1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5 caps,
6 encryption::{EncryptionKey, EncryptionSpec},
7 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8 record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9 types::{
10 basin::BasinName,
11 stream::{
12 ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13 StoredReadSessionOutput, StreamName,
14 },
15 },
16};
17use slatedb::{
18 IterationOrder,
19 config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25 backend::{
26 error::{
27 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28 },
29 kv,
30 streamer::GuardedStreamerClient,
31 },
32 stream_id::StreamId,
33};
34
35impl Backend {
36 pub async fn open_for_check_tail(
37 &self,
38 basin: &BasinName,
39 stream: &StreamName,
40 ) -> Result<StreamHandle, CheckTailError> {
41 self.stream_handle_with_auto_create::<CheckTailError>(
42 basin,
43 stream,
44 |config| config.create_stream_on_read,
45 |_| Ok(EncryptionSpec::Plain),
46 )
47 .await
48 }
49
50 pub async fn open_for_read(
51 &self,
52 basin: &BasinName,
53 stream: &StreamName,
54 encryption_key: Option<EncryptionKey>,
55 ) -> Result<StreamHandle, ReadError> {
56 self.stream_handle_with_auto_create::<ReadError>(
57 basin,
58 stream,
59 |config| config.create_stream_on_read,
60 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61 )
62 .await
63 }
64}
65
66impl StreamHandle {
67 pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68 let tail = self.client.check_tail().await?;
69 Ok(tail)
70 }
71
72 pub async fn read(
73 self,
74 start: ReadStart,
75 end: ReadEnd,
76 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77 let stream_id = self.client.stream_id();
78 let session = read_session(self.db, self.client, start, end).await?;
79 Ok(async_stream::stream! {
80 tokio::pin!(session);
81 while let Some(output) = session.next().await {
82 let output = match output {
83 Ok(output) => output
84 .decrypt(&self.encryption, stream_id.as_bytes())
85 .map_err(ReadError::from),
86 Err(err) => Err(err),
87 };
88 let should_stop = output.is_err();
89 yield output;
90 if should_stop {
91 break;
92 }
93 }
94 })
95 }
96}
97
98async fn read_session(
99 db: slatedb::Db,
100 client: GuardedStreamerClient,
101 start: ReadStart,
102 end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104 let stream_id = client.stream_id();
105 let tail = client.check_tail().await?;
106 let mut state = ReadSessionState {
107 start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108 limit: EvaluatedReadLimit::Remaining(end.limit),
109 until: end.until,
110 wait: end.wait,
111 wait_deadline: None,
112 tail,
113 };
114 let session = async_stream::try_stream! {
115 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116 if state.start_seq_num < state.tail.seq_num {
117 let start_key = kv::stream_record_data::ser_key(
118 stream_id,
119 StreamPosition {
120 seq_num: state.start_seq_num,
121 timestamp: 0,
122 },
123 );
124 let end_key = kv::stream_record_data::ser_key(
125 stream_id,
126 StreamPosition {
127 seq_num: state.tail.seq_num,
128 timestamp: 0,
129 },
130 );
131 static SCAN_OPTS: ScanOptions = ScanOptions {
132 durability_filter: DurabilityLevel::Remote,
133 dirty: false,
134 read_ahead_bytes: 1024 * 1024,
135 cache_blocks: true,
136 max_fetch_tasks: 8,
137 order: IterationOrder::Ascending,
138 };
139 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141 let mut records = Metered::with_capacity(
142 limit.count()
143 .unwrap_or(usize::MAX)
144 .min(caps::RECORD_BATCH_MAX.count),
145 );
146
147 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148 let Some(kv) = it.next().await? else {
149 break;
150 };
151 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152 assert_eq!(deser_stream_id, stream_id);
153
154 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156 if end.until.deny(pos.timestamp)
157 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158 {
159 if records.is_empty() {
160 break 'session;
161 } else {
162 break;
163 }
164 }
165
166 if records.len() == caps::RECORD_BATCH_MAX.count
167 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168 {
169 let new_records_buf = Metered::with_capacity(
170 limit.count()
171 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172 .min(caps::RECORD_BATCH_MAX.count),
173 );
174 yield state.on_batch(StoredReadBatch {
175 records: std::mem::replace(&mut records, new_records_buf),
176 tail: None,
177 });
178 }
179
180 records.push(record);
181 }
182
183 if !records.is_empty() {
184 yield state.on_batch(StoredReadBatch {
185 records,
186 tail: None,
187 });
188 } else {
189 state.start_seq_num = state.tail.seq_num;
190 }
191 } else {
192 assert_eq!(state.start_seq_num, state.tail.seq_num);
193 if !end.may_follow() {
194 break;
195 }
196 match client.follow(state.start_seq_num).await? {
197 Ok(mut follow_rx) => {
198 state.arm_wait_deadline_if_unset();
200 if state.wait_deadline_expired() {
201 break;
202 }
203 yield StoredReadSessionOutput::Heartbeat(state.tail);
204 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205 tokio::select! {
206 biased;
207 msg = follow_rx.recv() => {
208 match msg {
209 Ok(mut records) => {
210 let count = records.len();
211 let tail = super::streamer::next_pos(&records);
212 let allowed_count = count_allowed_records(limit, end.until, &records);
213 if allowed_count > 0 {
214 yield state.on_batch(StoredReadBatch {
215 records: records.drain(..allowed_count).collect(),
216 tail: Some(tail),
217 });
218 }
219 if allowed_count < count {
220 break 'session;
221 }
222 Ok(())
223 }
224 Err(broadcast::error::RecvError::Lagged(_)) => {
225 continue 'session;
227 }
228 Err(broadcast::error::RecvError::Closed) => {
229 Err(StreamerMissingInActionError)
230 }
231 }
232 }
233 _ = new_heartbeat_sleep() => {
234 yield StoredReadSessionOutput::Heartbeat(state.tail);
235 Ok(())
236 }
237 _ = wait_sleep_until(state.wait_deadline) => {
238 break 'session;
239 }
240 }?;
241 }
242 }
243 Err(tail) => {
244 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245 state.tail = tail;
246 }
247 }
248 }
249 }
250 };
251 Ok(session)
252}
253
254async fn read_start_seq_num(
255 db: &slatedb::Db,
256 stream_id: StreamId,
257 start: ReadStart,
258 end: ReadEnd,
259 tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261 let mut read_pos = match start.from {
262 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264 ReadPosition::Timestamp(timestamp)
265 }
266 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268 }
269 };
270 if match read_pos {
271 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273 } {
274 if start.clamp {
275 read_pos = ReadPosition::SeqNum(tail.seq_num);
276 } else {
277 return Err(UnwrittenError(tail).into());
278 }
279 }
280 if let ReadPosition::SeqNum(start_seq_num) = read_pos
281 && start_seq_num == tail.seq_num
282 && !end.may_follow()
283 {
284 return Err(UnwrittenError(tail).into());
285 }
286 Ok(match read_pos {
287 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288 ReadPosition::Timestamp(start_timestamp) => {
289 resolve_timestamp(db, stream_id, start_timestamp)
290 .await?
291 .unwrap_or(tail)
292 .seq_num
293 }
294 })
295}
296
297async fn resolve_timestamp(
298 db: &slatedb::Db,
299 stream_id: StreamId,
300 timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302 let start_key = kv::stream_record_timestamp::ser_key(
303 stream_id,
304 StreamPosition {
305 seq_num: SeqNum::MIN,
306 timestamp,
307 },
308 );
309 let end_key = kv::stream_record_timestamp::ser_key(
310 stream_id,
311 StreamPosition {
312 seq_num: SeqNum::MAX,
313 timestamp: Timestamp::MAX,
314 },
315 );
316 static SCAN_OPTS: ScanOptions = ScanOptions {
317 durability_filter: DurabilityLevel::Remote,
318 dirty: false,
319 read_ahead_bytes: 1,
320 cache_blocks: false,
321 max_fetch_tasks: 1,
322 order: IterationOrder::Ascending,
323 };
324 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325 Ok(match it.next().await? {
326 Some(kv) => {
327 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328 assert_eq!(deser_stream_id, stream_id);
329 assert!(pos.timestamp >= timestamp);
330 kv::stream_record_timestamp::deser_value(kv.value)?;
331 Some(StreamPosition {
332 seq_num: pos.seq_num,
333 timestamp: pos.timestamp,
334 })
335 }
336 None => None,
337 })
338}
339
340struct ReadSessionState {
341 start_seq_num: u64,
342 limit: EvaluatedReadLimit,
343 until: ReadUntil,
344 wait: Option<Duration>,
345 wait_deadline: Option<Instant>,
346 tail: StreamPosition,
347}
348
349impl ReadSessionState {
350 fn arm_wait_deadline_if_unset(&mut self) {
351 if self.wait_deadline.is_none() {
352 self.reset_wait_deadline();
353 }
354 }
355
356 fn reset_wait_deadline(&mut self) {
357 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358 }
359
360 fn wait_deadline_expired(&self) -> bool {
361 self.wait_deadline
362 .is_some_and(|deadline| deadline <= Instant::now())
363 }
364
365 fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366 if let Some(tail) = batch.tail {
367 self.tail = tail;
368 }
369 let last_record = batch.records.last().expect("non-empty");
370 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371 panic!("batch after exhausted limit");
372 };
373 let count = batch.records.len();
374 let bytes = batch.records.metered_size();
375 let last_position = last_record.position();
376 assert!(limit.allow(count, bytes));
377 assert!(self.until.allow(last_position.timestamp));
378 self.start_seq_num = last_position.seq_num + 1;
379 self.limit = limit.remaining(count, bytes);
380 self.reset_wait_deadline();
381 StoredReadSessionOutput::Batch(batch)
382 }
383}
384
385fn count_allowed_records(
386 limit: ReadLimit,
387 until: ReadUntil,
388 records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390 let mut acc_size = 0;
391 let mut acc_count = 0;
392 for record in records {
393 if limit.deny(acc_count + 1, acc_size + record.metered_size())
394 || until.deny(record.position().timestamp)
395 {
396 break;
397 }
398 acc_count += 1;
399 acc_size += record.metered_size();
400 }
401 acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415 match deadline {
416 Some(deadline) => tokio::time::sleep_until(deadline).await,
417 None => {
418 std::future::pending::<()>().await;
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::{sync::Arc, task::Poll};
426
427 use bytesize::ByteSize;
428 use futures::StreamExt;
429 use s2_common::{
430 read_extent::{ReadLimit, ReadUntil},
431 record::{Metered, Record},
432 types::{
433 basin::BasinName,
434 config::{BasinConfig, OptionalStreamConfig},
435 resources::ProvisionMode,
436 stream::{
437 AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
438 ReadSessionOutput, ReadStart,
439 },
440 },
441 };
442 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
443 use tokio::time::Instant;
444
445 use super::*;
446 use crate::{
447 backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
448 stream_id::StreamId,
449 };
450
451 fn append_input(record: Record) -> AppendInput {
452 let record: AppendRecord = AppendRecordParts {
453 timestamp: None,
454 record: Metered::from(record),
455 }
456 .try_into()
457 .unwrap();
458 let records: AppendRecordBatch = vec![record].try_into().unwrap();
459 AppendInput {
460 records,
461 match_seq_num: None,
462 fencing_token: None,
463 }
464 }
465
466 fn map_test_output(
467 output: Option<Result<ReadSessionOutput, ReadError>>,
468 ) -> Option<ReadSessionOutput> {
469 match output {
470 Some(Ok(output)) => Some(output),
471 Some(Err(e)) => panic!("Read error: {e:?}"),
472 None => None,
473 }
474 }
475
476 async fn poll_next_after_advance<S>(
477 session: &mut std::pin::Pin<Box<S>>,
478 advance_by: Duration,
479 ) -> Poll<Option<ReadSessionOutput>>
480 where
481 S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
482 {
483 let mut pinned_session = session.as_mut();
484 let next = pinned_session.next();
485 tokio::pin!(next);
486
487 assert!(
488 matches!(futures::poll!(&mut next), Poll::Pending),
489 "session unexpectedly yielded before time advanced"
490 );
491
492 tokio::time::advance(advance_by).await;
493 tokio::task::yield_now().await;
494
495 match futures::poll!(&mut next) {
496 Poll::Ready(output) => Poll::Ready(map_test_output(output)),
497 Poll::Pending => Poll::Pending,
498 }
499 }
500
501 #[tokio::test]
502 async fn resolve_timestamp_bounded_to_stream() {
503 let object_store = Arc::new(InMemory::new());
504 let db = Db::builder("/test", object_store).build().await.unwrap();
505 let backend = Backend::new(db, ByteSize::mib(10));
506
507 let stream_a: StreamId = [0u8; 32].into();
508 let stream_b: StreamId = [1u8; 32].into();
509
510 backend
511 .db
512 .put(
513 kv::stream_record_timestamp::ser_key(
514 stream_a,
515 StreamPosition {
516 seq_num: 0,
517 timestamp: 1000,
518 },
519 ),
520 kv::stream_record_timestamp::ser_value(),
521 )
522 .await
523 .unwrap();
524 backend
525 .db
526 .put(
527 kv::stream_record_timestamp::ser_key(
528 stream_b,
529 StreamPosition {
530 seq_num: 0,
531 timestamp: 2000,
532 },
533 ),
534 kv::stream_record_timestamp::ser_value(),
535 )
536 .await
537 .unwrap();
538
539 let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
541 assert_eq!(
542 result,
543 Some(StreamPosition {
544 seq_num: 0,
545 timestamp: 1000
546 })
547 );
548
549 let result = resolve_timestamp(&backend.db, stream_a, 1500)
551 .await
552 .unwrap();
553 assert_eq!(result, None);
554 }
555
556 #[tokio::test]
557 async fn read_completes_when_all_records_deleted() {
558 let object_store = Arc::new(InMemory::new());
559 let db = Db::builder("/test", object_store).build().await.unwrap();
560 let backend = Backend::new(db, ByteSize::mib(10));
561
562 let basin: BasinName = "test-basin".parse().unwrap();
563 backend
564 .provision_basin(
565 basin.clone(),
566 BasinConfig::default(),
567 ProvisionMode::CreateOnly {
568 request_token: None,
569 },
570 )
571 .await
572 .unwrap();
573 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
574 backend
575 .provision_stream(
576 basin.clone(),
577 stream.clone(),
578 OptionalStreamConfig::default(),
579 ProvisionMode::CreateOnly {
580 request_token: None,
581 },
582 )
583 .await
584 .unwrap();
585
586 let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
587 let ack = backend
588 .open_for_append(&basin, &stream, None)
589 .await
590 .unwrap()
591 .append(input)
592 .await
593 .unwrap();
594 assert!(ack.end.seq_num > 0);
595
596 let stream_id = StreamId::new(&basin, &stream);
597 let mut batch = WriteBatch::new();
598 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
599 static WRITE_OPTS: WriteOptions = WriteOptions {
600 await_durable: true,
601 };
602 backend
603 .db
604 .write_with_options(batch, &WRITE_OPTS)
605 .await
606 .unwrap();
607
608 let start = ReadStart {
609 from: ReadFrom::SeqNum(0),
610 clamp: false,
611 };
612 let end = ReadEnd {
613 limit: ReadLimit::Count(10),
614 until: ReadUntil::Unbounded,
615 wait: None,
616 };
617 let session = backend
618 .open_for_read(&basin, &stream, None)
619 .await
620 .unwrap()
621 .read(start, end)
622 .await
623 .unwrap();
624 let records: Vec<_> = tokio::time::timeout(
625 Duration::from_secs(2),
626 futures::StreamExt::collect::<Vec<_>>(session),
627 )
628 .await
629 .expect("read should not spin forever");
630 assert!(records.into_iter().all(|r| r.is_ok()));
631 }
632
633 #[tokio::test(flavor = "current_thread", start_paused = true)]
634 async fn read_wait_is_not_extended_by_heartbeats() {
635 let object_store = Arc::new(InMemory::new());
636 let db = Db::builder("/test", object_store).build().await.unwrap();
637 let backend = Backend::new(db, ByteSize::mib(10));
638
639 let basin: BasinName = "test-basin".parse().unwrap();
640 backend
641 .provision_basin(
642 basin.clone(),
643 BasinConfig::default(),
644 ProvisionMode::CreateOnly {
645 request_token: None,
646 },
647 )
648 .await
649 .unwrap();
650 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
651 backend
652 .provision_stream(
653 basin.clone(),
654 stream.clone(),
655 OptionalStreamConfig::default(),
656 ProvisionMode::CreateOnly {
657 request_token: None,
658 },
659 )
660 .await
661 .unwrap();
662
663 let wait = Duration::from_millis(30);
664 let start = ReadStart {
665 from: ReadFrom::SeqNum(0),
666 clamp: false,
667 };
668 let end = ReadEnd {
669 limit: ReadLimit::Unbounded,
670 until: ReadUntil::Unbounded,
671 wait: Some(wait),
672 };
673
674 let session = backend
675 .open_for_read(&basin, &stream, None)
676 .await
677 .unwrap()
678 .read(start, end)
679 .await
680 .unwrap();
681 let mut session = Box::pin(session);
682 let probe_step = Duration::from_millis(1);
683 let first = session
684 .as_mut()
685 .next()
686 .await
687 .expect("session should enter follow mode")
688 .expect("session should not error");
689 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
690
691 let started = Instant::now();
692 let second = match poll_next_after_advance(&mut session, wait).await {
693 Poll::Ready(Some(output)) => output,
694 Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
695 Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
696 };
697 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
698
699 tokio::task::yield_now().await;
700 let closed_at = loop {
701 match futures::poll!(session.as_mut().next()) {
702 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
703 Poll::Ready(Some(Ok(output))) => {
704 panic!("unexpected output after wait deadline: {output:?}");
705 }
706 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
707 Poll::Ready(None) => break Instant::now(),
708 Poll::Pending => panic!("session should close once the wait budget expires"),
709 }
710 };
711
712 assert!(closed_at >= started + wait);
713 assert!(closed_at <= started + wait + probe_step);
714 }
715
716 #[tokio::test(flavor = "current_thread", start_paused = true)]
717 async fn read_wait_is_reset_by_delivered_follow_batch() {
718 let object_store = Arc::new(InMemory::new());
719 let db = Db::builder("/test", object_store).build().await.unwrap();
720 let backend = Backend::new(db, ByteSize::mib(10));
721
722 let basin: BasinName = "test-basin".parse().unwrap();
723 backend
724 .provision_basin(
725 basin.clone(),
726 BasinConfig::default(),
727 ProvisionMode::CreateOnly {
728 request_token: None,
729 },
730 )
731 .await
732 .unwrap();
733 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
734 backend
735 .provision_stream(
736 basin.clone(),
737 stream.clone(),
738 OptionalStreamConfig::default(),
739 ProvisionMode::CreateOnly {
740 request_token: None,
741 },
742 )
743 .await
744 .unwrap();
745
746 let initial_input =
747 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
748 backend
749 .open_for_append(&basin, &stream, None)
750 .await
751 .unwrap()
752 .append(initial_input)
753 .await
754 .unwrap();
755
756 let wait = Duration::from_millis(30);
757 let probe_step = Duration::from_millis(1);
758 let start = ReadStart {
759 from: ReadFrom::SeqNum(0),
760 clamp: false,
761 };
762 let end = ReadEnd {
763 limit: ReadLimit::Unbounded,
764 until: ReadUntil::Unbounded,
765 wait: Some(wait),
766 };
767
768 let session = backend
769 .open_for_read(&basin, &stream, None)
770 .await
771 .unwrap()
772 .read(start, end)
773 .await
774 .unwrap();
775 let mut session = Box::pin(session);
776
777 let first = session
778 .as_mut()
779 .next()
780 .await
781 .expect("session should yield the initial batch")
782 .expect("session should not error");
783 let ReadSessionOutput::Batch(batch) = first else {
784 panic!("expected initial batch");
785 };
786 let initial_record = batch
787 .records
788 .first()
789 .expect("batch should contain one record");
790 let Record::Envelope(initial_envelope) = initial_record.inner() else {
791 panic!("expected plaintext envelope record");
792 };
793 assert_eq!(initial_envelope.body().as_ref(), b"initial");
794
795 let second = session
796 .as_mut()
797 .next()
798 .await
799 .expect("session should enter follow mode")
800 .expect("session should not error");
801 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
802
803 tokio::time::advance(Duration::from_millis(20)).await;
804 tokio::task::yield_now().await;
805
806 let follow_input =
807 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
808 backend
809 .open_for_append(&basin, &stream, None)
810 .await
811 .unwrap()
812 .append(follow_input)
813 .await
814 .unwrap();
815
816 let follow = session
817 .as_mut()
818 .next()
819 .await
820 .expect("session should deliver the live batch")
821 .expect("session should not error");
822 let reset_at = Instant::now();
823 let ReadSessionOutput::Batch(batch) = follow else {
824 panic!("expected live batch after append");
825 };
826 let follow_record = batch
827 .records
828 .first()
829 .expect("batch should contain one record");
830 let Record::Envelope(follow_envelope) = follow_record.inner() else {
831 panic!("expected plaintext envelope record");
832 };
833 assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
834
835 tokio::time::advance(wait - probe_step).await;
836 tokio::task::yield_now().await;
837
838 loop {
839 match futures::poll!(session.as_mut().next()) {
840 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
841 Poll::Ready(Some(Ok(output))) => {
842 panic!("unexpected output before the reset wait deadline: {output:?}");
843 }
844 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
845 Poll::Ready(None) => {
846 panic!("session closed before the reset wait budget expired");
847 }
848 Poll::Pending => break,
849 }
850 }
851
852 tokio::time::advance(probe_step).await;
853 tokio::task::yield_now().await;
854
855 let closed_at = loop {
856 match futures::poll!(session.as_mut().next()) {
857 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
858 Poll::Ready(Some(Ok(output))) => {
859 panic!("unexpected output after the reset wait deadline: {output:?}");
860 }
861 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
862 Poll::Ready(None) => break Instant::now(),
863 Poll::Pending => {
864 panic!("session should close once the reset wait budget expires");
865 }
866 }
867 };
868
869 assert!(closed_at >= reset_at + wait);
870 assert!(closed_at <= reset_at + wait + probe_step);
871 }
872
873 #[tokio::test(flavor = "current_thread", start_paused = true)]
874 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
875 let object_store = Arc::new(InMemory::new());
876 let db = Db::builder("/test", object_store).build().await.unwrap();
877 let backend = Backend::new(db, ByteSize::mib(10));
878
879 let basin: BasinName = "test-basin".parse().unwrap();
880 backend
881 .provision_basin(
882 basin.clone(),
883 BasinConfig::default(),
884 ProvisionMode::CreateOnly {
885 request_token: None,
886 },
887 )
888 .await
889 .unwrap();
890 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
891 backend
892 .provision_stream(
893 basin.clone(),
894 stream.clone(),
895 OptionalStreamConfig::default(),
896 ProvisionMode::CreateOnly {
897 request_token: None,
898 },
899 )
900 .await
901 .unwrap();
902
903 let wait = Duration::from_secs(30);
904 let start = ReadStart {
905 from: ReadFrom::SeqNum(0),
906 clamp: false,
907 };
908 let end = ReadEnd {
909 limit: ReadLimit::Unbounded,
910 until: ReadUntil::Unbounded,
911 wait: Some(wait),
912 };
913 let session = backend
914 .open_for_read(&basin, &stream, None)
915 .await
916 .unwrap()
917 .read(start, end)
918 .await
919 .unwrap();
920 let mut session = Box::pin(session);
921
922 let first = session
923 .as_mut()
924 .next()
925 .await
926 .expect("session should enter follow mode")
927 .expect("session should not error");
928 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
929
930 let stream_id = StreamId::new(&basin, &stream);
931 let mut delete_batch = WriteBatch::new();
932 let lagged_appends = FOLLOWER_MAX_LAG + 25;
933
934 for i in 0..lagged_appends {
935 let input = append_input(
936 Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
937 );
938 let ack = backend
939 .open_for_append(&basin, &stream, None)
940 .await
941 .unwrap()
942 .append(input)
943 .await
944 .unwrap();
945 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
946 }
947
948 static WRITE_OPTS: WriteOptions = WriteOptions {
949 await_durable: true,
950 };
951 backend
952 .db
953 .write_with_options(delete_batch, &WRITE_OPTS)
954 .await
955 .unwrap();
956
957 tokio::time::advance(wait + Duration::from_secs(1)).await;
958 tokio::task::yield_now().await;
959
960 let next = session.as_mut().next().await;
961 assert!(
962 next.is_none(),
963 "session should close immediately once the original wait budget has elapsed"
964 );
965 }
966
967 #[tokio::test(flavor = "current_thread", start_paused = true)]
968 async fn unbounded_follow_survives_streamer_dormancy() {
969 let object_store = Arc::new(InMemory::new());
970 let db = Db::builder("/test", object_store).build().await.unwrap();
971 let backend = Backend::new(db, ByteSize::mib(10));
972
973 let basin: BasinName = "test-basin".parse().unwrap();
974 backend
975 .provision_basin(
976 basin.clone(),
977 BasinConfig::default(),
978 ProvisionMode::CreateOnly {
979 request_token: None,
980 },
981 )
982 .await
983 .unwrap();
984 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
985 backend
986 .provision_stream(
987 basin.clone(),
988 stream.clone(),
989 OptionalStreamConfig::default(),
990 ProvisionMode::CreateOnly {
991 request_token: None,
992 },
993 )
994 .await
995 .unwrap();
996
997 let initial_input =
998 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
999 backend
1000 .open_for_append(&basin, &stream, None)
1001 .await
1002 .unwrap()
1003 .append(initial_input)
1004 .await
1005 .unwrap();
1006
1007 let start = ReadStart {
1008 from: ReadFrom::SeqNum(0),
1009 clamp: false,
1010 };
1011 let end = ReadEnd {
1012 limit: ReadLimit::Unbounded,
1013 until: ReadUntil::Unbounded,
1014 wait: None,
1015 };
1016 let session = backend
1017 .open_for_read(&basin, &stream, None)
1018 .await
1019 .unwrap()
1020 .read(start, end)
1021 .await
1022 .unwrap();
1023 let mut session = Box::pin(session);
1024
1025 let first = session
1026 .as_mut()
1027 .next()
1028 .await
1029 .expect("session should yield initial batch")
1030 .expect("session should not error");
1031 assert!(matches!(first, ReadSessionOutput::Batch(_)));
1032
1033 let second = session
1034 .as_mut()
1035 .next()
1036 .await
1037 .expect("session should enter follow mode")
1038 .expect("session should not error");
1039 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1040
1041 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1042 tokio::task::yield_now().await;
1043
1044 let follow_input =
1045 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1046 backend
1047 .open_for_append(&basin, &stream, None)
1048 .await
1049 .unwrap()
1050 .append(follow_input)
1051 .await
1052 .unwrap();
1053
1054 let next = session
1055 .as_mut()
1056 .next()
1057 .await
1058 .expect("session should stay open after dormancy")
1059 .expect("session should not error after dormancy");
1060 let ReadSessionOutput::Batch(batch) = next else {
1061 panic!("expected new batch after append");
1062 };
1063 assert_eq!(batch.records.len(), 1);
1064 let record = batch.records.first().expect("batch should have one record");
1065 let Record::Envelope(envelope) = record.inner() else {
1066 panic!("expected envelope record");
1067 };
1068 assert_eq!(envelope.body().as_ref(), b"follow-1");
1069 }
1070}