1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5 caps,
6 encryption::{EncryptionKey, EncryptionSpec},
7 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8 record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9 types::{
10 basin::BasinName,
11 stream::{
12 ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13 StoredReadSessionOutput, StreamName,
14 },
15 },
16};
17use slatedb::config::{DurabilityLevel, ScanOptions};
18use tokio::{sync::broadcast, time::Instant};
19
20use super::{Backend, StreamHandle};
21use crate::{
22 backend::{
23 error::{
24 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
25 },
26 kv,
27 streamer::GuardedStreamerClient,
28 },
29 stream_id::StreamId,
30};
31
32impl Backend {
33 pub async fn open_for_check_tail(
34 &self,
35 basin: &BasinName,
36 stream: &StreamName,
37 ) -> Result<StreamHandle, CheckTailError> {
38 self.stream_handle_with_auto_create::<CheckTailError>(
39 basin,
40 stream,
41 |config| config.create_stream_on_read,
42 |_| Ok(EncryptionSpec::Plain),
43 )
44 .await
45 }
46
47 pub async fn open_for_read(
48 &self,
49 basin: &BasinName,
50 stream: &StreamName,
51 encryption_key: Option<EncryptionKey>,
52 ) -> Result<StreamHandle, ReadError> {
53 self.stream_handle_with_auto_create::<ReadError>(
54 basin,
55 stream,
56 |config| config.create_stream_on_read,
57 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
58 )
59 .await
60 }
61}
62
63impl StreamHandle {
64 pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
65 let tail = self.client.check_tail().await?;
66 Ok(tail)
67 }
68
69 pub async fn read(
70 self,
71 start: ReadStart,
72 end: ReadEnd,
73 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
74 let stream_id = self.client.stream_id();
75 let session = read_session(self.db, self.client, start, end).await?;
76 Ok(async_stream::stream! {
77 tokio::pin!(session);
78 while let Some(output) = session.next().await {
79 let output = match output {
80 Ok(output) => output
81 .decrypt(&self.encryption, stream_id.as_bytes())
82 .map_err(ReadError::from),
83 Err(err) => Err(err),
84 };
85 let should_stop = output.is_err();
86 yield output;
87 if should_stop {
88 break;
89 }
90 }
91 })
92 }
93}
94
95async fn read_session(
96 db: slatedb::Db,
97 client: GuardedStreamerClient,
98 start: ReadStart,
99 end: ReadEnd,
100) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
101 let stream_id = client.stream_id();
102 let tail = client.check_tail().await?;
103 let mut state = ReadSessionState {
104 start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
105 limit: EvaluatedReadLimit::Remaining(end.limit),
106 until: end.until,
107 wait: end.wait,
108 wait_deadline: None,
109 tail,
110 };
111 let session = async_stream::try_stream! {
112 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
113 if state.start_seq_num < state.tail.seq_num {
114 let start_key = kv::stream_record_data::ser_key(
115 stream_id,
116 StreamPosition {
117 seq_num: state.start_seq_num,
118 timestamp: 0,
119 },
120 );
121 let end_key = kv::stream_record_data::ser_key(
122 stream_id,
123 StreamPosition {
124 seq_num: state.tail.seq_num,
125 timestamp: 0,
126 },
127 );
128 let scan_opts = ScanOptions {
129 durability_filter: DurabilityLevel::Remote,
130 read_ahead_bytes: 1024 * 1024,
131 cache_blocks: true,
132 max_fetch_tasks: 8,
133 ..Default::default()
134 };
135 let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
136
137 let mut records = Metered::with_capacity(
138 limit.count()
139 .unwrap_or(usize::MAX)
140 .min(caps::RECORD_BATCH_MAX.count),
141 );
142
143 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
144 let Some(kv) = it.next().await? else {
145 break;
146 };
147 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
148 assert_eq!(deser_stream_id, stream_id);
149
150 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
151
152 if end.until.deny(pos.timestamp)
153 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
154 {
155 if records.is_empty() {
156 break 'session;
157 } else {
158 break;
159 }
160 }
161
162 if records.len() == caps::RECORD_BATCH_MAX.count
163 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
164 {
165 let new_records_buf = Metered::with_capacity(
166 limit.count()
167 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
168 .min(caps::RECORD_BATCH_MAX.count),
169 );
170 yield state.on_batch(StoredReadBatch {
171 records: std::mem::replace(&mut records, new_records_buf),
172 tail: None,
173 });
174 }
175
176 records.push(record);
177 }
178
179 if !records.is_empty() {
180 yield state.on_batch(StoredReadBatch {
181 records,
182 tail: None,
183 });
184 } else {
185 state.start_seq_num = state.tail.seq_num;
186 }
187 } else {
188 assert_eq!(state.start_seq_num, state.tail.seq_num);
189 if !end.may_follow() {
190 break;
191 }
192 match client.follow(state.start_seq_num).await? {
193 Ok(mut follow_rx) => {
194 state.arm_wait_deadline_if_unset();
196 if state.wait_deadline_expired() {
197 break;
198 }
199 yield StoredReadSessionOutput::Heartbeat(state.tail);
200 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
201 tokio::select! {
202 biased;
203 msg = follow_rx.recv() => {
204 match msg {
205 Ok(mut records) => {
206 let count = records.len();
207 let tail = super::streamer::next_pos(&records);
208 let allowed_count = count_allowed_records(limit, end.until, &records);
209 if allowed_count > 0 {
210 yield state.on_batch(StoredReadBatch {
211 records: records.drain(..allowed_count).collect(),
212 tail: Some(tail),
213 });
214 }
215 if allowed_count < count {
216 break 'session;
217 }
218 Ok(())
219 }
220 Err(broadcast::error::RecvError::Lagged(_)) => {
221 continue 'session;
223 }
224 Err(broadcast::error::RecvError::Closed) => {
225 Err(StreamerMissingInActionError)
226 }
227 }
228 }
229 _ = new_heartbeat_sleep() => {
230 yield StoredReadSessionOutput::Heartbeat(state.tail);
231 Ok(())
232 }
233 _ = wait_sleep_until(state.wait_deadline) => {
234 break 'session;
235 }
236 }?;
237 }
238 }
239 Err(tail) => {
240 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
241 state.tail = tail;
242 }
243 }
244 }
245 }
246 };
247 Ok(session)
248}
249
250async fn read_start_seq_num(
251 db: &slatedb::Db,
252 stream_id: StreamId,
253 start: ReadStart,
254 end: ReadEnd,
255 tail: StreamPosition,
256) -> Result<SeqNum, ReadError> {
257 let mut read_pos = match start.from {
258 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
259 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
260 ReadPosition::Timestamp(timestamp)
261 }
262 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
263 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
264 }
265 };
266 if match read_pos {
267 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
268 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
269 } {
270 if start.clamp {
271 read_pos = ReadPosition::SeqNum(tail.seq_num);
272 } else {
273 return Err(UnwrittenError(tail).into());
274 }
275 }
276 if let ReadPosition::SeqNum(start_seq_num) = read_pos
277 && start_seq_num == tail.seq_num
278 && !end.may_follow()
279 {
280 return Err(UnwrittenError(tail).into());
281 }
282 Ok(match read_pos {
283 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
284 ReadPosition::Timestamp(start_timestamp) => {
285 resolve_timestamp(db, stream_id, start_timestamp)
286 .await?
287 .unwrap_or(tail)
288 .seq_num
289 }
290 })
291}
292
293async fn resolve_timestamp(
294 db: &slatedb::Db,
295 stream_id: StreamId,
296 timestamp: Timestamp,
297) -> Result<Option<StreamPosition>, StorageError> {
298 let start_key = kv::stream_record_timestamp::ser_key(
299 stream_id,
300 StreamPosition {
301 seq_num: SeqNum::MIN,
302 timestamp,
303 },
304 );
305 let end_key = kv::stream_record_timestamp::ser_key(
306 stream_id,
307 StreamPosition {
308 seq_num: SeqNum::MAX,
309 timestamp: Timestamp::MAX,
310 },
311 );
312 let scan_opts = ScanOptions {
313 durability_filter: DurabilityLevel::Remote,
314 ..Default::default()
315 };
316 let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
317 Ok(match it.next().await? {
318 Some(kv) => {
319 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
320 assert_eq!(deser_stream_id, stream_id);
321 assert!(pos.timestamp >= timestamp);
322 kv::stream_record_timestamp::deser_value(kv.value)?;
323 Some(StreamPosition {
324 seq_num: pos.seq_num,
325 timestamp: pos.timestamp,
326 })
327 }
328 None => None,
329 })
330}
331
332struct ReadSessionState {
333 start_seq_num: u64,
334 limit: EvaluatedReadLimit,
335 until: ReadUntil,
336 wait: Option<Duration>,
337 wait_deadline: Option<Instant>,
338 tail: StreamPosition,
339}
340
341impl ReadSessionState {
342 fn arm_wait_deadline_if_unset(&mut self) {
343 if self.wait_deadline.is_none() {
344 self.reset_wait_deadline();
345 }
346 }
347
348 fn reset_wait_deadline(&mut self) {
349 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
350 }
351
352 fn wait_deadline_expired(&self) -> bool {
353 self.wait_deadline
354 .is_some_and(|deadline| deadline <= Instant::now())
355 }
356
357 fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
358 if let Some(tail) = batch.tail {
359 self.tail = tail;
360 }
361 let last_record = batch.records.last().expect("non-empty");
362 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
363 panic!("batch after exhausted limit");
364 };
365 let count = batch.records.len();
366 let bytes = batch.records.metered_size();
367 let last_position = last_record.position();
368 assert!(limit.allow(count, bytes));
369 assert!(self.until.allow(last_position.timestamp));
370 self.start_seq_num = last_position.seq_num + 1;
371 self.limit = limit.remaining(count, bytes);
372 self.reset_wait_deadline();
373 StoredReadSessionOutput::Batch(batch)
374 }
375}
376
377fn count_allowed_records(
378 limit: ReadLimit,
379 until: ReadUntil,
380 records: &[Metered<StoredSequencedRecord>],
381) -> usize {
382 let mut acc_size = 0;
383 let mut acc_count = 0;
384 for record in records {
385 if limit.deny(acc_count + 1, acc_size + record.metered_size())
386 || until.deny(record.position().timestamp)
387 {
388 break;
389 }
390 acc_count += 1;
391 acc_size += record.metered_size();
392 }
393 acc_count
394}
395
396#[cfg(not(test))]
397fn new_heartbeat_sleep() -> tokio::time::Sleep {
398 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
399}
400
401#[cfg(test)]
402fn new_heartbeat_sleep() -> tokio::time::Sleep {
403 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
404}
405
406async fn wait_sleep_until(deadline: Option<Instant>) {
407 match deadline {
408 Some(deadline) => tokio::time::sleep_until(deadline).await,
409 None => {
410 std::future::pending::<()>().await;
411 }
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use std::{sync::Arc, task::Poll};
418
419 use bytesize::ByteSize;
420 use futures::StreamExt;
421 use s2_common::{
422 read_extent::{ReadLimit, ReadUntil},
423 record::{Metered, Record},
424 types::{
425 basin::BasinName,
426 config::{BasinConfig, OptionalStreamConfig},
427 resources::ProvisionMode,
428 stream::{
429 AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
430 ReadSessionOutput, ReadStart,
431 },
432 },
433 };
434 use slatedb::{Db, WriteBatch, object_store::memory::InMemory};
435 use tokio::time::Instant;
436
437 use super::*;
438 use crate::{
439 backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
440 stream_id::StreamId,
441 };
442
443 fn append_input(record: Record) -> AppendInput {
444 let record: AppendRecord = AppendRecordParts {
445 timestamp: None,
446 record: Metered::from(record),
447 }
448 .try_into()
449 .unwrap();
450 let records: AppendRecordBatch = vec![record].try_into().unwrap();
451 AppendInput {
452 records,
453 match_seq_num: None,
454 fencing_token: None,
455 }
456 }
457
458 fn map_test_output(
459 output: Option<Result<ReadSessionOutput, ReadError>>,
460 ) -> Option<ReadSessionOutput> {
461 match output {
462 Some(Ok(output)) => Some(output),
463 Some(Err(e)) => panic!("Read error: {e:?}"),
464 None => None,
465 }
466 }
467
468 async fn poll_next_after_advance<S>(
469 session: &mut std::pin::Pin<Box<S>>,
470 advance_by: Duration,
471 ) -> Poll<Option<ReadSessionOutput>>
472 where
473 S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
474 {
475 let mut pinned_session = session.as_mut();
476 let next = pinned_session.next();
477 tokio::pin!(next);
478
479 assert!(
480 matches!(futures::poll!(&mut next), Poll::Pending),
481 "session unexpectedly yielded before time advanced"
482 );
483
484 tokio::time::advance(advance_by).await;
485 tokio::task::yield_now().await;
486
487 match futures::poll!(&mut next) {
488 Poll::Ready(output) => Poll::Ready(map_test_output(output)),
489 Poll::Pending => Poll::Pending,
490 }
491 }
492
493 #[tokio::test]
494 async fn resolve_timestamp_bounded_to_stream() {
495 let object_store = Arc::new(InMemory::new());
496 let db = Db::builder("/test", object_store).build().await.unwrap();
497 let backend = Backend::new(db, ByteSize::mib(10));
498
499 let stream_a: StreamId = [0u8; 32].into();
500 let stream_b: StreamId = [1u8; 32].into();
501
502 backend
503 .db
504 .put(
505 kv::stream_record_timestamp::ser_key(
506 stream_a,
507 StreamPosition {
508 seq_num: 0,
509 timestamp: 1000,
510 },
511 ),
512 kv::stream_record_timestamp::ser_value(),
513 )
514 .await
515 .unwrap();
516 backend
517 .db
518 .put(
519 kv::stream_record_timestamp::ser_key(
520 stream_b,
521 StreamPosition {
522 seq_num: 0,
523 timestamp: 2000,
524 },
525 ),
526 kv::stream_record_timestamp::ser_value(),
527 )
528 .await
529 .unwrap();
530
531 let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
533 assert_eq!(
534 result,
535 Some(StreamPosition {
536 seq_num: 0,
537 timestamp: 1000
538 })
539 );
540
541 let result = resolve_timestamp(&backend.db, stream_a, 1500)
543 .await
544 .unwrap();
545 assert_eq!(result, None);
546 }
547
548 #[tokio::test]
549 async fn read_completes_when_all_records_deleted() {
550 let object_store = Arc::new(InMemory::new());
551 let db = Db::builder("/test", object_store).build().await.unwrap();
552 let backend = Backend::new(db, ByteSize::mib(10));
553
554 let basin: BasinName = "test-basin".parse().unwrap();
555 backend
556 .provision_basin(
557 basin.clone(),
558 BasinConfig::default(),
559 ProvisionMode::CreateOnly {
560 request_token: None,
561 },
562 )
563 .await
564 .unwrap();
565 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
566 backend
567 .provision_stream(
568 basin.clone(),
569 stream.clone(),
570 OptionalStreamConfig::default(),
571 ProvisionMode::CreateOnly {
572 request_token: None,
573 },
574 )
575 .await
576 .unwrap();
577
578 let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
579 let ack = backend
580 .open_for_append(&basin, &stream, None)
581 .await
582 .unwrap()
583 .append(input)
584 .await
585 .unwrap();
586 assert!(ack.end.seq_num > 0);
587
588 let stream_id = StreamId::new(&basin, &stream);
589 let mut batch = WriteBatch::new();
590 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
591 backend.db.write(batch).await.unwrap();
592
593 let start = ReadStart {
594 from: ReadFrom::SeqNum(0),
595 clamp: false,
596 };
597 let end = ReadEnd {
598 limit: ReadLimit::Count(10),
599 until: ReadUntil::Unbounded,
600 wait: None,
601 };
602 let session = backend
603 .open_for_read(&basin, &stream, None)
604 .await
605 .unwrap()
606 .read(start, end)
607 .await
608 .unwrap();
609 let records: Vec<_> = tokio::time::timeout(
610 Duration::from_secs(2),
611 futures::StreamExt::collect::<Vec<_>>(session),
612 )
613 .await
614 .expect("read should not spin forever");
615 assert!(records.into_iter().all(|r| r.is_ok()));
616 }
617
618 #[tokio::test(flavor = "current_thread", start_paused = true)]
619 async fn read_wait_is_not_extended_by_heartbeats() {
620 let object_store = Arc::new(InMemory::new());
621 let db = Db::builder("/test", object_store).build().await.unwrap();
622 let backend = Backend::new(db, ByteSize::mib(10));
623
624 let basin: BasinName = "test-basin".parse().unwrap();
625 backend
626 .provision_basin(
627 basin.clone(),
628 BasinConfig::default(),
629 ProvisionMode::CreateOnly {
630 request_token: None,
631 },
632 )
633 .await
634 .unwrap();
635 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
636 backend
637 .provision_stream(
638 basin.clone(),
639 stream.clone(),
640 OptionalStreamConfig::default(),
641 ProvisionMode::CreateOnly {
642 request_token: None,
643 },
644 )
645 .await
646 .unwrap();
647
648 let wait = Duration::from_millis(30);
649 let start = ReadStart {
650 from: ReadFrom::SeqNum(0),
651 clamp: false,
652 };
653 let end = ReadEnd {
654 limit: ReadLimit::Unbounded,
655 until: ReadUntil::Unbounded,
656 wait: Some(wait),
657 };
658
659 let session = backend
660 .open_for_read(&basin, &stream, None)
661 .await
662 .unwrap()
663 .read(start, end)
664 .await
665 .unwrap();
666 let mut session = Box::pin(session);
667 let probe_step = Duration::from_millis(1);
668 let first = session
669 .as_mut()
670 .next()
671 .await
672 .expect("session should enter follow mode")
673 .expect("session should not error");
674 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
675
676 let started = Instant::now();
677 let second = match poll_next_after_advance(&mut session, wait).await {
678 Poll::Ready(Some(output)) => output,
679 Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
680 Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
681 };
682 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
683
684 tokio::task::yield_now().await;
685 let closed_at = loop {
686 match futures::poll!(session.as_mut().next()) {
687 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
688 Poll::Ready(Some(Ok(output))) => {
689 panic!("unexpected output after wait deadline: {output:?}");
690 }
691 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
692 Poll::Ready(None) => break Instant::now(),
693 Poll::Pending => panic!("session should close once the wait budget expires"),
694 }
695 };
696
697 assert!(closed_at >= started + wait);
698 assert!(closed_at <= started + wait + probe_step);
699 }
700
701 #[tokio::test(flavor = "current_thread", start_paused = true)]
702 async fn read_wait_is_reset_by_delivered_follow_batch() {
703 let object_store = Arc::new(InMemory::new());
704 let db = Db::builder("/test", object_store).build().await.unwrap();
705 let backend = Backend::new(db, ByteSize::mib(10));
706
707 let basin: BasinName = "test-basin".parse().unwrap();
708 backend
709 .provision_basin(
710 basin.clone(),
711 BasinConfig::default(),
712 ProvisionMode::CreateOnly {
713 request_token: None,
714 },
715 )
716 .await
717 .unwrap();
718 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
719 backend
720 .provision_stream(
721 basin.clone(),
722 stream.clone(),
723 OptionalStreamConfig::default(),
724 ProvisionMode::CreateOnly {
725 request_token: None,
726 },
727 )
728 .await
729 .unwrap();
730
731 let initial_input =
732 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
733 backend
734 .open_for_append(&basin, &stream, None)
735 .await
736 .unwrap()
737 .append(initial_input)
738 .await
739 .unwrap();
740
741 let wait = Duration::from_millis(30);
742 let probe_step = Duration::from_millis(1);
743 let start = ReadStart {
744 from: ReadFrom::SeqNum(0),
745 clamp: false,
746 };
747 let end = ReadEnd {
748 limit: ReadLimit::Unbounded,
749 until: ReadUntil::Unbounded,
750 wait: Some(wait),
751 };
752
753 let session = backend
754 .open_for_read(&basin, &stream, None)
755 .await
756 .unwrap()
757 .read(start, end)
758 .await
759 .unwrap();
760 let mut session = Box::pin(session);
761
762 let first = session
763 .as_mut()
764 .next()
765 .await
766 .expect("session should yield the initial batch")
767 .expect("session should not error");
768 let ReadSessionOutput::Batch(batch) = first else {
769 panic!("expected initial batch");
770 };
771 let initial_record = batch
772 .records
773 .first()
774 .expect("batch should contain one record");
775 let Record::Envelope(initial_envelope) = initial_record.inner() else {
776 panic!("expected plaintext envelope record");
777 };
778 assert_eq!(initial_envelope.body().as_ref(), b"initial");
779
780 let second = session
781 .as_mut()
782 .next()
783 .await
784 .expect("session should enter follow mode")
785 .expect("session should not error");
786 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
787
788 tokio::time::advance(Duration::from_millis(20)).await;
789 tokio::task::yield_now().await;
790
791 let follow_input =
792 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
793 backend
794 .open_for_append(&basin, &stream, None)
795 .await
796 .unwrap()
797 .append(follow_input)
798 .await
799 .unwrap();
800
801 let follow = session
802 .as_mut()
803 .next()
804 .await
805 .expect("session should deliver the live batch")
806 .expect("session should not error");
807 let reset_at = Instant::now();
808 let ReadSessionOutput::Batch(batch) = follow else {
809 panic!("expected live batch after append");
810 };
811 let follow_record = batch
812 .records
813 .first()
814 .expect("batch should contain one record");
815 let Record::Envelope(follow_envelope) = follow_record.inner() else {
816 panic!("expected plaintext envelope record");
817 };
818 assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
819
820 tokio::time::advance(wait - probe_step).await;
821 tokio::task::yield_now().await;
822
823 loop {
824 match futures::poll!(session.as_mut().next()) {
825 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
826 Poll::Ready(Some(Ok(output))) => {
827 panic!("unexpected output before the reset wait deadline: {output:?}");
828 }
829 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
830 Poll::Ready(None) => {
831 panic!("session closed before the reset wait budget expired");
832 }
833 Poll::Pending => break,
834 }
835 }
836
837 tokio::time::advance(probe_step).await;
838 tokio::task::yield_now().await;
839
840 let closed_at = loop {
841 match futures::poll!(session.as_mut().next()) {
842 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
843 Poll::Ready(Some(Ok(output))) => {
844 panic!("unexpected output after the reset wait deadline: {output:?}");
845 }
846 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
847 Poll::Ready(None) => break Instant::now(),
848 Poll::Pending => {
849 panic!("session should close once the reset wait budget expires");
850 }
851 }
852 };
853
854 assert!(closed_at >= reset_at + wait);
855 assert!(closed_at <= reset_at + wait + probe_step);
856 }
857
858 #[tokio::test(flavor = "current_thread", start_paused = true)]
859 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
860 let object_store = Arc::new(InMemory::new());
861 let db = Db::builder("/test", object_store).build().await.unwrap();
862 let backend = Backend::new(db, ByteSize::mib(10));
863
864 let basin: BasinName = "test-basin".parse().unwrap();
865 backend
866 .provision_basin(
867 basin.clone(),
868 BasinConfig::default(),
869 ProvisionMode::CreateOnly {
870 request_token: None,
871 },
872 )
873 .await
874 .unwrap();
875 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
876 backend
877 .provision_stream(
878 basin.clone(),
879 stream.clone(),
880 OptionalStreamConfig::default(),
881 ProvisionMode::CreateOnly {
882 request_token: None,
883 },
884 )
885 .await
886 .unwrap();
887
888 let wait = Duration::from_secs(30);
889 let start = ReadStart {
890 from: ReadFrom::SeqNum(0),
891 clamp: false,
892 };
893 let end = ReadEnd {
894 limit: ReadLimit::Unbounded,
895 until: ReadUntil::Unbounded,
896 wait: Some(wait),
897 };
898 let session = backend
899 .open_for_read(&basin, &stream, None)
900 .await
901 .unwrap()
902 .read(start, end)
903 .await
904 .unwrap();
905 let mut session = Box::pin(session);
906
907 let first = session
908 .as_mut()
909 .next()
910 .await
911 .expect("session should enter follow mode")
912 .expect("session should not error");
913 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
914
915 let stream_id = StreamId::new(&basin, &stream);
916 let mut delete_batch = WriteBatch::new();
917 let lagged_appends = FOLLOWER_MAX_LAG + 25;
918
919 for i in 0..lagged_appends {
920 let input = append_input(
921 Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
922 );
923 let ack = backend
924 .open_for_append(&basin, &stream, None)
925 .await
926 .unwrap()
927 .append(input)
928 .await
929 .unwrap();
930 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
931 }
932
933 backend.db.write(delete_batch).await.unwrap();
934
935 tokio::time::advance(wait + Duration::from_secs(1)).await;
936 tokio::task::yield_now().await;
937
938 let next = session.as_mut().next().await;
939 assert!(
940 next.is_none(),
941 "session should close immediately once the original wait budget has elapsed"
942 );
943 }
944
945 #[tokio::test(flavor = "current_thread", start_paused = true)]
946 async fn unbounded_follow_survives_streamer_dormancy() {
947 let object_store = Arc::new(InMemory::new());
948 let db = Db::builder("/test", object_store).build().await.unwrap();
949 let backend = Backend::new(db, ByteSize::mib(10));
950
951 let basin: BasinName = "test-basin".parse().unwrap();
952 backend
953 .provision_basin(
954 basin.clone(),
955 BasinConfig::default(),
956 ProvisionMode::CreateOnly {
957 request_token: None,
958 },
959 )
960 .await
961 .unwrap();
962 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
963 backend
964 .provision_stream(
965 basin.clone(),
966 stream.clone(),
967 OptionalStreamConfig::default(),
968 ProvisionMode::CreateOnly {
969 request_token: None,
970 },
971 )
972 .await
973 .unwrap();
974
975 let initial_input =
976 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
977 backend
978 .open_for_append(&basin, &stream, None)
979 .await
980 .unwrap()
981 .append(initial_input)
982 .await
983 .unwrap();
984
985 let start = ReadStart {
986 from: ReadFrom::SeqNum(0),
987 clamp: false,
988 };
989 let end = ReadEnd {
990 limit: ReadLimit::Unbounded,
991 until: ReadUntil::Unbounded,
992 wait: None,
993 };
994 let session = backend
995 .open_for_read(&basin, &stream, None)
996 .await
997 .unwrap()
998 .read(start, end)
999 .await
1000 .unwrap();
1001 let mut session = Box::pin(session);
1002
1003 let first = session
1004 .as_mut()
1005 .next()
1006 .await
1007 .expect("session should yield initial batch")
1008 .expect("session should not error");
1009 assert!(matches!(first, ReadSessionOutput::Batch(_)));
1010
1011 let second = session
1012 .as_mut()
1013 .next()
1014 .await
1015 .expect("session should enter follow mode")
1016 .expect("session should not error");
1017 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1018
1019 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1020 tokio::task::yield_now().await;
1021
1022 let follow_input =
1023 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1024 backend
1025 .open_for_append(&basin, &stream, None)
1026 .await
1027 .unwrap()
1028 .append(follow_input)
1029 .await
1030 .unwrap();
1031
1032 let next = session
1033 .as_mut()
1034 .next()
1035 .await
1036 .expect("session should stay open after dormancy")
1037 .expect("session should not error after dormancy");
1038 let ReadSessionOutput::Batch(batch) = next else {
1039 panic!("expected new batch after append");
1040 };
1041 assert_eq!(batch.records.len(), 1);
1042 let record = batch.records.first().expect("batch should have one record");
1043 let Record::Envelope(envelope) = record.inner() else {
1044 panic!("expected envelope record");
1045 };
1046 assert_eq!(envelope.body().as_ref(), b"follow-1");
1047 }
1048}