1use std::time::Duration;
2
3use futures::Stream;
4use s2_common::{
5 caps,
6 read_extent::{EvaluatedReadLimit, ReadLimit, ReadUntil},
7 record::{Metered, MeteredSize as _, SeqNum, SequencedRecord, StreamPosition, Timestamp},
8 types::{
9 basin::BasinName,
10 stream::{ReadBatch, ReadEnd, ReadPosition, ReadSessionOutput, ReadStart, StreamName},
11 },
12};
13use slatedb::config::{DurabilityLevel, ScanOptions};
14use tokio::{sync::broadcast, time::Instant};
15
16use super::Backend;
17use crate::backend::{
18 error::{
19 CheckTailError, ReadError, StorageError, StreamerMissingInActionError, UnwrittenError,
20 },
21 kv,
22 stream_id::StreamId,
23};
24
25impl Backend {
26 async fn read_start_seq_num(
27 &self,
28 stream_id: StreamId,
29 start: ReadStart,
30 end: ReadEnd,
31 tail: StreamPosition,
32 ) -> Result<SeqNum, ReadError> {
33 let mut read_pos = match start.from {
34 s2_common::types::stream::ReadFrom::SeqNum(seq_num) => ReadPosition::SeqNum(seq_num),
35 s2_common::types::stream::ReadFrom::Timestamp(timestamp) => {
36 ReadPosition::Timestamp(timestamp)
37 }
38 s2_common::types::stream::ReadFrom::TailOffset(tail_offset) => {
39 ReadPosition::SeqNum(tail.seq_num.saturating_sub(tail_offset))
40 }
41 };
42 if match read_pos {
43 ReadPosition::SeqNum(start_seq_num) => start_seq_num > tail.seq_num,
44 ReadPosition::Timestamp(start_timestamp) => start_timestamp > tail.timestamp,
45 } {
46 if start.clamp {
47 read_pos = ReadPosition::SeqNum(tail.seq_num);
48 } else {
49 return Err(UnwrittenError(tail).into());
50 }
51 }
52 if let ReadPosition::SeqNum(start_seq_num) = read_pos
53 && start_seq_num == tail.seq_num
54 && !end.may_follow()
55 {
56 return Err(UnwrittenError(tail).into());
57 }
58 Ok(match read_pos {
59 ReadPosition::SeqNum(start_seq_num) => start_seq_num,
60 ReadPosition::Timestamp(start_timestamp) => {
61 self.resolve_timestamp(stream_id, start_timestamp)
62 .await?
63 .unwrap_or(tail)
64 .seq_num
65 }
66 })
67 }
68
69 pub async fn check_tail(
70 &self,
71 basin: BasinName,
72 stream: StreamName,
73 ) -> Result<StreamPosition, CheckTailError> {
74 let client = self
75 .streamer_client_with_auto_create::<CheckTailError>(&basin, &stream, |config| {
76 config.create_stream_on_read
77 })
78 .await?;
79 let tail = client.check_tail().await?;
80 Ok(tail)
81 }
82
83 pub async fn read(
84 &self,
85 basin: BasinName,
86 stream: StreamName,
87 start: ReadStart,
88 end: ReadEnd,
89 ) -> Result<impl Stream<Item = Result<ReadSessionOutput, ReadError>> + 'static, ReadError> {
90 let client = self
91 .streamer_client_with_auto_create::<ReadError>(&basin, &stream, |config| {
92 config.create_stream_on_read
93 })
94 .await?;
95 let stream_id = client.stream_id();
96 let tail = client.check_tail().await?;
97 let mut state = ReadSessionState {
98 start_seq_num: self.read_start_seq_num(stream_id, start, end, tail).await?,
99 limit: EvaluatedReadLimit::Remaining(end.limit),
100 until: end.until,
101 wait: end.wait,
102 wait_deadline: None,
103 tail,
104 };
105 let db = self.db.clone();
106 let session = async_stream::try_stream! {
107 'session: while let EvaluatedReadLimit::Remaining(limit) = state.limit {
108 if state.start_seq_num < state.tail.seq_num {
109 let start_key = kv::stream_record_data::ser_key(
110 stream_id,
111 StreamPosition {
112 seq_num: state.start_seq_num,
113 timestamp: 0,
114 },
115 );
116 let end_key = kv::stream_record_data::ser_key(
117 stream_id,
118 StreamPosition {
119 seq_num: state.tail.seq_num,
120 timestamp: 0,
121 },
122 );
123 static SCAN_OPTS: ScanOptions = ScanOptions {
124 durability_filter: DurabilityLevel::Remote,
125 dirty: false,
126 read_ahead_bytes: 1024 * 1024,
127 cache_blocks: true,
128 max_fetch_tasks: 8,
129 };
130 let mut it = db
131 .scan_with_options(start_key..end_key, &SCAN_OPTS)
132 .await?;
133
134 let mut records = Metered::with_capacity(
135 limit.count()
136 .unwrap_or(usize::MAX)
137 .min(caps::RECORD_BATCH_MAX.count),
138 );
139
140 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
141 let Some(kv) = it.next().await? else {
142 break;
143 };
144 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
145 assert_eq!(deser_stream_id, stream_id);
146
147 let record = kv::stream_record_data::deser_value(kv.value)?.sequenced(pos);
148
149 if end.until.deny(pos.timestamp)
150 || limit.deny(records.len() + 1, records.metered_size() + record.metered_size()) {
151 if records.is_empty() {
152 break 'session;
153 } else {
154 break;
155 }
156 }
157
158 if records.len() == caps::RECORD_BATCH_MAX.count
159 || records.metered_size() + record.metered_size() > caps::RECORD_BATCH_MAX.bytes
160 {
161 let new_records_buf = Metered::with_capacity(
162 limit.count()
163 .map_or(usize::MAX, |n| n.saturating_sub(records.len()))
164 .min(caps::RECORD_BATCH_MAX.count),
165 );
166 yield state.on_batch(ReadBatch {
167 records: std::mem::replace(&mut records, new_records_buf),
168 tail: None,
169 });
170 }
171
172 records.push(record);
173 }
174
175 if !records.is_empty() {
176 yield state.on_batch(ReadBatch {
177 records,
178 tail: None,
179 });
180 } else {
181 state.start_seq_num = state.tail.seq_num;
182 }
183 } else {
184 assert_eq!(state.start_seq_num, state.tail.seq_num);
185 if !end.may_follow() {
186 break;
187 }
188 match client.follow(state.start_seq_num).await? {
189 Ok(mut follow_rx) => {
190 state.arm_wait_deadline_if_unset();
192 if state.wait_deadline_expired() {
193 break;
194 }
195 yield ReadSessionOutput::Heartbeat(state.tail);
196 while let EvaluatedReadLimit::Remaining(limit) = state.limit {
197 tokio::select! {
198 biased;
199 msg = follow_rx.recv() => {
200 match msg {
201 Ok(mut records) => {
202 let count = records.len();
203 let tail = super::streamer::next_pos(&records);
204 let allowed_count = count_allowed_records(limit, end.until, &records);
205 if allowed_count > 0 {
206 yield state.on_batch(ReadBatch {
207 records: records.drain(..allowed_count).collect(),
208 tail: Some(tail),
209 });
210 }
211 if allowed_count < count {
212 break 'session;
213 }
214 Ok(())
215 }
216 Err(broadcast::error::RecvError::Lagged(_)) => {
217 continue 'session;
219 }
220 Err(broadcast::error::RecvError::Closed) => {
221 Err(StreamerMissingInActionError)
222 }
223 }
224 }
225 _ = new_heartbeat_sleep() => {
226 yield ReadSessionOutput::Heartbeat(state.tail);
227 Ok(())
228 }
229 _ = wait_sleep_until(state.wait_deadline) => {
230 break 'session;
231 }
232 }?;
233 }
234 }
235 Err(tail) => {
236 assert!(state.tail.seq_num < tail.seq_num, "tail cannot regress");
237 state.tail = tail;
238 }
239 }
240 }
241 }
242 };
243 Ok(session)
244 }
245
246 pub(super) async fn resolve_timestamp(
247 &self,
248 stream_id: StreamId,
249 timestamp: Timestamp,
250 ) -> Result<Option<StreamPosition>, StorageError> {
251 let start_key = kv::stream_record_timestamp::ser_key(
252 stream_id,
253 StreamPosition {
254 seq_num: SeqNum::MIN,
255 timestamp,
256 },
257 );
258 let end_key = kv::stream_record_timestamp::ser_key(
259 stream_id,
260 StreamPosition {
261 seq_num: SeqNum::MAX,
262 timestamp: Timestamp::MAX,
263 },
264 );
265 static SCAN_OPTS: ScanOptions = ScanOptions {
266 durability_filter: DurabilityLevel::Remote,
267 dirty: false,
268 read_ahead_bytes: 1,
269 cache_blocks: false,
270 max_fetch_tasks: 1,
271 };
272 let mut it = self
273 .db
274 .scan_with_options(start_key..end_key, &SCAN_OPTS)
275 .await?;
276 Ok(match it.next().await? {
277 Some(kv) => {
278 let (deser_stream_id, pos) = kv::stream_record_timestamp::deser_key(kv.key)?;
279 assert_eq!(deser_stream_id, stream_id);
280 assert!(pos.timestamp >= timestamp);
281 kv::stream_record_timestamp::deser_value(kv.value)?;
282 Some(StreamPosition {
283 seq_num: pos.seq_num,
284 timestamp: pos.timestamp,
285 })
286 }
287 None => None,
288 })
289 }
290}
291
292struct ReadSessionState {
293 start_seq_num: u64,
294 limit: EvaluatedReadLimit,
295 until: ReadUntil,
296 wait: Option<Duration>,
297 wait_deadline: Option<Instant>,
298 tail: StreamPosition,
299}
300
301impl ReadSessionState {
302 fn arm_wait_deadline_if_unset(&mut self) {
303 if self.wait_deadline.is_none() {
304 self.reset_wait_deadline();
305 }
306 }
307
308 fn reset_wait_deadline(&mut self) {
309 self.wait_deadline = self.wait.map(|wait| Instant::now() + wait);
310 }
311
312 fn wait_deadline_expired(&self) -> bool {
313 self.wait_deadline
314 .is_some_and(|deadline| deadline <= Instant::now())
315 }
316
317 fn on_batch(&mut self, batch: ReadBatch) -> ReadSessionOutput {
318 if let Some(tail) = batch.tail {
319 self.tail = tail;
320 }
321 let last_record = batch.records.last().expect("non-empty");
322 let EvaluatedReadLimit::Remaining(limit) = self.limit else {
323 panic!("batch after exhausted limit");
324 };
325 let count = batch.records.len();
326 let bytes = batch.records.metered_size();
327 assert!(limit.allow(count, bytes));
328 assert!(self.until.allow(last_record.position.timestamp));
329 self.start_seq_num = last_record.position.seq_num + 1;
330 self.limit = limit.remaining(count, bytes);
331 self.reset_wait_deadline();
332 ReadSessionOutput::Batch(batch)
333 }
334}
335
336fn count_allowed_records(
337 limit: ReadLimit,
338 until: ReadUntil,
339 records: &[Metered<SequencedRecord>],
340) -> usize {
341 let mut acc_size = 0;
342 let mut acc_count = 0;
343 for record in records {
344 if limit.deny(acc_count + 1, acc_size + record.metered_size())
345 || until.deny(record.position.timestamp)
346 {
347 break;
348 }
349 acc_count += 1;
350 acc_size += record.metered_size();
351 }
352 acc_count
353}
354
355#[cfg(not(test))]
356fn new_heartbeat_sleep() -> tokio::time::Sleep {
357 tokio::time::sleep(Duration::from_millis(rand::random_range(5_000..15_000)))
358}
359
360#[cfg(test)]
361fn new_heartbeat_sleep() -> tokio::time::Sleep {
362 tokio::time::sleep(Duration::from_millis(rand::random_range(5..15)))
363}
364
365async fn wait_sleep_until(deadline: Option<Instant>) {
366 match deadline {
367 Some(deadline) => tokio::time::sleep_until(deadline).await,
368 None => {
369 std::future::pending::<()>().await;
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use std::sync::Arc;
377
378 use bytesize::ByteSize;
379 use futures::StreamExt;
380 use s2_common::{
381 read_extent::{ReadLimit, ReadUntil},
382 types::{
383 basin::BasinName,
384 config::{BasinConfig, OptionalStreamConfig},
385 resources::CreateMode,
386 stream::{
387 AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart,
388 },
389 },
390 };
391 use slatedb::{Db, WriteBatch, config::WriteOptions, object_store::memory::InMemory};
392 use tokio::time::Instant;
393
394 use super::*;
395 use crate::backend::{FOLLOWER_MAX_LAG, kv, stream_id::StreamId, streamer::DORMANT_TIMEOUT};
396
397 #[tokio::test]
398 async fn resolve_timestamp_bounded_to_stream() {
399 let object_store = Arc::new(InMemory::new());
400 let db = Db::builder("/test", object_store).build().await.unwrap();
401 let backend = Backend::new(db, ByteSize::mib(10));
402
403 let stream_a: StreamId = [0u8; 32].into();
404 let stream_b: StreamId = [1u8; 32].into();
405
406 backend
407 .db
408 .put(
409 kv::stream_record_timestamp::ser_key(
410 stream_a,
411 StreamPosition {
412 seq_num: 0,
413 timestamp: 1000,
414 },
415 ),
416 kv::stream_record_timestamp::ser_value(),
417 )
418 .await
419 .unwrap();
420 backend
421 .db
422 .put(
423 kv::stream_record_timestamp::ser_key(
424 stream_b,
425 StreamPosition {
426 seq_num: 0,
427 timestamp: 2000,
428 },
429 ),
430 kv::stream_record_timestamp::ser_value(),
431 )
432 .await
433 .unwrap();
434
435 let result = backend.resolve_timestamp(stream_a, 500).await.unwrap();
437 assert_eq!(
438 result,
439 Some(StreamPosition {
440 seq_num: 0,
441 timestamp: 1000
442 })
443 );
444
445 let result = backend.resolve_timestamp(stream_a, 1500).await.unwrap();
447 assert_eq!(result, None);
448 }
449
450 #[tokio::test]
451 async fn read_completes_when_all_records_deleted() {
452 let object_store = Arc::new(InMemory::new());
453 let db = Db::builder("/test", object_store).build().await.unwrap();
454 let backend = Backend::new(db, ByteSize::mib(10));
455
456 let basin: BasinName = "test-basin".parse().unwrap();
457 backend
458 .create_basin(
459 basin.clone(),
460 BasinConfig::default(),
461 CreateMode::CreateOnly(None),
462 )
463 .await
464 .unwrap();
465 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
466 backend
467 .create_stream(
468 basin.clone(),
469 stream.clone(),
470 OptionalStreamConfig::default(),
471 CreateMode::CreateOnly(None),
472 )
473 .await
474 .unwrap();
475
476 let record =
477 s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("x")).unwrap();
478 let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
479 let parts = AppendRecordParts {
480 timestamp: None,
481 record: metered,
482 };
483 let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
484 let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
485 let input = AppendInput {
486 records: batch,
487 match_seq_num: None,
488 fencing_token: None,
489 };
490 let ack = backend
491 .append(basin.clone(), stream.clone(), input)
492 .await
493 .unwrap();
494 assert!(ack.end.seq_num > 0);
495
496 let stream_id = StreamId::new(&basin, &stream);
497 let mut batch = WriteBatch::new();
498 batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
499 static WRITE_OPTS: WriteOptions = WriteOptions {
500 await_durable: true,
501 };
502 backend
503 .db
504 .write_with_options(batch, &WRITE_OPTS)
505 .await
506 .unwrap();
507
508 let start = ReadStart {
509 from: ReadFrom::SeqNum(0),
510 clamp: false,
511 };
512 let end = ReadEnd {
513 limit: ReadLimit::Count(10),
514 until: ReadUntil::Unbounded,
515 wait: None,
516 };
517 let session = backend.read(basin, stream, start, end).await.unwrap();
518 let records: Vec<_> = tokio::time::timeout(
519 Duration::from_secs(2),
520 futures::StreamExt::collect::<Vec<_>>(session),
521 )
522 .await
523 .expect("read should not spin forever");
524 assert!(records.into_iter().all(|r| r.is_ok()));
525 }
526
527 #[tokio::test]
528 async fn read_wait_is_not_extended_by_heartbeats() {
529 let object_store = Arc::new(InMemory::new());
530 let db = Db::builder("/test", object_store).build().await.unwrap();
531 let backend = Backend::new(db, ByteSize::mib(10));
532
533 let basin: BasinName = "test-basin".parse().unwrap();
534 backend
535 .create_basin(
536 basin.clone(),
537 BasinConfig::default(),
538 CreateMode::CreateOnly(None),
539 )
540 .await
541 .unwrap();
542 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
543 backend
544 .create_stream(
545 basin.clone(),
546 stream.clone(),
547 OptionalStreamConfig::default(),
548 CreateMode::CreateOnly(None),
549 )
550 .await
551 .unwrap();
552
553 let wait = Duration::from_millis(30);
554 let start = ReadStart {
555 from: ReadFrom::SeqNum(0),
556 clamp: false,
557 };
558 let end = ReadEnd {
559 limit: ReadLimit::Unbounded,
560 until: ReadUntil::Unbounded,
561 wait: Some(wait),
562 };
563
564 let session = backend.read(basin, stream, start, end).await.unwrap();
565 let started = Instant::now();
566 let outputs = tokio::time::timeout(Duration::from_millis(150), session.collect::<Vec<_>>())
567 .await
568 .expect("read session should close once wait expires");
569
570 assert!(
571 started.elapsed() >= wait,
572 "read session ended before wait elapsed"
573 );
574 assert!(
575 outputs.len() > 1,
576 "expected heartbeats before wait deadline; got {} output(s)",
577 outputs.len()
578 );
579 assert!(outputs.into_iter().all(|o| o.is_ok()));
580 }
581
582 #[tokio::test(flavor = "current_thread", start_paused = true)]
583 async fn read_wait_is_not_reset_after_follow_lag_without_catchup_records() {
584 let object_store = Arc::new(InMemory::new());
585 let db = Db::builder("/test", object_store).build().await.unwrap();
586 let backend = Backend::new(db, ByteSize::mib(10));
587
588 let basin: BasinName = "test-basin".parse().unwrap();
589 backend
590 .create_basin(
591 basin.clone(),
592 BasinConfig::default(),
593 CreateMode::CreateOnly(None),
594 )
595 .await
596 .unwrap();
597 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
598 backend
599 .create_stream(
600 basin.clone(),
601 stream.clone(),
602 OptionalStreamConfig::default(),
603 CreateMode::CreateOnly(None),
604 )
605 .await
606 .unwrap();
607
608 let wait = Duration::from_secs(30);
609 let start = ReadStart {
610 from: ReadFrom::SeqNum(0),
611 clamp: false,
612 };
613 let end = ReadEnd {
614 limit: ReadLimit::Unbounded,
615 until: ReadUntil::Unbounded,
616 wait: Some(wait),
617 };
618 let session = backend
619 .read(basin.clone(), stream.clone(), start, end)
620 .await
621 .unwrap();
622 let mut session = Box::pin(session);
623
624 let first = session
625 .as_mut()
626 .next()
627 .await
628 .expect("session should enter follow mode")
629 .expect("session should not error");
630 assert!(matches!(first, ReadSessionOutput::Heartbeat(_)));
631
632 let stream_id = StreamId::new(&basin, &stream);
633 let mut delete_batch = WriteBatch::new();
634 let lagged_appends = FOLLOWER_MAX_LAG + 25;
635
636 for i in 0..lagged_appends {
637 let record = s2_common::record::Record::try_from_parts(
638 vec![],
639 bytes::Bytes::from(format!("lagged-{i}")),
640 )
641 .unwrap();
642 let metered: s2_common::record::Metered<s2_common::record::Record> = record.into();
643 let parts = AppendRecordParts {
644 timestamp: None,
645 record: metered,
646 };
647 let append_record: s2_common::types::stream::AppendRecord = parts.try_into().unwrap();
648 let batch: AppendRecordBatch = vec![append_record].try_into().unwrap();
649 let input = AppendInput {
650 records: batch,
651 match_seq_num: None,
652 fencing_token: None,
653 };
654 let ack = backend
655 .append(basin.clone(), stream.clone(), input)
656 .await
657 .unwrap();
658 delete_batch.delete(kv::stream_record_data::ser_key(stream_id, ack.start));
659 }
660
661 static WRITE_OPTS: WriteOptions = WriteOptions {
662 await_durable: true,
663 };
664 backend
665 .db
666 .write_with_options(delete_batch, &WRITE_OPTS)
667 .await
668 .unwrap();
669
670 tokio::time::advance(wait + Duration::from_secs(1)).await;
671 tokio::task::yield_now().await;
672
673 let next = session.as_mut().next().await;
674 assert!(
675 next.is_none(),
676 "session should close immediately once the original wait budget has elapsed"
677 );
678 }
679
680 #[tokio::test(flavor = "current_thread", start_paused = true)]
681 async fn unbounded_follow_survives_streamer_dormancy() {
682 let object_store = Arc::new(InMemory::new());
683 let db = Db::builder("/test", object_store).build().await.unwrap();
684 let backend = Backend::new(db, ByteSize::mib(10));
685
686 let basin: BasinName = "test-basin".parse().unwrap();
687 backend
688 .create_basin(
689 basin.clone(),
690 BasinConfig::default(),
691 CreateMode::CreateOnly(None),
692 )
693 .await
694 .unwrap();
695 let stream: s2_common::types::stream::StreamName = "test-stream".parse().unwrap();
696 backend
697 .create_stream(
698 basin.clone(),
699 stream.clone(),
700 OptionalStreamConfig::default(),
701 CreateMode::CreateOnly(None),
702 )
703 .await
704 .unwrap();
705
706 let initial_record =
707 s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("initial"))
708 .unwrap();
709 let initial_metered: s2_common::record::Metered<s2_common::record::Record> =
710 initial_record.into();
711 let initial_parts = AppendRecordParts {
712 timestamp: None,
713 record: initial_metered,
714 };
715 let initial_append_record: s2_common::types::stream::AppendRecord =
716 initial_parts.try_into().unwrap();
717 let initial_batch: AppendRecordBatch = vec![initial_append_record].try_into().unwrap();
718 let initial_input = AppendInput {
719 records: initial_batch,
720 match_seq_num: None,
721 fencing_token: None,
722 };
723 backend
724 .append(basin.clone(), stream.clone(), initial_input)
725 .await
726 .unwrap();
727
728 let start = ReadStart {
729 from: ReadFrom::SeqNum(0),
730 clamp: false,
731 };
732 let end = ReadEnd {
733 limit: ReadLimit::Unbounded,
734 until: ReadUntil::Unbounded,
735 wait: None,
736 };
737 let session = backend
738 .read(basin.clone(), stream.clone(), start, end)
739 .await
740 .unwrap();
741 let mut session = Box::pin(session);
742
743 let first = session
744 .as_mut()
745 .next()
746 .await
747 .expect("session should yield initial batch")
748 .expect("session should not error");
749 assert!(matches!(first, ReadSessionOutput::Batch(_)));
750
751 let second = session
752 .as_mut()
753 .next()
754 .await
755 .expect("session should enter follow mode")
756 .expect("session should not error");
757 assert!(matches!(second, ReadSessionOutput::Heartbeat(_)));
758
759 tokio::time::advance(DORMANT_TIMEOUT + Duration::from_secs(1)).await;
760 tokio::task::yield_now().await;
761
762 let follow_record =
763 s2_common::record::Record::try_from_parts(vec![], bytes::Bytes::from("follow-1"))
764 .unwrap();
765 let follow_metered: s2_common::record::Metered<s2_common::record::Record> =
766 follow_record.into();
767 let follow_parts = AppendRecordParts {
768 timestamp: None,
769 record: follow_metered,
770 };
771 let follow_append_record: s2_common::types::stream::AppendRecord =
772 follow_parts.try_into().unwrap();
773 let follow_batch: AppendRecordBatch = vec![follow_append_record].try_into().unwrap();
774 let follow_input = AppendInput {
775 records: follow_batch,
776 match_seq_num: None,
777 fencing_token: None,
778 };
779 backend.append(basin, stream, follow_input).await.unwrap();
780
781 let next = session
782 .as_mut()
783 .next()
784 .await
785 .expect("session should stay open after dormancy")
786 .expect("session should not error after dormancy");
787 let ReadSessionOutput::Batch(batch) = next else {
788 panic!("expected new batch after append");
789 };
790 assert_eq!(batch.records.len(), 1);
791 let record = batch.records.first().expect("batch should have one record");
792 let s2_common::record::Record::Envelope(envelope) = &record.record else {
793 panic!("expected envelope record");
794 };
795 assert_eq!(envelope.body().as_ref(), b"follow-1");
796 }
797}