Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5    caps,
6    encryption::{EncryptionKey, EncryptionSpec},
7    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
8    record::{Metered, MeteredSize as _, SeqNum, StoredSequencedRecord, StreamPosition, Timestamp},
9    types::{
10        basin::BasinName,
11        stream::{
12            ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StoredReadBatch,
13            StoredReadSessionOutput, StreamName,
14        },
15    },
16};
17use slatedb::{
18    IterationOrder,
19    config::{DurabilityLevel, ScanOptions},
20};
21use tokio::{sync::broadcast, time::Instant};
22
23use super::{Backend, StreamHandle};
24use crate::{
25    backend::{
26        error::{
27            CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
28        },
29        kv,
30        streamer::GuardedStreamerClient,
31    },
32    stream_id::StreamId,
33};
34
35impl Backend {
36    pub async fn open_for_check_tail(
37        &self,
38        basin: &BasinName,
39        stream: &StreamName,
40    ) -> Result<StreamHandle, CheckTailError> {
41        self.stream_handle_with_auto_create::<CheckTailError>(
42            basin,
43            stream,
44            |config| config.create_stream_on_read,
45            |_| Ok(EncryptionSpec::Plain),
46        )
47        .await
48    }
49
50    pub async fn open_for_read(
51        &self,
52        basin: &BasinName,
53        stream: &StreamName,
54        encryption_key: Option<EncryptionKey>,
55    ) -> Result<StreamHandle, ReadError> {
56        self.stream_handle_with_auto_create::<ReadError>(
57            basin,
58            stream,
59            |config| config.create_stream_on_read,
60            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
61        )
62        .await
63    }
64}
65
66impl StreamHandle {
67    pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
68        let tail = self.client.check_tail().await?;
69        Ok(tail)
70    }
71
72    pub async fn read(
73        self,
74        start: ReadStart,
75        end: ReadEnd,
76    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
77        let stream_id = self.client.stream_id();
78        let session = read_session(self.db, self.client, start, end).await?;
79        Ok(async_stream::stream! {
80            tokio::pin!(session);
81            while let Some(output) = session.next().await {
82                let output = match output {
83                    Ok(output) => output
84                        .decrypt(&self.encryption, stream_id.as_bytes())
85                        .map_err(ReadError::from),
86                    Err(err) => Err(err),
87                };
88                let should_stop = output.is_err();
89                yield output;
90                if should_stop {
91                    break;
92                }
93            }
94        })
95    }
96}
97
98async fn read_session(
99    db: slatedb::Db,
100    client: GuardedStreamerClient,
101    start: ReadStart,
102    end: ReadEnd,
103) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
104    let stream_id = client.stream_id();
105    let tail = client.check_tail().await?;
106    let mut state = ReadSessionState {
107        start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
108        limit: EvaluatedReadLimit::Remaining(end.limit),
109        until: end.until,
110        wait: end.wait,
111        wait_deadline: None,
112        tail,
113    };
114    let session = async_stream::try_stream! {
115        'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
116            if state.start_seq_num < state.tail.seq_num {
117                let start_key = kv::stream_record_data::ser_key(
118                    stream_id,
119                    StreamPosition {
120                        seq_num: state.start_seq_num,
121                        timestamp: 0,
122                    },
123                );
124                let end_key = kv::stream_record_data::ser_key(
125                    stream_id,
126                    StreamPosition {
127                        seq_num: state.tail.seq_num,
128                        timestamp: 0,
129                    },
130                );
131                static SCAN_OPTS: ScanOptions = ScanOptions {
132                    durability_filter: DurabilityLevel::Remote,
133                    dirty: false,
134                    read_ahead_bytes: 1024 * 1024,
135                    cache_blocks: true,
136                    max_fetch_tasks: 8,
137                    order: IterationOrder::Ascending,
138                };
139                let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
140
141                let mut records = Metered::with_capacity(
142                    limit.count()
143                        .unwrap_or(usize::MAX)
144                        .min(caps::RECORD_BATCH_MAX.count),
145                );
146
147                while let EvaluatedReadLimit::Remaining(limit) = state.limit {
148                    let Some(kv) = it.next().await? else {
149                        break;
150                    };
151                    let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
152                    assert_eq!(deser_stream_id, stream_id);
153
154                    let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
155
156                    if end.until.deny(pos.timestamp)
157                        || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
158                    {
159                        if records.is_empty() {
160                            break 'session;
161                        } else {
162                            break;
163                        }
164                    }
165
166                    if records.len() == caps::RECORD_BATCH_MAX.count
167                        || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
168                    {
169                        let new_records_buf = Metered::with_capacity(
170                            limit.count()
171                                .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
172                                .min(caps::RECORD_BATCH_MAX.count),
173                        );
174                        yield state.on_batch(StoredReadBatch {
175                            records: std::mem::replace(&mut records, new_records_buf),
176                            tail: None,
177                        });
178                    }
179
180                    records.push(record);
181                }
182
183                if !records.is_empty() {
184                    yield state.on_batch(StoredReadBatch {
185                        records,
186                        tail: None,
187                    });
188                } else {
189                    state.start_seq_num = state.tail.seq_num;
190                }
191            } else {
192                assert_eq!(state.start_seq_num, state.tail.seq_num);
193                if !end.may_follow() {
194                    break;
195                }
196                match client.follow(state.start_seq_num).await? {
197                    Ok(mut follow_rx) => {
198                        // Only a delivered batch should reset the absolute wait budget.
199                        state.arm_wait_deadline_if_unset();
200                        if state.wait_deadline_expired() {
201                            break;
202                        }
203                        yield StoredReadSessionOutput::Heartbeat(state.tail);
204                        while let EvaluatedReadLimit::Remaining(limit) = state.limit {
205                            tokio::select! {
206                                biased;
207                                msg = follow_rx.recv() => {
208                                    match msg {
209                                        Ok(mut records) => {
210                                            let count = records.len();
211                                            let tail = super::streamer::next_pos(&records);
212                                            let allowed_count = count_allowed_records(limit, end.until, &records);
213                                            if allowed_count > 0 {
214                                                yield state.on_batch(StoredReadBatch {
215                                                    records: records.drain(..allowed_count).collect(),
216                                                    tail: Some(tail),
217                                                });
218                                            }
219                                            if allowed_count < count {
220                                                break 'session;
221                                            }
222                                            Ok(())
223                                        }
224                                        Err(broadcast::error::RecvError::Lagged(_)) => {
225                                            // Catch up using DB
226                                            continue 'session;
227                                        }
228                                        Err(broadcast::error::RecvError::Closed) => {
229                                            Err(StreamerMissingInActionError)
230                                        }
231                                    }
232                                }
233                                _ = new_heartbeat_sleep() => {
234                                    yield StoredReadSessionOutput::Heartbeat(state.tail);
235                                    Ok(())
236                                }
237                                _ = wait_sleep_until(state.wait_deadline) => {
238                                    break 'session;
239                                }
240                            }?;
241                        }
242                    }
243                    Err(tail) => {
244                        assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
245                        state.tail = tail;
246                    }
247                }
248            }
249        }
250    };
251    Ok(session)
252}
253
254async fn read_start_seq_num(
255    db: &slatedb::Db,
256    stream_id: StreamId,
257    start: ReadStart,
258    end: ReadEnd,
259    tail: StreamPosition,
260) -> Result<SeqNum, ReadError> {
261    let mut read_pos = match start.from {
262        s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
263        s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
264            ReadPosition::Timestamp(timestamp)
265        }
266        s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
267            ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
268        }
269    };
270    if match read_pos {
271        ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
272        ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
273    } {
274        if start.clamp {
275            read_pos = ReadPosition::SeqNum(tail.seq_num);
276        } else {
277            return Err(UnwrittenError(tail).into());
278        }
279    }
280    if let ReadPosition::SeqNum(start_seq_num) = read_pos
281        && start_seq_num == tail.seq_num
282        && !end.may_follow()
283    {
284        return Err(UnwrittenError(tail).into());
285    }
286    Ok(match read_pos {
287        ReadPosition::SeqNum(start_seq_num) => start_seq_num,
288        ReadPosition::Timestamp(start_timestamp) => {
289            resolve_timestamp(db, stream_id, start_timestamp)
290                .await?
291                .unwrap_or(tail)
292                .seq_num
293        }
294    })
295}
296
297async fn resolve_timestamp(
298    db: &slatedb::Db,
299    stream_id: StreamId,
300    timestamp: Timestamp,
301) -> Result<Option<StreamPosition>, StorageError> {
302    let start_key = kv::stream_record_timestamp::ser_key(
303        stream_id,
304        StreamPosition {
305            seq_num: SeqNum::MIN,
306            timestamp,
307        },
308    );
309    let end_key = kv::stream_record_timestamp::ser_key(
310        stream_id,
311        StreamPosition {
312            seq_num: SeqNum::MAX,
313            timestamp: Timestamp::MAX,
314        },
315    );
316    static SCAN_OPTS: ScanOptions = ScanOptions {
317        durability_filter: DurabilityLevel::Remote,
318        dirty: false,
319        read_ahead_bytes: 1,
320        cache_blocks: false,
321        max_fetch_tasks: 1,
322        order: IterationOrder::Ascending,
323    };
324    let mut it = db.scan_with_options(start_key..end_key, &SCAN_OPTS).await?;
325    Ok(match it.next().await? {
326        Some(kv) => {
327            let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
328            assert_eq!(deser_stream_id, stream_id);
329            assert!(pos.timestamp >= timestamp);
330            kv::stream_record_timestamp::deser_value(kv.value)?;
331            Some(StreamPosition {
332                seq_num: pos.seq_num,
333                timestamp: pos.timestamp,
334            })
335        }
336        None => None,
337    })
338}
339
340struct ReadSessionState {
341    start_seq_num: u64,
342    limit: EvaluatedReadLimit,
343    until: ReadUntil,
344    wait: Option<Duration>,
345    wait_deadline: Option<Instant>,
346    tail: StreamPosition,
347}
348
349impl ReadSessionState {
350    fn arm_wait_deadline_if_unset(&mut self) {
351        if self.wait_deadline.is_none() {
352            self.reset_wait_deadline();
353        }
354    }
355
356    fn reset_wait_deadline(&mut self) {
357        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
358    }
359
360    fn wait_deadline_expired(&self) -> bool {
361        self.wait_deadline
362            .is_some_and(|deadline| deadline <= Instant::now())
363    }
364
365    fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
366        if let Some(tail) = batch.tail {
367            self.tail = tail;
368        }
369        let last_record = batch.records.last().expect("non-empty");
370        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
371            panic!("batch after exhausted limit");
372        };
373        let count = batch.records.len();
374        let bytes = batch.records.metered_size();
375        let last_position = last_record.position();
376        assert!(limit.allow(count, bytes));
377        assert!(self.until.allow(last_position.timestamp));
378        self.start_seq_num = last_position.seq_num + 1;
379        self.limit = limit.remaining(count, bytes);
380        self.reset_wait_deadline();
381        StoredReadSessionOutput::Batch(batch)
382    }
383}
384
385fn count_allowed_records(
386    limit: ReadLimit,
387    until: ReadUntil,
388    records: &[Metered<StoredSequencedRecord>],
389) -> usize {
390    let mut acc_size = 0;
391    let mut acc_count = 0;
392    for record in records {
393        if limit.deny(acc_count + 1, acc_size + record.metered_size())
394            || until.deny(record.position().timestamp)
395        {
396            break;
397        }
398        acc_count += 1;
399        acc_size += record.metered_size();
400    }
401    acc_count
402}
403
404#[cfg(not(test))]
405fn new_heartbeat_sleep() -> tokio::time::Sleep {
406    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
407}
408
409#[cfg(test)]
410fn new_heartbeat_sleep() -> tokio::time::Sleep {
411    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
412}
413
414async fn wait_sleep_until(deadline: Option<Instant>) {
415    match deadline {
416        Some(deadline) => tokio::time::sleep_until(deadline).await,
417        None => {
418            std::future::pending::<()>().await;
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::{sync::Arc, task::Poll};
426
427    use bytesize::ByteSize;
428    use futures::StreamExt;
429    use s2_common::{
430        read_extent::{ReadLimit, ReadUntil},
431        record::{Metered, Record},
432        types::{
433            basin::BasinName,
434            config::{BasinConfig, OptionalStreamConfig},
435            resources::ProvisionMode,
436            stream::{
437                AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
438                ReadSessionOutput, ReadStart,
439            },
440        },
441    };
442    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
443    use tokio::time::Instant;
444
445    use super::*;
446    use crate::{
447        backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
448        stream_id::StreamId,
449    };
450
451    fn append_input(record: Record) -> AppendInput {
452        let record: AppendRecord = AppendRecordParts {
453            timestamp: None,
454            record: Metered::from(record),
455        }
456        .try_into()
457        .unwrap();
458        let records: AppendRecordBatch = vec![record].try_into().unwrap();
459        AppendInput {
460            records,
461            match_seq_num: None,
462            fencing_token: None,
463        }
464    }
465
466    fn map_test_output(
467        output: Option<Result<ReadSessionOutput, ReadError>>,
468    ) -> Option<ReadSessionOutput> {
469        match output {
470            Some(Ok(output)) => Some(output),
471            Some(Err(e)) => panic!("Read error: {e:?}"),
472            None => None,
473        }
474    }
475
476    async fn poll_next_after_advance<S>(
477        session: &mut std::pin::Pin<Box<S>>,
478        advance_by: Duration,
479    ) -> Poll<Option<ReadSessionOutput>>
480    where
481        S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
482    {
483        let mut pinned_session = session.as_mut();
484        let next = pinned_session.next();
485        tokio::pin!(next);
486
487        assert!(
488            matches!(futures::poll!(&mut next), Poll::Pending),
489            "session unexpectedly yielded before time advanced"
490        );
491
492        tokio::time::advance(advance_by).await;
493        tokio::task::yield_now().await;
494
495        match futures::poll!(&mut next) {
496            Poll::Ready(output) => Poll::Ready(map_test_output(output)),
497            Poll::Pending => Poll::Pending,
498        }
499    }
500
501    #[tokio::test]
502    async fn resolve_timestamp_bounded_to_stream() {
503        let object_store = Arc::new(InMemory::new());
504        let db = Db::builder("/test", object_store).build().await.unwrap();
505        let backend = Backend::new(db, ByteSize::mib(10));
506
507        let stream_a: StreamId = [0u8; 32].into();
508        let stream_b: StreamId = [1u8; 32].into();
509
510        backend
511            .db
512            .put(
513                kv::stream_record_timestamp::ser_key(
514                    stream_a,
515                    StreamPosition {
516                        seq_num: 0,
517                        timestamp: 1000,
518                    },
519                ),
520                kv::stream_record_timestamp::ser_value(),
521            )
522            .await
523            .unwrap();
524        backend
525            .db
526            .put(
527                kv::stream_record_timestamp::ser_key(
528                    stream_b,
529                    StreamPosition {
530                        seq_num: 0,
531                        timestamp: 2000,
532                    },
533                ),
534                kv::stream_record_timestamp::ser_value(),
535            )
536            .await
537            .unwrap();
538
539        // Should find record in stream_a
540        let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
541        assert_eq!(
542            result,
543            Some(StreamPosition {
544                seq_num: 0,
545                timestamp: 1000
546            })
547        );
548
549        // Should return None, not find stream_b's record
550        let result = resolve_timestamp(&backend.db, stream_a, 1500)
551            .await
552            .unwrap();
553        assert_eq!(result, None);
554    }
555
556    #[tokio::test]
557    async fn read_completes_when_all_records_deleted() {
558        let object_store = Arc::new(InMemory::new());
559        let db = Db::builder("/test", object_store).build().await.unwrap();
560        let backend = Backend::new(db, ByteSize::mib(10));
561
562        let basin: BasinName = "test-basin".parse().unwrap();
563        backend
564            .provision_basin(
565                basin.clone(),
566                BasinConfig::default(),
567                ProvisionMode::CreateOnly {
568                    request_token: None,
569                },
570            )
571            .await
572            .unwrap();
573        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
574        backend
575            .provision_stream(
576                basin.clone(),
577                stream.clone(),
578                OptionalStreamConfig::default(),
579                ProvisionMode::CreateOnly {
580                    request_token: None,
581                },
582            )
583            .await
584            .unwrap();
585
586        let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
587        let ack = backend
588            .open_for_append(&basin, &stream, None)
589            .await
590            .unwrap()
591            .append(input)
592            .await
593            .unwrap();
594        assert!(ack.end.seq_num > 0);
595
596        let stream_id = StreamId::new(&basin, &stream);
597        let mut batch = WriteBatch::new();
598        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
599        static WRITE_OPTS: WriteOptions = WriteOptions {
600            await_durable: true,
601        };
602        backend
603            .db
604            .write_with_options(batch, &WRITE_OPTS)
605            .await
606            .unwrap();
607
608        let start = ReadStart {
609            from: ReadFrom::SeqNum(0),
610            clamp: false,
611        };
612        let end = ReadEnd {
613            limit: ReadLimit::Count(10),
614            until: ReadUntil::Unbounded,
615            wait: None,
616        };
617        let session = backend
618            .open_for_read(&basin, &stream, None)
619            .await
620            .unwrap()
621            .read(start, end)
622            .await
623            .unwrap();
624        let records: Vec<_> = tokio::time::timeout(
625            Duration::from_secs(2),
626            futures::StreamExt::collect::<Vec<_>>(session),
627        )
628        .await
629        .expect("read should not spin forever");
630        assert!(records.into_iter().all(|r| r.is_ok()));
631    }
632
633    #[tokio::test(flavor = "current_thread", start_paused = true)]
634    async fn read_wait_is_not_extended_by_heartbeats() {
635        let object_store = Arc::new(InMemory::new());
636        let db = Db::builder("/test", object_store).build().await.unwrap();
637        let backend = Backend::new(db, ByteSize::mib(10));
638
639        let basin: BasinName = "test-basin".parse().unwrap();
640        backend
641            .provision_basin(
642                basin.clone(),
643                BasinConfig::default(),
644                ProvisionMode::CreateOnly {
645                    request_token: None,
646                },
647            )
648            .await
649            .unwrap();
650        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
651        backend
652            .provision_stream(
653                basin.clone(),
654                stream.clone(),
655                OptionalStreamConfig::default(),
656                ProvisionMode::CreateOnly {
657                    request_token: None,
658                },
659            )
660            .await
661            .unwrap();
662
663        let wait = Duration::from_millis(30);
664        let start = ReadStart {
665            from: ReadFrom::SeqNum(0),
666            clamp: false,
667        };
668        let end = ReadEnd {
669            limit: ReadLimit::Unbounded,
670            until: ReadUntil::Unbounded,
671            wait: Some(wait),
672        };
673
674        let session = backend
675            .open_for_read(&basin, &stream, None)
676            .await
677            .unwrap()
678            .read(start, end)
679            .await
680            .unwrap();
681        let mut session = Box::pin(session);
682        let probe_step = Duration::from_millis(1);
683        let first = session
684            .as_mut()
685            .next()
686            .await
687            .expect("session should enter follow mode")
688            .expect("session should not error");
689        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
690
691        let started = Instant::now();
692        let second = match poll_next_after_advance(&mut session, wait).await {
693            Poll::Ready(Some(output)) => output,
694            Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
695            Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
696        };
697        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
698
699        tokio::task::yield_now().await;
700        let closed_at = loop {
701            match futures::poll!(session.as_mut().next()) {
702                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
703                Poll::Ready(Some(Ok(output))) => {
704                    panic!("unexpected output after wait deadline: {output:?}");
705                }
706                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
707                Poll::Ready(None) => break Instant::now(),
708                Poll::Pending => panic!("session should close once the wait budget expires"),
709            }
710        };
711
712        assert!(closed_at >= started + wait);
713        assert!(closed_at <= started + wait + probe_step);
714    }
715
716    #[tokio::test(flavor = "current_thread", start_paused = true)]
717    async fn read_wait_is_reset_by_delivered_follow_batch() {
718        let object_store = Arc::new(InMemory::new());
719        let db = Db::builder("/test", object_store).build().await.unwrap();
720        let backend = Backend::new(db, ByteSize::mib(10));
721
722        let basin: BasinName = "test-basin".parse().unwrap();
723        backend
724            .provision_basin(
725                basin.clone(),
726                BasinConfig::default(),
727                ProvisionMode::CreateOnly {
728                    request_token: None,
729                },
730            )
731            .await
732            .unwrap();
733        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
734        backend
735            .provision_stream(
736                basin.clone(),
737                stream.clone(),
738                OptionalStreamConfig::default(),
739                ProvisionMode::CreateOnly {
740                    request_token: None,
741                },
742            )
743            .await
744            .unwrap();
745
746        let initial_input =
747            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
748        backend
749            .open_for_append(&basin, &stream, None)
750            .await
751            .unwrap()
752            .append(initial_input)
753            .await
754            .unwrap();
755
756        let wait = Duration::from_millis(30);
757        let probe_step = Duration::from_millis(1);
758        let start = ReadStart {
759            from: ReadFrom::SeqNum(0),
760            clamp: false,
761        };
762        let end = ReadEnd {
763            limit: ReadLimit::Unbounded,
764            until: ReadUntil::Unbounded,
765            wait: Some(wait),
766        };
767
768        let session = backend
769            .open_for_read(&basin, &stream, None)
770            .await
771            .unwrap()
772            .read(start, end)
773            .await
774            .unwrap();
775        let mut session = Box::pin(session);
776
777        let first = session
778            .as_mut()
779            .next()
780            .await
781            .expect("session should yield the initial batch")
782            .expect("session should not error");
783        let ReadSessionOutput::Batch(batch) = first else {
784            panic!("expected initial batch");
785        };
786        let initial_record = batch
787            .records
788            .first()
789            .expect("batch should contain one record");
790        let Record::Envelope(initial_envelope) = initial_record.inner() else {
791            panic!("expected plaintext envelope record");
792        };
793        assert_eq!(initial_envelope.body().as_ref(), b"initial");
794
795        let second = session
796            .as_mut()
797            .next()
798            .await
799            .expect("session should enter follow mode")
800            .expect("session should not error");
801        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
802
803        tokio::time::advance(Duration::from_millis(20)).await;
804        tokio::task::yield_now().await;
805
806        let follow_input =
807            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
808        backend
809            .open_for_append(&basin, &stream, None)
810            .await
811            .unwrap()
812            .append(follow_input)
813            .await
814            .unwrap();
815
816        let follow = session
817            .as_mut()
818            .next()
819            .await
820            .expect("session should deliver the live batch")
821            .expect("session should not error");
822        let reset_at = Instant::now();
823        let ReadSessionOutput::Batch(batch) = follow else {
824            panic!("expected live batch after append");
825        };
826        let follow_record = batch
827            .records
828            .first()
829            .expect("batch should contain one record");
830        let Record::Envelope(follow_envelope) = follow_record.inner() else {
831            panic!("expected plaintext envelope record");
832        };
833        assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
834
835        tokio::time::advance(wait - probe_step).await;
836        tokio::task::yield_now().await;
837
838        loop {
839            match futures::poll!(session.as_mut().next()) {
840                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
841                Poll::Ready(Some(Ok(output))) => {
842                    panic!("unexpected output before the reset wait deadline: {output:?}");
843                }
844                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
845                Poll::Ready(None) => {
846                    panic!("session closed before the reset wait budget expired");
847                }
848                Poll::Pending => break,
849            }
850        }
851
852        tokio::time::advance(probe_step).await;
853        tokio::task::yield_now().await;
854
855        let closed_at = loop {
856            match futures::poll!(session.as_mut().next()) {
857                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
858                Poll::Ready(Some(Ok(output))) => {
859                    panic!("unexpected output after the reset wait deadline: {output:?}");
860                }
861                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
862                Poll::Ready(None) => break Instant::now(),
863                Poll::Pending => {
864                    panic!("session should close once the reset wait budget expires");
865                }
866            }
867        };
868
869        assert!(closed_at >= reset_at + wait);
870        assert!(closed_at <= reset_at + wait + probe_step);
871    }
872
873    #[tokio::test(flavor = "current_thread", start_paused = true)]
874    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
875        let object_store = Arc::new(InMemory::new());
876        let db = Db::builder("/test", object_store).build().await.unwrap();
877        let backend = Backend::new(db, ByteSize::mib(10));
878
879        let basin: BasinName = "test-basin".parse().unwrap();
880        backend
881            .provision_basin(
882                basin.clone(),
883                BasinConfig::default(),
884                ProvisionMode::CreateOnly {
885                    request_token: None,
886                },
887            )
888            .await
889            .unwrap();
890        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
891        backend
892            .provision_stream(
893                basin.clone(),
894                stream.clone(),
895                OptionalStreamConfig::default(),
896                ProvisionMode::CreateOnly {
897                    request_token: None,
898                },
899            )
900            .await
901            .unwrap();
902
903        let wait = Duration::from_secs(30);
904        let start = ReadStart {
905            from: ReadFrom::SeqNum(0),
906            clamp: false,
907        };
908        let end = ReadEnd {
909            limit: ReadLimit::Unbounded,
910            until: ReadUntil::Unbounded,
911            wait: Some(wait),
912        };
913        let session = backend
914            .open_for_read(&basin, &stream, None)
915            .await
916            .unwrap()
917            .read(start, end)
918            .await
919            .unwrap();
920        let mut session = Box::pin(session);
921
922        let first = session
923            .as_mut()
924            .next()
925            .await
926            .expect("session should enter follow mode")
927            .expect("session should not error");
928        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
929
930        let stream_id = StreamId::new(&basin, &stream);
931        let mut delete_batch = WriteBatch::new();
932        let lagged_appends = FOLLOWER_MAX_LAG + 25;
933
934        for i in 0..lagged_appends {
935            let input = append_input(
936                Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
937            );
938            let ack = backend
939                .open_for_append(&basin, &stream, None)
940                .await
941                .unwrap()
942                .append(input)
943                .await
944                .unwrap();
945            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
946        }
947
948        static WRITE_OPTS: WriteOptions = WriteOptions {
949            await_durable: true,
950        };
951        backend
952            .db
953            .write_with_options(delete_batch, &WRITE_OPTS)
954            .await
955            .unwrap();
956
957        tokio::time::advance(wait + Duration::from_secs(1)).await;
958        tokio::task::yield_now().await;
959
960        let next = session.as_mut().next().await;
961        assert!(
962            next.is_none(),
963            "session should close immediately once the original wait budget has elapsed"
964        );
965    }
966
967    #[tokio::test(flavor = "current_thread", start_paused = true)]
968    async fn unbounded_follow_survives_streamer_dormancy() {
969        let object_store = Arc::new(InMemory::new());
970        let db = Db::builder("/test", object_store).build().await.unwrap();
971        let backend = Backend::new(db, ByteSize::mib(10));
972
973        let basin: BasinName = "test-basin".parse().unwrap();
974        backend
975            .provision_basin(
976                basin.clone(),
977                BasinConfig::default(),
978                ProvisionMode::CreateOnly {
979                    request_token: None,
980                },
981            )
982            .await
983            .unwrap();
984        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
985        backend
986            .provision_stream(
987                basin.clone(),
988                stream.clone(),
989                OptionalStreamConfig::default(),
990                ProvisionMode::CreateOnly {
991                    request_token: None,
992                },
993            )
994            .await
995            .unwrap();
996
997        let initial_input =
998            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
999        backend
1000            .open_for_append(&basin, &stream, None)
1001            .await
1002            .unwrap()
1003            .append(initial_input)
1004            .await
1005            .unwrap();
1006
1007        let start = ReadStart {
1008            from: ReadFrom::SeqNum(0),
1009            clamp: false,
1010        };
1011        let end = ReadEnd {
1012            limit: ReadLimit::Unbounded,
1013            until: ReadUntil::Unbounded,
1014            wait: None,
1015        };
1016        let session = backend
1017            .open_for_read(&basin, &stream, None)
1018            .await
1019            .unwrap()
1020            .read(start, end)
1021            .await
1022            .unwrap();
1023        let mut session = Box::pin(session);
1024
1025        let first = session
1026            .as_mut()
1027            .next()
1028            .await
1029            .expect("session should yield initial batch")
1030            .expect("session should not error");
1031        assert!(matches!(first, ReadSessionOutput::Batch(_)));
1032
1033        let second = session
1034            .as_mut()
1035            .next()
1036            .await
1037            .expect("session should enter follow mode")
1038            .expect("session should not error");
1039        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1040
1041        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1042        tokio::task::yield_now().await;
1043
1044        let follow_input =
1045            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1046        backend
1047            .open_for_append(&basin, &stream, None)
1048            .await
1049            .unwrap()
1050            .append(follow_input)
1051            .await
1052            .unwrap();
1053
1054        let next = session
1055            .as_mut()
1056            .next()
1057            .await
1058            .expect("session should stay open after dormancy")
1059            .expect("session should not error after dormancy");
1060        let ReadSessionOutput::Batch(batch) = next else {
1061            panic!("expected new batch after append");
1062        };
1063        assert_eq!(batch.records.len(), 1);
1064        let record = batch.records.first().expect("batch should have one record");
1065        let Record::Envelope(envelope) = record.inner() else {
1066            panic!("expected envelope record");
1067        };
1068        assert_eq!(envelope.body().as_ref(), b"follow-1");
1069    }
1070}