1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5 basin::BasinName,
6 caps,
7 encryption::{EncryptionKey, EncryptionSpec},
8 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
9 record::{Metered, MeteredSize as _, SeqNum, StreamPosition, Timestamp},
10 stream::{ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11};
12use s2_storage::record::{
13 StoredReadBatch, StoredReadSessionOutput, StoredSequencedRecord, decrypt_read_session_output,
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::{sync::broadcast, time::Instant};
17
18use super::{Backend, StreamHandle};
19use crate::{
20 backend::{
21 error::{
22 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
23 },
24 kv,
25 streamer::GuardedStreamerClient,
26 },
27 stream_id::StreamId,
28};
29
30impl Backend {
31 pub async fn open_for_check_tail(
32 &self,
33 basin: &BasinName,
34 stream: &StreamName,
35 ) -> Result<StreamHandle, CheckTailError> {
36 self.stream_handle_with_auto_create::<CheckTailError>(
37 basin,
38 stream,
39 |config| config.create_stream_on_read,
40 |_| Ok(EncryptionSpec::Plain),
41 )
42 .await
43 }
44
45 pub async fn open_for_read(
46 &self,
47 basin: &BasinName,
48 stream: &StreamName,
49 encryption_key: Option<EncryptionKey>,
50 ) -> Result<StreamHandle, ReadError> {
51 self.stream_handle_with_auto_create::<ReadError>(
52 basin,
53 stream,
54 |config| config.create_stream_on_read,
55 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
56 )
57 .await
58 }
59}
60
61impl StreamHandle {
62 pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
63 let tail = self.client.check_tail().await?;
64 Ok(tail)
65 }
66
67 pub async fn read(
68 self,
69 start: ReadStart,
70 end: ReadEnd,
71 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
72 let stream_id = self.client.stream_id();
73 let session = read_session(self.db, self.client, start, end).await?;
74 Ok(async_stream::stream! {
75 tokio::pin!(session);
76 while let Some(output) = session.next().await {
77 let output = match output {
78 Ok(output) => {
79 decrypt_read_session_output(output, &self.encryption, stream_id.as_bytes())
80 .map_err(ReadError::from)
81 }
82 Err(err) => Err(err),
83 };
84 let should_stop = output.is_err();
85 yield output;
86 if should_stop {
87 break;
88 }
89 }
90 })
91 }
92}
93
94async fn read_session(
95 db: slatedb::Db,
96 client: GuardedStreamerClient,
97 start: ReadStart,
98 end: ReadEnd,
99) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
100 let stream_id = client.stream_id();
101 let tail = client.check_tail().await?;
102 let mut state = ReadSessionState {
103 start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
104 limit: EvaluatedReadLimit::Remaining(end.limit),
105 until: end.until,
106 wait: end.wait,
107 wait_deadline: None,
108 tail,
109 };
110 let session = async_stream::try_stream! {
111 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
112 if state.start_seq_num < state.tail.seq_num {
113 let start_key = kv::stream_record_data::ser_key(
114 stream_id,
115 StreamPosition {
116 seq_num: state.start_seq_num,
117 timestamp: 0,
118 },
119 );
120 let end_key = kv::stream_record_data::ser_key(
121 stream_id,
122 StreamPosition {
123 seq_num: state.tail.seq_num,
124 timestamp: 0,
125 },
126 );
127 let scan_opts = ScanOptions {
128 durability_filter: DurabilityLevel::Remote,
129 read_ahead_bytes: 1024 * 1024,
130 cache_blocks: true,
131 max_fetch_tasks: 8,
132 ..Default::default()
133 };
134 let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
135
136 let mut records = Metered::with_capacity(
137 limit.count()
138 .unwrap_or(usize::MAX)
139 .min(caps::RECORD_BATCH_MAX.count),
140 );
141
142 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
143 let Some(kv) = it.next().await? else {
144 break;
145 };
146 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
147 assert_eq!(deser_stream_id, stream_id);
148
149 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
150
151 if end.until.deny(pos.timestamp)
152 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
153 {
154 if records.is_empty() {
155 break 'session;
156 } else {
157 break;
158 }
159 }
160
161 if records.len() == caps::RECORD_BATCH_MAX.count
162 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
163 {
164 let new_records_buf = Metered::with_capacity(
165 limit.count()
166 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
167 .min(caps::RECORD_BATCH_MAX.count),
168 );
169 yield state.on_batch(StoredReadBatch {
170 records: std::mem::replace(&mut records, new_records_buf),
171 tail: None,
172 });
173 }
174
175 records.push(record);
176 }
177
178 if !records.is_empty() {
179 yield state.on_batch(StoredReadBatch {
180 records,
181 tail: None,
182 });
183 } else {
184 state.start_seq_num = state.tail.seq_num;
185 }
186 } else {
187 assert_eq!(state.start_seq_num, state.tail.seq_num);
188 if !end.may_follow() {
189 break;
190 }
191 match client.follow(state.start_seq_num).await? {
192 Ok(mut follow_rx) => {
193 state.arm_wait_deadline_if_unset();
195 if state.wait_deadline_expired() {
196 break;
197 }
198 yield StoredReadSessionOutput::Heartbeat(state.tail);
199 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
200 tokio::select! {
201 biased;
202 msg = follow_rx.recv() => {
203 match msg {
204 Ok(mut records) => {
205 let count = records.len();
206 let tail = super::streamer::next_pos(&records);
207 let allowed_count = count_allowed_records(limit, end.until, &records);
208 if allowed_count > 0 {
209 yield state.on_batch(StoredReadBatch {
210 records: records.drain(..allowed_count).collect(),
211 tail: Some(tail),
212 });
213 }
214 if allowed_count < count {
215 break 'session;
216 }
217 Ok(())
218 }
219 Err(broadcast::error::RecvError::Lagged(_)) => {
220 continue 'session;
222 }
223 Err(broadcast::error::RecvError::Closed) => {
224 Err(StreamerMissingInActionError)
225 }
226 }
227 }
228 _ = new_heartbeat_sleep() => {
229 yield StoredReadSessionOutput::Heartbeat(state.tail);
230 Ok(())
231 }
232 _ = wait_sleep_until(state.wait_deadline) => {
233 break 'session;
234 }
235 }?;
236 }
237 }
238 Err(tail) => {
239 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
240 state.tail = tail;
241 }
242 }
243 }
244 }
245 };
246 Ok(session)
247}
248
249async fn read_start_seq_num(
250 db: &slatedb::Db,
251 stream_id: StreamId,
252 start: ReadStart,
253 end: ReadEnd,
254 tail: StreamPosition,
255) -> Result<SeqNum, ReadError> {
256 let mut read_pos = match start.from {
257 s2_common::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
258 s2_common::stream::ReadFrom::Timestamp(timestamp) => ReadPosition::Timestamp(timestamp),
259 s2_common::stream::ReadFrom::TailOffset(tail_offset) => {
260 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
261 }
262 };
263 if match read_pos {
264 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
265 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
266 } {
267 if start.clamp {
268 read_pos = ReadPosition::SeqNum(tail.seq_num);
269 } else {
270 return Err(UnwrittenError(tail).into());
271 }
272 }
273 if let ReadPosition::SeqNum(start_seq_num) = read_pos
274 && start_seq_num == tail.seq_num
275 && !end.may_follow()
276 {
277 return Err(UnwrittenError(tail).into());
278 }
279 Ok(match read_pos {
280 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
281 ReadPosition::Timestamp(start_timestamp) => {
282 resolve_timestamp(db, stream_id, start_timestamp)
283 .await?
284 .unwrap_or(tail)
285 .seq_num
286 }
287 })
288}
289
290async fn resolve_timestamp(
291 db: &slatedb::Db,
292 stream_id: StreamId,
293 timestamp: Timestamp,
294) -> Result<Option<StreamPosition>, StorageError> {
295 let start_key = kv::stream_record_timestamp::ser_key(
296 stream_id,
297 StreamPosition {
298 seq_num: SeqNum::MIN,
299 timestamp,
300 },
301 );
302 let end_key = kv::stream_record_timestamp::ser_key(
303 stream_id,
304 StreamPosition {
305 seq_num: SeqNum::MAX,
306 timestamp: Timestamp::MAX,
307 },
308 );
309 let scan_opts = ScanOptions {
310 durability_filter: DurabilityLevel::Remote,
311 ..Default::default()
312 };
313 let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
314 Ok(match it.next().await? {
315 Some(kv) => {
316 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
317 assert_eq!(deser_stream_id, stream_id);
318 assert!(pos.timestamp >= timestamp);
319 kv::stream_record_timestamp::deser_value(kv.value)?;
320 Some(StreamPosition {
321 seq_num: pos.seq_num,
322 timestamp: pos.timestamp,
323 })
324 }
325 None => None,
326 })
327}
328
329struct ReadSessionState {
330 start_seq_num: u64,
331 limit: EvaluatedReadLimit,
332 until: ReadUntil,
333 wait: Option<Duration>,
334 wait_deadline: Option<Instant>,
335 tail: StreamPosition,
336}
337
338impl ReadSessionState {
339 fn arm_wait_deadline_if_unset(&mut self) {
340 if self.wait_deadline.is_none() {
341 self.reset_wait_deadline();
342 }
343 }
344
345 fn reset_wait_deadline(&mut self) {
346 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
347 }
348
349 fn wait_deadline_expired(&self) -> bool {
350 self.wait_deadline
351 .is_some_and(|deadline| deadline <= Instant::now())
352 }
353
354 fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
355 if let Some(tail) = batch.tail {
356 self.tail = tail;
357 }
358 let last_record = batch.records.last().expect("non-empty");
359 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
360 panic!("batch after exhausted limit");
361 };
362 let count = batch.records.len();
363 let bytes = batch.records.metered_size();
364 let last_position = last_record.position();
365 assert!(limit.allow(count, bytes));
366 assert!(self.until.allow(last_position.timestamp));
367 self.start_seq_num = last_position.seq_num + 1;
368 self.limit = limit.remaining(count, bytes);
369 self.reset_wait_deadline();
370 StoredReadSessionOutput::Batch(batch)
371 }
372}
373
374fn count_allowed_records(
375 limit: ReadLimit,
376 until: ReadUntil,
377 records: &[Metered<StoredSequencedRecord>],
378) -> usize {
379 let mut acc_size = 0;
380 let mut acc_count = 0;
381 for record in records {
382 if limit.deny(acc_count + 1, acc_size + record.metered_size())
383 || until.deny(record.position().timestamp)
384 {
385 break;
386 }
387 acc_count += 1;
388 acc_size += record.metered_size();
389 }
390 acc_count
391}
392
393#[cfg(not(test))]
394fn new_heartbeat_sleep() -> tokio::time::Sleep {
395 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
396}
397
398#[cfg(test)]
399fn new_heartbeat_sleep() -> tokio::time::Sleep {
400 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
401}
402
403async fn wait_sleep_until(deadline: Option<Instant>) {
404 match deadline {
405 Some(deadline) => tokio::time::sleep_until(deadline).await,
406 None => {
407 std::future::pending::<()>().await;
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use std::{sync::Arc, task::Poll};
415
416 use bytesize::ByteSize;
417 use futures::StreamExt;
418 use s2_common::{
419 basin::BasinName,
420 config::{BasinConfig, OptionalStreamConfig},
421 read_extent::{ReadLimit, ReadUntil},
422 record::{Metered, Record},
423 resources::ProvisionMode,
424 stream::{
425 AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
426 ReadSessionOutput, ReadStart, StreamName,
427 },
428 };
429 use slatedb::{Db, WriteBatch, object_store::memory::InMemory};
430 use tokio::time::Instant;
431
432 use super::*;
433 use crate::{
434 backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
435 stream_id::StreamId,
436 };
437
438 fn append_input(record: Record) -> AppendInput {
439 let record: AppendRecord = AppendRecordParts {
440 timestamp: None,
441 record: Metered::from(record),
442 }
443 .try_into()
444 .unwrap();
445 let records: AppendRecordBatch = vec![record].try_into().unwrap();
446 AppendInput {
447 records,
448 match_seq_num: None,
449 fencing_token: None,
450 }
451 }
452
453 fn map_test_output(
454 output: Option<Result<ReadSessionOutput, ReadError>>,
455 ) -> Option<ReadSessionOutput> {
456 match output {
457 Some(Ok(output)) => Some(output),
458 Some(Err(e)) => panic!("Read error: {e:?}"),
459 None => None,
460 }
461 }
462
463 async fn poll_next_after_advance<S>(
464 session: &mut std::pin::Pin<Box<S>>,
465 advance_by: Duration,
466 ) -> Poll<Option<ReadSessionOutput>>
467 where
468 S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
469 {
470 let mut pinned_session = session.as_mut();
471 let next = pinned_session.next();
472 tokio::pin!(next);
473
474 assert!(
475 matches!(futures::poll!(&mut next), Poll::Pending),
476 "session unexpectedly yielded before time advanced"
477 );
478
479 tokio::time::advance(advance_by).await;
480 tokio::task::yield_now().await;
481
482 match futures::poll!(&mut next) {
483 Poll::Ready(output) => Poll::Ready(map_test_output(output)),
484 Poll::Pending => Poll::Pending,
485 }
486 }
487
488 #[tokio::test]
489 async fn resolve_timestamp_bounded_to_stream() {
490 let object_store = Arc::new(InMemory::new());
491 let db = Db::builder("/test", object_store).build().await.unwrap();
492 let backend = Backend::new(db, ByteSize::mib(10));
493
494 let stream_a: StreamId = [0u8; 32].into();
495 let stream_b: StreamId = [1u8; 32].into();
496
497 backend
498 .db
499 .put(
500 kv::stream_record_timestamp::ser_key(
501 stream_a,
502 StreamPosition {
503 seq_num: 0,
504 timestamp: 1000,
505 },
506 ),
507 kv::stream_record_timestamp::ser_value(),
508 )
509 .await
510 .unwrap();
511 backend
512 .db
513 .put(
514 kv::stream_record_timestamp::ser_key(
515 stream_b,
516 StreamPosition {
517 seq_num: 0,
518 timestamp: 2000,
519 },
520 ),
521 kv::stream_record_timestamp::ser_value(),
522 )
523 .await
524 .unwrap();
525
526 let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
528 assert_eq!(
529 result,
530 Some(StreamPosition {
531 seq_num: 0,
532 timestamp: 1000
533 })
534 );
535
536 let result = resolve_timestamp(&backend.db, stream_a, 1500)
538 .await
539 .unwrap();
540 assert_eq!(result, None);
541 }
542
543 #[tokio::test]
544 async fn read_completes_when_all_records_deleted() {
545 let object_store = Arc::new(InMemory::new());
546 let db = Db::builder("/test", object_store).build().await.unwrap();
547 let backend = Backend::new(db, ByteSize::mib(10));
548
549 let basin: BasinName = "test-basin".parse().unwrap();
550 backend
551 .provision_basin(
552 basin.clone(),
553 BasinConfig::default(),
554 ProvisionMode::CreateOnly {
555 request_token: None,
556 },
557 )
558 .await
559 .unwrap();
560 let stream: StreamName = "test-stream".parse().unwrap();
561 backend
562 .provision_stream(
563 basin.clone(),
564 stream.clone(),
565 OptionalStreamConfig::default(),
566 ProvisionMode::CreateOnly {
567 request_token: None,
568 },
569 )
570 .await
571 .unwrap();
572
573 let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
574 let ack = backend
575 .open_for_append(&basin, &stream, None)
576 .await
577 .unwrap()
578 .append(input)
579 .await
580 .unwrap();
581 assert!(ack.end.seq_num > 0);
582
583 let stream_id = StreamId::new(&basin, &stream);
584 let mut batch = WriteBatch::new();
585 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
586 backend.db.write(batch).await.unwrap();
587
588 let start = ReadStart {
589 from: ReadFrom::SeqNum(0),
590 clamp: false,
591 };
592 let end = ReadEnd {
593 limit: ReadLimit::Count(10),
594 until: ReadUntil::Unbounded,
595 wait: None,
596 };
597 let session = backend
598 .open_for_read(&basin, &stream, None)
599 .await
600 .unwrap()
601 .read(start, end)
602 .await
603 .unwrap();
604 let records: Vec<_> = tokio::time::timeout(
605 Duration::from_secs(2),
606 futures::StreamExt::collect::<Vec<_>>(session),
607 )
608 .await
609 .expect("read should not spin forever");
610 assert!(records.into_iter().all(|r| r.is_ok()));
611 }
612
613 #[tokio::test(flavor = "current_thread", start_paused = true)]
614 async fn read_wait_is_not_extended_by_heartbeats() {
615 let object_store = Arc::new(InMemory::new());
616 let db = Db::builder("/test", object_store).build().await.unwrap();
617 let backend = Backend::new(db, ByteSize::mib(10));
618
619 let basin: BasinName = "test-basin".parse().unwrap();
620 backend
621 .provision_basin(
622 basin.clone(),
623 BasinConfig::default(),
624 ProvisionMode::CreateOnly {
625 request_token: None,
626 },
627 )
628 .await
629 .unwrap();
630 let stream: StreamName = "test-stream".parse().unwrap();
631 backend
632 .provision_stream(
633 basin.clone(),
634 stream.clone(),
635 OptionalStreamConfig::default(),
636 ProvisionMode::CreateOnly {
637 request_token: None,
638 },
639 )
640 .await
641 .unwrap();
642
643 let wait = Duration::from_millis(30);
644 let start = ReadStart {
645 from: ReadFrom::SeqNum(0),
646 clamp: false,
647 };
648 let end = ReadEnd {
649 limit: ReadLimit::Unbounded,
650 until: ReadUntil::Unbounded,
651 wait: Some(wait),
652 };
653
654 let session = backend
655 .open_for_read(&basin, &stream, None)
656 .await
657 .unwrap()
658 .read(start, end)
659 .await
660 .unwrap();
661 let mut session = Box::pin(session);
662 let probe_step = Duration::from_millis(1);
663 let first = session
664 .as_mut()
665 .next()
666 .await
667 .expect("session should enter follow mode")
668 .expect("session should not error");
669 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
670
671 let started = Instant::now();
672 let second = match poll_next_after_advance(&mut session, wait).await {
673 Poll::Ready(Some(output)) => output,
674 Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
675 Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
676 };
677 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
678
679 tokio::task::yield_now().await;
680 let closed_at = loop {
681 match futures::poll!(session.as_mut().next()) {
682 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
683 Poll::Ready(Some(Ok(output))) => {
684 panic!("unexpected output after wait deadline: {output:?}");
685 }
686 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
687 Poll::Ready(None) => break Instant::now(),
688 Poll::Pending => panic!("session should close once the wait budget expires"),
689 }
690 };
691
692 assert!(closed_at >= started + wait);
693 assert!(closed_at <= started + wait + probe_step);
694 }
695
696 #[tokio::test(flavor = "current_thread", start_paused = true)]
697 async fn read_wait_is_reset_by_delivered_follow_batch() {
698 let object_store = Arc::new(InMemory::new());
699 let db = Db::builder("/test", object_store).build().await.unwrap();
700 let backend = Backend::new(db, ByteSize::mib(10));
701
702 let basin: BasinName = "test-basin".parse().unwrap();
703 backend
704 .provision_basin(
705 basin.clone(),
706 BasinConfig::default(),
707 ProvisionMode::CreateOnly {
708 request_token: None,
709 },
710 )
711 .await
712 .unwrap();
713 let stream: StreamName = "test-stream".parse().unwrap();
714 backend
715 .provision_stream(
716 basin.clone(),
717 stream.clone(),
718 OptionalStreamConfig::default(),
719 ProvisionMode::CreateOnly {
720 request_token: None,
721 },
722 )
723 .await
724 .unwrap();
725
726 let initial_input =
727 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
728 backend
729 .open_for_append(&basin, &stream, None)
730 .await
731 .unwrap()
732 .append(initial_input)
733 .await
734 .unwrap();
735
736 let wait = Duration::from_millis(30);
737 let probe_step = Duration::from_millis(1);
738 let start = ReadStart {
739 from: ReadFrom::SeqNum(0),
740 clamp: false,
741 };
742 let end = ReadEnd {
743 limit: ReadLimit::Unbounded,
744 until: ReadUntil::Unbounded,
745 wait: Some(wait),
746 };
747
748 let session = backend
749 .open_for_read(&basin, &stream, None)
750 .await
751 .unwrap()
752 .read(start, end)
753 .await
754 .unwrap();
755 let mut session = Box::pin(session);
756
757 let first = session
758 .as_mut()
759 .next()
760 .await
761 .expect("session should yield the initial batch")
762 .expect("session should not error");
763 let ReadSessionOutput::Batch(batch) = first else {
764 panic!("expected initial batch");
765 };
766 let initial_record = batch
767 .records
768 .first()
769 .expect("batch should contain one record");
770 let Record::Envelope(initial_envelope) = initial_record.inner() else {
771 panic!("expected plaintext envelope record");
772 };
773 assert_eq!(initial_envelope.body().as_ref(), b"initial");
774
775 let second = session
776 .as_mut()
777 .next()
778 .await
779 .expect("session should enter follow mode")
780 .expect("session should not error");
781 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
782
783 tokio::time::advance(Duration::from_millis(20)).await;
784 tokio::task::yield_now().await;
785
786 let follow_input =
787 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
788 backend
789 .open_for_append(&basin, &stream, None)
790 .await
791 .unwrap()
792 .append(follow_input)
793 .await
794 .unwrap();
795
796 let follow = session
797 .as_mut()
798 .next()
799 .await
800 .expect("session should deliver the live batch")
801 .expect("session should not error");
802 let reset_at = Instant::now();
803 let ReadSessionOutput::Batch(batch) = follow else {
804 panic!("expected live batch after append");
805 };
806 let follow_record = batch
807 .records
808 .first()
809 .expect("batch should contain one record");
810 let Record::Envelope(follow_envelope) = follow_record.inner() else {
811 panic!("expected plaintext envelope record");
812 };
813 assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
814
815 tokio::time::advance(wait - probe_step).await;
816 tokio::task::yield_now().await;
817
818 loop {
819 match futures::poll!(session.as_mut().next()) {
820 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
821 Poll::Ready(Some(Ok(output))) => {
822 panic!("unexpected output before the reset wait deadline: {output:?}");
823 }
824 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
825 Poll::Ready(None) => {
826 panic!("session closed before the reset wait budget expired");
827 }
828 Poll::Pending => break,
829 }
830 }
831
832 tokio::time::advance(probe_step).await;
833 tokio::task::yield_now().await;
834
835 let closed_at = loop {
836 match futures::poll!(session.as_mut().next()) {
837 Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
838 Poll::Ready(Some(Ok(output))) => {
839 panic!("unexpected output after the reset wait deadline: {output:?}");
840 }
841 Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
842 Poll::Ready(None) => break Instant::now(),
843 Poll::Pending => {
844 panic!("session should close once the reset wait budget expires");
845 }
846 }
847 };
848
849 assert!(closed_at >= reset_at + wait);
850 assert!(closed_at <= reset_at + wait + probe_step);
851 }
852
853 #[tokio::test(flavor = "current_thread", start_paused = true)]
854 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
855 let object_store = Arc::new(InMemory::new());
856 let db = Db::builder("/test", object_store).build().await.unwrap();
857 let backend = Backend::new(db, ByteSize::mib(10));
858
859 let basin: BasinName = "test-basin".parse().unwrap();
860 backend
861 .provision_basin(
862 basin.clone(),
863 BasinConfig::default(),
864 ProvisionMode::CreateOnly {
865 request_token: None,
866 },
867 )
868 .await
869 .unwrap();
870 let stream: StreamName = "test-stream".parse().unwrap();
871 backend
872 .provision_stream(
873 basin.clone(),
874 stream.clone(),
875 OptionalStreamConfig::default(),
876 ProvisionMode::CreateOnly {
877 request_token: None,
878 },
879 )
880 .await
881 .unwrap();
882
883 let wait = Duration::from_secs(30);
884 let start = ReadStart {
885 from: ReadFrom::SeqNum(0),
886 clamp: false,
887 };
888 let end = ReadEnd {
889 limit: ReadLimit::Unbounded,
890 until: ReadUntil::Unbounded,
891 wait: Some(wait),
892 };
893 let session = backend
894 .open_for_read(&basin, &stream, None)
895 .await
896 .unwrap()
897 .read(start, end)
898 .await
899 .unwrap();
900 let mut session = Box::pin(session);
901
902 let first = session
903 .as_mut()
904 .next()
905 .await
906 .expect("session should enter follow mode")
907 .expect("session should not error");
908 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
909
910 let stream_id = StreamId::new(&basin, &stream);
911 let mut delete_batch = WriteBatch::new();
912 let lagged_appends = FOLLOWER_MAX_LAG + 25;
913
914 for i in 0..lagged_appends {
915 let input = append_input(
916 Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
917 );
918 let ack = backend
919 .open_for_append(&basin, &stream, None)
920 .await
921 .unwrap()
922 .append(input)
923 .await
924 .unwrap();
925 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
926 }
927
928 backend.db.write(delete_batch).await.unwrap();
929
930 tokio::time::advance(wait + Duration::from_secs(1)).await;
931 tokio::task::yield_now().await;
932
933 let next = session.as_mut().next().await;
934 assert!(
935 next.is_none(),
936 "session should close immediately once the original wait budget has elapsed"
937 );
938 }
939
940 #[tokio::test(flavor = "current_thread", start_paused = true)]
941 async fn unbounded_follow_survives_streamer_dormancy() {
942 let object_store = Arc::new(InMemory::new());
943 let db = Db::builder("/test", object_store).build().await.unwrap();
944 let backend = Backend::new(db, ByteSize::mib(10));
945
946 let basin: BasinName = "test-basin".parse().unwrap();
947 backend
948 .provision_basin(
949 basin.clone(),
950 BasinConfig::default(),
951 ProvisionMode::CreateOnly {
952 request_token: None,
953 },
954 )
955 .await
956 .unwrap();
957 let stream: StreamName = "test-stream".parse().unwrap();
958 backend
959 .provision_stream(
960 basin.clone(),
961 stream.clone(),
962 OptionalStreamConfig::default(),
963 ProvisionMode::CreateOnly {
964 request_token: None,
965 },
966 )
967 .await
968 .unwrap();
969
970 let initial_input =
971 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
972 backend
973 .open_for_append(&basin, &stream, None)
974 .await
975 .unwrap()
976 .append(initial_input)
977 .await
978 .unwrap();
979
980 let start = ReadStart {
981 from: ReadFrom::SeqNum(0),
982 clamp: false,
983 };
984 let end = ReadEnd {
985 limit: ReadLimit::Unbounded,
986 until: ReadUntil::Unbounded,
987 wait: None,
988 };
989 let session = backend
990 .open_for_read(&basin, &stream, None)
991 .await
992 .unwrap()
993 .read(start, end)
994 .await
995 .unwrap();
996 let mut session = Box::pin(session);
997
998 let first = session
999 .as_mut()
1000 .next()
1001 .await
1002 .expect("session should yield initial batch")
1003 .expect("session should not error");
1004 assert!(matches!(first, ReadSessionOutput::Batch(_)));
1005
1006 let second = session
1007 .as_mut()
1008 .next()
1009 .await
1010 .expect("session should enter follow mode")
1011 .expect("session should not error");
1012 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1013
1014 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1015 tokio::task::yield_now().await;
1016
1017 let follow_input =
1018 append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1019 backend
1020 .open_for_append(&basin, &stream, None)
1021 .await
1022 .unwrap()
1023 .append(follow_input)
1024 .await
1025 .unwrap();
1026
1027 let next = session
1028 .as_mut()
1029 .next()
1030 .await
1031 .expect("session should stay open after dormancy")
1032 .expect("session should not error after dormancy");
1033 let ReadSessionOutput::Batch(batch) = next else {
1034 panic!("expected new batch after append");
1035 };
1036 assert_eq!(batch.records.len(), 1);
1037 let record = batch.records.first().expect("batch should have one record");
1038 let Record::Envelope(envelope) = record.inner() else {
1039 panic!("expected envelope record");
1040 };
1041 assert_eq!(envelope.body().as_ref(), b"follow-1");
1042 }
1043}