Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5    caps,
6    encryption::{EncryptionKey, EncryptionSpec},
7    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8    record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9    types::{
10        basin::BasinName,
11        stream::{
12            ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13            StoredReadSessionOutput, StreamName,
14        },
15    },
16};
17use slatedb::{
18    IterationOrder,
19    config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25    backend::{
26        error::{
27            CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28        },
29        kv,
30        streamer::GuardedStreamerClient,
31    },
32    stream_id::StreamId,
33};
34
35impl Backend {
36    pub async fn open_for_check_tail(
37        &self,
38        basin: &BasinName,
39        stream: &StreamName,
40    ) -> Result<StreamHandle, CheckTailError> {
41        self.stream_handle_with_auto_create::<CheckTailError>(
42            basin,
43            stream,
44            |config| config.create_stream_on_read,
45            |_| Ok(EncryptionSpec::Plain),
46        )
47        .await
48    }
49
50    pub async fn open_for_read(
51        &self,
52        basin: &BasinName,
53        stream: &StreamName,
54        encryption_key: Option<EncryptionKey>,
55    ) -> Result<StreamHandle, ReadError> {
56        self.stream_handle_with_auto_create::<ReadError>(
57            basin,
58            stream,
59            |config| config.create_stream_on_read,
60            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61        )
62        .await
63    }
64}
65
66impl StreamHandle {
67    pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68        let tail = self.client.check_tail().await?;
69        Ok(tail)
70    }
71
72    pub async fn read(
73        self,
74        start: ReadStart,
75        end: ReadEnd,
76    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77        let stream_id = self.client.stream_id();
78        let session = read_session(self.db, self.client, start, end).await?;
79        Ok(async_stream::stream! {
80            tokio::pin!(session);
81            while let Some(output) = session.next().await {
82                let output = match output {
83                    Ok(output) => output
84                        .decrypt(&self.encryption, stream_id.as_bytes())
85                        .map_err(ReadError::from),
86                    Err(err) => Err(err),
87                };
88                let should_stop = output.is_err();
89                yield output;
90                if should_stop {
91                    break;
92                }
93            }
94        })
95    }
96}
97
98async fn read_session(
99    db: slatedb::Db,
100    client: GuardedStreamerClient,
101    start: ReadStart,
102    end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104    let stream_id = client.stream_id();
105    let tail = client.check_tail().await?;
106    let mut state = ReadSessionState {
107        start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108        limit: EvaluatedReadLimit::Remaining(end.limit),
109        until: end.until,
110        wait: end.wait,
111        wait_deadline: None,
112        tail,
113    };
114    let session = async_stream::try_stream! {
115        'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116            if state.start_seq_num < state.tail.seq_num {
117                let start_key = kv::stream_record_data::ser_key(
118                    stream_id,
119                    StreamPosition {
120                        seq_num: state.start_seq_num,
121                        timestamp: 0,
122                    },
123                );
124                let end_key = kv::stream_record_data::ser_key(
125                    stream_id,
126                    StreamPosition {
127                        seq_num: state.tail.seq_num,
128                        timestamp: 0,
129                    },
130                );
131                static SCAN_OPTS: ScanOptions = ScanOptions {
132                    durability_filter: DurabilityLevel::Remote,
133                    dirty: false,
134                    read_ahead_bytes: 1024 * 1024,
135                    cache_blocks: true,
136                    max_fetch_tasks: 8,
137                    order: IterationOrder::Ascending,
138                };
139                let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141                let mut records = Metered::with_capacity(
142                    limit.count()
143                        .unwrap_or(usize::MAX)
144                        .min(caps::RECORD_BATCH_MAX.count),
145                );
146
147                while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148                    let Some(kv) = it.next().await? else {
149                        break;
150                    };
151                    let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152                    assert_eq!(deser_stream_id, stream_id);
153
154                    let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156                    if end.until.deny(pos.timestamp)
157                        || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158                    {
159                        if records.is_empty() {
160                            break 'session;
161                        } else {
162                            break;
163                        }
164                    }
165
166                    if records.len() == caps::RECORD_BATCH_MAX.count
167                        || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168                    {
169                        let new_records_buf = Metered::with_capacity(
170                            limit.count()
171                                .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172                                .min(caps::RECORD_BATCH_MAX.count),
173                        );
174                        yield state.on_batch(StoredReadBatch {
175                            records: std::mem::replace(&mut records, new_records_buf),
176                            tail: None,
177                        });
178                    }
179
180                    records.push(record);
181                }
182
183                if !records.is_empty() {
184                    yield state.on_batch(StoredReadBatch {
185                        records,
186                        tail: None,
187                    });
188                } else {
189                    state.start_seq_num = state.tail.seq_num;
190                }
191            } else {
192                assert_eq!(state.start_seq_num, state.tail.seq_num);
193                if !end.may_follow() {
194                    break;
195                }
196                match client.follow(state.start_seq_num).await? {
197                    Ok(mut follow_rx) => {
198                        // Only a delivered batch should reset the absolute wait budget.
199                        state.arm_wait_deadline_if_unset();
200                        if state.wait_deadline_expired() {
201                            break;
202                        }
203                        yield StoredReadSessionOutput::Heartbeat(state.tail);
204                        while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205                            tokio::select! {
206                                biased;
207                                msg = follow_rx.recv() => {
208                                    match msg {
209                                        Ok(mut records) => {
210                                            let count = records.len();
211                                            let tail = super::streamer::next_pos(&records);
212                                            let allowed_count = count_allowed_records(limit, end.until, &records);
213                                            if allowed_count > 0 {
214                                                yield state.on_batch(StoredReadBatch {
215                                                    records: records.drain(..allowed_count).collect(),
216                                                    tail: Some(tail),
217                                                });
218                                            }
219                                            if allowed_count < count {
220                                                break 'session;
221                                            }
222                                            Ok(())
223                                        }
224                                        Err(broadcast::error::RecvError::Lagged(_)) => {
225                                            // Catch up using DB
226                                            continue 'session;
227                                        }
228                                        Err(broadcast::error::RecvError::Closed) => {
229                                            Err(StreamerMissingInActionError)
230                                        }
231                                    }
232                                }
233                                _ = new_heartbeat_sleep() => {
234                                    yield StoredReadSessionOutput::Heartbeat(state.tail);
235                                    Ok(())
236                                }
237                                _ = wait_sleep_until(state.wait_deadline) => {
238                                    break 'session;
239                                }
240                            }?;
241                        }
242                    }
243                    Err(tail) => {
244                        assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245                        state.tail = tail;
246                    }
247                }
248            }
249        }
250    };
251    Ok(session)
252}
253
254async fn read_start_seq_num(
255    db: &slatedb::Db,
256    stream_id: StreamId,
257    start: ReadStart,
258    end: ReadEnd,
259    tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261    let mut read_pos = match start.from {
262        s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263        s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264            ReadPosition::Timestamp(timestamp)
265        }
266        s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267            ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268        }
269    };
270    if match read_pos {
271        ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272        ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273    } {
274        if start.clamp {
275            read_pos = ReadPosition::SeqNum(tail.seq_num);
276        } else {
277            return Err(UnwrittenError(tail).into());
278        }
279    }
280    if let ReadPosition::SeqNum(start_seq_num) = read_pos
281        && start_seq_num == tail.seq_num
282        && !end.may_follow()
283    {
284        return Err(UnwrittenError(tail).into());
285    }
286    Ok(match read_pos {
287        ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288        ReadPosition::Timestamp(start_timestamp) => {
289            resolve_timestamp(db, stream_id, start_timestamp)
290                .await?
291                .unwrap_or(tail)
292                .seq_num
293        }
294    })
295}
296
297async fn resolve_timestamp(
298    db: &slatedb::Db,
299    stream_id: StreamId,
300    timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302    let start_key = kv::stream_record_timestamp::ser_key(
303        stream_id,
304        StreamPosition {
305            seq_num: SeqNum::MIN,
306            timestamp,
307        },
308    );
309    let end_key = kv::stream_record_timestamp::ser_key(
310        stream_id,
311        StreamPosition {
312            seq_num: SeqNum::MAX,
313            timestamp: Timestamp::MAX,
314        },
315    );
316    static SCAN_OPTS: ScanOptions = ScanOptions {
317        durability_filter: DurabilityLevel::Remote,
318        dirty: false,
319        read_ahead_bytes: 1,
320        cache_blocks: false,
321        max_fetch_tasks: 1,
322        order: IterationOrder::Ascending,
323    };
324    let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325    Ok(match it.next().await? {
326        Some(kv) => {
327            let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328            assert_eq!(deser_stream_id, stream_id);
329            assert!(pos.timestamp >= timestamp);
330            kv::stream_record_timestamp::deser_value(kv.value)?;
331            Some(StreamPosition {
332                seq_num: pos.seq_num,
333                timestamp: pos.timestamp,
334            })
335        }
336        None => None,
337    })
338}
339
340struct ReadSessionState {
341    start_seq_num: u64,
342    limit: EvaluatedReadLimit,
343    until: ReadUntil,
344    wait: Option<Duration>,
345    wait_deadline: Option<Instant>,
346    tail: StreamPosition,
347}
348
349impl ReadSessionState {
350    fn arm_wait_deadline_if_unset(&mut self) {
351        if self.wait_deadline.is_none() {
352            self.reset_wait_deadline();
353        }
354    }
355
356    fn reset_wait_deadline(&mut self) {
357        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358    }
359
360    fn wait_deadline_expired(&self) -> bool {
361        self.wait_deadline
362            .is_some_and(|deadline| deadline <= Instant::now())
363    }
364
365    fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366        if let Some(tail) = batch.tail {
367            self.tail = tail;
368        }
369        let last_record = batch.records.last().expect("non-empty");
370        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371            panic!("batch after exhausted limit");
372        };
373        let count = batch.records.len();
374        let bytes = batch.records.metered_size();
375        let last_position = last_record.position();
376        assert!(limit.allow(count, bytes));
377        assert!(self.until.allow(last_position.timestamp));
378        self.start_seq_num = last_position.seq_num + 1;
379        self.limit = limit.remaining(count, bytes);
380        self.reset_wait_deadline();
381        StoredReadSessionOutput::Batch(batch)
382    }
383}
384
385fn count_allowed_records(
386    limit: ReadLimit,
387    until: ReadUntil,
388    records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390    let mut acc_size = 0;
391    let mut acc_count = 0;
392    for record in records {
393        if limit.deny(acc_count + 1, acc_size + record.metered_size())
394            || until.deny(record.position().timestamp)
395        {
396            break;
397        }
398        acc_count += 1;
399        acc_size += record.metered_size();
400    }
401    acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415    match deadline {
416        Some(deadline) => tokio::time::sleep_until(deadline).await,
417        None => {
418            std::future::pending::<()>().await;
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::{sync::Arc, task::Poll};
426
427    use bytesize::ByteSize;
428    use futures::StreamExt;
429    use s2_common::{
430        read_extent::{ReadLimit, ReadUntil},
431        record::{Metered, Record},
432        types::{
433            basin::{BasinName, CreateBasinIntent},
434            config::{BasinConfig, OptionalStreamConfig},
435            stream::{
436                AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts,
437                CreateStreamIntent, ReadEnd, ReadFrom, ReadSessionOutput, ReadStart,
438            },
439        },
440    };
441    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
442    use tokio::time::Instant;
443
444    use super::*;
445    use crate::{
446        backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
447        stream_id::StreamId,
448    };
449
450    fn append_input(record: Record) -> AppendInput {
451        let record: AppendRecord = AppendRecordParts {
452            timestamp: None,
453            record: Metered::from(record),
454        }
455        .try_into()
456        .unwrap();
457        let records: AppendRecordBatch = vec![record].try_into().unwrap();
458        AppendInput {
459            records,
460            match_seq_num: None,
461            fencing_token: None,
462        }
463    }
464
465    fn map_test_output(
466        output: Option<Result<ReadSessionOutput, ReadError>>,
467    ) -> Option<ReadSessionOutput> {
468        match output {
469            Some(Ok(output)) => Some(output),
470            Some(Err(e)) => panic!("Read error: {e:?}"),
471            None => None,
472        }
473    }
474
475    async fn poll_next_after_advance<S>(
476        session: &mut std::pin::Pin<Box<S>>,
477        advance_by: Duration,
478    ) -> Poll<Option<ReadSessionOutput>>
479    where
480        S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
481    {
482        let mut pinned_session = session.as_mut();
483        let next = pinned_session.next();
484        tokio::pin!(next);
485
486        assert!(
487            matches!(futures::poll!(&mut next), Poll::Pending),
488            "session unexpectedly yielded before time advanced"
489        );
490
491        tokio::time::advance(advance_by).await;
492        tokio::task::yield_now().await;
493
494        match futures::poll!(&mut next) {
495            Poll::Ready(output) => Poll::Ready(map_test_output(output)),
496            Poll::Pending => Poll::Pending,
497        }
498    }
499
500    #[tokio::test]
501    async fn resolve_timestamp_bounded_to_stream() {
502        let object_store = Arc::new(InMemory::new());
503        let db = Db::builder("/test", object_store).build().await.unwrap();
504        let backend = Backend::new(db, ByteSize::mib(10));
505
506        let stream_a: StreamId = [0u8; 32].into();
507        let stream_b: StreamId = [1u8; 32].into();
508
509        backend
510            .db
511            .put(
512                kv::stream_record_timestamp::ser_key(
513                    stream_a,
514                    StreamPosition {
515                        seq_num: 0,
516                        timestamp: 1000,
517                    },
518                ),
519                kv::stream_record_timestamp::ser_value(),
520            )
521            .await
522            .unwrap();
523        backend
524            .db
525            .put(
526                kv::stream_record_timestamp::ser_key(
527                    stream_b,
528                    StreamPosition {
529                        seq_num: 0,
530                        timestamp: 2000,
531                    },
532                ),
533                kv::stream_record_timestamp::ser_value(),
534            )
535            .await
536            .unwrap();
537
538        // Should find record in stream_a
539        let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
540        assert_eq!(
541            result,
542            Some(StreamPosition {
543                seq_num: 0,
544                timestamp: 1000
545            })
546        );
547
548        // Should return None, not find stream_b's record
549        let result = resolve_timestamp(&backend.db, stream_a, 1500)
550            .await
551            .unwrap();
552        assert_eq!(result, None);
553    }
554
555    #[tokio::test]
556    async fn read_completes_when_all_records_deleted() {
557        let object_store = Arc::new(InMemory::new());
558        let db = Db::builder("/test", object_store).build().await.unwrap();
559        let backend = Backend::new(db, ByteSize::mib(10));
560
561        let basin: BasinName = "test-basin".parse().unwrap();
562        backend
563            .create_basin(
564                basin.clone(),
565                CreateBasinIntent::CreateOnly {
566                    config: BasinConfig::default(),
567                    request_token: None,
568                },
569            )
570            .await
571            .unwrap();
572        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
573        backend
574            .create_stream(
575                basin.clone(),
576                stream.clone(),
577                CreateStreamIntent::CreateOnly {
578                    config: OptionalStreamConfig::default(),
579                    request_token: None,
580                },
581            )
582            .await
583            .unwrap();
584
585        let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
586        let ack = backend
587            .open_for_append(&basin, &stream, None)
588            .await
589            .unwrap()
590            .append(input)
591            .await
592            .unwrap();
593        assert!(ack.end.seq_num > 0);
594
595        let stream_id = StreamId::new(&basin, &stream);
596        let mut batch = WriteBatch::new();
597        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
598        static WRITE_OPTS: WriteOptions = WriteOptions {
599            await_durable: true,
600        };
601        backend
602            .db
603            .write_with_options(batch, &WRITE_OPTS)
604            .await
605            .unwrap();
606
607        let start = ReadStart {
608            from: ReadFrom::SeqNum(0),
609            clamp: false,
610        };
611        let end = ReadEnd {
612            limit: ReadLimit::Count(10),
613            until: ReadUntil::Unbounded,
614            wait: None,
615        };
616        let session = backend
617            .open_for_read(&basin, &stream, None)
618            .await
619            .unwrap()
620            .read(start, end)
621            .await
622            .unwrap();
623        let records: Vec<_> = tokio::time::timeout(
624            Duration::from_secs(2),
625            futures::StreamExt::collect::<Vec<_>>(session),
626        )
627        .await
628        .expect("read should not spin forever");
629        assert!(records.into_iter().all(|r| r.is_ok()));
630    }
631
632    #[tokio::test(flavor = "current_thread", start_paused = true)]
633    async fn read_wait_is_not_extended_by_heartbeats() {
634        let object_store = Arc::new(InMemory::new());
635        let db = Db::builder("/test", object_store).build().await.unwrap();
636        let backend = Backend::new(db, ByteSize::mib(10));
637
638        let basin: BasinName = "test-basin".parse().unwrap();
639        backend
640            .create_basin(
641                basin.clone(),
642                CreateBasinIntent::CreateOnly {
643                    config: BasinConfig::default(),
644                    request_token: None,
645                },
646            )
647            .await
648            .unwrap();
649        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
650        backend
651            .create_stream(
652                basin.clone(),
653                stream.clone(),
654                CreateStreamIntent::CreateOnly {
655                    config: OptionalStreamConfig::default(),
656                    request_token: None,
657                },
658            )
659            .await
660            .unwrap();
661
662        let wait = Duration::from_millis(30);
663        let start = ReadStart {
664            from: ReadFrom::SeqNum(0),
665            clamp: false,
666        };
667        let end = ReadEnd {
668            limit: ReadLimit::Unbounded,
669            until: ReadUntil::Unbounded,
670            wait: Some(wait),
671        };
672
673        let session = backend
674            .open_for_read(&basin, &stream, None)
675            .await
676            .unwrap()
677            .read(start, end)
678            .await
679            .unwrap();
680        let mut session = Box::pin(session);
681        let probe_step = Duration::from_millis(1);
682        let first = session
683            .as_mut()
684            .next()
685            .await
686            .expect("session should enter follow mode")
687            .expect("session should not error");
688        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
689
690        let started = Instant::now();
691        let second = match poll_next_after_advance(&mut session, wait).await {
692            Poll::Ready(Some(output)) => output,
693            Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
694            Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
695        };
696        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
697
698        tokio::task::yield_now().await;
699        let closed_at = loop {
700            match futures::poll!(session.as_mut().next()) {
701                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
702                Poll::Ready(Some(Ok(output))) => {
703                    panic!("unexpected output after wait deadline: {output:?}");
704                }
705                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
706                Poll::Ready(None) => break Instant::now(),
707                Poll::Pending => panic!("session should close once the wait budget expires"),
708            }
709        };
710
711        assert!(closed_at >= started + wait);
712        assert!(closed_at <= started + wait + probe_step);
713    }
714
715    #[tokio::test(flavor = "current_thread", start_paused = true)]
716    async fn read_wait_is_reset_by_delivered_follow_batch() {
717        let object_store = Arc::new(InMemory::new());
718        let db = Db::builder("/test", object_store).build().await.unwrap();
719        let backend = Backend::new(db, ByteSize::mib(10));
720
721        let basin: BasinName = "test-basin".parse().unwrap();
722        backend
723            .create_basin(
724                basin.clone(),
725                CreateBasinIntent::CreateOnly {
726                    config: BasinConfig::default(),
727                    request_token: None,
728                },
729            )
730            .await
731            .unwrap();
732        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
733        backend
734            .create_stream(
735                basin.clone(),
736                stream.clone(),
737                CreateStreamIntent::CreateOnly {
738                    config: OptionalStreamConfig::default(),
739                    request_token: None,
740                },
741            )
742            .await
743            .unwrap();
744
745        let initial_input =
746            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
747        backend
748            .open_for_append(&basin, &stream, None)
749            .await
750            .unwrap()
751            .append(initial_input)
752            .await
753            .unwrap();
754
755        let wait = Duration::from_millis(30);
756        let probe_step = Duration::from_millis(1);
757        let start = ReadStart {
758            from: ReadFrom::SeqNum(0),
759            clamp: false,
760        };
761        let end = ReadEnd {
762            limit: ReadLimit::Unbounded,
763            until: ReadUntil::Unbounded,
764            wait: Some(wait),
765        };
766
767        let session = backend
768            .open_for_read(&basin, &stream, None)
769            .await
770            .unwrap()
771            .read(start, end)
772            .await
773            .unwrap();
774        let mut session = Box::pin(session);
775
776        let first = session
777            .as_mut()
778            .next()
779            .await
780            .expect("session should yield the initial batch")
781            .expect("session should not error");
782        let ReadSessionOutput::Batch(batch) = first else {
783            panic!("expected initial batch");
784        };
785        let initial_record = batch
786            .records
787            .first()
788            .expect("batch should contain one record");
789        let Record::Envelope(initial_envelope) = initial_record.inner() else {
790            panic!("expected plaintext envelope record");
791        };
792        assert_eq!(initial_envelope.body().as_ref(), b"initial");
793
794        let second = session
795            .as_mut()
796            .next()
797            .await
798            .expect("session should enter follow mode")
799            .expect("session should not error");
800        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
801
802        tokio::time::advance(Duration::from_millis(20)).await;
803        tokio::task::yield_now().await;
804
805        let follow_input =
806            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
807        backend
808            .open_for_append(&basin, &stream, None)
809            .await
810            .unwrap()
811            .append(follow_input)
812            .await
813            .unwrap();
814
815        let follow = session
816            .as_mut()
817            .next()
818            .await
819            .expect("session should deliver the live batch")
820            .expect("session should not error");
821        let reset_at = Instant::now();
822        let ReadSessionOutput::Batch(batch) = follow else {
823            panic!("expected live batch after append");
824        };
825        let follow_record = batch
826            .records
827            .first()
828            .expect("batch should contain one record");
829        let Record::Envelope(follow_envelope) = follow_record.inner() else {
830            panic!("expected plaintext envelope record");
831        };
832        assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
833
834        tokio::time::advance(wait - probe_step).await;
835        tokio::task::yield_now().await;
836
837        loop {
838            match futures::poll!(session.as_mut().next()) {
839                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
840                Poll::Ready(Some(Ok(output))) => {
841                    panic!("unexpected output before the reset wait deadline: {output:?}");
842                }
843                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
844                Poll::Ready(None) => {
845                    panic!("session closed before the reset wait budget expired");
846                }
847                Poll::Pending => break,
848            }
849        }
850
851        tokio::time::advance(probe_step).await;
852        tokio::task::yield_now().await;
853
854        let closed_at = loop {
855            match futures::poll!(session.as_mut().next()) {
856                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
857                Poll::Ready(Some(Ok(output))) => {
858                    panic!("unexpected output after the reset wait deadline: {output:?}");
859                }
860                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
861                Poll::Ready(None) => break Instant::now(),
862                Poll::Pending => {
863                    panic!("session should close once the reset wait budget expires");
864                }
865            }
866        };
867
868        assert!(closed_at >= reset_at + wait);
869        assert!(closed_at <= reset_at + wait + probe_step);
870    }
871
872    #[tokio::test(flavor = "current_thread", start_paused = true)]
873    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
874        let object_store = Arc::new(InMemory::new());
875        let db = Db::builder("/test", object_store).build().await.unwrap();
876        let backend = Backend::new(db, ByteSize::mib(10));
877
878        let basin: BasinName = "test-basin".parse().unwrap();
879        backend
880            .create_basin(
881                basin.clone(),
882                CreateBasinIntent::CreateOnly {
883                    config: BasinConfig::default(),
884                    request_token: None,
885                },
886            )
887            .await
888            .unwrap();
889        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
890        backend
891            .create_stream(
892                basin.clone(),
893                stream.clone(),
894                CreateStreamIntent::CreateOnly {
895                    config: OptionalStreamConfig::default(),
896                    request_token: None,
897                },
898            )
899            .await
900            .unwrap();
901
902        let wait = Duration::from_secs(30);
903        let start = ReadStart {
904            from: ReadFrom::SeqNum(0),
905            clamp: false,
906        };
907        let end = ReadEnd {
908            limit: ReadLimit::Unbounded,
909            until: ReadUntil::Unbounded,
910            wait: Some(wait),
911        };
912        let session = backend
913            .open_for_read(&basin, &stream, None)
914            .await
915            .unwrap()
916            .read(start, end)
917            .await
918            .unwrap();
919        let mut session = Box::pin(session);
920
921        let first = session
922            .as_mut()
923            .next()
924            .await
925            .expect("session should enter follow mode")
926            .expect("session should not error");
927        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
928
929        let stream_id = StreamId::new(&basin, &stream);
930        let mut delete_batch = WriteBatch::new();
931        let lagged_appends = FOLLOWER_MAX_LAG + 25;
932
933        for i in 0..lagged_appends {
934            let input = append_input(
935                Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
936            );
937            let ack = backend
938                .open_for_append(&basin, &stream, None)
939                .await
940                .unwrap()
941                .append(input)
942                .await
943                .unwrap();
944            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
945        }
946
947        static WRITE_OPTS: WriteOptions = WriteOptions {
948            await_durable: true,
949        };
950        backend
951            .db
952            .write_with_options(delete_batch, &WRITE_OPTS)
953            .await
954            .unwrap();
955
956        tokio::time::advance(wait + Duration::from_secs(1)).await;
957        tokio::task::yield_now().await;
958
959        let next = session.as_mut().next().await;
960        assert!(
961            next.is_none(),
962            "session should close immediately once the original wait budget has elapsed"
963        );
964    }
965
966    #[tokio::test(flavor = "current_thread", start_paused = true)]
967    async fn unbounded_follow_survives_streamer_dormancy() {
968        let object_store = Arc::new(InMemory::new());
969        let db = Db::builder("/test", object_store).build().await.unwrap();
970        let backend = Backend::new(db, ByteSize::mib(10));
971
972        let basin: BasinName = "test-basin".parse().unwrap();
973        backend
974            .create_basin(
975                basin.clone(),
976                CreateBasinIntent::CreateOnly {
977                    config: BasinConfig::default(),
978                    request_token: None,
979                },
980            )
981            .await
982            .unwrap();
983        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
984        backend
985            .create_stream(
986                basin.clone(),
987                stream.clone(),
988                CreateStreamIntent::CreateOnly {
989                    config: OptionalStreamConfig::default(),
990                    request_token: None,
991                },
992            )
993            .await
994            .unwrap();
995
996        let initial_input =
997            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
998        backend
999            .open_for_append(&basin, &stream, None)
1000            .await
1001            .unwrap()
1002            .append(initial_input)
1003            .await
1004            .unwrap();
1005
1006        let start = ReadStart {
1007            from: ReadFrom::SeqNum(0),
1008            clamp: false,
1009        };
1010        let end = ReadEnd {
1011            limit: ReadLimit::Unbounded,
1012            until: ReadUntil::Unbounded,
1013            wait: None,
1014        };
1015        let session = backend
1016            .open_for_read(&basin, &stream, None)
1017            .await
1018            .unwrap()
1019            .read(start, end)
1020            .await
1021            .unwrap();
1022        let mut session = Box::pin(session);
1023
1024        let first = session
1025            .as_mut()
1026            .next()
1027            .await
1028            .expect("session should yield initial batch")
1029            .expect("session should not error");
1030        assert!(matches!(first, ReadSessionOutput::Batch(_)));
1031
1032        let second = session
1033            .as_mut()
1034            .next()
1035            .await
1036            .expect("session should enter follow mode")
1037            .expect("session should not error");
1038        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1039
1040        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1041        tokio::task::yield_now().await;
1042
1043        let follow_input =
1044            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1045        backend
1046            .open_for_append(&basin, &stream, None)
1047            .await
1048            .unwrap()
1049            .append(follow_input)
1050            .await
1051            .unwrap();
1052
1053        let next = session
1054            .as_mut()
1055            .next()
1056            .await
1057            .expect("session should stay open after dormancy")
1058            .expect("session should not error after dormancy");
1059        let ReadSessionOutput::Batch(batch) = next else {
1060            panic!("expected new batch after append");
1061        };
1062        assert_eq!(batch.records.len(), 1);
1063        let record = batch.records.first().expect("batch should have one record");
1064        let Record::Envelope(envelope) = record.inner() else {
1065            panic!("expected envelope record");
1066        };
1067        assert_eq!(envelope.body().as_ref(), b"follow-1");
1068    }
1069}