Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::{Stream, StreamExt as _};
4use s2_common::{
5    basin::BasinName,
6    caps,
7    encryption::{EncryptionKey, EncryptionSpec},
8    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
9    record::{Metered, MeteredSize as _, SeqNum, StreamPosition, Timestamp},
10    stream::{ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11};
12use s2_storage::record::{
13    StoredReadBatch, StoredReadSessionOutput, StoredSequencedRecord, decrypt_read_session_output,
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::{sync::broadcast, time::Instant};
17
18use super::{Backend, StreamHandle};
19use crate::{
20    backend::{
21        error::{
22            CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
23        },
24        kv,
25        streamer::GuardedStreamerClient,
26    },
27    stream_id::StreamId,
28};
29
30impl Backend {
31    pub async fn open_for_check_tail(
32        &self,
33        basin: &BasinName,
34        stream: &StreamName,
35    ) -> Result<StreamHandle, CheckTailError> {
36        self.stream_handle_with_auto_create::<CheckTailError>(
37            basin,
38            stream,
39            |config| config.create_stream_on_read,
40            |_| Ok(EncryptionSpec::Plain),
41        )
42        .await
43    }
44
45    pub async fn open_for_read(
46        &self,
47        basin: &BasinName,
48        stream: &StreamName,
49        encryption_key: Option<EncryptionKey>,
50    ) -> Result<StreamHandle, ReadError> {
51        self.stream_handle_with_auto_create::<ReadError>(
52            basin,
53            stream,
54            |config| config.create_stream_on_read,
55            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
56        )
57        .await
58    }
59}
60
61impl StreamHandle {
62    pub async fn check_tail(self) -> Result<StreamPosition, CheckTailError> {
63        let tail = self.client.check_tail().await?;
64        Ok(tail)
65    }
66
67    pub async fn read(
68        self,
69        start: ReadStart,
70        end: ReadEnd,
71    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
72        let stream_id = self.client.stream_id();
73        let session = read_session(self.db, self.client, start, end).await?;
74        Ok(async_stream::stream! {
75            tokio::pin!(session);
76            while let Some(output) = session.next().await {
77                let output = match output {
78                    Ok(output) => {
79                        decrypt_read_session_output(output, &self.encryption, stream_id.as_bytes())
80                            .map_err(ReadError::from)
81                    }
82                    Err(err) => Err(err),
83                };
84                let should_stop = output.is_err();
85                yield output;
86                if should_stop {
87                    break;
88                }
89            }
90        })
91    }
92}
93
94async fn read_session(
95    db: slatedb::Db,
96    client: GuardedStreamerClient,
97    start: ReadStart,
98    end: ReadEnd,
99) -> Result<impl Stream<Item = Result<StoredReadSessionOutput, ReadError>> + 'static, ReadError> {
100    let stream_id = client.stream_id();
101    let tail = client.check_tail().await?;
102    let mut state = ReadSessionState {
103        start_seq_num: read_start_seq_num(&db, stream_id, start, end, tail).await?,
104        limit: EvaluatedReadLimit::Remaining(end.limit),
105        until: end.until,
106        wait: end.wait,
107        wait_deadline: None,
108        tail,
109    };
110    let session = async_stream::try_stream! {
111        'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
112            if state.start_seq_num < state.tail.seq_num {
113                let start_key = kv::stream_record_data::ser_key(
114                    stream_id,
115                    StreamPosition {
116                        seq_num: state.start_seq_num,
117                        timestamp: 0,
118                    },
119                );
120                let end_key = kv::stream_record_data::ser_key(
121                    stream_id,
122                    StreamPosition {
123                        seq_num: state.tail.seq_num,
124                        timestamp: 0,
125                    },
126                );
127                let scan_opts = ScanOptions {
128                    durability_filter: DurabilityLevel::Remote,
129                    read_ahead_bytes: 1024 * 1024,
130                    cache_blocks: true,
131                    max_fetch_tasks: 8,
132                    ..Default::default()
133                };
134                let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
135
136                let mut records = Metered::with_capacity(
137                    limit.count()
138                        .unwrap_or(usize::MAX)
139                        .min(caps::RECORD_BATCH_MAX.count),
140                );
141
142                while let EvaluatedReadLimit::Remaining(limit) = state.limit {
143                    let Some(kv) = it.next().await? else {
144                        break;
145                    };
146                    let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
147                    assert_eq!(deser_stream_id, stream_id);
148
149                    let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
150
151                    if end.until.deny(pos.timestamp)
152                        || limit.deny(records.len() + 1, records.metered_size() + record.metered_size())
153                    {
154                        if records.is_empty() {
155                            break 'session;
156                        } else {
157                            break;
158                        }
159                    }
160
161                    if records.len() == caps::RECORD_BATCH_MAX.count
162                        || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
163                    {
164                        let new_records_buf = Metered::with_capacity(
165                            limit.count()
166                                .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
167                                .min(caps::RECORD_BATCH_MAX.count),
168                        );
169                        yield state.on_batch(StoredReadBatch {
170                            records: std::mem::replace(&mut records, new_records_buf),
171                            tail: None,
172                        });
173                    }
174
175                    records.push(record);
176                }
177
178                if !records.is_empty() {
179                    yield state.on_batch(StoredReadBatch {
180                        records,
181                        tail: None,
182                    });
183                } else {
184                    state.start_seq_num = state.tail.seq_num;
185                }
186            } else {
187                assert_eq!(state.start_seq_num, state.tail.seq_num);
188                if !end.may_follow() {
189                    break;
190                }
191                match client.follow(state.start_seq_num).await? {
192                    Ok(mut follow_rx) => {
193                        // Only a delivered batch should reset the absolute wait budget.
194                        state.arm_wait_deadline_if_unset();
195                        if state.wait_deadline_expired() {
196                            break;
197                        }
198                        yield StoredReadSessionOutput::Heartbeat(state.tail);
199                        while let EvaluatedReadLimit::Remaining(limit) = state.limit {
200                            tokio::select! {
201                                biased;
202                                msg = follow_rx.recv() => {
203                                    match msg {
204                                        Ok(mut records) => {
205                                            let count = records.len();
206                                            let tail = super::streamer::next_pos(&records);
207                                            let allowed_count = count_allowed_records(limit, end.until, &records);
208                                            if allowed_count > 0 {
209                                                yield state.on_batch(StoredReadBatch {
210                                                    records: records.drain(..allowed_count).collect(),
211                                                    tail: Some(tail),
212                                                });
213                                            }
214                                            if allowed_count < count {
215                                                break 'session;
216                                            }
217                                            Ok(())
218                                        }
219                                        Err(broadcast::error::RecvError::Lagged(_)) => {
220                                            // Catch up using DB
221                                            continue 'session;
222                                        }
223                                        Err(broadcast::error::RecvError::Closed) => {
224                                            Err(StreamerMissingInActionError)
225                                        }
226                                    }
227                                }
228                                _ = new_heartbeat_sleep() => {
229                                    yield StoredReadSessionOutput::Heartbeat(state.tail);
230                                    Ok(())
231                                }
232                                _ = wait_sleep_until(state.wait_deadline) => {
233                                    break 'session;
234                                }
235                            }?;
236                        }
237                    }
238                    Err(tail) => {
239                        assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
240                        state.tail = tail;
241                    }
242                }
243            }
244        }
245    };
246    Ok(session)
247}
248
249async fn read_start_seq_num(
250    db: &slatedb::Db,
251    stream_id: StreamId,
252    start: ReadStart,
253    end: ReadEnd,
254    tail: StreamPosition,
255) -> Result<SeqNum, ReadError> {
256    let mut read_pos = match start.from {
257        s2_common::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
258        s2_common::stream::ReadFrom::Timestamp(timestamp) => ReadPosition::Timestamp(timestamp),
259        s2_common::stream::ReadFrom::TailOffset(tail_offset) => {
260            ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
261        }
262    };
263    if match read_pos {
264        ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
265        ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
266    } {
267        if start.clamp {
268            read_pos = ReadPosition::SeqNum(tail.seq_num);
269        } else {
270            return Err(UnwrittenError(tail).into());
271        }
272    }
273    if let ReadPosition::SeqNum(start_seq_num) = read_pos
274        && start_seq_num == tail.seq_num
275        && !end.may_follow()
276    {
277        return Err(UnwrittenError(tail).into());
278    }
279    Ok(match read_pos {
280        ReadPosition::SeqNum(start_seq_num) => start_seq_num,
281        ReadPosition::Timestamp(start_timestamp) => {
282            resolve_timestamp(db, stream_id, start_timestamp)
283                .await?
284                .unwrap_or(tail)
285                .seq_num
286        }
287    })
288}
289
290async fn resolve_timestamp(
291    db: &slatedb::Db,
292    stream_id: StreamId,
293    timestamp: Timestamp,
294) -> Result<Option<StreamPosition>, StorageError> {
295    let start_key = kv::stream_record_timestamp::ser_key(
296        stream_id,
297        StreamPosition {
298            seq_num: SeqNum::MIN,
299            timestamp,
300        },
301    );
302    let end_key = kv::stream_record_timestamp::ser_key(
303        stream_id,
304        StreamPosition {
305            seq_num: SeqNum::MAX,
306            timestamp: Timestamp::MAX,
307        },
308    );
309    let scan_opts = ScanOptions {
310        durability_filter: DurabilityLevel::Remote,
311        ..Default::default()
312    };
313    let mut it = db.scan_with_options(start_key..end_key, &scan_opts).await?;
314    Ok(match it.next().await? {
315        Some(kv) => {
316            let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
317            assert_eq!(deser_stream_id, stream_id);
318            assert!(pos.timestamp >= timestamp);
319            kv::stream_record_timestamp::deser_value(kv.value)?;
320            Some(StreamPosition {
321                seq_num: pos.seq_num,
322                timestamp: pos.timestamp,
323            })
324        }
325        None => None,
326    })
327}
328
329struct ReadSessionState {
330    start_seq_num: u64,
331    limit: EvaluatedReadLimit,
332    until: ReadUntil,
333    wait: Option<Duration>,
334    wait_deadline: Option<Instant>,
335    tail: StreamPosition,
336}
337
338impl ReadSessionState {
339    fn arm_wait_deadline_if_unset(&mut self) {
340        if self.wait_deadline.is_none() {
341            self.reset_wait_deadline();
342        }
343    }
344
345    fn reset_wait_deadline(&mut self) {
346        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
347    }
348
349    fn wait_deadline_expired(&self) -> bool {
350        self.wait_deadline
351            .is_some_and(|deadline| deadline <= Instant::now())
352    }
353
354    fn on_batch(&mut self, batch: StoredReadBatch) -> StoredReadSessionOutput {
355        if let Some(tail) = batch.tail {
356            self.tail = tail;
357        }
358        let last_record = batch.records.last().expect("non-empty");
359        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
360            panic!("batch after exhausted limit");
361        };
362        let count = batch.records.len();
363        let bytes = batch.records.metered_size();
364        let last_position = last_record.position();
365        assert!(limit.allow(count, bytes));
366        assert!(self.until.allow(last_position.timestamp));
367        self.start_seq_num = last_position.seq_num + 1;
368        self.limit = limit.remaining(count, bytes);
369        self.reset_wait_deadline();
370        StoredReadSessionOutput::Batch(batch)
371    }
372}
373
374fn count_allowed_records(
375    limit: ReadLimit,
376    until: ReadUntil,
377    records: &[Metered<StoredSequencedRecord>],
378) -> usize {
379    let mut acc_size = 0;
380    let mut acc_count = 0;
381    for record in records {
382        if limit.deny(acc_count + 1, acc_size + record.metered_size())
383            || until.deny(record.position().timestamp)
384        {
385            break;
386        }
387        acc_count += 1;
388        acc_size += record.metered_size();
389    }
390    acc_count
391}
392
393#[cfg(not(test))]
394fn new_heartbeat_sleep() -> tokio::time::Sleep {
395    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
396}
397
398#[cfg(test)]
399fn new_heartbeat_sleep() -> tokio::time::Sleep {
400    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
401}
402
403async fn wait_sleep_until(deadline: Option<Instant>) {
404    match deadline {
405        Some(deadline) => tokio::time::sleep_until(deadline).await,
406        None => {
407            std::future::pending::<()>().await;
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use std::{sync::Arc, task::Poll};
415
416    use bytesize::ByteSize;
417    use futures::StreamExt;
418    use s2_common::{
419        basin::BasinName,
420        config::{BasinConfig, OptionalStreamConfig},
421        read_extent::{ReadLimit, ReadUntil},
422        record::{Metered, Record},
423        resources::ProvisionMode,
424        stream::{
425            AppendInput, AppendRecord, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom,
426            ReadSessionOutput, ReadStart, StreamName,
427        },
428    };
429    use slatedb::{Db, WriteBatch, object_store::memory::InMemory};
430    use tokio::time::Instant;
431
432    use super::*;
433    use crate::{
434        backend::{FOLLOWER_MAX_LAG, kv, streamer::DORMANT_TIMEOUT},
435        stream_id::StreamId,
436    };
437
438    fn append_input(record: Record) -> AppendInput {
439        let record: AppendRecord = AppendRecordParts {
440            timestamp: None,
441            record: Metered::from(record),
442        }
443        .try_into()
444        .unwrap();
445        let records: AppendRecordBatch = vec![record].try_into().unwrap();
446        AppendInput {
447            records,
448            match_seq_num: None,
449            fencing_token: None,
450        }
451    }
452
453    fn map_test_output(
454        output: Option<Result<ReadSessionOutput, ReadError>>,
455    ) -> Option<ReadSessionOutput> {
456        match output {
457            Some(Ok(output)) => Some(output),
458            Some(Err(e)) => panic!("Read error: {e:?}"),
459            None => None,
460        }
461    }
462
463    async fn poll_next_after_advance<S>(
464        session: &mut std::pin::Pin<Box<S>>,
465        advance_by: Duration,
466    ) -> Poll<Option<ReadSessionOutput>>
467    where
468        S: futures::Stream<Item = Result<ReadSessionOutput, ReadError>>,
469    {
470        let mut pinned_session = session.as_mut();
471        let next = pinned_session.next();
472        tokio::pin!(next);
473
474        assert!(
475            matches!(futures::poll!(&mut next), Poll::Pending),
476            "session unexpectedly yielded before time advanced"
477        );
478
479        tokio::time::advance(advance_by).await;
480        tokio::task::yield_now().await;
481
482        match futures::poll!(&mut next) {
483            Poll::Ready(output) => Poll::Ready(map_test_output(output)),
484            Poll::Pending => Poll::Pending,
485        }
486    }
487
488    #[tokio::test]
489    async fn resolve_timestamp_bounded_to_stream() {
490        let object_store = Arc::new(InMemory::new());
491        let db = Db::builder("/test", object_store).build().await.unwrap();
492        let backend = Backend::new(db, ByteSize::mib(10));
493
494        let stream_a: StreamId = [0u8; 32].into();
495        let stream_b: StreamId = [1u8; 32].into();
496
497        backend
498            .db
499            .put(
500                kv::stream_record_timestamp::ser_key(
501                    stream_a,
502                    StreamPosition {
503                        seq_num: 0,
504                        timestamp: 1000,
505                    },
506                ),
507                kv::stream_record_timestamp::ser_value(),
508            )
509            .await
510            .unwrap();
511        backend
512            .db
513            .put(
514                kv::stream_record_timestamp::ser_key(
515                    stream_b,
516                    StreamPosition {
517                        seq_num: 0,
518                        timestamp: 2000,
519                    },
520                ),
521                kv::stream_record_timestamp::ser_value(),
522            )
523            .await
524            .unwrap();
525
526        // Should find record in stream_a
527        let result = resolve_timestamp(&backend.db, stream_a, 500).await.unwrap();
528        assert_eq!(
529            result,
530            Some(StreamPosition {
531                seq_num: 0,
532                timestamp: 1000
533            })
534        );
535
536        // Should return None, not find stream_b's record
537        let result = resolve_timestamp(&backend.db, stream_a, 1500)
538            .await
539            .unwrap();
540        assert_eq!(result, None);
541    }
542
543    #[tokio::test]
544    async fn read_completes_when_all_records_deleted() {
545        let object_store = Arc::new(InMemory::new());
546        let db = Db::builder("/test", object_store).build().await.unwrap();
547        let backend = Backend::new(db, ByteSize::mib(10));
548
549        let basin: BasinName = "test-basin".parse().unwrap();
550        backend
551            .provision_basin(
552                basin.clone(),
553                BasinConfig::default(),
554                ProvisionMode::CreateOnly {
555                    request_token: None,
556                },
557            )
558            .await
559            .unwrap();
560        let stream: StreamName = "test-stream".parse().unwrap();
561        backend
562            .provision_stream(
563                basin.clone(),
564                stream.clone(),
565                OptionalStreamConfig::default(),
566                ProvisionMode::CreateOnly {
567                    request_token: None,
568                },
569            )
570            .await
571            .unwrap();
572
573        let input = append_input(Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap());
574        let ack = backend
575            .open_for_append(&basin, &stream, None)
576            .await
577            .unwrap()
578            .append(input)
579            .await
580            .unwrap();
581        assert!(ack.end.seq_num > 0);
582
583        let stream_id = StreamId::new(&basin, &stream);
584        let mut batch = WriteBatch::new();
585        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
586        backend.db.write(batch).await.unwrap();
587
588        let start = ReadStart {
589            from: ReadFrom::SeqNum(0),
590            clamp: false,
591        };
592        let end = ReadEnd {
593            limit: ReadLimit::Count(10),
594            until: ReadUntil::Unbounded,
595            wait: None,
596        };
597        let session = backend
598            .open_for_read(&basin, &stream, None)
599            .await
600            .unwrap()
601            .read(start, end)
602            .await
603            .unwrap();
604        let records: Vec<_> = tokio::time::timeout(
605            Duration::from_secs(2),
606            futures::StreamExt::collect::<Vec<_>>(session),
607        )
608        .await
609        .expect("read should not spin forever");
610        assert!(records.into_iter().all(|r| r.is_ok()));
611    }
612
613    #[tokio::test(flavor = "current_thread", start_paused = true)]
614    async fn read_wait_is_not_extended_by_heartbeats() {
615        let object_store = Arc::new(InMemory::new());
616        let db = Db::builder("/test", object_store).build().await.unwrap();
617        let backend = Backend::new(db, ByteSize::mib(10));
618
619        let basin: BasinName = "test-basin".parse().unwrap();
620        backend
621            .provision_basin(
622                basin.clone(),
623                BasinConfig::default(),
624                ProvisionMode::CreateOnly {
625                    request_token: None,
626                },
627            )
628            .await
629            .unwrap();
630        let stream: StreamName = "test-stream".parse().unwrap();
631        backend
632            .provision_stream(
633                basin.clone(),
634                stream.clone(),
635                OptionalStreamConfig::default(),
636                ProvisionMode::CreateOnly {
637                    request_token: None,
638                },
639            )
640            .await
641            .unwrap();
642
643        let wait = Duration::from_millis(30);
644        let start = ReadStart {
645            from: ReadFrom::SeqNum(0),
646            clamp: false,
647        };
648        let end = ReadEnd {
649            limit: ReadLimit::Unbounded,
650            until: ReadUntil::Unbounded,
651            wait: Some(wait),
652        };
653
654        let session = backend
655            .open_for_read(&basin, &stream, None)
656            .await
657            .unwrap()
658            .read(start, end)
659            .await
660            .unwrap();
661        let mut session = Box::pin(session);
662        let probe_step = Duration::from_millis(1);
663        let first = session
664            .as_mut()
665            .next()
666            .await
667            .expect("session should enter follow mode")
668            .expect("session should not error");
669        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
670
671        let started = Instant::now();
672        let second = match poll_next_after_advance(&mut session, wait).await {
673            Poll::Ready(Some(output)) => output,
674            Poll::Ready(None) => panic!("session closed before emitting a follow heartbeat"),
675            Poll::Pending => panic!("expected a follow heartbeat before the wait budget expired"),
676        };
677        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
678
679        tokio::task::yield_now().await;
680        let closed_at = loop {
681            match futures::poll!(session.as_mut().next()) {
682                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
683                Poll::Ready(Some(Ok(output))) => {
684                    panic!("unexpected output after wait deadline: {output:?}");
685                }
686                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
687                Poll::Ready(None) => break Instant::now(),
688                Poll::Pending => panic!("session should close once the wait budget expires"),
689            }
690        };
691
692        assert!(closed_at >= started + wait);
693        assert!(closed_at <= started + wait + probe_step);
694    }
695
696    #[tokio::test(flavor = "current_thread", start_paused = true)]
697    async fn read_wait_is_reset_by_delivered_follow_batch() {
698        let object_store = Arc::new(InMemory::new());
699        let db = Db::builder("/test", object_store).build().await.unwrap();
700        let backend = Backend::new(db, ByteSize::mib(10));
701
702        let basin: BasinName = "test-basin".parse().unwrap();
703        backend
704            .provision_basin(
705                basin.clone(),
706                BasinConfig::default(),
707                ProvisionMode::CreateOnly {
708                    request_token: None,
709                },
710            )
711            .await
712            .unwrap();
713        let stream: StreamName = "test-stream".parse().unwrap();
714        backend
715            .provision_stream(
716                basin.clone(),
717                stream.clone(),
718                OptionalStreamConfig::default(),
719                ProvisionMode::CreateOnly {
720                    request_token: None,
721                },
722            )
723            .await
724            .unwrap();
725
726        let initial_input =
727            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
728        backend
729            .open_for_append(&basin, &stream, None)
730            .await
731            .unwrap()
732            .append(initial_input)
733            .await
734            .unwrap();
735
736        let wait = Duration::from_millis(30);
737        let probe_step = Duration::from_millis(1);
738        let start = ReadStart {
739            from: ReadFrom::SeqNum(0),
740            clamp: false,
741        };
742        let end = ReadEnd {
743            limit: ReadLimit::Unbounded,
744            until: ReadUntil::Unbounded,
745            wait: Some(wait),
746        };
747
748        let session = backend
749            .open_for_read(&basin, &stream, None)
750            .await
751            .unwrap()
752            .read(start, end)
753            .await
754            .unwrap();
755        let mut session = Box::pin(session);
756
757        let first = session
758            .as_mut()
759            .next()
760            .await
761            .expect("session should yield the initial batch")
762            .expect("session should not error");
763        let ReadSessionOutput::Batch(batch) = first else {
764            panic!("expected initial batch");
765        };
766        let initial_record = batch
767            .records
768            .first()
769            .expect("batch should contain one record");
770        let Record::Envelope(initial_envelope) = initial_record.inner() else {
771            panic!("expected plaintext envelope record");
772        };
773        assert_eq!(initial_envelope.body().as_ref(), b"initial");
774
775        let second = session
776            .as_mut()
777            .next()
778            .await
779            .expect("session should enter follow mode")
780            .expect("session should not error");
781        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
782
783        tokio::time::advance(Duration::from_millis(20)).await;
784        tokio::task::yield_now().await;
785
786        let follow_input =
787            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
788        backend
789            .open_for_append(&basin, &stream, None)
790            .await
791            .unwrap()
792            .append(follow_input)
793            .await
794            .unwrap();
795
796        let follow = session
797            .as_mut()
798            .next()
799            .await
800            .expect("session should deliver the live batch")
801            .expect("session should not error");
802        let reset_at = Instant::now();
803        let ReadSessionOutput::Batch(batch) = follow else {
804            panic!("expected live batch after append");
805        };
806        let follow_record = batch
807            .records
808            .first()
809            .expect("batch should contain one record");
810        let Record::Envelope(follow_envelope) = follow_record.inner() else {
811            panic!("expected plaintext envelope record");
812        };
813        assert_eq!(follow_envelope.body().as_ref(), b"follow-1");
814
815        tokio::time::advance(wait - probe_step).await;
816        tokio::task::yield_now().await;
817
818        loop {
819            match futures::poll!(session.as_mut().next()) {
820                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
821                Poll::Ready(Some(Ok(output))) => {
822                    panic!("unexpected output before the reset wait deadline: {output:?}");
823                }
824                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
825                Poll::Ready(None) => {
826                    panic!("session closed before the reset wait budget expired");
827                }
828                Poll::Pending => break,
829            }
830        }
831
832        tokio::time::advance(probe_step).await;
833        tokio::task::yield_now().await;
834
835        let closed_at = loop {
836            match futures::poll!(session.as_mut().next()) {
837                Poll::Ready(Some(Ok(ReadSessionOutput::Heartbeat(_)))) => {}
838                Poll::Ready(Some(Ok(output))) => {
839                    panic!("unexpected output after the reset wait deadline: {output:?}");
840                }
841                Poll::Ready(Some(Err(e))) => panic!("Read error: {e:?}"),
842                Poll::Ready(None) => break Instant::now(),
843                Poll::Pending => {
844                    panic!("session should close once the reset wait budget expires");
845                }
846            }
847        };
848
849        assert!(closed_at >= reset_at + wait);
850        assert!(closed_at <= reset_at + wait + probe_step);
851    }
852
853    #[tokio::test(flavor = "current_thread", start_paused = true)]
854    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
855        let object_store = Arc::new(InMemory::new());
856        let db = Db::builder("/test", object_store).build().await.unwrap();
857        let backend = Backend::new(db, ByteSize::mib(10));
858
859        let basin: BasinName = "test-basin".parse().unwrap();
860        backend
861            .provision_basin(
862                basin.clone(),
863                BasinConfig::default(),
864                ProvisionMode::CreateOnly {
865                    request_token: None,
866                },
867            )
868            .await
869            .unwrap();
870        let stream: StreamName = "test-stream".parse().unwrap();
871        backend
872            .provision_stream(
873                basin.clone(),
874                stream.clone(),
875                OptionalStreamConfig::default(),
876                ProvisionMode::CreateOnly {
877                    request_token: None,
878                },
879            )
880            .await
881            .unwrap();
882
883        let wait = Duration::from_secs(30);
884        let start = ReadStart {
885            from: ReadFrom::SeqNum(0),
886            clamp: false,
887        };
888        let end = ReadEnd {
889            limit: ReadLimit::Unbounded,
890            until: ReadUntil::Unbounded,
891            wait: Some(wait),
892        };
893        let session = backend
894            .open_for_read(&basin, &stream, None)
895            .await
896            .unwrap()
897            .read(start, end)
898            .await
899            .unwrap();
900        let mut session = Box::pin(session);
901
902        let first = session
903            .as_mut()
904            .next()
905            .await
906            .expect("session should enter follow mode")
907            .expect("session should not error");
908        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
909
910        let stream_id = StreamId::new(&basin, &stream);
911        let mut delete_batch = WriteBatch::new();
912        let lagged_appends = FOLLOWER_MAX_LAG + 25;
913
914        for i in 0..lagged_appends {
915            let input = append_input(
916                Record::try_from_parts(vec![], bytes::Bytes::from(format!("lagged-{i}"))).unwrap(),
917            );
918            let ack = backend
919                .open_for_append(&basin, &stream, None)
920                .await
921                .unwrap()
922                .append(input)
923                .await
924                .unwrap();
925            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
926        }
927
928        backend.db.write(delete_batch).await.unwrap();
929
930        tokio::time::advance(wait + Duration::from_secs(1)).await;
931        tokio::task::yield_now().await;
932
933        let next = session.as_mut().next().await;
934        assert!(
935            next.is_none(),
936            "session should close immediately once the original wait budget has elapsed"
937        );
938    }
939
940    #[tokio::test(flavor = "current_thread", start_paused = true)]
941    async fn unbounded_follow_survives_streamer_dormancy() {
942        let object_store = Arc::new(InMemory::new());
943        let db = Db::builder("/test", object_store).build().await.unwrap();
944        let backend = Backend::new(db, ByteSize::mib(10));
945
946        let basin: BasinName = "test-basin".parse().unwrap();
947        backend
948            .provision_basin(
949                basin.clone(),
950                BasinConfig::default(),
951                ProvisionMode::CreateOnly {
952                    request_token: None,
953                },
954            )
955            .await
956            .unwrap();
957        let stream: StreamName = "test-stream".parse().unwrap();
958        backend
959            .provision_stream(
960                basin.clone(),
961                stream.clone(),
962                OptionalStreamConfig::default(),
963                ProvisionMode::CreateOnly {
964                    request_token: None,
965                },
966            )
967            .await
968            .unwrap();
969
970        let initial_input =
971            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("initial")).unwrap());
972        backend
973            .open_for_append(&basin, &stream, None)
974            .await
975            .unwrap()
976            .append(initial_input)
977            .await
978            .unwrap();
979
980        let start = ReadStart {
981            from: ReadFrom::SeqNum(0),
982            clamp: false,
983        };
984        let end = ReadEnd {
985            limit: ReadLimit::Unbounded,
986            until: ReadUntil::Unbounded,
987            wait: None,
988        };
989        let session = backend
990            .open_for_read(&basin, &stream, None)
991            .await
992            .unwrap()
993            .read(start, end)
994            .await
995            .unwrap();
996        let mut session = Box::pin(session);
997
998        let first = session
999            .as_mut()
1000            .next()
1001            .await
1002            .expect("session should yield initial batch")
1003            .expect("session should not error");
1004        assert!(matches!(first, ReadSessionOutput::Batch(_)));
1005
1006        let second = session
1007            .as_mut()
1008            .next()
1009            .await
1010            .expect("session should enter follow mode")
1011            .expect("session should not error");
1012        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
1013
1014        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
1015        tokio::task::yield_now().await;
1016
1017        let follow_input =
1018            append_input(Record::try_from_parts(vec![], bytes::Bytes::from("follow-1")).unwrap());
1019        backend
1020            .open_for_append(&basin, &stream, None)
1021            .await
1022            .unwrap()
1023            .append(follow_input)
1024            .await
1025            .unwrap();
1026
1027        let next = session
1028            .as_mut()
1029            .next()
1030            .await
1031            .expect("session should stay open after dormancy")
1032            .expect("session should not error after dormancy");
1033        let ReadSessionOutput::Batch(batch) = next else {
1034            panic!("expected new batch after append");
1035        };
1036        assert_eq!(batch.records.len(), 1);
1037        let record = batch.records.first().expect("batch should have one record");
1038        let Record::Envelope(envelope) = record.inner() else {
1039            panic!("expected envelope record");
1040        };
1041        assert_eq!(envelope.body().as_ref(), b"follow-1");
1042    }
1043}