Skip to main content

s2_lite/backend/
read.rs

1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5    caps,
6    read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7    record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8    types::{
9        basin::BasinName,
10        stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11    },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::{sync::broadcast, time::Instant};
15
16use super::Backend;
17use crate::backend::{
18    error::{
19        CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20    },
21    kv,
22    stream_id::StreamId,
23};
24
25impl Backend {
26    async fn read_start_seq_num(
27        &self,
28        stream_id: StreamId,
29        start: ReadStart,
30        end: ReadEnd,
31        tail: StreamPosition,
32    ) -> Result<SeqNum, ReadError> {
33        let mut read_pos = match start.from {
34            s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35            s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36                ReadPosition::Timestamp(timestamp)
37            }
38            s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39                ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40            }
41        };
42        if match read_pos {
43            ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44            ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45        } {
46            if start.clamp {
47                read_pos = ReadPosition::SeqNum(tail.seq_num);
48            } else {
49                return Err(UnwrittenError(tail).into());
50            }
51        }
52        if let ReadPosition::SeqNum(start_seq_num) = read_pos
53            && start_seq_num == tail.seq_num
54            && !end.may_follow()
55        {
56            return Err(UnwrittenError(tail).into());
57        }
58        Ok(match read_pos {
59            ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60            ReadPosition::Timestamp(start_timestamp) => {
61                self.resolve_timestamp(stream_id, start_timestamp)
62                    .await?
63                    .unwrap_or(tail)
64                    .seq_num
65            }
66        })
67    }
68
69    pub async fn check_tail(
70        &self,
71        basin: BasinName,
72        stream: StreamName,
73    ) -> Result<StreamPosition, CheckTailError> {
74        let client = self
75            .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76                config.create_stream_on_read
77            })
78            .await?;
79        let tail = client.check_tail().await?;
80        Ok(tail)
81    }
82
83    pub async fn read(
84        &self,
85        basin: BasinName,
86        stream: StreamName,
87        start: ReadStart,
88        end: ReadEnd,
89    ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90        let client = self
91            .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92                config.create_stream_on_read
93            })
94            .await?;
95        let stream_id = client.stream_id();
96        let tail = client.check_tail().await?;
97        let mut state = ReadSessionState {
98            start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99            limit: EvaluatedReadLimit::Remaining(end.limit),
100            until: end.until,
101            wait: end.wait,
102            wait_deadline: None,
103            tail,
104        };
105        let db = self.db.clone();
106        let session = async_stream::try_stream! {
107            'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
108                if state.start_seq_num < state.tail.seq_num {
109                    let start_key = kv::stream_record_data::ser_key(
110                        stream_id,
111                        StreamPosition {
112                            seq_num: state.start_seq_num,
113                            timestamp: 0,
114                        },
115                    );
116                    let end_key = kv::stream_record_data::ser_key(
117                        stream_id,
118                        StreamPosition {
119                            seq_num: state.tail.seq_num,
120                            timestamp: 0,
121                        },
122                    );
123                    static SCAN_OPTS: ScanOptions = ScanOptions {
124                        durability_filter: DurabilityLevel::Remote,
125                        dirty: false,
126                        read_ahead_bytes: 1024 * 1024,
127                        cache_blocks: true,
128                        max_fetch_tasks: 8,
129                    };
130                    let mut it = db
131                        .scan_with_options(start_key..end_key, &SCAN_OPTS)
132                        .await?;
133
134                    let mut records = Metered::with_capacity(
135                        limit.count()
136                            .unwrap_or(usize::MAX)
137                            .min(caps::RECORD_BATCH_MAX.count),
138                    );
139
140                    while let EvaluatedReadLimit::Remaining(limit) = state.limit {
141                        let Some(kv) = it.next().await? else {
142                            break;
143                        };
144                        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
145                        assert_eq!(deser_stream_id, stream_id);
146
147                        let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
148
149                        if end.until.deny(pos.timestamp)
150                            || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
151                            if records.is_empty() {
152                                break 'session;
153                            } else {
154                                break;
155                            }
156                        }
157
158                        if records.len() == caps::RECORD_BATCH_MAX.count
159                            || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
160                        {
161                            let new_records_buf = Metered::with_capacity(
162                                limit.count()
163                                    .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
164                                    .min(caps::RECORD_BATCH_MAX.count),
165                            );
166                            yield state.on_batch(ReadBatch {
167                                records: std::mem::replace(&mut records, new_records_buf),
168                                tail: None,
169                            });
170                        }
171
172                        records.push(record);
173                    }
174
175                    if !records.is_empty() {
176                        yield state.on_batch(ReadBatch {
177                            records,
178                            tail: None,
179                        });
180                    } else {
181                        state.start_seq_num = state.tail.seq_num;
182                    }
183                } else {
184                    assert_eq!(state.start_seq_num, state.tail.seq_num);
185                    if !end.may_follow() {
186                        break;
187                    }
188                    match client.follow(state.start_seq_num).await? {
189                        Ok(mut follow_rx) => {
190                            // Only a delivered batch should reset the absolute wait budget.
191                            state.arm_wait_deadline_if_unset();
192                            if state.wait_deadline_expired() {
193                                break;
194                            }
195                            yield ReadSessionOutput::Heartbeat(state.tail);
196                            while let EvaluatedReadLimit::Remaining(limit) = state.limit {
197                                tokio::select! {
198                                    biased;
199                                    msg = follow_rx.recv() => {
200                                        match msg {
201                                            Ok(mut records) => {
202                                                let count = records.len();
203                                                let tail = super::streamer::next_pos(&records);
204                                                let allowed_count = count_allowed_records(limit, end.until, &records);
205                                                if allowed_count > 0 {
206                                                    yield state.on_batch(ReadBatch {
207                                                        records: records.drain(..allowed_count).collect(),
208                                                        tail: Some(tail),
209                                                    });
210                                                }
211                                                if allowed_count < count {
212                                                    break 'session;
213                                                }
214                                                Ok(())
215                                            }
216                                            Err(broadcast::error::RecvError::Lagged(_)) => {
217                                                // Catch up using DB
218                                                continue 'session;
219                                            }
220                                            Err(broadcast::error::RecvError::Closed) => {
221                                                Err(StreamerMissingInActionError)
222                                            }
223                                        }
224                                    }
225                                    _ = new_heartbeat_sleep() => {
226                                        yield ReadSessionOutput::Heartbeat(state.tail);
227                                        Ok(())
228                                    }
229                                    _ = wait_sleep_until(state.wait_deadline) => {
230                                        break 'session;
231                                    }
232                                }?;
233                            }
234                        }
235                        Err(tail) => {
236                            assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
237                            state.tail = tail;
238                        }
239                    }
240                }
241            }
242        };
243        Ok(session)
244    }
245
246    pub(super) async fn resolve_timestamp(
247        &self,
248        stream_id: StreamId,
249        timestamp: Timestamp,
250    ) -> Result<Option<StreamPosition>, StorageError> {
251        let start_key = kv::stream_record_timestamp::ser_key(
252            stream_id,
253            StreamPosition {
254                seq_num: SeqNum::MIN,
255                timestamp,
256            },
257        );
258        let end_key = kv::stream_record_timestamp::ser_key(
259            stream_id,
260            StreamPosition {
261                seq_num: SeqNum::MAX,
262                timestamp: Timestamp::MAX,
263            },
264        );
265        static SCAN_OPTS: ScanOptions = ScanOptions {
266            durability_filter: DurabilityLevel::Remote,
267            dirty: false,
268            read_ahead_bytes: 1,
269            cache_blocks: false,
270            max_fetch_tasks: 1,
271        };
272        let mut it = self
273            .db
274            .scan_with_options(start_key..end_key, &SCAN_OPTS)
275            .await?;
276        Ok(match it.next().await? {
277            Some(kv) => {
278                let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
279                assert_eq!(deser_stream_id, stream_id);
280                assert!(pos.timestamp >= timestamp);
281                kv::stream_record_timestamp::deser_value(kv.value)?;
282                Some(StreamPosition {
283                    seq_num: pos.seq_num,
284                    timestamp: pos.timestamp,
285                })
286            }
287            None => None,
288        })
289    }
290}
291
292struct ReadSessionState {
293    start_seq_num: u64,
294    limit: EvaluatedReadLimit,
295    until: ReadUntil,
296    wait: Option<Duration>,
297    wait_deadline: Option<Instant>,
298    tail: StreamPosition,
299}
300
301impl ReadSessionState {
302    fn arm_wait_deadline_if_unset(&mut self) {
303        if self.wait_deadline.is_none() {
304            self.reset_wait_deadline();
305        }
306    }
307
308    fn reset_wait_deadline(&mut self) {
309        self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
310    }
311
312    fn wait_deadline_expired(&self) -> bool {
313        self.wait_deadline
314            .is_some_and(|deadline| deadline <= Instant::now())
315    }
316
317    fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
318        if let Some(tail) = batch.tail {
319            self.tail = tail;
320        }
321        let last_record = batch.records.last().expect("non-empty");
322        let EvaluatedReadLimit::Remaining(limit) = self.limit else {
323            panic!("batch after exhausted limit");
324        };
325        let count = batch.records.len();
326        let bytes = batch.records.metered_size();
327        assert!(limit.allow(count, bytes));
328        assert!(self.until.allow(last_record.position.timestamp));
329        self.start_seq_num = last_record.position.seq_num + 1;
330        self.limit = limit.remaining(count, bytes);
331        self.reset_wait_deadline();
332        ReadSessionOutput::Batch(batch)
333    }
334}
335
336fn count_allowed_records(
337    limit: ReadLimit,
338    until: ReadUntil,
339    records: &[Metered<SequencedRecord>],
340) -> usize {
341    let mut acc_size = 0;
342    let mut acc_count = 0;
343    for record in records {
344        if limit.deny(acc_count + 1, acc_size + record.metered_size())
345            || until.deny(record.position.timestamp)
346        {
347            break;
348        }
349        acc_count += 1;
350        acc_size += record.metered_size();
351    }
352    acc_count
353}
354
355#[cfg(not(test))]
356fn new_heartbeat_sleep() -> tokio::time::Sleep {
357    tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
358}
359
360#[cfg(test)]
361fn new_heartbeat_sleep() -> tokio::time::Sleep {
362    tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
363}
364
365async fn wait_sleep_until(deadline: Option<Instant>) {
366    match deadline {
367        Some(deadline) => tokio::time::sleep_until(deadline).await,
368        None => {
369            std::future::pending::<()>().await;
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use std::sync::Arc;
377
378    use bytesize::ByteSize;
379    use futures::StreamExt;
380    use s2_common::{
381        read_extent::{ReadLimit, ReadUntil},
382        types::{
383            basin::BasinName,
384            config::{BasinConfig, OptionalStreamConfig},
385            resources::CreateMode,
386            stream::{
387                AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
388            },
389        },
390    };
391    use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
392    use tokio::time::Instant;
393
394    use super::*;
395    use crate::backend::{FOLLOWER_MAX_LAG, kv, stream_id::StreamId, streamer::DORMANT_TIMEOUT};
396
397    #[tokio::test]
398    async fn resolve_timestamp_bounded_to_stream() {
399        let object_store = Arc::new(InMemory::new());
400        let db = Db::builder("/test", object_store).build().await.unwrap();
401        let backend = Backend::new(db, ByteSize::mib(10));
402
403        let stream_a: StreamId = [0u8; 32].into();
404        let stream_b: StreamId = [1u8; 32].into();
405
406        backend
407            .db
408            .put(
409                kv::stream_record_timestamp::ser_key(
410                    stream_a,
411                    StreamPosition {
412                        seq_num: 0,
413                        timestamp: 1000,
414                    },
415                ),
416                kv::stream_record_timestamp::ser_value(),
417            )
418            .await
419            .unwrap();
420        backend
421            .db
422            .put(
423                kv::stream_record_timestamp::ser_key(
424                    stream_b,
425                    StreamPosition {
426                        seq_num: 0,
427                        timestamp: 2000,
428                    },
429                ),
430                kv::stream_record_timestamp::ser_value(),
431            )
432            .await
433            .unwrap();
434
435        // Should find record in stream_a
436        let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
437        assert_eq!(
438            result,
439            Some(StreamPosition {
440                seq_num: 0,
441                timestamp: 1000
442            })
443        );
444
445        // Should return None, not find stream_b's record
446        let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
447        assert_eq!(result, None);
448    }
449
450    #[tokio::test]
451    async fn read_completes_when_all_records_deleted() {
452        let object_store = Arc::new(InMemory::new());
453        let db = Db::builder("/test", object_store).build().await.unwrap();
454        let backend = Backend::new(db, ByteSize::mib(10));
455
456        let basin: BasinName = "test-basin".parse().unwrap();
457        backend
458            .create_basin(
459                basin.clone(),
460                BasinConfig::default(),
461                CreateMode::CreateOnly(None),
462            )
463            .await
464            .unwrap();
465        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
466        backend
467            .create_stream(
468                basin.clone(),
469                stream.clone(),
470                OptionalStreamConfig::default(),
471                CreateMode::CreateOnly(None),
472            )
473            .await
474            .unwrap();
475
476        let record =
477            s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
478        let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
479        let parts = AppendRecordParts {
480            timestamp: None,
481            record: metered,
482        };
483        let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
484        let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
485        let input = AppendInput {
486            records: batch,
487            match_seq_num: None,
488            fencing_token: None,
489        };
490        let ack = backend
491            .append(basin.clone(), stream.clone(), input)
492            .await
493            .unwrap();
494        assert!(ack.end.seq_num > 0);
495
496        let stream_id = StreamId::new(&basin, &stream);
497        let mut batch = WriteBatch::new();
498        batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
499        static WRITE_OPTS: WriteOptions = WriteOptions {
500            await_durable: true,
501        };
502        backend
503            .db
504            .write_with_options(batch, &WRITE_OPTS)
505            .await
506            .unwrap();
507
508        let start = ReadStart {
509            from: ReadFrom::SeqNum(0),
510            clamp: false,
511        };
512        let end = ReadEnd {
513            limit: ReadLimit::Count(10),
514            until: ReadUntil::Unbounded,
515            wait: None,
516        };
517        let session = backend.read(basin, stream, start, end).await.unwrap();
518        let records: Vec<_> = tokio::time::timeout(
519            Duration::from_secs(2),
520            futures::StreamExt::collect::<Vec<_>>(session),
521        )
522        .await
523        .expect("read should not spin forever");
524        assert!(records.into_iter().all(|r| r.is_ok()));
525    }
526
527    #[tokio::test]
528    async fn read_wait_is_not_extended_by_heartbeats() {
529        let object_store = Arc::new(InMemory::new());
530        let db = Db::builder("/test", object_store).build().await.unwrap();
531        let backend = Backend::new(db, ByteSize::mib(10));
532
533        let basin: BasinName = "test-basin".parse().unwrap();
534        backend
535            .create_basin(
536                basin.clone(),
537                BasinConfig::default(),
538                CreateMode::CreateOnly(None),
539            )
540            .await
541            .unwrap();
542        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
543        backend
544            .create_stream(
545                basin.clone(),
546                stream.clone(),
547                OptionalStreamConfig::default(),
548                CreateMode::CreateOnly(None),
549            )
550            .await
551            .unwrap();
552
553        let wait = Duration::from_millis(30);
554        let start = ReadStart {
555            from: ReadFrom::SeqNum(0),
556            clamp: false,
557        };
558        let end = ReadEnd {
559            limit: ReadLimit::Unbounded,
560            until: ReadUntil::Unbounded,
561            wait: Some(wait),
562        };
563
564        let session = backend.read(basin, stream, start, end).await.unwrap();
565        let started = Instant::now();
566        let outputs = tokio::time::timeout(Duration::from_millis(150), session.collect::<Vec<_>>())
567            .await
568            .expect("read session should close once wait expires");
569
570        assert!(
571            started.elapsed() >= wait,
572            "read session ended before wait elapsed"
573        );
574        assert!(
575            outputs.len() > 1,
576            "expected heartbeats before wait deadline; got {} output(s)",
577            outputs.len()
578        );
579        assert!(outputs.into_iter().all(|o| o.is_ok()));
580    }
581
582    #[tokio::test(flavor = "current_thread", start_paused = true)]
583    async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
584        let object_store = Arc::new(InMemory::new());
585        let db = Db::builder("/test", object_store).build().await.unwrap();
586        let backend = Backend::new(db, ByteSize::mib(10));
587
588        let basin: BasinName = "test-basin".parse().unwrap();
589        backend
590            .create_basin(
591                basin.clone(),
592                BasinConfig::default(),
593                CreateMode::CreateOnly(None),
594            )
595            .await
596            .unwrap();
597        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
598        backend
599            .create_stream(
600                basin.clone(),
601                stream.clone(),
602                OptionalStreamConfig::default(),
603                CreateMode::CreateOnly(None),
604            )
605            .await
606            .unwrap();
607
608        let wait = Duration::from_secs(30);
609        let start = ReadStart {
610            from: ReadFrom::SeqNum(0),
611            clamp: false,
612        };
613        let end = ReadEnd {
614            limit: ReadLimit::Unbounded,
615            until: ReadUntil::Unbounded,
616            wait: Some(wait),
617        };
618        let session = backend
619            .read(basin.clone(), stream.clone(), start, end)
620            .await
621            .unwrap();
622        let mut session = Box::pin(session);
623
624        let first = session
625            .as_mut()
626            .next()
627            .await
628            .expect("session should enter follow mode")
629            .expect("session should not error");
630        assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
631
632        let stream_id = StreamId::new(&basin, &stream);
633        let mut delete_batch = WriteBatch::new();
634        let lagged_appends = FOLLOWER_MAX_LAG + 25;
635
636        for i in 0..lagged_appends {
637            let record = s2_common::record::Record::try_from_parts(
638                vec![],
639                bytes::Bytes::from(format!("lagged-{i}")),
640            )
641            .unwrap();
642            let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
643            let parts = AppendRecordParts {
644                timestamp: None,
645                record: metered,
646            };
647            let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
648            let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
649            let input = AppendInput {
650                records: batch,
651                match_seq_num: None,
652                fencing_token: None,
653            };
654            let ack = backend
655                .append(basin.clone(), stream.clone(), input)
656                .await
657                .unwrap();
658            delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
659        }
660
661        static WRITE_OPTS: WriteOptions = WriteOptions {
662            await_durable: true,
663        };
664        backend
665            .db
666            .write_with_options(delete_batch, &WRITE_OPTS)
667            .await
668            .unwrap();
669
670        tokio::time::advance(wait + Duration::from_secs(1)).await;
671        tokio::task::yield_now().await;
672
673        let next = session.as_mut().next().await;
674        assert!(
675            next.is_none(),
676            "session should close immediately once the original wait budget has elapsed"
677        );
678    }
679
680    #[tokio::test(flavor = "current_thread", start_paused = true)]
681    async fn unbounded_follow_survives_streamer_dormancy() {
682        let object_store = Arc::new(InMemory::new());
683        let db = Db::builder("/test", object_store).build().await.unwrap();
684        let backend = Backend::new(db, ByteSize::mib(10));
685
686        let basin: BasinName = "test-basin".parse().unwrap();
687        backend
688            .create_basin(
689                basin.clone(),
690                BasinConfig::default(),
691                CreateMode::CreateOnly(None),
692            )
693            .await
694            .unwrap();
695        let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
696        backend
697            .create_stream(
698                basin.clone(),
699                stream.clone(),
700                OptionalStreamConfig::default(),
701                CreateMode::CreateOnly(None),
702            )
703            .await
704            .unwrap();
705
706        let initial_record =
707            s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("initial"))
708                .unwrap();
709        let initial_metered: s2_common::record::Metered<s2_common::record::Record> =
710            initial_record.into();
711        let initial_parts = AppendRecordParts {
712            timestamp: None,
713            record: initial_metered,
714        };
715        let initial_append_record: s2_common::types::stream::AppendRecord =
716            initial_parts.try_into().unwrap();
717        let initial_batch: AppendRecordBatch = vec![initial_append_record].try_into().unwrap();
718        let initial_input = AppendInput {
719            records: initial_batch,
720            match_seq_num: None,
721            fencing_token: None,
722        };
723        backend
724            .append(basin.clone(), stream.clone(), initial_input)
725            .await
726            .unwrap();
727
728        let start = ReadStart {
729            from: ReadFrom::SeqNum(0),
730            clamp: false,
731        };
732        let end = ReadEnd {
733            limit: ReadLimit::Unbounded,
734            until: ReadUntil::Unbounded,
735            wait: None,
736        };
737        let session = backend
738            .read(basin.clone(), stream.clone(), start, end)
739            .await
740            .unwrap();
741        let mut session = Box::pin(session);
742
743        let first = session
744            .as_mut()
745            .next()
746            .await
747            .expect("session should yield initial batch")
748            .expect("session should not error");
749        assert!(matches!(first, ReadSessionOutput::Batch(_)));
750
751        let second = session
752            .as_mut()
753            .next()
754            .await
755            .expect("session should enter follow mode")
756            .expect("session should not error");
757        assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
758
759        tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
760        tokio::task::yield_now().await;
761
762        let follow_record =
763            s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("follow-1"))
764                .unwrap();
765        let follow_metered: s2_common::record::Metered<s2_common::record::Record> =
766            follow_record.into();
767        let follow_parts = AppendRecordParts {
768            timestamp: None,
769            record: follow_metered,
770        };
771        let follow_append_record: s2_common::types::stream::AppendRecord =
772            follow_parts.try_into().unwrap();
773        let follow_batch: AppendRecordBatch = vec![follow_append_record].try_into().unwrap();
774        let follow_input = AppendInput {
775            records: follow_batch,
776            match_seq_num: None,
777            fencing_token: None,
778        };
779        backend.append(basin, stream, follow_input).await.unwrap();
780
781        let next = session
782            .as_mut()
783            .next()
784            .await
785            .expect("session should stay open after dormancy")
786            .expect("session should not error after dormancy");
787        let ReadSessionOutput::Batch(batch) = next else {
788            panic!("expected new batch after append");
789        };
790        assert_eq!(batch.records.len(), 1);
791        let record = batch.records.first().expect("batch should have one record");
792        let s2_common::record::Record::Envelope(envelope) = &record.record else {
793            panic!("expected envelope record");
794        };
795        assert_eq!(envelope.body().as_ref(), b"follow-1");
796    }
797}