1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5 caps,
6 encryption::{EncryptionKey, EncryptionSpec},
7 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8 record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9 types::{
10 basin::BasinName,
11 stream::{
12 ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13 StoredReadSessionOutput, StreamName,
14 },
15 },
16};
17use slatedb::{
18 IterationOrder,
19 config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25 backend::{
26 error::{
27 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28 },
29 kv,
30 streamer::GuardedStreamerClient,
31 },
32 stream_id::StreamId,
33};
34
35impl Backend {
36 pub async fn open_for_check_tail(
37 &self,
38 basin: &BasinName,
39 stream: &StreamName,
40 ) -> Result<StreamHandle, CheckTailError> {
41 self.stream_handle_with_auto_create::<CheckTailError>(
42 basin,
43 stream,
44 |config| config.create_stream_on_read,
45 |_| Ok(EncryptionSpec::Plain),
46 )
47 .await
48 }
49
50 pub async fn open_for_read(
51 &self,
52 basin: &BasinName,
53 stream: &StreamName,
54 encryption_key: Option<EncryptionKey>,
55 ) -> Result<StreamHandle, ReadError> {
56 self.stream_handle_with_auto_create::<ReadError>(
57 basin,
58 stream,
59 |config| config.create_stream_on_read,
60 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61 )
62 .await
63 }
64}
65
66impl StreamHandle {
67 pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68 let tail = self.client.check_tail().await?;
69 Ok(tail)
70 }
71
72 pub async fn read(
73 self,
74 start: ReadStart,
75 end: ReadEnd,
76 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77 let stream_id = self.client.stream_id();
78 let session = read_session(self.db, self.client, start, end).await?;
79 Ok(async_stream::stream! {
80 tokio::pin!(session);
81 while let Some(output) = session.next().await {
82 let output = match output {
83 Ok(output) => output
84 .decrypt(&self.encryption, stream_id.as_bytes())
85 .map_err(ReadError::from),
86 Err(err) => Err(err),
87 };
88 let should_stop = output.is_err();
89 yield output;
90 if should_stop {
91 break;
92 }
93 }
94 })
95 }
96}
97
98async fn read_session(
99 db: slatedb::Db,
100 client: GuardedStreamerClient,
101 start: ReadStart,
102 end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104 let stream_id = client.stream_id();
105 let tail = client.check_tail().await?;
106 let mut state = ReadSessionState {
107 start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108 limit: EvaluatedReadLimit::Remaining(end.limit),
109 until: end.until,
110 wait: end.wait,
111 wait_deadline: None,
112 tail,
113 };
114 let session = async_stream::try_stream! {
115 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116 if state.start_seq_num < state.tail.seq_num {
117 let start_key = kv::stream_record_data::ser_key(
118 stream_id,
119 StreamPosition {
120 seq_num: state.start_seq_num,
121 timestamp: 0,
122 },
123 );
124 let end_key = kv::stream_record_data::ser_key(
125 stream_id,
126 StreamPosition {
127 seq_num: state.tail.seq_num,
128 timestamp: 0,
129 },
130 );
131 static SCAN_OPTS: ScanOptions = ScanOptions {
132 durability_filter: DurabilityLevel::Remote,
133 dirty: false,
134 read_ahead_bytes: 1024 * 1024,
135 cache_blocks: true,
136 max_fetch_tasks: 8,
137 order: IterationOrder::Ascending,
138 };
139 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141 let mut records = Metered::with_capacity(
142 limit.count()
143 .unwrap_or(usize::MAX)
144 .min(caps::RECORD_BATCH_MAX.count),
145 );
146
147 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148 let Some(kv) = it.next().await? else {
149 break;
150 };
151 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152 assert_eq!(deser_stream_id, stream_id);
153
154 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156 if end.until.deny(pos.timestamp)
157 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158 {
159 if records.is_empty() {
160 break 'session;
161 } else {
162 break;
163 }
164 }
165
166 if records.len() == caps::RECORD_BATCH_MAX.count
167 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168 {
169 let new_records_buf = Metered::with_capacity(
170 limit.count()
171 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172 .min(caps::RECORD_BATCH_MAX.count),
173 );
174 yield state.on_batch(StoredReadBatch {
175 records: std::mem::replace(&mut records, new_records_buf),
176 tail: None,
177 });
178 }
179
180 records.push(record);
181 }
182
183 if !records.is_empty() {
184 yield state.on_batch(StoredReadBatch {
185 records,
186 tail: None,
187 });
188 } else {
189 state.start_seq_num = state.tail.seq_num;
190 }
191 } else {
192 assert_eq!(state.start_seq_num, state.tail.seq_num);
193 if !end.may_follow() {
194 break;
195 }
196 match client.follow(state.start_seq_num).await? {
197 Ok(mut follow_rx) => {
198 state.arm_wait_deadline_if_unset();
200 if state.wait_deadline_expired() {
201 break;
202 }
203 yield StoredReadSessionOutput::Heartbeat(state.tail);
204 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205 tokio::select! {
206 biased;
207 msg = follow_rx.recv() => {
208 match msg {
209 Ok(mut records) => {
210 let count = records.len();
211 let tail = super::streamer::next_pos(&records);
212 let allowed_count = count_allowed_records(limit, end.until, &records);
213 if allowed_count > 0 {
214 yield state.on_batch(StoredReadBatch {
215 records: records.drain(..allowed_count).collect(),
216 tail: Some(tail),
217 });
218 }
219 if allowed_count < count {
220 break 'session;
221 }
222 Ok(())
223 }
224 Err(broadcast::error::RecvError::Lagged(_)) => {
225 continue 'session;
227 }
228 Err(broadcast::error::RecvError::Closed) => {
229 Err(StreamerMissingInActionError)
230 }
231 }
232 }
233 _ = new_heartbeat_sleep() => {
234 yield StoredReadSessionOutput::Heartbeat(state.tail);
235 Ok(())
236 }
237 _ = wait_sleep_until(state.wait_deadline) => {
238 break 'session;
239 }
240 }?;
241 }
242 }
243 Err(tail) => {
244 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245 state.tail = tail;
246 }
247 }
248 }
249 }
250 };
251 Ok(session)
252}
253
254async fn read_start_seq_num(
255 db: &slatedb::Db,
256 stream_id: StreamId,
257 start: ReadStart,
258 end: ReadEnd,
259 tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261 let mut read_pos = match start.from {
262 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264 ReadPosition::Timestamp(timestamp)
265 }
266 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268 }
269 };
270 if match read_pos {
271 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273 } {
274 if start.clamp {
275 read_pos = ReadPosition::SeqNum(tail.seq_num);
276 } else {
277 return Err(UnwrittenError(tail).into());
278 }
279 }
280 if let ReadPosition::SeqNum(start_seq_num) = read_pos
281 && start_seq_num == tail.seq_num
282 && !end.may_follow()
283 {
284 return Err(UnwrittenError(tail).into());
285 }
286 Ok(match read_pos {
287 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288 ReadPosition::Timestamp(start_timestamp) => {
289 resolve_timestamp(db, stream_id, start_timestamp)
290 .await?
291 .unwrap_or(tail)
292 .seq_num
293 }
294 })
295}
296
297async fn resolve_timestamp(
298 db: &slatedb::Db,
299 stream_id: StreamId,
300 timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302 let start_key = kv::stream_record_timestamp::ser_key(
303 stream_id,
304 StreamPosition {
305 seq_num: SeqNum::MIN,
306 timestamp,
307 },
308 );
309 let end_key = kv::stream_record_timestamp::ser_key(
310 stream_id,
311 StreamPosition {
312 seq_num: SeqNum::MAX,
313 timestamp: Timestamp::MAX,
314 },
315 );
316 static SCAN_OPTS: ScanOptions = ScanOptions {
317 durability_filter: DurabilityLevel::Remote,
318 dirty: false,
319 read_ahead_bytes: 1,
320 cache_blocks: false,
321 max_fetch_tasks: 1,
322 order: IterationOrder::Ascending,
323 };
324 let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325 Ok(match it.next().await? {
326 Some(kv) => {
327 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328 assert_eq!(deser_stream_id, stream_id);
329 assert!(pos.timestamp >= timestamp);
330 kv::stream_record_timestamp::deser_value(kv.value)?;
331 Some(StreamPosition {
332 seq_num: pos.seq_num,
333 timestamp: pos.timestamp,
334 })
335 }
336 None => None,
337 })
338}
339
340struct ReadSessionState {
341 start_seq_num: u64,
342 limit: EvaluatedReadLimit,
343 until: ReadUntil,
344 wait: Option<Duration>,
345 wait_deadline: Option<Instant>,
346 tail: StreamPosition,
347}
348
349impl ReadSessionState {
350 fn arm_wait_deadline_if_unset(&mut self) {
351 if self.wait_deadline.is_none() {
352 self.reset_wait_deadline();
353 }
354 }
355
356 fn reset_wait_deadline(&mut self) {
357 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358 }
359
360 fn wait_deadline_expired(&self) -> bool {
361 self.wait_deadline
362 .is_some_and(|deadline| deadline <= Instant::now())
363 }
364
365 fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366 if let Some(tail) = batch.tail {
367 self.tail = tail;
368 }
369 let last_record = batch.records.last().expect("non-empty");
370 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371 panic!("batch after exhausted limit");
372 };
373 let count = batch.records.len();
374 let bytes = batch.records.metered_size();
375 let last_position = last_record.position();
376 assert!(limit.allow(count, bytes));
377 assert!(self.until.allow(last_position.timestamp));
378 self.start_seq_num = last_position.seq_num + 1;
379 self.limit = limit.remaining(count, bytes);
380 self.reset_wait_deadline();
381 StoredReadSessionOutput::Batch(batch)
382 }
383}
384
385fn count_allowed_records(
386 limit: ReadLimit,
387 until: ReadUntil,
388 records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390 let mut acc_size = 0;
391 let mut acc_count = 0;
392 for record in records {
393 if limit.deny(acc_count + 1, acc_size + record.metered_size())
394 || until.deny(record.position().timestamp)
395 {
396 break;
397 }
398 acc_count += 1;
399 acc_size += record.metered_size();
400 }
401 acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415 match deadline {
416 Some(deadline) => tokio::time::sleep_until(deadline).await,
417 None => {
418 std::future::pending::<()>().await;
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::{sync::Arc, task::Poll};
426
427 use bytesize::ByteSize;
428 use futures::StreamExt;
429 use s2_common::{
430 read_extent::{ReadLimit, ReadUntil},
431 record::{Metered, Record},
432 types::{
433 basin::BasinName,
434 config::{BasinConfig, OptionalStreamConfig},
435 resources::CreateMode,
436 stream::{
437 AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
438 ReadSessionOutput, ReadStart,
439 },
440 },
441 };
442 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
443 use tokio::time::Instant;
444
445 use super::*;
446 use crate::{
447 backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
448 stream_id::StreamId,
449 };
450
451 fn append_input(record: Record) -> AppendInput {
452 let record: AppendRecord = AppendRecordParts {
453 timestamp: None,
454 record: Metered::from(record),
455 }
456 .try_into()
457 .unwrap();
458 let records: AppendRecordBatch = vec![record].try_into().unwrap();
459 AppendInput {
460 records,
461 match_seq_num: None,
462 fencing_token: None,
463 }
464 }
465
466 fn map_test_output(
467 output: Option<Result<ReadSessionOutput, ReadError>>,
468 ) -> Option<ReadSessionOutput> {
469 match output {
470 Some(Ok(output)) => Some(output),
471 Some(Err(e)) => panic!("Read error: {e:?}"),
472 None => None,
473 }
474 }
475
476 async fn poll_next_after_advance<S>(
477 session: &mut std::pin::Pin<Box<S>>,
478 advance_by: Duration,
479 ) -> Poll<Option<ReadSessionOutput>>
480 where
481 S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
482 {
483 let mut pinned_session = session.as_mut();
484 let next = pinned_session.next();
485 tokio::pin!(next);
486
487 assert!(
488 matches!(futures::poll!(&mut next), Poll::Pending),
489 "session unexpectedly yielded before time advanced"
490 );
491
492 tokio::time::advance(advance_by).await;
493 tokio::task::yield_now().await;
494
495 match futures::poll!(&mut next) {
496 Poll::Ready(output) => Poll::Ready(map_test_output(output)),
497 Poll::Pending => Poll::Pending,
498 }
499 }
500
501 #[tokio::test]
502 async fn resolve_timestamp_bounded_to_stream() {
503 let object_store = Arc::new(InMemory::new());
504 let db = Db::builder("/test", object_store).build().await.unwrap();
505 let backend = Backend::new(db, ByteSize::mib(10));
506
507 let stream_a: StreamId = [0u8; 32].into();
508 let stream_b: StreamId = [1u8; 32].into();
509
510 backend
511 .db
512 .put(
513 kv::stream_record_timestamp::ser_key(
514 stream_a,
515 StreamPosition {
516 seq_num: 0,
517 timestamp: 1000,
518 },
519 ),
520 kv::stream_record_timestamp::ser_value(),
521 )
522 .await
523 .unwrap();
524 backend
525 .db
526 .put(
527 kv::stream_record_timestamp::ser_key(
528 stream_b,
529 StreamPosition {
530 seq_num: 0,
531 timestamp: 2000,
532 },
533 ),
534 kv::stream_record_timestamp::ser_value(),
535 )
536 .await
537 .unwrap();
538
539 let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
541 assert_eq!(
542 result,
543 Some(StreamPosition {
544 seq_num: 0,
545 timestamp: 1000
546 })
547 );
548
549 let result = resolve_timestamp(&backend.db, stream_a, 1500)
551 .await
552 .unwrap();
553 assert_eq!(result, None);
554 }
555
556 #[tokio::test]
557 async fn read_completes_when_all_records_deleted() {
558 let object_store = Arc::new(InMemory::new());
559 let db = Db::builder("/test", object_store).build().await.unwrap();
560 let backend = Backend::new(db, ByteSize::mib(10));
561
562 let basin: BasinName = "test-basin".parse().unwrap();
563 backend
564 .create_basin(
565 basin.clone(),
566 BasinConfig::default(),
567 CreateMode::CreateOnly(None),
568 )
569 .await
570 .unwrap();
571 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
572 backend
573 .create_stream(
574 basin.clone(),
575 stream.clone(),
576 OptionalStreamConfig::default(),
577 CreateMode::CreateOnly(None),
578 )
579 .await
580 .unwrap();
581
582 let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
583 let ack = backend
584 .open_for_append(&basin, &stream, None)
585 .await
586 .unwrap()
587 .append(input)
588 .await
589 .unwrap();
590 assert!(ack.end.seq_num > 0);
591
592 let stream_id = StreamId::new(&basin, &stream);
593 let mut batch = WriteBatch::new();
594 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
595 static WRITE_OPTS: WriteOptions = WriteOptions {
596 await_durable: true,
597 };
598 backend
599 .db
600 .write_with_options(batch, &WRITE_OPTS)
601 .await
602 .unwrap();
603
604 let start = ReadStart {
605 from: ReadFrom::SeqNum(0),
606 clamp: false,
607 };
608 let end = ReadEnd {
609 limit: ReadLimit::Count(10),
610 until: ReadUntil::Unbounded,
611 wait: None,
612 };
613 let session = backend
614 .open_for_read(&basin, &stream, None)
615 .await
616 .unwrap()
617 .read(start, end)
618 .await
619 .unwrap();
620 let records: Vec<_> = tokio::time::timeout(
621 Duration::from_secs(2),
622 futures::StreamExt::collect::<Vec<_>>(session),
623 )
624 .await
625 .expect("read should not spin forever");
626 assert!(records.into_iter().all(|r| r.is_ok()));
627 }
628
629 #[tokio::test(flavor = "current_thread", start_paused = true)]
630 async fn read_wait_is_not_extended_by_heartbeats() {
631 let object_store = Arc::new(InMemory::new());
632 let db = Db::builder("/test", object_store).build().await.unwrap();
633 let backend = Backend::new(db, ByteSize::mib(10));
634
635 let basin: BasinName = "test-basin".parse().unwrap();
636 backend
637 .create_basin(
638 basin.clone(),
639 BasinConfig::default(),
640 CreateMode::CreateOnly(None),
641 )
642 .await
643 .unwrap();
644 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
645 backend
646 .create_stream(
647 basin.clone(),
648 stream.clone(),
649 OptionalStreamConfig::default(),
650 CreateMode::CreateOnly(None),
651 )
652 .await
653 .unwrap();
654
655 let wait = Duration::from_millis(30);
656 let start = ReadStart {
657 from: ReadFrom::SeqNum(0),
658 clamp: false,
659 };
660 let end = ReadEnd {
661 limit: ReadLimit::Unbounded,
662 until: ReadUntil::Unbounded,
663 wait: Some(wait),
664 };
665
666 let session = backend
667 .open_for_read(&basin, &stream, None)
668 .await
669 .unwrap()
670 .read(start, end)
671 .await
672 .unwrap();
673 let mut session = Box::pin(session);
674 let probe_step = Duration::from_millis(1);
675 let first = session
676 .as_mut()
677 .next()
678 .await
679 .expect("session should enter follow mode")
680 .expect("session should not error");
681 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
682
683 let started = Instant::now();
684 let second = match poll_next_after_advance(&mut session, wait).await {
685 Poll::Ready(Some(output)) => output,
686 Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
687 Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
688 };
689 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
690
691 tokio::task::yield_now().await;
692 let closed_at = loop {
693 match futures::poll!(session.as_mut().next()) {
694 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
695 Poll::Ready(Some(Ok(output))) => {
696 panic!("unexpected output after wait deadline: {output:?}");
697 }
698 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
699 Poll::Ready(None) => break Instant::now(),
700 Poll::Pending => panic!("session should close once the wait budget expires"),
701 }
702 };
703
704 assert!(closed_at >= started + wait);
705 assert!(closed_at <= started + wait + probe_step);
706 }
707
708 #[tokio::test(flavor = "current_thread", start_paused = true)]
709 async fn read_wait_is_reset_by_delivered_follow_batch() {
710 let object_store = Arc::new(InMemory::new());
711 let db = Db::builder("/test", object_store).build().await.unwrap();
712 let backend = Backend::new(db, ByteSize::mib(10));
713
714 let basin: BasinName = "test-basin".parse().unwrap();
715 backend
716 .create_basin(
717 basin.clone(),
718 BasinConfig::default(),
719 CreateMode::CreateOnly(None),
720 )
721 .await
722 .unwrap();
723 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
724 backend
725 .create_stream(
726 basin.clone(),
727 stream.clone(),
728 OptionalStreamConfig::default(),
729 CreateMode::CreateOnly(None),
730 )
731 .await
732 .unwrap();
733
734 let initial_input =
735 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
736 backend
737 .open_for_append(&basin, &stream, None)
738 .await
739 .unwrap()
740 .append(initial_input)
741 .await
742 .unwrap();
743
744 let wait = Duration::from_millis(30);
745 let probe_step = Duration::from_millis(1);
746 let start = ReadStart {
747 from: ReadFrom::SeqNum(0),
748 clamp: false,
749 };
750 let end = ReadEnd {
751 limit: ReadLimit::Unbounded,
752 until: ReadUntil::Unbounded,
753 wait: Some(wait),
754 };
755
756 let session = backend
757 .open_for_read(&basin, &stream, None)
758 .await
759 .unwrap()
760 .read(start, end)
761 .await
762 .unwrap();
763 let mut session = Box::pin(session);
764
765 let first = session
766 .as_mut()
767 .next()
768 .await
769 .expect("session should yield the initial batch")
770 .expect("session should not error");
771 let ReadSessionOutput::Batch(batch) = first else {
772 panic!("expected initial batch");
773 };
774 let initial_record = batch
775 .records
776 .first()
777 .expect("batch should contain one record");
778 let Record::Envelope(initial_envelope) = initial_record.inner() else {
779 panic!("expected plaintext envelope record");
780 };
781 assert_eq!(initial_envelope.body().as_ref(), b"initial");
782
783 let second = session
784 .as_mut()
785 .next()
786 .await
787 .expect("session should enter follow mode")
788 .expect("session should not error");
789 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
790
791 tokio::time::advance(Duration::from_millis(20)).await;
792 tokio::task::yield_now().await;
793
794 let follow_input =
795 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
796 backend
797 .open_for_append(&basin, &stream, None)
798 .await
799 .unwrap()
800 .append(follow_input)
801 .await
802 .unwrap();
803
804 let follow = session
805 .as_mut()
806 .next()
807 .await
808 .expect("session should deliver the live batch")
809 .expect("session should not error");
810 let reset_at = Instant::now();
811 let ReadSessionOutput::Batch(batch) = follow else {
812 panic!("expected live batch after append");
813 };
814 let follow_record = batch
815 .records
816 .first()
817 .expect("batch should contain one record");
818 let Record::Envelope(follow_envelope) = follow_record.inner() else {
819 panic!("expected plaintext envelope record");
820 };
821 assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
822
823 tokio::time::advance(wait - probe_step).await;
824 tokio::task::yield_now().await;
825
826 loop {
827 match futures::poll!(session.as_mut().next()) {
828 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
829 Poll::Ready(Some(Ok(output))) => {
830 panic!("unexpected output before the reset wait deadline: {output:?}");
831 }
832 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
833 Poll::Ready(None) => {
834 panic!("session closed before the reset wait budget expired");
835 }
836 Poll::Pending => break,
837 }
838 }
839
840 tokio::time::advance(probe_step).await;
841 tokio::task::yield_now().await;
842
843 let closed_at = loop {
844 match futures::poll!(session.as_mut().next()) {
845 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
846 Poll::Ready(Some(Ok(output))) => {
847 panic!("unexpected output after the reset wait deadline: {output:?}");
848 }
849 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
850 Poll::Ready(None) => break Instant::now(),
851 Poll::Pending => {
852 panic!("session should close once the reset wait budget expires");
853 }
854 }
855 };
856
857 assert!(closed_at >= reset_at + wait);
858 assert!(closed_at <= reset_at + wait + probe_step);
859 }
860
861 #[tokio::test(flavor = "current_thread", start_paused = true)]
862 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
863 let object_store = Arc::new(InMemory::new());
864 let db = Db::builder("/test", object_store).build().await.unwrap();
865 let backend = Backend::new(db, ByteSize::mib(10));
866
867 let basin: BasinName = "test-basin".parse().unwrap();
868 backend
869 .create_basin(
870 basin.clone(),
871 BasinConfig::default(),
872 CreateMode::CreateOnly(None),
873 )
874 .await
875 .unwrap();
876 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
877 backend
878 .create_stream(
879 basin.clone(),
880 stream.clone(),
881 OptionalStreamConfig::default(),
882 CreateMode::CreateOnly(None),
883 )
884 .await
885 .unwrap();
886
887 let wait = Duration::from_secs(30);
888 let start = ReadStart {
889 from: ReadFrom::SeqNum(0),
890 clamp: false,
891 };
892 let end = ReadEnd {
893 limit: ReadLimit::Unbounded,
894 until: ReadUntil::Unbounded,
895 wait: Some(wait),
896 };
897 let session = backend
898 .open_for_read(&basin, &stream, None)
899 .await
900 .unwrap()
901 .read(start, end)
902 .await
903 .unwrap();
904 let mut session = Box::pin(session);
905
906 let first = session
907 .as_mut()
908 .next()
909 .await
910 .expect("session should enter follow mode")
911 .expect("session should not error");
912 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
913
914 let stream_id = StreamId::new(&basin, &stream);
915 let mut delete_batch = WriteBatch::new();
916 let lagged_appends = FOLLOWER_MAX_LAG + 25;
917
918 for i in 0..lagged_appends {
919 let input = append_input(
920 Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
921 );
922 let ack = backend
923 .open_for_append(&basin, &stream, None)
924 .await
925 .unwrap()
926 .append(input)
927 .await
928 .unwrap();
929 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
930 }
931
932 static WRITE_OPTS: WriteOptions = WriteOptions {
933 await_durable: true,
934 };
935 backend
936 .db
937 .write_with_options(delete_batch, &WRITE_OPTS)
938 .await
939 .unwrap();
940
941 tokio::time::advance(wait + Duration::from_secs(1)).await;
942 tokio::task::yield_now().await;
943
944 let next = session.as_mut().next().await;
945 assert!(
946 next.is_none(),
947 "session should close immediately once the original wait budget has elapsed"
948 );
949 }
950
951 #[tokio::test(flavor = "current_thread", start_paused = true)]
952 async fn unbounded_follow_survives_streamer_dormancy() {
953 let object_store = Arc::new(InMemory::new());
954 let db = Db::builder("/test", object_store).build().await.unwrap();
955 let backend = Backend::new(db, ByteSize::mib(10));
956
957 let basin: BasinName = "test-basin".parse().unwrap();
958 backend
959 .create_basin(
960 basin.clone(),
961 BasinConfig::default(),
962 CreateMode::CreateOnly(None),
963 )
964 .await
965 .unwrap();
966 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
967 backend
968 .create_stream(
969 basin.clone(),
970 stream.clone(),
971 OptionalStreamConfig::default(),
972 CreateMode::CreateOnly(None),
973 )
974 .await
975 .unwrap();
976
977 let initial_input =
978 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
979 backend
980 .open_for_append(&basin, &stream, None)
981 .await
982 .unwrap()
983 .append(initial_input)
984 .await
985 .unwrap();
986
987 let start = ReadStart {
988 from: ReadFrom::SeqNum(0),
989 clamp: false,
990 };
991 let end = ReadEnd {
992 limit: ReadLimit::Unbounded,
993 until: ReadUntil::Unbounded,
994 wait: None,
995 };
996 let session = backend
997 .open_for_read(&basin, &stream, None)
998 .await
999 .unwrap()
1000 .read(start, end)
1001 .await
1002 .unwrap();
1003 let mut session = Box::pin(session);
1004
1005 let first = session
1006 .as_mut()
1007 .next()
1008 .await
1009 .expect("session should yield initial batch")
1010 .expect("session should not error");
1011 assert!(matches!(first, ReadSessionOutput::Batch(_)));
1012
1013 let second = session
1014 .as_mut()
1015 .next()
1016 .await
1017 .expect("session should enter follow mode")
1018 .expect("session should not error");
1019 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1020
1021 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1022 tokio::task::yield_now().await;
1023
1024 let follow_input =
1025 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1026 backend
1027 .open_for_append(&basin, &stream, None)
1028 .await
1029 .unwrap()
1030 .append(follow_input)
1031 .await
1032 .unwrap();
1033
1034 let next = session
1035 .as_mut()
1036 .next()
1037 .await
1038 .expect("session should stay open after dormancy")
1039 .expect("session should not error after dormancy");
1040 let ReadSessionOutput::Batch(batch) = next else {
1041 panic!("expected new batch after append");
1042 };
1043 assert_eq!(batch.records.len(), 1);
1044 let record = batch.records.first().expect("batch should have one record");
1045 let Record::Envelope(envelope) = record.inner() else {
1046 panic!("expected envelope record");
1047 };
1048 assert_eq!(envelope.body().as_ref(), b"follow-1");
1049 }
1050}