1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5 caps,
6 encryption::{EncryptionKey, EncryptionSpec},
7 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8 record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9 types::{
10 basin::BasinName,
11 stream::{
12 ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13 StoredReadSessionOutput, StreamName,
14 },
15 },
16};
17use slatedb::{
18 IterationOrder,
19 config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25 backend::{
26 error::{
27 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28 },
29 kv,
30 streamer::GuardedStreamerClient,
31 },
32 stream_id::StreamId,
33};
34
35impl Backend {
36 pub async fn open_for_check_tail(
37 &self,
38 basin: &BasinName,
39 stream: &StreamName,
40 ) -> Result<StreamHandle, CheckTailError> {
41 self.stream_handle_with_auto_create::<CheckTailError>(
42 basin,
43 stream,
44 |config| config.create_stream_on_read,
45 |_| Ok(EncryptionSpec::Plain),
46 )
47 .await
48 }
49
50 pub async fn open_for_read(
51 &self,
52 basin: &BasinName,
53 stream: &StreamName,
54 encryption_key: Option<EncryptionKey>,
55 ) -> Result<StreamHandle, ReadError> {
56 self.stream_handle_with_auto_create::<ReadError>(
57 basin,
58 stream,
59 |config| config.create_stream_on_read,
60 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61 )
62 .await
63 }
64}
65
66impl StreamHandle {
67 pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68 let tail = self.client.check_tail().await?;
69 Ok(tail)
70 }
71
72 pub async fn read(
73 self,
74 start: ReadStart,
75 end: ReadEnd,
76 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77 let stream_id = self.client.stream_id();
78 let session = read_session(self.db, self.client, start, end).await?;
79 Ok(async_stream::stream! {
80 tokio::pin!(session);
81 while let Some(output) = session.next().await {
82 let output = match output {
83 Ok(output) => output
84 .decrypt(&self.encryption, stream_id.as_bytes())
85 .map_err(ReadError::from),
86 Err(err) => Err(err),
87 };
88 let should_stop = output.is_err();
89 yield output;
90 if should_stop {
91 break;
92 }
93 }
94 })
95 }
96}
97
98async fn read_session(
99 db: slatedb::Db,
100 client: GuardedStreamerClient,
101 start: ReadStart,
102 end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104 let stream_id = client.stream_id();
105 let tail = client.check_tail().await?;
106 let mut state = ReadSessionState {
107 start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108 limit: EvaluatedReadLimit::Remaining(end.limit),
109 until: end.until,
110 wait: end.wait,
111 wait_deadline: None,
112 tail,
113 };
114 let session = async_stream::try_stream! {
115 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116 if state.start_seq_num < state.tail.seq_num {
117 let start_key = kv::stream_record_data::ser_key(
118 stream_id,
119 StreamPosition {
120 seq_num: state.start_seq_num,
121 timestamp: 0,
122 },
123 );
124 let end_key = kv::stream_record_data::ser_key(
125 stream_id,
126 StreamPosition {
127 seq_num: state.tail.seq_num,
128 timestamp: 0,
129 },
130 );
131 static SCAN_OPTS: ScanOptions = ScanOptions {
132 durability_filter: DurabilityLevel::Remote,
133 dirty: false,
134 read_ahead_bytes: 1024 * 1024,
135 cache_blocks: true,
136 max_fetch_tasks: 8,
137 order: IterationOrder::Ascending,
138 };
139 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141 let mut records = Metered::with_capacity(
142 limit.count()
143 .unwrap_or(usize::MAX)
144 .min(caps::RECORD_BATCH_MAX.count),
145 );
146
147 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148 let Some(kv) = it.next().await? else {
149 break;
150 };
151 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152 assert_eq!(deser_stream_id, stream_id);
153
154 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156 if end.until.deny(pos.timestamp)
157 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158 {
159 if records.is_empty() {
160 break 'session;
161 } else {
162 break;
163 }
164 }
165
166 if records.len() == caps::RECORD_BATCH_MAX.count
167 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168 {
169 let new_records_buf = Metered::with_capacity(
170 limit.count()
171 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172 .min(caps::RECORD_BATCH_MAX.count),
173 );
174 yield state.on_batch(StoredReadBatch {
175 records: std::mem::replace(&mut records, new_records_buf),
176 tail: None,
177 });
178 }
179
180 records.push(record);
181 }
182
183 if !records.is_empty() {
184 yield state.on_batch(StoredReadBatch {
185 records,
186 tail: None,
187 });
188 } else {
189 state.start_seq_num = state.tail.seq_num;
190 }
191 } else {
192 assert_eq!(state.start_seq_num, state.tail.seq_num);
193 if !end.may_follow() {
194 break;
195 }
196 match client.follow(state.start_seq_num).await? {
197 Ok(mut follow_rx) => {
198 state.arm_wait_deadline_if_unset();
200 if state.wait_deadline_expired() {
201 break;
202 }
203 yield StoredReadSessionOutput::Heartbeat(state.tail);
204 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205 tokio::select! {
206 biased;
207 msg = follow_rx.recv() => {
208 match msg {
209 Ok(mut records) => {
210 let count = records.len();
211 let tail = super::streamer::next_pos(&records);
212 let allowed_count = count_allowed_records(limit, end.until, &records);
213 if allowed_count > 0 {
214 yield state.on_batch(StoredReadBatch {
215 records: records.drain(..allowed_count).collect(),
216 tail: Some(tail),
217 });
218 }
219 if allowed_count < count {
220 break 'session;
221 }
222 Ok(())
223 }
224 Err(broadcast::error::RecvError::Lagged(_)) => {
225 continue 'session;
227 }
228 Err(broadcast::error::RecvError::Closed) => {
229 Err(StreamerMissingInActionError)
230 }
231 }
232 }
233 _ = new_heartbeat_sleep() => {
234 yield StoredReadSessionOutput::Heartbeat(state.tail);
235 Ok(())
236 }
237 _ = wait_sleep_until(state.wait_deadline) => {
238 break 'session;
239 }
240 }?;
241 }
242 }
243 Err(tail) => {
244 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245 state.tail = tail;
246 }
247 }
248 }
249 }
250 };
251 Ok(session)
252}
253
254async fn read_start_seq_num(
255 db: &slatedb::Db,
256 stream_id: StreamId,
257 start: ReadStart,
258 end: ReadEnd,
259 tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261 let mut read_pos = match start.from {
262 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264 ReadPosition::Timestamp(timestamp)
265 }
266 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268 }
269 };
270 if match read_pos {
271 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273 } {
274 if start.clamp {
275 read_pos = ReadPosition::SeqNum(tail.seq_num);
276 } else {
277 return Err(UnwrittenError(tail).into());
278 }
279 }
280 if let ReadPosition::SeqNum(start_seq_num) = read_pos
281 && start_seq_num == tail.seq_num
282 && !end.may_follow()
283 {
284 return Err(UnwrittenError(tail).into());
285 }
286 Ok(match read_pos {
287 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288 ReadPosition::Timestamp(start_timestamp) => {
289 resolve_timestamp(db, stream_id, start_timestamp)
290 .await?
291 .unwrap_or(tail)
292 .seq_num
293 }
294 })
295}
296
297async fn resolve_timestamp(
298 db: &slatedb::Db,
299 stream_id: StreamId,
300 timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302 let start_key = kv::stream_record_timestamp::ser_key(
303 stream_id,
304 StreamPosition {
305 seq_num: SeqNum::MIN,
306 timestamp,
307 },
308 );
309 let end_key = kv::stream_record_timestamp::ser_key(
310 stream_id,
311 StreamPosition {
312 seq_num: SeqNum::MAX,
313 timestamp: Timestamp::MAX,
314 },
315 );
316 static SCAN_OPTS: ScanOptions = ScanOptions {
317 durability_filter: DurabilityLevel::Remote,
318 dirty: false,
319 read_ahead_bytes: 1,
320 cache_blocks: false,
321 max_fetch_tasks: 1,
322 order: IterationOrder::Ascending,
323 };
324 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325 Ok(match it.next().await? {
326 Some(kv) => {
327 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328 assert_eq!(deser_stream_id, stream_id);
329 assert!(pos.timestamp >= timestamp);
330 kv::stream_record_timestamp::deser_value(kv.value)?;
331 Some(StreamPosition {
332 seq_num: pos.seq_num,
333 timestamp: pos.timestamp,
334 })
335 }
336 None => None,
337 })
338}
339
340struct ReadSessionState {
341 start_seq_num: u64,
342 limit: EvaluatedReadLimit,
343 until: ReadUntil,
344 wait: Option<Duration>,
345 wait_deadline: Option<Instant>,
346 tail: StreamPosition,
347}
348
349impl ReadSessionState {
350 fn arm_wait_deadline_if_unset(&mut self) {
351 if self.wait_deadline.is_none() {
352 self.reset_wait_deadline();
353 }
354 }
355
356 fn reset_wait_deadline(&mut self) {
357 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358 }
359
360 fn wait_deadline_expired(&self) -> bool {
361 self.wait_deadline
362 .is_some_and(|deadline| deadline <= Instant::now())
363 }
364
365 fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366 if let Some(tail) = batch.tail {
367 self.tail = tail;
368 }
369 let last_record = batch.records.last().expect("non-empty");
370 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371 panic!("batch after exhausted limit");
372 };
373 let count = batch.records.len();
374 let bytes = batch.records.metered_size();
375 let last_position = last_record.position();
376 assert!(limit.allow(count, bytes));
377 assert!(self.until.allow(last_position.timestamp));
378 self.start_seq_num = last_position.seq_num + 1;
379 self.limit = limit.remaining(count, bytes);
380 self.reset_wait_deadline();
381 StoredReadSessionOutput::Batch(batch)
382 }
383}
384
385fn count_allowed_records(
386 limit: ReadLimit,
387 until: ReadUntil,
388 records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390 let mut acc_size = 0;
391 let mut acc_count = 0;
392 for record in records {
393 if limit.deny(acc_count + 1, acc_size + record.metered_size())
394 || until.deny(record.position().timestamp)
395 {
396 break;
397 }
398 acc_count += 1;
399 acc_size += record.metered_size();
400 }
401 acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415 match deadline {
416 Some(deadline) => tokio::time::sleep_until(deadline).await,
417 None => {
418 std::future::pending::<()>().await;
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::{sync::Arc, task::Poll};
426
427 use bytesize::ByteSize;
428 use futures::StreamExt;
429 use s2_common::{
430 read_extent::{ReadLimit, ReadUntil},
431 record::{Metered, Record},
432 types::{
433 basin::{BasinName, CreateBasinIntent},
434 config::{BasinConfig, OptionalStreamConfig},
435 stream::{
436 AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts,
437 CreateStreamIntent, ReadEnd, ReadFrom, ReadSessionOutput, ReadStart,
438 },
439 },
440 };
441 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
442 use tokio::time::Instant;
443
444 use super::*;
445 use crate::{
446 backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
447 stream_id::StreamId,
448 };
449
450 fn append_input(record: Record) -> AppendInput {
451 let record: AppendRecord = AppendRecordParts {
452 timestamp: None,
453 record: Metered::from(record),
454 }
455 .try_into()
456 .unwrap();
457 let records: AppendRecordBatch = vec![record].try_into().unwrap();
458 AppendInput {
459 records,
460 match_seq_num: None,
461 fencing_token: None,
462 }
463 }
464
465 fn map_test_output(
466 output: Option<Result<ReadSessionOutput, ReadError>>,
467 ) -> Option<ReadSessionOutput> {
468 match output {
469 Some(Ok(output)) => Some(output),
470 Some(Err(e)) => panic!("Read error: {e:?}"),
471 None => None,
472 }
473 }
474
475 async fn poll_next_after_advance<S>(
476 session: &mut std::pin::Pin<Box<S>>,
477 advance_by: Duration,
478 ) -> Poll<Option<ReadSessionOutput>>
479 where
480 S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
481 {
482 let mut pinned_session = session.as_mut();
483 let next = pinned_session.next();
484 tokio::pin!(next);
485
486 assert!(
487 matches!(futures::poll!(&mut next), Poll::Pending),
488 "session unexpectedly yielded before time advanced"
489 );
490
491 tokio::time::advance(advance_by).await;
492 tokio::task::yield_now().await;
493
494 match futures::poll!(&mut next) {
495 Poll::Ready(output) => Poll::Ready(map_test_output(output)),
496 Poll::Pending => Poll::Pending,
497 }
498 }
499
500 #[tokio::test]
501 async fn resolve_timestamp_bounded_to_stream() {
502 let object_store = Arc::new(InMemory::new());
503 let db = Db::builder("/test", object_store).build().await.unwrap();
504 let backend = Backend::new(db, ByteSize::mib(10));
505
506 let stream_a: StreamId = [0u8; 32].into();
507 let stream_b: StreamId = [1u8; 32].into();
508
509 backend
510 .db
511 .put(
512 kv::stream_record_timestamp::ser_key(
513 stream_a,
514 StreamPosition {
515 seq_num: 0,
516 timestamp: 1000,
517 },
518 ),
519 kv::stream_record_timestamp::ser_value(),
520 )
521 .await
522 .unwrap();
523 backend
524 .db
525 .put(
526 kv::stream_record_timestamp::ser_key(
527 stream_b,
528 StreamPosition {
529 seq_num: 0,
530 timestamp: 2000,
531 },
532 ),
533 kv::stream_record_timestamp::ser_value(),
534 )
535 .await
536 .unwrap();
537
538 let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
540 assert_eq!(
541 result,
542 Some(StreamPosition {
543 seq_num: 0,
544 timestamp: 1000
545 })
546 );
547
548 let result = resolve_timestamp(&backend.db, stream_a, 1500)
550 .await
551 .unwrap();
552 assert_eq!(result, None);
553 }
554
555 #[tokio::test]
556 async fn read_completes_when_all_records_deleted() {
557 let object_store = Arc::new(InMemory::new());
558 let db = Db::builder("/test", object_store).build().await.unwrap();
559 let backend = Backend::new(db, ByteSize::mib(10));
560
561 let basin: BasinName = "test-basin".parse().unwrap();
562 backend
563 .create_basin(
564 basin.clone(),
565 CreateBasinIntent::CreateOnly {
566 config: BasinConfig::default(),
567 request_token: None,
568 },
569 )
570 .await
571 .unwrap();
572 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
573 backend
574 .create_stream(
575 basin.clone(),
576 stream.clone(),
577 CreateStreamIntent::CreateOnly {
578 config: OptionalStreamConfig::default(),
579 request_token: None,
580 },
581 )
582 .await
583 .unwrap();
584
585 let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
586 let ack = backend
587 .open_for_append(&basin, &stream, None)
588 .await
589 .unwrap()
590 .append(input)
591 .await
592 .unwrap();
593 assert!(ack.end.seq_num > 0);
594
595 let stream_id = StreamId::new(&basin, &stream);
596 let mut batch = WriteBatch::new();
597 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
598 static WRITE_OPTS: WriteOptions = WriteOptions {
599 await_durable: true,
600 };
601 backend
602 .db
603 .write_with_options(batch, &WRITE_OPTS)
604 .await
605 .unwrap();
606
607 let start = ReadStart {
608 from: ReadFrom::SeqNum(0),
609 clamp: false,
610 };
611 let end = ReadEnd {
612 limit: ReadLimit::Count(10),
613 until: ReadUntil::Unbounded,
614 wait: None,
615 };
616 let session = backend
617 .open_for_read(&basin, &stream, None)
618 .await
619 .unwrap()
620 .read(start, end)
621 .await
622 .unwrap();
623 let records: Vec<_> = tokio::time::timeout(
624 Duration::from_secs(2),
625 futures::StreamExt::collect::<Vec<_>>(session),
626 )
627 .await
628 .expect("read should not spin forever");
629 assert!(records.into_iter().all(|r| r.is_ok()));
630 }
631
632 #[tokio::test(flavor = "current_thread", start_paused = true)]
633 async fn read_wait_is_not_extended_by_heartbeats() {
634 let object_store = Arc::new(InMemory::new());
635 let db = Db::builder("/test", object_store).build().await.unwrap();
636 let backend = Backend::new(db, ByteSize::mib(10));
637
638 let basin: BasinName = "test-basin".parse().unwrap();
639 backend
640 .create_basin(
641 basin.clone(),
642 CreateBasinIntent::CreateOnly {
643 config: BasinConfig::default(),
644 request_token: None,
645 },
646 )
647 .await
648 .unwrap();
649 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
650 backend
651 .create_stream(
652 basin.clone(),
653 stream.clone(),
654 CreateStreamIntent::CreateOnly {
655 config: OptionalStreamConfig::default(),
656 request_token: None,
657 },
658 )
659 .await
660 .unwrap();
661
662 let wait = Duration::from_millis(30);
663 let start = ReadStart {
664 from: ReadFrom::SeqNum(0),
665 clamp: false,
666 };
667 let end = ReadEnd {
668 limit: ReadLimit::Unbounded,
669 until: ReadUntil::Unbounded,
670 wait: Some(wait),
671 };
672
673 let session = backend
674 .open_for_read(&basin, &stream, None)
675 .await
676 .unwrap()
677 .read(start, end)
678 .await
679 .unwrap();
680 let mut session = Box::pin(session);
681 let probe_step = Duration::from_millis(1);
682 let first = session
683 .as_mut()
684 .next()
685 .await
686 .expect("session should enter follow mode")
687 .expect("session should not error");
688 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
689
690 let started = Instant::now();
691 let second = match poll_next_after_advance(&mut session, wait).await {
692 Poll::Ready(Some(output)) => output,
693 Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
694 Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
695 };
696 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
697
698 tokio::task::yield_now().await;
699 let closed_at = loop {
700 match futures::poll!(session.as_mut().next()) {
701 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
702 Poll::Ready(Some(Ok(output))) => {
703 panic!("unexpected output after wait deadline: {output:?}");
704 }
705 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
706 Poll::Ready(None) => break Instant::now(),
707 Poll::Pending => panic!("session should close once the wait budget expires"),
708 }
709 };
710
711 assert!(closed_at >= started + wait);
712 assert!(closed_at <= started + wait + probe_step);
713 }
714
715 #[tokio::test(flavor = "current_thread", start_paused = true)]
716 async fn read_wait_is_reset_by_delivered_follow_batch() {
717 let object_store = Arc::new(InMemory::new());
718 let db = Db::builder("/test", object_store).build().await.unwrap();
719 let backend = Backend::new(db, ByteSize::mib(10));
720
721 let basin: BasinName = "test-basin".parse().unwrap();
722 backend
723 .create_basin(
724 basin.clone(),
725 CreateBasinIntent::CreateOnly {
726 config: BasinConfig::default(),
727 request_token: None,
728 },
729 )
730 .await
731 .unwrap();
732 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
733 backend
734 .create_stream(
735 basin.clone(),
736 stream.clone(),
737 CreateStreamIntent::CreateOnly {
738 config: OptionalStreamConfig::default(),
739 request_token: None,
740 },
741 )
742 .await
743 .unwrap();
744
745 let initial_input =
746 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
747 backend
748 .open_for_append(&basin, &stream, None)
749 .await
750 .unwrap()
751 .append(initial_input)
752 .await
753 .unwrap();
754
755 let wait = Duration::from_millis(30);
756 let probe_step = Duration::from_millis(1);
757 let start = ReadStart {
758 from: ReadFrom::SeqNum(0),
759 clamp: false,
760 };
761 let end = ReadEnd {
762 limit: ReadLimit::Unbounded,
763 until: ReadUntil::Unbounded,
764 wait: Some(wait),
765 };
766
767 let session = backend
768 .open_for_read(&basin, &stream, None)
769 .await
770 .unwrap()
771 .read(start, end)
772 .await
773 .unwrap();
774 let mut session = Box::pin(session);
775
776 let first = session
777 .as_mut()
778 .next()
779 .await
780 .expect("session should yield the initial batch")
781 .expect("session should not error");
782 let ReadSessionOutput::Batch(batch) = first else {
783 panic!("expected initial batch");
784 };
785 let initial_record = batch
786 .records
787 .first()
788 .expect("batch should contain one record");
789 let Record::Envelope(initial_envelope) = initial_record.inner() else {
790 panic!("expected plaintext envelope record");
791 };
792 assert_eq!(initial_envelope.body().as_ref(), b"initial");
793
794 let second = session
795 .as_mut()
796 .next()
797 .await
798 .expect("session should enter follow mode")
799 .expect("session should not error");
800 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
801
802 tokio::time::advance(Duration::from_millis(20)).await;
803 tokio::task::yield_now().await;
804
805 let follow_input =
806 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
807 backend
808 .open_for_append(&basin, &stream, None)
809 .await
810 .unwrap()
811 .append(follow_input)
812 .await
813 .unwrap();
814
815 let follow = session
816 .as_mut()
817 .next()
818 .await
819 .expect("session should deliver the live batch")
820 .expect("session should not error");
821 let reset_at = Instant::now();
822 let ReadSessionOutput::Batch(batch) = follow else {
823 panic!("expected live batch after append");
824 };
825 let follow_record = batch
826 .records
827 .first()
828 .expect("batch should contain one record");
829 let Record::Envelope(follow_envelope) = follow_record.inner() else {
830 panic!("expected plaintext envelope record");
831 };
832 assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
833
834 tokio::time::advance(wait - probe_step).await;
835 tokio::task::yield_now().await;
836
837 loop {
838 match futures::poll!(session.as_mut().next()) {
839 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
840 Poll::Ready(Some(Ok(output))) => {
841 panic!("unexpected output before the reset wait deadline: {output:?}");
842 }
843 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
844 Poll::Ready(None) => {
845 panic!("session closed before the reset wait budget expired");
846 }
847 Poll::Pending => break,
848 }
849 }
850
851 tokio::time::advance(probe_step).await;
852 tokio::task::yield_now().await;
853
854 let closed_at = loop {
855 match futures::poll!(session.as_mut().next()) {
856 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
857 Poll::Ready(Some(Ok(output))) => {
858 panic!("unexpected output after the reset wait deadline: {output:?}");
859 }
860 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
861 Poll::Ready(None) => break Instant::now(),
862 Poll::Pending => {
863 panic!("session should close once the reset wait budget expires");
864 }
865 }
866 };
867
868 assert!(closed_at >= reset_at + wait);
869 assert!(closed_at <= reset_at + wait + probe_step);
870 }
871
872 #[tokio::test(flavor = "current_thread", start_paused = true)]
873 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
874 let object_store = Arc::new(InMemory::new());
875 let db = Db::builder("/test", object_store).build().await.unwrap();
876 let backend = Backend::new(db, ByteSize::mib(10));
877
878 let basin: BasinName = "test-basin".parse().unwrap();
879 backend
880 .create_basin(
881 basin.clone(),
882 CreateBasinIntent::CreateOnly {
883 config: BasinConfig::default(),
884 request_token: None,
885 },
886 )
887 .await
888 .unwrap();
889 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
890 backend
891 .create_stream(
892 basin.clone(),
893 stream.clone(),
894 CreateStreamIntent::CreateOnly {
895 config: OptionalStreamConfig::default(),
896 request_token: None,
897 },
898 )
899 .await
900 .unwrap();
901
902 let wait = Duration::from_secs(30);
903 let start = ReadStart {
904 from: ReadFrom::SeqNum(0),
905 clamp: false,
906 };
907 let end = ReadEnd {
908 limit: ReadLimit::Unbounded,
909 until: ReadUntil::Unbounded,
910 wait: Some(wait),
911 };
912 let session = backend
913 .open_for_read(&basin, &stream, None)
914 .await
915 .unwrap()
916 .read(start, end)
917 .await
918 .unwrap();
919 let mut session = Box::pin(session);
920
921 let first = session
922 .as_mut()
923 .next()
924 .await
925 .expect("session should enter follow mode")
926 .expect("session should not error");
927 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
928
929 let stream_id = StreamId::new(&basin, &stream);
930 let mut delete_batch = WriteBatch::new();
931 let lagged_appends = FOLLOWER_MAX_LAG + 25;
932
933 for i in 0..lagged_appends {
934 let input = append_input(
935 Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
936 );
937 let ack = backend
938 .open_for_append(&basin, &stream, None)
939 .await
940 .unwrap()
941 .append(input)
942 .await
943 .unwrap();
944 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
945 }
946
947 static WRITE_OPTS: WriteOptions = WriteOptions {
948 await_durable: true,
949 };
950 backend
951 .db
952 .write_with_options(delete_batch, &WRITE_OPTS)
953 .await
954 .unwrap();
955
956 tokio::time::advance(wait + Duration::from_secs(1)).await;
957 tokio::task::yield_now().await;
958
959 let next = session.as_mut().next().await;
960 assert!(
961 next.is_none(),
962 "session should close immediately once the original wait budget has elapsed"
963 );
964 }
965
966 #[tokio::test(flavor = "current_thread", start_paused = true)]
967 async fn unbounded_follow_survives_streamer_dormancy() {
968 let object_store = Arc::new(InMemory::new());
969 let db = Db::builder("/test", object_store).build().await.unwrap();
970 let backend = Backend::new(db, ByteSize::mib(10));
971
972 let basin: BasinName = "test-basin".parse().unwrap();
973 backend
974 .create_basin(
975 basin.clone(),
976 CreateBasinIntent::CreateOnly {
977 config: BasinConfig::default(),
978 request_token: None,
979 },
980 )
981 .await
982 .unwrap();
983 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
984 backend
985 .create_stream(
986 basin.clone(),
987 stream.clone(),
988 CreateStreamIntent::CreateOnly {
989 config: OptionalStreamConfig::default(),
990 request_token: None,
991 },
992 )
993 .await
994 .unwrap();
995
996 let initial_input =
997 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
998 backend
999 .open_for_append(&basin, &stream, None)
1000 .await
1001 .unwrap()
1002 .append(initial_input)
1003 .await
1004 .unwrap();
1005
1006 let start = ReadStart {
1007 from: ReadFrom::SeqNum(0),
1008 clamp: false,
1009 };
1010 let end = ReadEnd {
1011 limit: ReadLimit::Unbounded,
1012 until: ReadUntil::Unbounded,
1013 wait: None,
1014 };
1015 let session = backend
1016 .open_for_read(&basin, &stream, None)
1017 .await
1018 .unwrap()
1019 .read(start, end)
1020 .await
1021 .unwrap();
1022 let mut session = Box::pin(session);
1023
1024 let first = session
1025 .as_mut()
1026 .next()
1027 .await
1028 .expect("session should yield initial batch")
1029 .expect("session should not error");
1030 assert!(matches!(first, ReadSessionOutput::Batch(_)));
1031
1032 let second = session
1033 .as_mut()
1034 .next()
1035 .await
1036 .expect("session should enter follow mode")
1037 .expect("session should not error");
1038 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1039
1040 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1041 tokio::task::yield_now().await;
1042
1043 let follow_input =
1044 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1045 backend
1046 .open_for_append(&basin, &stream, None)
1047 .await
1048 .unwrap()
1049 .append(follow_input)
1050 .await
1051 .unwrap();
1052
1053 let next = session
1054 .as_mut()
1055 .next()
1056 .await
1057 .expect("session should stay open after dormancy")
1058 .expect("session should not error after dormancy");
1059 let ReadSessionOutput::Batch(batch) = next else {
1060 panic!("expected new batch after append");
1061 };
1062 assert_eq!(batch.records.len(), 1);
1063 let record = batch.records.first().expect("batch should have one record");
1064 let Record::Envelope(envelope) = record.inner() else {
1065 panic!("expected envelope record");
1066 };
1067 assert_eq!(envelope.body().as_ref(), b"follow-1");
1068 }
1069}