Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5    caps,
6    encryption::{EncryptionKey, EncryptionSpec},
7    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8    record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9    types::{
10        basin::BasinName,
11        stream::{
12            ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13            StoredReadSessionOutput, StreamName,
14        },
15    },
16};
17use slatedb::{
18    IterationOrder,
19    config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25    backend::{
26        error::{
27            CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28        },
29        kv,
30        streamer::GuardedStreamerClient,
31    },
32    stream_id::StreamId,
33};
34
35impl Backend {
36    pub async fn open_for_check_tail(
37        &self,
38        basin: &BasinName,
39        stream: &StreamName,
40    ) -> Result<StreamHandle, CheckTailError> {
41        self.stream_handle_with_auto_create::<CheckTailError>(
42            basin,
43            stream,
44            |config| config.create_stream_on_read,
45            |_| Ok(EncryptionSpec::Plain),
46        )
47        .await
48    }
49
50    pub async fn open_for_read(
51        &self,
52        basin: &BasinName,
53        stream: &StreamName,
54        encryption_key: Option<EncryptionKey>,
55    ) -> Result<StreamHandle, ReadError> {
56        self.stream_handle_with_auto_create::<ReadError>(
57            basin,
58            stream,
59            |config| config.create_stream_on_read,
60            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61        )
62        .await
63    }
64}
65
66impl StreamHandle {
67    pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68        let tail = self.client.check_tail().await?;
69        Ok(tail)
70    }
71
72    pub async fn read(
73        self,
74        start: ReadStart,
75        end: ReadEnd,
76    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77        let stream_id = self.client.stream_id();
78        let session = read_session(self.db, self.client, start, end).await?;
79        Ok(async_stream::stream! {
80            tokio::pin!(session);
81            while let Some(output) = session.next().await {
82                let output = match output {
83                    Ok(output) => output
84                        .decrypt(&self.encryption, stream_id.as_bytes())
85                        .map_err(ReadError::from),
86                    Err(err) => Err(err),
87                };
88                let should_stop = output.is_err();
89                yield output;
90                if should_stop {
91                    break;
92                }
93            }
94        })
95    }
96}
97
98async fn read_session(
99    db: slatedb::Db,
100    client: GuardedStreamerClient,
101    start: ReadStart,
102    end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104    let stream_id = client.stream_id();
105    let tail = client.check_tail().await?;
106    let mut state = ReadSessionState {
107        start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108        limit: EvaluatedReadLimit::Remaining(end.limit),
109        until: end.until,
110        wait: end.wait,
111        wait_deadline: None,
112        tail,
113    };
114    let session = async_stream::try_stream! {
115        'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116            if state.start_seq_num < state.tail.seq_num {
117                let start_key = kv::stream_record_data::ser_key(
118                    stream_id,
119                    StreamPosition {
120                        seq_num: state.start_seq_num,
121                        timestamp: 0,
122                    },
123                );
124                let end_key = kv::stream_record_data::ser_key(
125                    stream_id,
126                    StreamPosition {
127                        seq_num: state.tail.seq_num,
128                        timestamp: 0,
129                    },
130                );
131                static SCAN_OPTS: ScanOptions = ScanOptions {
132                    durability_filter: DurabilityLevel::Remote,
133                    dirty: false,
134                    read_ahead_bytes: 1024 * 1024,
135                    cache_blocks: true,
136                    max_fetch_tasks: 8,
137                    order: IterationOrder::Ascending,
138                };
139                let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141                let mut records = Metered::with_capacity(
142                    limit.count()
143                        .unwrap_or(usize::MAX)
144                        .min(caps::RECORD_BATCH_MAX.count),
145                );
146
147                while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148                    let Some(kv) = it.next().await? else {
149                        break;
150                    };
151                    let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152                    assert_eq!(deser_stream_id, stream_id);
153
154                    let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156                    if end.until.deny(pos.timestamp)
157                        || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158                    {
159                        if records.is_empty() {
160                            break 'session;
161                        } else {
162                            break;
163                        }
164                    }
165
166                    if records.len() == caps::RECORD_BATCH_MAX.count
167                        || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168                    {
169                        let new_records_buf = Metered::with_capacity(
170                            limit.count()
171                                .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172                                .min(caps::RECORD_BATCH_MAX.count),
173                        );
174                        yield state.on_batch(StoredReadBatch {
175                            records: std::mem::replace(&mut records, new_records_buf),
176                            tail: None,
177                        });
178                    }
179
180                    records.push(record);
181                }
182
183                if !records.is_empty() {
184                    yield state.on_batch(StoredReadBatch {
185                        records,
186                        tail: None,
187                    });
188                } else {
189                    state.start_seq_num = state.tail.seq_num;
190                }
191            } else {
192                assert_eq!(state.start_seq_num, state.tail.seq_num);
193                if !end.may_follow() {
194                    break;
195                }
196                match client.follow(state.start_seq_num).await? {
197                    Ok(mut follow_rx) => {
198                        // Only a delivered batch should reset the absolute wait budget.
199                        state.arm_wait_deadline_if_unset();
200                        if state.wait_deadline_expired() {
201                            break;
202                        }
203                        yield StoredReadSessionOutput::Heartbeat(state.tail);
204                        while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205                            tokio::select! {
206                                biased;
207                                msg = follow_rx.recv() => {
208                                    match msg {
209                                        Ok(mut records) => {
210                                            let count = records.len();
211                                            let tail = super::streamer::next_pos(&records);
212                                            let allowed_count = count_allowed_records(limit, end.until, &records);
213                                            if allowed_count > 0 {
214                                                yield state.on_batch(StoredReadBatch {
215                                                    records: records.drain(..allowed_count).collect(),
216                                                    tail: Some(tail),
217                                                });
218                                            }
219                                            if allowed_count < count {
220                                                break 'session;
221                                            }
222                                            Ok(())
223                                        }
224                                        Err(broadcast::error::RecvError::Lagged(_)) => {
225                                            // Catch up using DB
226                                            continue 'session;
227                                        }
228                                        Err(broadcast::error::RecvError::Closed) => {
229                                            Err(StreamerMissingInActionError)
230                                        }
231                                    }
232                                }
233                                _ = new_heartbeat_sleep() => {
234                                    yield StoredReadSessionOutput::Heartbeat(state.tail);
235                                    Ok(())
236                                }
237                                _ = wait_sleep_until(state.wait_deadline) => {
238                                    break 'session;
239                                }
240                            }?;
241                        }
242                    }
243                    Err(tail) => {
244                        assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245                        state.tail = tail;
246                    }
247                }
248            }
249        }
250    };
251    Ok(session)
252}
253
254async fn read_start_seq_num(
255    db: &slatedb::Db,
256    stream_id: StreamId,
257    start: ReadStart,
258    end: ReadEnd,
259    tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261    let mut read_pos = match start.from {
262        s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263        s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264            ReadPosition::Timestamp(timestamp)
265        }
266        s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267            ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268        }
269    };
270    if match read_pos {
271        ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272        ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273    } {
274        if start.clamp {
275            read_pos = ReadPosition::SeqNum(tail.seq_num);
276        } else {
277            return Err(UnwrittenError(tail).into());
278        }
279    }
280    if let ReadPosition::SeqNum(start_seq_num) = read_pos
281        && start_seq_num == tail.seq_num
282        && !end.may_follow()
283    {
284        return Err(UnwrittenError(tail).into());
285    }
286    Ok(match read_pos {
287        ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288        ReadPosition::Timestamp(start_timestamp) => {
289            resolve_timestamp(db, stream_id, start_timestamp)
290                .await?
291                .unwrap_or(tail)
292                .seq_num
293        }
294    })
295}
296
297async fn resolve_timestamp(
298    db: &slatedb::Db,
299    stream_id: StreamId,
300    timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302    let start_key = kv::stream_record_timestamp::ser_key(
303        stream_id,
304        StreamPosition {
305            seq_num: SeqNum::MIN,
306            timestamp,
307        },
308    );
309    let end_key = kv::stream_record_timestamp::ser_key(
310        stream_id,
311        StreamPosition {
312            seq_num: SeqNum::MAX,
313            timestamp: Timestamp::MAX,
314        },
315    );
316    static SCAN_OPTS: ScanOptions = ScanOptions {
317        durability_filter: DurabilityLevel::Remote,
318        dirty: false,
319        read_ahead_bytes: 1,
320        cache_blocks: false,
321        max_fetch_tasks: 1,
322        order: IterationOrder::Ascending,
323    };
324    let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325    Ok(match it.next().await? {
326        Some(kv) => {
327            let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328            assert_eq!(deser_stream_id, stream_id);
329            assert!(pos.timestamp >= timestamp);
330            kv::stream_record_timestamp::deser_value(kv.value)?;
331            Some(StreamPosition {
332                seq_num: pos.seq_num,
333                timestamp: pos.timestamp,
334            })
335        }
336        None => None,
337    })
338}
339
340struct ReadSessionState {
341    start_seq_num: u64,
342    limit: EvaluatedReadLimit,
343    until: ReadUntil,
344    wait: Option<Duration>,
345    wait_deadline: Option<Instant>,
346    tail: StreamPosition,
347}
348
349impl ReadSessionState {
350    fn arm_wait_deadline_if_unset(&mut self) {
351        if self.wait_deadline.is_none() {
352            self.reset_wait_deadline();
353        }
354    }
355
356    fn reset_wait_deadline(&mut self) {
357        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358    }
359
360    fn wait_deadline_expired(&self) -> bool {
361        self.wait_deadline
362            .is_some_and(|deadline| deadline <= Instant::now())
363    }
364
365    fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366        if let Some(tail) = batch.tail {
367            self.tail = tail;
368        }
369        let last_record = batch.records.last().expect("non-empty");
370        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371            panic!("batch after exhausted limit");
372        };
373        let count = batch.records.len();
374        let bytes = batch.records.metered_size();
375        let last_position = last_record.position();
376        assert!(limit.allow(count, bytes));
377        assert!(self.until.allow(last_position.timestamp));
378        self.start_seq_num = last_position.seq_num + 1;
379        self.limit = limit.remaining(count, bytes);
380        self.reset_wait_deadline();
381        StoredReadSessionOutput::Batch(batch)
382    }
383}
384
385fn count_allowed_records(
386    limit: ReadLimit,
387    until: ReadUntil,
388    records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390    let mut acc_size = 0;
391    let mut acc_count = 0;
392    for record in records {
393        if limit.deny(acc_count + 1, acc_size + record.metered_size())
394            || until.deny(record.position().timestamp)
395        {
396            break;
397        }
398        acc_count += 1;
399        acc_size += record.metered_size();
400    }
401    acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415    match deadline {
416        Some(deadline) => tokio::time::sleep_until(deadline).await,
417        None => {
418            std::future::pending::<()>().await;
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::{sync::Arc, task::Poll};
426
427    use bytesize::ByteSize;
428    use futures::StreamExt;
429    use s2_common::{
430        read_extent::{ReadLimit, ReadUntil},
431        record::{Metered, Record},
432        types::{
433            basin::BasinName,
434            config::{BasinConfig, OptionalStreamConfig},
435            resources::CreateMode,
436            stream::{
437                AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
438                ReadSessionOutput, ReadStart,
439            },
440        },
441    };
442    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
443    use tokio::time::Instant;
444
445    use super::*;
446    use crate::{
447        backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
448        stream_id::StreamId,
449    };
450
451    fn append_input(record: Record) -> AppendInput {
452        let record: AppendRecord = AppendRecordParts {
453            timestamp: None,
454            record: Metered::from(record),
455        }
456        .try_into()
457        .unwrap();
458        let records: AppendRecordBatch = vec![record].try_into().unwrap();
459        AppendInput {
460            records,
461            match_seq_num: None,
462            fencing_token: None,
463        }
464    }
465
466    fn map_test_output(
467        output: Option<Result<ReadSessionOutput, ReadError>>,
468    ) -> Option<ReadSessionOutput> {
469        match output {
470            Some(Ok(output)) => Some(output),
471            Some(Err(e)) => panic!("Read error: {e:?}"),
472            None => None,
473        }
474    }
475
476    async fn poll_next_after_advance<S>(
477        session: &mut std::pin::Pin<Box<S>>,
478        advance_by: Duration,
479    ) -> Poll<Option<ReadSessionOutput>>
480    where
481        S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
482    {
483        let mut pinned_session = session.as_mut();
484        let next = pinned_session.next();
485        tokio::pin!(next);
486
487        assert!(
488            matches!(futures::poll!(&mut next), Poll::Pending),
489            "session unexpectedly yielded before time advanced"
490        );
491
492        tokio::time::advance(advance_by).await;
493        tokio::task::yield_now().await;
494
495        match futures::poll!(&mut next) {
496            Poll::Ready(output) => Poll::Ready(map_test_output(output)),
497            Poll::Pending => Poll::Pending,
498        }
499    }
500
501    #[tokio::test]
502    async fn resolve_timestamp_bounded_to_stream() {
503        let object_store = Arc::new(InMemory::new());
504        let db = Db::builder("/test", object_store).build().await.unwrap();
505        let backend = Backend::new(db, ByteSize::mib(10));
506
507        let stream_a: StreamId = [0u8; 32].into();
508        let stream_b: StreamId = [1u8; 32].into();
509
510        backend
511            .db
512            .put(
513                kv::stream_record_timestamp::ser_key(
514                    stream_a,
515                    StreamPosition {
516                        seq_num: 0,
517                        timestamp: 1000,
518                    },
519                ),
520                kv::stream_record_timestamp::ser_value(),
521            )
522            .await
523            .unwrap();
524        backend
525            .db
526            .put(
527                kv::stream_record_timestamp::ser_key(
528                    stream_b,
529                    StreamPosition {
530                        seq_num: 0,
531                        timestamp: 2000,
532                    },
533                ),
534                kv::stream_record_timestamp::ser_value(),
535            )
536            .await
537            .unwrap();
538
539        // Should find record in stream_a
540        let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
541        assert_eq!(
542            result,
543            Some(StreamPosition {
544                seq_num: 0,
545                timestamp: 1000
546            })
547        );
548
549        // Should return None, not find stream_b's record
550        let result = resolve_timestamp(&backend.db, stream_a, 1500)
551            .await
552            .unwrap();
553        assert_eq!(result, None);
554    }
555
556    #[tokio::test]
557    async fn read_completes_when_all_records_deleted() {
558        let object_store = Arc::new(InMemory::new());
559        let db = Db::builder("/test", object_store).build().await.unwrap();
560        let backend = Backend::new(db, ByteSize::mib(10));
561
562        let basin: BasinName = "test-basin".parse().unwrap();
563        backend
564            .create_basin(
565                basin.clone(),
566                BasinConfig::default(),
567                CreateMode::CreateOnly(None),
568            )
569            .await
570            .unwrap();
571        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
572        backend
573            .create_stream(
574                basin.clone(),
575                stream.clone(),
576                OptionalStreamConfig::default(),
577                CreateMode::CreateOnly(None),
578            )
579            .await
580            .unwrap();
581
582        let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
583        let ack = backend
584            .open_for_append(&basin, &stream, None)
585            .await
586            .unwrap()
587            .append(input)
588            .await
589            .unwrap();
590        assert!(ack.end.seq_num > 0);
591
592        let stream_id = StreamId::new(&basin, &stream);
593        let mut batch = WriteBatch::new();
594        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
595        static WRITE_OPTS: WriteOptions = WriteOptions {
596            await_durable: true,
597        };
598        backend
599            .db
600            .write_with_options(batch, &WRITE_OPTS)
601            .await
602            .unwrap();
603
604        let start = ReadStart {
605            from: ReadFrom::SeqNum(0),
606            clamp: false,
607        };
608        let end = ReadEnd {
609            limit: ReadLimit::Count(10),
610            until: ReadUntil::Unbounded,
611            wait: None,
612        };
613        let session = backend
614            .open_for_read(&basin, &stream, None)
615            .await
616            .unwrap()
617            .read(start, end)
618            .await
619            .unwrap();
620        let records: Vec<_> = tokio::time::timeout(
621            Duration::from_secs(2),
622            futures::StreamExt::collect::<Vec<_>>(session),
623        )
624        .await
625        .expect("read should not spin forever");
626        assert!(records.into_iter().all(|r| r.is_ok()));
627    }
628
629    #[tokio::test(flavor = "current_thread", start_paused = true)]
630    async fn read_wait_is_not_extended_by_heartbeats() {
631        let object_store = Arc::new(InMemory::new());
632        let db = Db::builder("/test", object_store).build().await.unwrap();
633        let backend = Backend::new(db, ByteSize::mib(10));
634
635        let basin: BasinName = "test-basin".parse().unwrap();
636        backend
637            .create_basin(
638                basin.clone(),
639                BasinConfig::default(),
640                CreateMode::CreateOnly(None),
641            )
642            .await
643            .unwrap();
644        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
645        backend
646            .create_stream(
647                basin.clone(),
648                stream.clone(),
649                OptionalStreamConfig::default(),
650                CreateMode::CreateOnly(None),
651            )
652            .await
653            .unwrap();
654
655        let wait = Duration::from_millis(30);
656        let start = ReadStart {
657            from: ReadFrom::SeqNum(0),
658            clamp: false,
659        };
660        let end = ReadEnd {
661            limit: ReadLimit::Unbounded,
662            until: ReadUntil::Unbounded,
663            wait: Some(wait),
664        };
665
666        let session = backend
667            .open_for_read(&basin, &stream, None)
668            .await
669            .unwrap()
670            .read(start, end)
671            .await
672            .unwrap();
673        let mut session = Box::pin(session);
674        let probe_step = Duration::from_millis(1);
675        let first = session
676            .as_mut()
677            .next()
678            .await
679            .expect("session should enter follow mode")
680            .expect("session should not error");
681        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
682
683        let started = Instant::now();
684        let second = match poll_next_after_advance(&mut session, wait).await {
685            Poll::Ready(Some(output)) => output,
686            Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
687            Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
688        };
689        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
690
691        tokio::task::yield_now().await;
692        let closed_at = loop {
693            match futures::poll!(session.as_mut().next()) {
694                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
695                Poll::Ready(Some(Ok(output))) => {
696                    panic!("unexpected output after wait deadline: {output:?}");
697                }
698                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
699                Poll::Ready(None) => break Instant::now(),
700                Poll::Pending => panic!("session should close once the wait budget expires"),
701            }
702        };
703
704        assert!(closed_at >= started + wait);
705        assert!(closed_at <= started + wait + probe_step);
706    }
707
708    #[tokio::test(flavor = "current_thread", start_paused = true)]
709    async fn read_wait_is_reset_by_delivered_follow_batch() {
710        let object_store = Arc::new(InMemory::new());
711        let db = Db::builder("/test", object_store).build().await.unwrap();
712        let backend = Backend::new(db, ByteSize::mib(10));
713
714        let basin: BasinName = "test-basin".parse().unwrap();
715        backend
716            .create_basin(
717                basin.clone(),
718                BasinConfig::default(),
719                CreateMode::CreateOnly(None),
720            )
721            .await
722            .unwrap();
723        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
724        backend
725            .create_stream(
726                basin.clone(),
727                stream.clone(),
728                OptionalStreamConfig::default(),
729                CreateMode::CreateOnly(None),
730            )
731            .await
732            .unwrap();
733
734        let initial_input =
735            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
736        backend
737            .open_for_append(&basin, &stream, None)
738            .await
739            .unwrap()
740            .append(initial_input)
741            .await
742            .unwrap();
743
744        let wait = Duration::from_millis(30);
745        let probe_step = Duration::from_millis(1);
746        let start = ReadStart {
747            from: ReadFrom::SeqNum(0),
748            clamp: false,
749        };
750        let end = ReadEnd {
751            limit: ReadLimit::Unbounded,
752            until: ReadUntil::Unbounded,
753            wait: Some(wait),
754        };
755
756        let session = backend
757            .open_for_read(&basin, &stream, None)
758            .await
759            .unwrap()
760            .read(start, end)
761            .await
762            .unwrap();
763        let mut session = Box::pin(session);
764
765        let first = session
766            .as_mut()
767            .next()
768            .await
769            .expect("session should yield the initial batch")
770            .expect("session should not error");
771        let ReadSessionOutput::Batch(batch) = first else {
772            panic!("expected initial batch");
773        };
774        let initial_record = batch
775            .records
776            .first()
777            .expect("batch should contain one record");
778        let Record::Envelope(initial_envelope) = initial_record.inner() else {
779            panic!("expected plaintext envelope record");
780        };
781        assert_eq!(initial_envelope.body().as_ref(), b"initial");
782
783        let second = session
784            .as_mut()
785            .next()
786            .await
787            .expect("session should enter follow mode")
788            .expect("session should not error");
789        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
790
791        tokio::time::advance(Duration::from_millis(20)).await;
792        tokio::task::yield_now().await;
793
794        let follow_input =
795            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
796        backend
797            .open_for_append(&basin, &stream, None)
798            .await
799            .unwrap()
800            .append(follow_input)
801            .await
802            .unwrap();
803
804        let follow = session
805            .as_mut()
806            .next()
807            .await
808            .expect("session should deliver the live batch")
809            .expect("session should not error");
810        let reset_at = Instant::now();
811        let ReadSessionOutput::Batch(batch) = follow else {
812            panic!("expected live batch after append");
813        };
814        let follow_record = batch
815            .records
816            .first()
817            .expect("batch should contain one record");
818        let Record::Envelope(follow_envelope) = follow_record.inner() else {
819            panic!("expected plaintext envelope record");
820        };
821        assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
822
823        tokio::time::advance(wait - probe_step).await;
824        tokio::task::yield_now().await;
825
826        loop {
827            match futures::poll!(session.as_mut().next()) {
828                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
829                Poll::Ready(Some(Ok(output))) => {
830                    panic!("unexpected output before the reset wait deadline: {output:?}");
831                }
832                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
833                Poll::Ready(None) => {
834                    panic!("session closed before the reset wait budget expired");
835                }
836                Poll::Pending => break,
837            }
838        }
839
840        tokio::time::advance(probe_step).await;
841        tokio::task::yield_now().await;
842
843        let closed_at = loop {
844            match futures::poll!(session.as_mut().next()) {
845                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
846                Poll::Ready(Some(Ok(output))) => {
847                    panic!("unexpected output after the reset wait deadline: {output:?}");
848                }
849                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
850                Poll::Ready(None) => break Instant::now(),
851                Poll::Pending => {
852                    panic!("session should close once the reset wait budget expires");
853                }
854            }
855        };
856
857        assert!(closed_at >= reset_at + wait);
858        assert!(closed_at <= reset_at + wait + probe_step);
859    }
860
861    #[tokio::test(flavor = "current_thread", start_paused = true)]
862    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
863        let object_store = Arc::new(InMemory::new());
864        let db = Db::builder("/test", object_store).build().await.unwrap();
865        let backend = Backend::new(db, ByteSize::mib(10));
866
867        let basin: BasinName = "test-basin".parse().unwrap();
868        backend
869            .create_basin(
870                basin.clone(),
871                BasinConfig::default(),
872                CreateMode::CreateOnly(None),
873            )
874            .await
875            .unwrap();
876        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
877        backend
878            .create_stream(
879                basin.clone(),
880                stream.clone(),
881                OptionalStreamConfig::default(),
882                CreateMode::CreateOnly(None),
883            )
884            .await
885            .unwrap();
886
887        let wait = Duration::from_secs(30);
888        let start = ReadStart {
889            from: ReadFrom::SeqNum(0),
890            clamp: false,
891        };
892        let end = ReadEnd {
893            limit: ReadLimit::Unbounded,
894            until: ReadUntil::Unbounded,
895            wait: Some(wait),
896        };
897        let session = backend
898            .open_for_read(&basin, &stream, None)
899            .await
900            .unwrap()
901            .read(start, end)
902            .await
903            .unwrap();
904        let mut session = Box::pin(session);
905
906        let first = session
907            .as_mut()
908            .next()
909            .await
910            .expect("session should enter follow mode")
911            .expect("session should not error");
912        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
913
914        let stream_id = StreamId::new(&basin, &stream);
915        let mut delete_batch = WriteBatch::new();
916        let lagged_appends = FOLLOWER_MAX_LAG + 25;
917
918        for i in 0..lagged_appends {
919            let input = append_input(
920                Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
921            );
922            let ack = backend
923                .open_for_append(&basin, &stream, None)
924                .await
925                .unwrap()
926                .append(input)
927                .await
928                .unwrap();
929            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
930        }
931
932        static WRITE_OPTS: WriteOptions = WriteOptions {
933            await_durable: true,
934        };
935        backend
936            .db
937            .write_with_options(delete_batch, &WRITE_OPTS)
938            .await
939            .unwrap();
940
941        tokio::time::advance(wait + Duration::from_secs(1)).await;
942        tokio::task::yield_now().await;
943
944        let next = session.as_mut().next().await;
945        assert!(
946            next.is_none(),
947            "session should close immediately once the original wait budget has elapsed"
948        );
949    }
950
951    #[tokio::test(flavor = "current_thread", start_paused = true)]
952    async fn unbounded_follow_survives_streamer_dormancy() {
953        let object_store = Arc::new(InMemory::new());
954        let db = Db::builder("/test", object_store).build().await.unwrap();
955        let backend = Backend::new(db, ByteSize::mib(10));
956
957        let basin: BasinName = "test-basin".parse().unwrap();
958        backend
959            .create_basin(
960                basin.clone(),
961                BasinConfig::default(),
962                CreateMode::CreateOnly(None),
963            )
964            .await
965            .unwrap();
966        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
967        backend
968            .create_stream(
969                basin.clone(),
970                stream.clone(),
971                OptionalStreamConfig::default(),
972                CreateMode::CreateOnly(None),
973            )
974            .await
975            .unwrap();
976
977        let initial_input =
978            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
979        backend
980            .open_for_append(&basin, &stream, None)
981            .await
982            .unwrap()
983            .append(initial_input)
984            .await
985            .unwrap();
986
987        let start = ReadStart {
988            from: ReadFrom::SeqNum(0),
989            clamp: false,
990        };
991        let end = ReadEnd {
992            limit: ReadLimit::Unbounded,
993            until: ReadUntil::Unbounded,
994            wait: None,
995        };
996        let session = backend
997            .open_for_read(&basin, &stream, None)
998            .await
999            .unwrap()
1000            .read(start, end)
1001            .await
1002            .unwrap();
1003        let mut session = Box::pin(session);
1004
1005        let first = session
1006            .as_mut()
1007            .next()
1008            .await
1009            .expect("session should yield initial batch")
1010            .expect("session should not error");
1011        assert!(matches!(first, ReadSessionOutput::Batch(_)));
1012
1013        let second = session
1014            .as_mut()
1015            .next()
1016            .await
1017            .expect("session should enter follow mode")
1018            .expect("session should not error");
1019        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1020
1021        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1022        tokio::task::yield_now().await;
1023
1024        let follow_input =
1025            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1026        backend
1027            .open_for_append(&basin, &stream, None)
1028            .await
1029            .unwrap()
1030            .append(follow_input)
1031            .await
1032            .unwrap();
1033
1034        let next = session
1035            .as_mut()
1036            .next()
1037            .await
1038            .expect("session should stay open after dormancy")
1039            .expect("session should not error after dormancy");
1040        let ReadSessionOutput::Batch(batch) = next else {
1041            panic!("expected new batch after append");
1042        };
1043        assert_eq!(batch.records.len(), 1);
1044        let record = batch.records.first().expect("batch should have one record");
1045        let Record::Envelope(envelope) = record.inner() else {
1046            panic!("expected envelope record");
1047        };
1048        assert_eq!(envelope.body().as_ref(), b"follow-1");
1049    }
1050}