Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5    caps,
6    encryption::{EncryptionKey, EncryptionSpec},
7    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8    record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9    types::{
10        basin::BasinName,
11        stream::{
12            ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13            StoredReadSessionOutput, StreamName,
14        },
15    },
16};
17use slatedb::config::{DurabilityLevel, ScanOptions};
18use tokio::{sync::broadcast, time::Instant};
19
20use super::{Backend, StreamHandle};
21use crate::{
22    backend::{
23        error::{
24            CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
25        },
26        kv,
27        streamer::GuardedStreamerClient,
28    },
29    stream_id::StreamId,
30};
31
32impl Backend {
33    pub async fn open_for_check_tail(
34        &self,
35        basin: &BasinName,
36        stream: &StreamName,
37    ) -> Result<StreamHandle, CheckTailError> {
38        self.stream_handle_with_auto_create::<CheckTailError>(
39            basin,
40            stream,
41            |config| config.create_stream_on_read,
42            |_| Ok(EncryptionSpec::Plain),
43        )
44        .await
45    }
46
47    pub async fn open_for_read(
48        &self,
49        basin: &BasinName,
50        stream: &StreamName,
51        encryption_key: Option<EncryptionKey>,
52    ) -> Result<StreamHandle, ReadError> {
53        self.stream_handle_with_auto_create::<ReadError>(
54            basin,
55            stream,
56            |config| config.create_stream_on_read,
57            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
58        )
59        .await
60    }
61}
62
63impl StreamHandle {
64    pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
65        let tail = self.client.check_tail().await?;
66        Ok(tail)
67    }
68
69    pub async fn read(
70        self,
71        start: ReadStart,
72        end: ReadEnd,
73    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
74        let stream_id = self.client.stream_id();
75        let session = read_session(self.db, self.client, start, end).await?;
76        Ok(async_stream::stream! {
77            tokio::pin!(session);
78            while let Some(output) = session.next().await {
79                let output = match output {
80                    Ok(output) => output
81                        .decrypt(&self.encryption, stream_id.as_bytes())
82                        .map_err(ReadError::from),
83                    Err(err) => Err(err),
84                };
85                let should_stop = output.is_err();
86                yield output;
87                if should_stop {
88                    break;
89                }
90            }
91        })
92    }
93}
94
95async fn read_session(
96    db: slatedb::Db,
97    client: GuardedStreamerClient,
98    start: ReadStart,
99    end: ReadEnd,
100) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
101    let stream_id = client.stream_id();
102    let tail = client.check_tail().await?;
103    let mut state = ReadSessionState {
104        start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
105        limit: EvaluatedReadLimit::Remaining(end.limit),
106        until: end.until,
107        wait: end.wait,
108        wait_deadline: None,
109        tail,
110    };
111    let session = async_stream::try_stream! {
112        'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
113            if state.start_seq_num < state.tail.seq_num {
114                let start_key = kv::stream_record_data::ser_key(
115                    stream_id,
116                    StreamPosition {
117                        seq_num: state.start_seq_num,
118                        timestamp: 0,
119                    },
120                );
121                let end_key = kv::stream_record_data::ser_key(
122                    stream_id,
123                    StreamPosition {
124                        seq_num: state.tail.seq_num,
125                        timestamp: 0,
126                    },
127                );
128                let scan_opts = ScanOptions {
129                    durability_filter: DurabilityLevel::Remote,
130                    read_ahead_bytes: 1024 * 1024,
131                    cache_blocks: true,
132                    max_fetch_tasks: 8,
133                    ..Default::default()
134                };
135                let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
136
137                let mut records = Metered::with_capacity(
138                    limit.count()
139                        .unwrap_or(usize::MAX)
140                        .min(caps::RECORD_BATCH_MAX.count),
141                );
142
143                while let EvaluatedReadLimit::Remaining(limit) = state.limit {
144                    let Some(kv) = it.next().await? else {
145                        break;
146                    };
147                    let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
148                    assert_eq!(deser_stream_id, stream_id);
149
150                    let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
151
152                    if end.until.deny(pos.timestamp)
153                        || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
154                    {
155                        if records.is_empty() {
156                            break 'session;
157                        } else {
158                            break;
159                        }
160                    }
161
162                    if records.len() == caps::RECORD_BATCH_MAX.count
163                        || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
164                    {
165                        let new_records_buf = Metered::with_capacity(
166                            limit.count()
167                                .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
168                                .min(caps::RECORD_BATCH_MAX.count),
169                        );
170                        yield state.on_batch(StoredReadBatch {
171                            records: std::mem::replace(&mut records, new_records_buf),
172                            tail: None,
173                        });
174                    }
175
176                    records.push(record);
177                }
178
179                if !records.is_empty() {
180                    yield state.on_batch(StoredReadBatch {
181                        records,
182                        tail: None,
183                    });
184                } else {
185                    state.start_seq_num = state.tail.seq_num;
186                }
187            } else {
188                assert_eq!(state.start_seq_num, state.tail.seq_num);
189                if !end.may_follow() {
190                    break;
191                }
192                match client.follow(state.start_seq_num).await? {
193                    Ok(mut follow_rx) => {
194                        // Only a delivered batch should reset the absolute wait budget.
195                        state.arm_wait_deadline_if_unset();
196                        if state.wait_deadline_expired() {
197                            break;
198                        }
199                        yield StoredReadSessionOutput::Heartbeat(state.tail);
200                        while let EvaluatedReadLimit::Remaining(limit) = state.limit {
201                            tokio::select! {
202                                biased;
203                                msg = follow_rx.recv() => {
204                                    match msg {
205                                        Ok(mut records) => {
206                                            let count = records.len();
207                                            let tail = super::streamer::next_pos(&records);
208                                            let allowed_count = count_allowed_records(limit, end.until, &records);
209                                            if allowed_count > 0 {
210                                                yield state.on_batch(StoredReadBatch {
211                                                    records: records.drain(..allowed_count).collect(),
212                                                    tail: Some(tail),
213                                                });
214                                            }
215                                            if allowed_count < count {
216                                                break 'session;
217                                            }
218                                            Ok(())
219                                        }
220                                        Err(broadcast::error::RecvError::Lagged(_)) => {
221                                            // Catch up using DB
222                                            continue 'session;
223                                        }
224                                        Err(broadcast::error::RecvError::Closed) => {
225                                            Err(StreamerMissingInActionError)
226                                        }
227                                    }
228                                }
229                                _ = new_heartbeat_sleep() => {
230                                    yield StoredReadSessionOutput::Heartbeat(state.tail);
231                                    Ok(())
232                                }
233                                _ = wait_sleep_until(state.wait_deadline) => {
234                                    break 'session;
235                                }
236                            }?;
237                        }
238                    }
239                    Err(tail) => {
240                        assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
241                        state.tail = tail;
242                    }
243                }
244            }
245        }
246    };
247    Ok(session)
248}
249
250async fn read_start_seq_num(
251    db: &slatedb::Db,
252    stream_id: StreamId,
253    start: ReadStart,
254    end: ReadEnd,
255    tail: StreamPosition,
256) -> Result<SeqNum, ReadError> {
257    let mut read_pos = match start.from {
258        s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
259        s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
260            ReadPosition::Timestamp(timestamp)
261        }
262        s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
263            ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
264        }
265    };
266    if match read_pos {
267        ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
268        ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
269    } {
270        if start.clamp {
271            read_pos = ReadPosition::SeqNum(tail.seq_num);
272        } else {
273            return Err(UnwrittenError(tail).into());
274        }
275    }
276    if let ReadPosition::SeqNum(start_seq_num) = read_pos
277        && start_seq_num == tail.seq_num
278        && !end.may_follow()
279    {
280        return Err(UnwrittenError(tail).into());
281    }
282    Ok(match read_pos {
283        ReadPosition::SeqNum(start_seq_num) => start_seq_num,
284        ReadPosition::Timestamp(start_timestamp) => {
285            resolve_timestamp(db, stream_id, start_timestamp)
286                .await?
287                .unwrap_or(tail)
288                .seq_num
289        }
290    })
291}
292
293async fn resolve_timestamp(
294    db: &slatedb::Db,
295    stream_id: StreamId,
296    timestamp: Timestamp,
297) -> Result<Option<StreamPosition>, StorageError> {
298    let start_key = kv::stream_record_timestamp::ser_key(
299        stream_id,
300        StreamPosition {
301            seq_num: SeqNum::MIN,
302            timestamp,
303        },
304    );
305    let end_key = kv::stream_record_timestamp::ser_key(
306        stream_id,
307        StreamPosition {
308            seq_num: SeqNum::MAX,
309            timestamp: Timestamp::MAX,
310        },
311    );
312    let scan_opts = ScanOptions {
313        durability_filter: DurabilityLevel::Remote,
314        ..Default::default()
315    };
316    let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
317    Ok(match it.next().await? {
318        Some(kv) => {
319            let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
320            assert_eq!(deser_stream_id, stream_id);
321            assert!(pos.timestamp >= timestamp);
322            kv::stream_record_timestamp::deser_value(kv.value)?;
323            Some(StreamPosition {
324                seq_num: pos.seq_num,
325                timestamp: pos.timestamp,
326            })
327        }
328        None => None,
329    })
330}
331
332struct ReadSessionState {
333    start_seq_num: u64,
334    limit: EvaluatedReadLimit,
335    until: ReadUntil,
336    wait: Option<Duration>,
337    wait_deadline: Option<Instant>,
338    tail: StreamPosition,
339}
340
341impl ReadSessionState {
342    fn arm_wait_deadline_if_unset(&mut self) {
343        if self.wait_deadline.is_none() {
344            self.reset_wait_deadline();
345        }
346    }
347
348    fn reset_wait_deadline(&mut self) {
349        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
350    }
351
352    fn wait_deadline_expired(&self) -> bool {
353        self.wait_deadline
354            .is_some_and(|deadline| deadline <= Instant::now())
355    }
356
357    fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
358        if let Some(tail) = batch.tail {
359            self.tail = tail;
360        }
361        let last_record = batch.records.last().expect("non-empty");
362        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
363            panic!("batch after exhausted limit");
364        };
365        let count = batch.records.len();
366        let bytes = batch.records.metered_size();
367        let last_position = last_record.position();
368        assert!(limit.allow(count, bytes));
369        assert!(self.until.allow(last_position.timestamp));
370        self.start_seq_num = last_position.seq_num + 1;
371        self.limit = limit.remaining(count, bytes);
372        self.reset_wait_deadline();
373        StoredReadSessionOutput::Batch(batch)
374    }
375}
376
377fn count_allowed_records(
378    limit: ReadLimit,
379    until: ReadUntil,
380    records: &[Metered<StoredSequencedRecord>],
381) -> usize {
382    let mut acc_size = 0;
383    let mut acc_count = 0;
384    for record in records {
385        if limit.deny(acc_count + 1, acc_size + record.metered_size())
386            || until.deny(record.position().timestamp)
387        {
388            break;
389        }
390        acc_count += 1;
391        acc_size += record.metered_size();
392    }
393    acc_count
394}
395
396#[cfg(not(test))]
397fn new_heartbeat_sleep() -> tokio::time::Sleep {
398    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
399}
400
401#[cfg(test)]
402fn new_heartbeat_sleep() -> tokio::time::Sleep {
403    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
404}
405
406async fn wait_sleep_until(deadline: Option<Instant>) {
407    match deadline {
408        Some(deadline) => tokio::time::sleep_until(deadline).await,
409        None => {
410            std::future::pending::<()>().await;
411        }
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use std::{sync::Arc, task::Poll};
418
419    use bytesize::ByteSize;
420    use futures::StreamExt;
421    use s2_common::{
422        read_extent::{ReadLimit, ReadUntil},
423        record::{Metered, Record},
424        types::{
425            basin::BasinName,
426            config::{BasinConfig, OptionalStreamConfig},
427            resources::ProvisionMode,
428            stream::{
429                AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
430                ReadSessionOutput, ReadStart,
431            },
432        },
433    };
434    use slatedb::{Db, WriteBatch, object_store::memory::InMemory};
435    use tokio::time::Instant;
436
437    use super::*;
438    use crate::{
439        backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
440        stream_id::StreamId,
441    };
442
443    fn append_input(record: Record) -> AppendInput {
444        let record: AppendRecord = AppendRecordParts {
445            timestamp: None,
446            record: Metered::from(record),
447        }
448        .try_into()
449        .unwrap();
450        let records: AppendRecordBatch = vec![record].try_into().unwrap();
451        AppendInput {
452            records,
453            match_seq_num: None,
454            fencing_token: None,
455        }
456    }
457
458    fn map_test_output(
459        output: Option<Result<ReadSessionOutput, ReadError>>,
460    ) -> Option<ReadSessionOutput> {
461        match output {
462            Some(Ok(output)) => Some(output),
463            Some(Err(e)) => panic!("Read error: {e:?}"),
464            None => None,
465        }
466    }
467
468    async fn poll_next_after_advance<S>(
469        session: &mut std::pin::Pin<Box<S>>,
470        advance_by: Duration,
471    ) -> Poll<Option<ReadSessionOutput>>
472    where
473        S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
474    {
475        let mut pinned_session = session.as_mut();
476        let next = pinned_session.next();
477        tokio::pin!(next);
478
479        assert!(
480            matches!(futures::poll!(&mut next), Poll::Pending),
481            "session unexpectedly yielded before time advanced"
482        );
483
484        tokio::time::advance(advance_by).await;
485        tokio::task::yield_now().await;
486
487        match futures::poll!(&mut next) {
488            Poll::Ready(output) => Poll::Ready(map_test_output(output)),
489            Poll::Pending => Poll::Pending,
490        }
491    }
492
493    #[tokio::test]
494    async fn resolve_timestamp_bounded_to_stream() {
495        let object_store = Arc::new(InMemory::new());
496        let db = Db::builder("/test", object_store).build().await.unwrap();
497        let backend = Backend::new(db, ByteSize::mib(10));
498
499        let stream_a: StreamId = [0u8; 32].into();
500        let stream_b: StreamId = [1u8; 32].into();
501
502        backend
503            .db
504            .put(
505                kv::stream_record_timestamp::ser_key(
506                    stream_a,
507                    StreamPosition {
508                        seq_num: 0,
509                        timestamp: 1000,
510                    },
511                ),
512                kv::stream_record_timestamp::ser_value(),
513            )
514            .await
515            .unwrap();
516        backend
517            .db
518            .put(
519                kv::stream_record_timestamp::ser_key(
520                    stream_b,
521                    StreamPosition {
522                        seq_num: 0,
523                        timestamp: 2000,
524                    },
525                ),
526                kv::stream_record_timestamp::ser_value(),
527            )
528            .await
529            .unwrap();
530
531        // Should find record in stream_a
532        let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
533        assert_eq!(
534            result,
535            Some(StreamPosition {
536                seq_num: 0,
537                timestamp: 1000
538            })
539        );
540
541        // Should return None, not find stream_b's record
542        let result = resolve_timestamp(&backend.db, stream_a, 1500)
543            .await
544            .unwrap();
545        assert_eq!(result, None);
546    }
547
548    #[tokio::test]
549    async fn read_completes_when_all_records_deleted() {
550        let object_store = Arc::new(InMemory::new());
551        let db = Db::builder("/test", object_store).build().await.unwrap();
552        let backend = Backend::new(db, ByteSize::mib(10));
553
554        let basin: BasinName = "test-basin".parse().unwrap();
555        backend
556            .provision_basin(
557                basin.clone(),
558                BasinConfig::default(),
559                ProvisionMode::CreateOnly {
560                    request_token: None,
561                },
562            )
563            .await
564            .unwrap();
565        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
566        backend
567            .provision_stream(
568                basin.clone(),
569                stream.clone(),
570                OptionalStreamConfig::default(),
571                ProvisionMode::CreateOnly {
572                    request_token: None,
573                },
574            )
575            .await
576            .unwrap();
577
578        let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
579        let ack = backend
580            .open_for_append(&basin, &stream, None)
581            .await
582            .unwrap()
583            .append(input)
584            .await
585            .unwrap();
586        assert!(ack.end.seq_num > 0);
587
588        let stream_id = StreamId::new(&basin, &stream);
589        let mut batch = WriteBatch::new();
590        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
591        backend.db.write(batch).await.unwrap();
592
593        let start = ReadStart {
594            from: ReadFrom::SeqNum(0),
595            clamp: false,
596        };
597        let end = ReadEnd {
598            limit: ReadLimit::Count(10),
599            until: ReadUntil::Unbounded,
600            wait: None,
601        };
602        let session = backend
603            .open_for_read(&basin, &stream, None)
604            .await
605            .unwrap()
606            .read(start, end)
607            .await
608            .unwrap();
609        let records: Vec<_> = tokio::time::timeout(
610            Duration::from_secs(2),
611            futures::StreamExt::collect::<Vec<_>>(session),
612        )
613        .await
614        .expect("read should not spin forever");
615        assert!(records.into_iter().all(|r| r.is_ok()));
616    }
617
618    #[tokio::test(flavor = "current_thread", start_paused = true)]
619    async fn read_wait_is_not_extended_by_heartbeats() {
620        let object_store = Arc::new(InMemory::new());
621        let db = Db::builder("/test", object_store).build().await.unwrap();
622        let backend = Backend::new(db, ByteSize::mib(10));
623
624        let basin: BasinName = "test-basin".parse().unwrap();
625        backend
626            .provision_basin(
627                basin.clone(),
628                BasinConfig::default(),
629                ProvisionMode::CreateOnly {
630                    request_token: None,
631                },
632            )
633            .await
634            .unwrap();
635        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
636        backend
637            .provision_stream(
638                basin.clone(),
639                stream.clone(),
640                OptionalStreamConfig::default(),
641                ProvisionMode::CreateOnly {
642                    request_token: None,
643                },
644            )
645            .await
646            .unwrap();
647
648        let wait = Duration::from_millis(30);
649        let start = ReadStart {
650            from: ReadFrom::SeqNum(0),
651            clamp: false,
652        };
653        let end = ReadEnd {
654            limit: ReadLimit::Unbounded,
655            until: ReadUntil::Unbounded,
656            wait: Some(wait),
657        };
658
659        let session = backend
660            .open_for_read(&basin, &stream, None)
661            .await
662            .unwrap()
663            .read(start, end)
664            .await
665            .unwrap();
666        let mut session = Box::pin(session);
667        let probe_step = Duration::from_millis(1);
668        let first = session
669            .as_mut()
670            .next()
671            .await
672            .expect("session should enter follow mode")
673            .expect("session should not error");
674        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
675
676        let started = Instant::now();
677        let second = match poll_next_after_advance(&mut session, wait).await {
678            Poll::Ready(Some(output)) => output,
679            Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
680            Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
681        };
682        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
683
684        tokio::task::yield_now().await;
685        let closed_at = loop {
686            match futures::poll!(session.as_mut().next()) {
687                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
688                Poll::Ready(Some(Ok(output))) => {
689                    panic!("unexpected output after wait deadline: {output:?}");
690                }
691                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
692                Poll::Ready(None) => break Instant::now(),
693                Poll::Pending => panic!("session should close once the wait budget expires"),
694            }
695        };
696
697        assert!(closed_at >= started + wait);
698        assert!(closed_at <= started + wait + probe_step);
699    }
700
701    #[tokio::test(flavor = "current_thread", start_paused = true)]
702    async fn read_wait_is_reset_by_delivered_follow_batch() {
703        let object_store = Arc::new(InMemory::new());
704        let db = Db::builder("/test", object_store).build().await.unwrap();
705        let backend = Backend::new(db, ByteSize::mib(10));
706
707        let basin: BasinName = "test-basin".parse().unwrap();
708        backend
709            .provision_basin(
710                basin.clone(),
711                BasinConfig::default(),
712                ProvisionMode::CreateOnly {
713                    request_token: None,
714                },
715            )
716            .await
717            .unwrap();
718        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
719        backend
720            .provision_stream(
721                basin.clone(),
722                stream.clone(),
723                OptionalStreamConfig::default(),
724                ProvisionMode::CreateOnly {
725                    request_token: None,
726                },
727            )
728            .await
729            .unwrap();
730
731        let initial_input =
732            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
733        backend
734            .open_for_append(&basin, &stream, None)
735            .await
736            .unwrap()
737            .append(initial_input)
738            .await
739            .unwrap();
740
741        let wait = Duration::from_millis(30);
742        let probe_step = Duration::from_millis(1);
743        let start = ReadStart {
744            from: ReadFrom::SeqNum(0),
745            clamp: false,
746        };
747        let end = ReadEnd {
748            limit: ReadLimit::Unbounded,
749            until: ReadUntil::Unbounded,
750            wait: Some(wait),
751        };
752
753        let session = backend
754            .open_for_read(&basin, &stream, None)
755            .await
756            .unwrap()
757            .read(start, end)
758            .await
759            .unwrap();
760        let mut session = Box::pin(session);
761
762        let first = session
763            .as_mut()
764            .next()
765            .await
766            .expect("session should yield the initial batch")
767            .expect("session should not error");
768        let ReadSessionOutput::Batch(batch) = first else {
769            panic!("expected initial batch");
770        };
771        let initial_record = batch
772            .records
773            .first()
774            .expect("batch should contain one record");
775        let Record::Envelope(initial_envelope) = initial_record.inner() else {
776            panic!("expected plaintext envelope record");
777        };
778        assert_eq!(initial_envelope.body().as_ref(), b"initial");
779
780        let second = session
781            .as_mut()
782            .next()
783            .await
784            .expect("session should enter follow mode")
785            .expect("session should not error");
786        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
787
788        tokio::time::advance(Duration::from_millis(20)).await;
789        tokio::task::yield_now().await;
790
791        let follow_input =
792            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
793        backend
794            .open_for_append(&basin, &stream, None)
795            .await
796            .unwrap()
797            .append(follow_input)
798            .await
799            .unwrap();
800
801        let follow = session
802            .as_mut()
803            .next()
804            .await
805            .expect("session should deliver the live batch")
806            .expect("session should not error");
807        let reset_at = Instant::now();
808        let ReadSessionOutput::Batch(batch) = follow else {
809            panic!("expected live batch after append");
810        };
811        let follow_record = batch
812            .records
813            .first()
814            .expect("batch should contain one record");
815        let Record::Envelope(follow_envelope) = follow_record.inner() else {
816            panic!("expected plaintext envelope record");
817        };
818        assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
819
820        tokio::time::advance(wait - probe_step).await;
821        tokio::task::yield_now().await;
822
823        loop {
824            match futures::poll!(session.as_mut().next()) {
825                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
826                Poll::Ready(Some(Ok(output))) => {
827                    panic!("unexpected output before the reset wait deadline: {output:?}");
828                }
829                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
830                Poll::Ready(None) => {
831                    panic!("session closed before the reset wait budget expired");
832                }
833                Poll::Pending => break,
834            }
835        }
836
837        tokio::time::advance(probe_step).await;
838        tokio::task::yield_now().await;
839
840        let closed_at = loop {
841            match futures::poll!(session.as_mut().next()) {
842                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
843                Poll::Ready(Some(Ok(output))) => {
844                    panic!("unexpected output after the reset wait deadline: {output:?}");
845                }
846                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
847                Poll::Ready(None) => break Instant::now(),
848                Poll::Pending => {
849                    panic!("session should close once the reset wait budget expires");
850                }
851            }
852        };
853
854        assert!(closed_at >= reset_at + wait);
855        assert!(closed_at <= reset_at + wait + probe_step);
856    }
857
858    #[tokio::test(flavor = "current_thread", start_paused = true)]
859    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
860        let object_store = Arc::new(InMemory::new());
861        let db = Db::builder("/test", object_store).build().await.unwrap();
862        let backend = Backend::new(db, ByteSize::mib(10));
863
864        let basin: BasinName = "test-basin".parse().unwrap();
865        backend
866            .provision_basin(
867                basin.clone(),
868                BasinConfig::default(),
869                ProvisionMode::CreateOnly {
870                    request_token: None,
871                },
872            )
873            .await
874            .unwrap();
875        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
876        backend
877            .provision_stream(
878                basin.clone(),
879                stream.clone(),
880                OptionalStreamConfig::default(),
881                ProvisionMode::CreateOnly {
882                    request_token: None,
883                },
884            )
885            .await
886            .unwrap();
887
888        let wait = Duration::from_secs(30);
889        let start = ReadStart {
890            from: ReadFrom::SeqNum(0),
891            clamp: false,
892        };
893        let end = ReadEnd {
894            limit: ReadLimit::Unbounded,
895            until: ReadUntil::Unbounded,
896            wait: Some(wait),
897        };
898        let session = backend
899            .open_for_read(&basin, &stream, None)
900            .await
901            .unwrap()
902            .read(start, end)
903            .await
904            .unwrap();
905        let mut session = Box::pin(session);
906
907        let first = session
908            .as_mut()
909            .next()
910            .await
911            .expect("session should enter follow mode")
912            .expect("session should not error");
913        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
914
915        let stream_id = StreamId::new(&basin, &stream);
916        let mut delete_batch = WriteBatch::new();
917        let lagged_appends = FOLLOWER_MAX_LAG + 25;
918
919        for i in 0..lagged_appends {
920            let input = append_input(
921                Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
922            );
923            let ack = backend
924                .open_for_append(&basin, &stream, None)
925                .await
926                .unwrap()
927                .append(input)
928                .await
929                .unwrap();
930            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
931        }
932
933        backend.db.write(delete_batch).await.unwrap();
934
935        tokio::time::advance(wait + Duration::from_secs(1)).await;
936        tokio::task::yield_now().await;
937
938        let next = session.as_mut().next().await;
939        assert!(
940            next.is_none(),
941            "session should close immediately once the original wait budget has elapsed"
942        );
943    }
944
945    #[tokio::test(flavor = "current_thread", start_paused = true)]
946    async fn unbounded_follow_survives_streamer_dormancy() {
947        let object_store = Arc::new(InMemory::new());
948        let db = Db::builder("/test", object_store).build().await.unwrap();
949        let backend = Backend::new(db, ByteSize::mib(10));
950
951        let basin: BasinName = "test-basin".parse().unwrap();
952        backend
953            .provision_basin(
954                basin.clone(),
955                BasinConfig::default(),
956                ProvisionMode::CreateOnly {
957                    request_token: None,
958                },
959            )
960            .await
961            .unwrap();
962        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
963        backend
964            .provision_stream(
965                basin.clone(),
966                stream.clone(),
967                OptionalStreamConfig::default(),
968                ProvisionMode::CreateOnly {
969                    request_token: None,
970                },
971            )
972            .await
973            .unwrap();
974
975        let initial_input =
976            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
977        backend
978            .open_for_append(&basin, &stream, None)
979            .await
980            .unwrap()
981            .append(initial_input)
982            .await
983            .unwrap();
984
985        let start = ReadStart {
986            from: ReadFrom::SeqNum(0),
987            clamp: false,
988        };
989        let end = ReadEnd {
990            limit: ReadLimit::Unbounded,
991            until: ReadUntil::Unbounded,
992            wait: None,
993        };
994        let session = backend
995            .open_for_read(&basin, &stream, None)
996            .await
997            .unwrap()
998            .read(start, end)
999            .await
1000            .unwrap();
1001        let mut session = Box::pin(session);
1002
1003        let first = session
1004            .as_mut()
1005            .next()
1006            .await
1007            .expect("session should yield initial batch")
1008            .expect("session should not error");
1009        assert!(matches!(first, ReadSessionOutput::Batch(_)));
1010
1011        let second = session
1012            .as_mut()
1013            .next()
1014            .await
1015            .expect("session should enter follow mode")
1016            .expect("session should not error");
1017        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1018
1019        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1020        tokio::task::yield_now().await;
1021
1022        let follow_input =
1023            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1024        backend
1025            .open_for_append(&basin, &stream, None)
1026            .await
1027            .unwrap()
1028            .append(follow_input)
1029            .await
1030            .unwrap();
1031
1032        let next = session
1033            .as_mut()
1034            .next()
1035            .await
1036            .expect("session should stay open after dormancy")
1037            .expect("session should not error after dormancy");
1038        let ReadSessionOutput::Batch(batch) = next else {
1039            panic!("expected new batch after append");
1040        };
1041        assert_eq!(batch.records.len(), 1);
1042        let record = batch.records.first().expect("batch should have one record");
1043        let Record::Envelope(envelope) = record.inner() else {
1044            panic!("expected envelope record");
1045        };
1046        assert_eq!(envelope.body().as_ref(), b"follow-1");
1047    }
1048}