1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::sync::{Arc, Mutex as StdMutex};
4use std::time::{Duration, Instant};
5
6use tokio::sync::broadcast;
7
8use crate::runtime::{LashRuntime, RuntimeSessionState};
9
10const SESSION_CURSOR_PREFIX: &str = "lashsc1:";
11const DEFAULT_LIVE_REPLAY_CAPACITY: usize = 2048;
12const DEFAULT_LIVE_REPLAY_TTL: Duration = Duration::from_secs(120);
13
14#[derive(
15 Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
16)]
17#[serde(transparent)]
18pub struct SessionRevision(pub u64);
19
20impl SessionRevision {
21 pub fn new(revision: u64) -> Self {
22 Self(revision)
23 }
24
25 pub fn as_u64(self) -> u64 {
26 self.0
27 }
28
29 pub(super) fn from_runtime(runtime: &LashRuntime) -> Self {
30 Self::from_state(&runtime.export_persisted_state())
31 }
32
33 pub(super) fn from_state(state: &RuntimeSessionState) -> Self {
34 Self(state.head_revision.unwrap_or(state.turn_index as u64))
35 }
36}
37
38#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
39#[serde(transparent)]
40pub struct SessionCursor(String);
41
42impl SessionCursor {
43 pub(crate) fn new(
44 session_id: impl AsRef<str>,
45 revision: SessionRevision,
46 live_position: u64,
47 ) -> Self {
48 Self(format!(
49 "{SESSION_CURSOR_PREFIX}{}:{live_position}:{}",
50 revision.0,
51 session_id.as_ref()
52 ))
53 }
54
55 #[cfg(test)]
56 pub(super) fn from_raw_for_testing(raw: impl Into<String>) -> Self {
57 Self(raw.into())
58 }
59
60 pub fn as_str(&self) -> &str {
61 &self.0
62 }
63
64 pub(crate) fn parse_for_session(
65 &self,
66 expected_session_id: &str,
67 ) -> Result<ParsedSessionCursor, SessionCursorError> {
68 let parsed = self.parse()?;
69 if parsed.session_id != expected_session_id {
70 return Err(SessionCursorError::WrongSession {
71 expected_session_id: expected_session_id.to_string(),
72 actual_session_id: parsed.session_id,
73 });
74 }
75 Ok(parsed)
76 }
77
78 fn parse(&self) -> Result<ParsedSessionCursor, SessionCursorError> {
79 let payload = self.0.strip_prefix(SESSION_CURSOR_PREFIX).ok_or_else(|| {
80 SessionCursorError::Malformed {
81 message: "missing cursor prefix".to_string(),
82 }
83 })?;
84 let mut parts = payload.splitn(3, ':');
85 let revision = parts
86 .next()
87 .ok_or_else(|| SessionCursorError::Malformed {
88 message: "missing session revision".to_string(),
89 })?
90 .parse::<u64>()
91 .map_err(|err| SessionCursorError::Malformed {
92 message: format!("invalid session revision: {err}"),
93 })?;
94 let live_position = parts
95 .next()
96 .ok_or_else(|| SessionCursorError::Malformed {
97 message: "missing live replay position".to_string(),
98 })?
99 .parse::<u64>()
100 .map_err(|err| SessionCursorError::Malformed {
101 message: format!("invalid live replay position: {err}"),
102 })?;
103 let session_id = parts
104 .next()
105 .filter(|value| !value.is_empty())
106 .ok_or_else(|| SessionCursorError::Malformed {
107 message: "missing session id".to_string(),
108 })?
109 .to_string();
110 Ok(ParsedSessionCursor {
111 session_id,
112 revision: SessionRevision(revision),
113 live_position,
114 })
115 }
116}
117
118impl fmt::Debug for SessionCursor {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 f.write_str("SessionCursor(<opaque>)")
121 }
122}
123
124impl fmt::Display for SessionCursor {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.write_str(&self.0)
127 }
128}
129
130#[derive(Clone, Debug)]
131pub(crate) struct ParsedSessionCursor {
132 pub session_id: String,
133 pub revision: SessionRevision,
134 pub live_position: u64,
135}
136
137#[derive(Clone, Debug, thiserror::Error)]
138pub enum SessionCursorError {
139 #[error("malformed session cursor: {message}")]
140 Malformed { message: String },
141 #[error("session cursor belongs to `{actual_session_id}`, not `{expected_session_id}`")]
142 WrongSession {
143 expected_session_id: String,
144 actual_session_id: String,
145 },
146}
147
148#[derive(Clone, Debug)]
149pub struct SessionObservation {
150 pub read_view: crate::SessionReadView,
151 pub cursor: SessionCursor,
152}
153
154#[derive(Clone, Debug)]
155pub struct SessionObservationEvent {
156 pub session_id: String,
157 pub revision: SessionRevision,
158 pub cursor: SessionCursor,
159 pub payload: SessionObservationEventPayload,
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
163#[serde(rename_all = "snake_case")]
164pub enum SessionQueueEventKind {
165 Enqueued,
166 Cancelled,
167}
168
169#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
170#[serde(rename_all = "snake_case")]
171pub enum SessionProcessEventKind {
172 Started,
173 Cancelled,
174}
175
176#[derive(Clone, Debug)]
177#[allow(clippy::large_enum_variant)]
178pub enum SessionObservationEventPayload {
179 TurnActivity(crate::TurnActivity),
180 Committed {
181 read_view: crate::SessionReadView,
182 },
183 AgentFrameSwitched {
184 frame_id: String,
185 },
186 QueueChanged {
187 kind: SessionQueueEventKind,
188 batch_ids: Vec<String>,
189 },
190 ProcessChanged {
191 kind: SessionProcessEventKind,
192 process_ids: Vec<String>,
193 },
194}
195
196#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
197pub struct LiveReplayGap {
198 pub session_id: String,
199 pub requested_cursor: SessionCursor,
200 pub latest_cursor: SessionCursor,
201 pub latest_revision: SessionRevision,
202 pub reason: LiveReplayGapReason,
203}
204
205#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
206#[serde(rename_all = "snake_case")]
207pub enum LiveReplayGapReason {
208 Trimmed,
209 Unavailable,
210}
211
212#[derive(Clone, Debug, thiserror::Error)]
213pub enum LiveReplayStoreError {
214 #[error("{0}")]
215 Cursor(#[from] SessionCursorError),
216 #[error("live replay store error: {0}")]
217 Store(String),
218 #[error("live replay subscriber lagged by {0} events")]
219 SubscriberLagged(u64),
220 #[error("live replay channel closed")]
221 Closed,
222}
223
224#[derive(Clone, Debug)]
225pub enum LiveReplayResult {
226 Replayed(Vec<SessionObservationEvent>),
227 Gap(LiveReplayGapReason),
228}
229
230pub enum LiveReplaySubscribeResult {
231 Subscribed(LiveReplaySubscription),
232 Gap(LiveReplayGapReason),
233}
234
235pub struct LiveReplaySubscription {
236 replay: VecDeque<SessionObservationEvent>,
237 receiver: broadcast::Receiver<SessionObservationEvent>,
238}
239
240impl LiveReplaySubscription {
241 fn new(
242 replay: Vec<SessionObservationEvent>,
243 receiver: broadcast::Receiver<SessionObservationEvent>,
244 ) -> Self {
245 Self {
246 replay: replay.into(),
247 receiver,
248 }
249 }
250
251 pub async fn next_event(&mut self) -> Result<SessionObservationEvent, LiveReplayStoreError> {
252 if let Some(event) = self.replay.pop_front() {
253 return Ok(event);
254 }
255 match self.receiver.recv().await {
256 Ok(event) => Ok(event),
257 Err(broadcast::error::RecvError::Lagged(count)) => {
258 Err(LiveReplayStoreError::SubscriberLagged(count))
259 }
260 Err(broadcast::error::RecvError::Closed) => Err(LiveReplayStoreError::Closed),
261 }
262 }
263}
264
265#[derive(Clone, Debug)]
266pub enum SessionResume {
267 Replayed {
268 events: Vec<SessionObservationEvent>,
269 },
270 Gap {
271 observation: SessionObservation,
272 gap: LiveReplayGap,
273 },
274}
275
276pub enum SessionObservationSubscription {
277 Subscribed(LiveReplaySubscription),
278 Gap {
279 observation: SessionObservation,
280 gap: LiveReplayGap,
281 },
282}
283
284pub trait LiveReplayStore: Send + Sync {
292 fn append(
296 &self,
297 session_id: &str,
298 revision: SessionRevision,
299 payload: SessionObservationEventPayload,
300 ) -> Result<SessionObservationEvent, LiveReplayStoreError>;
301
302 fn replay_after_cursor(
306 &self,
307 cursor: &SessionCursor,
308 ) -> Result<LiveReplayResult, LiveReplayStoreError>;
309
310 fn subscribe_after_cursor(
314 &self,
315 cursor: &SessionCursor,
316 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError>;
317
318 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor;
322
323 fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError>;
327}
328
329#[derive(Clone, Debug)]
330pub struct InMemoryLiveReplayStoreConfig {
331 pub max_events_per_session: usize,
332 pub max_age: Duration,
333}
334
335impl Default for InMemoryLiveReplayStoreConfig {
336 fn default() -> Self {
337 Self {
338 max_events_per_session: DEFAULT_LIVE_REPLAY_CAPACITY,
339 max_age: DEFAULT_LIVE_REPLAY_TTL,
340 }
341 }
342}
343
344#[derive(Debug)]
345pub struct InMemoryLiveReplayStore {
346 config: InMemoryLiveReplayStoreConfig,
347 clock: Arc<dyn crate::Clock>,
348 sessions: StdMutex<HashMap<String, LiveReplaySessionBuffer>>,
349}
350
351impl InMemoryLiveReplayStore {
352 pub fn new(config: InMemoryLiveReplayStoreConfig) -> Self {
353 Self::with_clock(config, Arc::new(crate::SystemClock))
354 }
355
356 pub fn with_clock(config: InMemoryLiveReplayStoreConfig, clock: Arc<dyn crate::Clock>) -> Self {
357 Self {
358 config,
359 clock,
360 sessions: StdMutex::new(HashMap::new()),
361 }
362 }
363
364 pub fn with_bounds(max_events_per_session: usize, max_age: Duration) -> Self {
365 Self::new(InMemoryLiveReplayStoreConfig {
366 max_events_per_session,
367 max_age,
368 })
369 }
370}
371
372impl Default for InMemoryLiveReplayStore {
373 fn default() -> Self {
374 Self::new(InMemoryLiveReplayStoreConfig::default())
375 }
376}
377
378#[derive(Debug)]
379struct LiveReplaySessionBuffer {
380 events: VecDeque<StoredObservationEvent>,
381 tail_position: u64,
382 sender: Option<broadcast::Sender<SessionObservationEvent>>,
383}
384
385impl LiveReplaySessionBuffer {
386 fn new() -> Self {
387 Self {
388 events: VecDeque::new(),
389 tail_position: 0,
390 sender: None,
391 }
392 }
393
394 fn subscribe(
395 &mut self,
396 channel_capacity: usize,
397 ) -> broadcast::Receiver<SessionObservationEvent> {
398 match self.sender.as_ref() {
399 Some(sender) => sender.subscribe(),
400 None => {
401 let (sender, receiver) = broadcast::channel(channel_capacity.max(1));
402 self.sender = Some(sender);
403 receiver
404 }
405 }
406 }
407
408 fn publish(&mut self, event: SessionObservationEvent) {
409 let Some(sender) = self.sender.as_ref() else {
410 return;
411 };
412 if sender.send(event).is_err() {
413 self.sender = None;
414 }
415 }
416}
417
418#[derive(Clone, Debug)]
419struct StoredObservationEvent {
420 position: u64,
421 appended_at: Instant,
422 event: SessionObservationEvent,
423}
424
425impl InMemoryLiveReplayStore {
426 fn trim_locked(
427 config: &InMemoryLiveReplayStoreConfig,
428 buffer: &mut LiveReplaySessionBuffer,
429 now: Instant,
430 ) {
431 while buffer.events.len() > config.max_events_per_session {
432 buffer.events.pop_front();
433 }
434 while buffer
435 .events
436 .front()
437 .is_some_and(|event| now.duration_since(event.appended_at) > config.max_age)
438 {
439 buffer.events.pop_front();
440 }
441 }
442
443 fn gap_reason_for_cursor(
444 buffer: Option<&LiveReplaySessionBuffer>,
445 cursor_position: u64,
446 ) -> Option<LiveReplayGapReason> {
447 let Some(buffer) = buffer else {
448 return (cursor_position > 0).then_some(LiveReplayGapReason::Unavailable);
449 };
450 if cursor_position > buffer.tail_position {
451 return Some(LiveReplayGapReason::Unavailable);
452 }
453 let Some(first) = buffer.events.front() else {
454 return (cursor_position < buffer.tail_position)
455 .then_some(LiveReplayGapReason::Trimmed);
456 };
457 if cursor_position + 1 < first.position {
458 Some(LiveReplayGapReason::Trimmed)
459 } else {
460 None
461 }
462 }
463}
464
465impl LiveReplayStore for InMemoryLiveReplayStore {
466 fn append(
467 &self,
468 session_id: &str,
469 revision: SessionRevision,
470 payload: SessionObservationEventPayload,
471 ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
472 let now = self.clock.now();
473 let mut sessions = self
474 .sessions
475 .lock()
476 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
477 let buffer = sessions
478 .entry(session_id.to_string())
479 .or_insert_with(LiveReplaySessionBuffer::new);
480 buffer.tail_position = buffer.tail_position.saturating_add(1);
481 let cursor = SessionCursor::new(session_id, revision, buffer.tail_position);
482 let event = SessionObservationEvent {
483 session_id: session_id.to_string(),
484 revision,
485 cursor,
486 payload,
487 };
488 buffer.events.push_back(StoredObservationEvent {
489 position: buffer.tail_position,
490 appended_at: now,
491 event: event.clone(),
492 });
493 Self::trim_locked(&self.config, buffer, now);
494 buffer.publish(event.clone());
495 Ok(event)
496 }
497
498 fn replay_after_cursor(
499 &self,
500 cursor: &SessionCursor,
501 ) -> Result<LiveReplayResult, LiveReplayStoreError> {
502 let parsed = cursor.parse()?;
503 let _cursor_revision = parsed.revision;
504 let now = self.clock.now();
505 let mut sessions = self
506 .sessions
507 .lock()
508 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
509 if let Some(buffer) = sessions.get_mut(&parsed.session_id) {
510 Self::trim_locked(&self.config, buffer, now);
511 }
512 let buffer = sessions.get(&parsed.session_id);
513 if let Some(reason) = Self::gap_reason_for_cursor(buffer, parsed.live_position) {
514 return Ok(LiveReplayResult::Gap(reason));
515 }
516 let events = buffer
517 .map(|buffer| {
518 buffer
519 .events
520 .iter()
521 .filter(|event| event.position > parsed.live_position)
522 .map(|event| event.event.clone())
523 .collect()
524 })
525 .unwrap_or_default();
526 Ok(LiveReplayResult::Replayed(events))
527 }
528
529 fn subscribe_after_cursor(
530 &self,
531 cursor: &SessionCursor,
532 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
533 let parsed = cursor.parse()?;
534 let _cursor_revision = parsed.revision;
535 let now = self.clock.now();
536 let mut sessions = self
537 .sessions
538 .lock()
539 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
540 let buffer = sessions
541 .entry(parsed.session_id.clone())
542 .or_insert_with(LiveReplaySessionBuffer::new);
543 Self::trim_locked(&self.config, buffer, now);
544 if let Some(reason) = Self::gap_reason_for_cursor(Some(buffer), parsed.live_position) {
545 return Ok(LiveReplaySubscribeResult::Gap(reason));
546 }
547 let replay = buffer
548 .events
549 .iter()
550 .filter(|event| event.position > parsed.live_position)
551 .map(|event| event.event.clone())
552 .collect();
553 let receiver = buffer.subscribe(self.config.max_events_per_session);
554 Ok(LiveReplaySubscribeResult::Subscribed(
555 LiveReplaySubscription::new(replay, receiver),
556 ))
557 }
558
559 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
560 let tail_position = self
561 .sessions
562 .lock()
563 .ok()
564 .and_then(|sessions| sessions.get(session_id).map(|buffer| buffer.tail_position))
565 .unwrap_or(0);
566 SessionCursor::new(session_id, revision, tail_position)
567 }
568
569 fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError> {
570 let now = self.clock.now();
571 let mut sessions = self
572 .sessions
573 .lock()
574 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
575 if let Some(buffer) = sessions.get_mut(session_id) {
576 Self::trim_locked(&self.config, buffer, now);
577 }
578 Ok(())
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 fn activity(text: &str) -> SessionObservationEventPayload {
587 SessionObservationEventPayload::TurnActivity(crate::TurnActivity::independent(
588 crate::TurnEvent::AssistantProseDelta {
589 text: text.to_string(),
590 },
591 ))
592 }
593
594 #[test]
595 fn session_cursor_round_trips_and_debug_is_opaque() {
596 let cursor = SessionCursor::new("session:with:colon", SessionRevision(3), 9);
597 let encoded = serde_json::to_string(&cursor).expect("serialize");
598 let decoded: SessionCursor = serde_json::from_str(&encoded).expect("deserialize");
599 assert_eq!(decoded, cursor);
600 assert_eq!(format!("{cursor:?}"), "SessionCursor(<opaque>)");
601 let parsed = cursor
602 .parse_for_session("session:with:colon")
603 .expect("parse");
604 assert_eq!(parsed.revision, SessionRevision(3));
605 assert_eq!(parsed.live_position, 9);
606 }
607
608 #[test]
609 fn session_cursor_rejects_malformed_and_wrong_session() {
610 let malformed = SessionCursor::from_raw_for_testing("bad");
611 assert!(matches!(
612 malformed.parse_for_session("s"),
613 Err(SessionCursorError::Malformed { .. })
614 ));
615 let cursor = SessionCursor::new("actual", SessionRevision(0), 0);
616 assert!(matches!(
617 cursor.parse_for_session("expected"),
618 Err(SessionCursorError::WrongSession { .. })
619 ));
620 }
621
622 #[test]
623 fn in_memory_replay_store_replays_after_cursor_in_order() {
624 let store = InMemoryLiveReplayStore::default();
625 let start = store.current_cursor("s", SessionRevision(0));
626 store
627 .append("s", SessionRevision(0), activity("a"))
628 .expect("append a");
629 store
630 .append("s", SessionRevision(0), activity("b"))
631 .expect("append b");
632 let LiveReplayResult::Replayed(events) = store.replay_after_cursor(&start).expect("replay")
633 else {
634 panic!("expected replay");
635 };
636 assert_eq!(events.len(), 2);
637 match &events[0].payload {
638 SessionObservationEventPayload::TurnActivity(activity) => match &activity.event {
639 crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "a"),
640 _ => panic!("wrong event"),
641 },
642 _ => panic!("wrong payload"),
643 }
644 }
645
646 #[test]
647 fn in_memory_replay_store_reports_gap_after_capacity_trim() {
648 let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
649 let start = store.current_cursor("s", SessionRevision(0));
650 store
651 .append("s", SessionRevision(0), activity("a"))
652 .expect("append a");
653 store
654 .append("s", SessionRevision(0), activity("b"))
655 .expect("append b");
656 assert!(matches!(
657 store.replay_after_cursor(&start).expect("gap"),
658 LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
659 ));
660 }
661
662 #[test]
663 fn in_memory_replay_store_reports_gap_after_ttl_trim() {
664 let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
665 let start = store.current_cursor("s", SessionRevision(0));
666 store
667 .append("s", SessionRevision(0), activity("a"))
668 .expect("append a");
669 std::thread::sleep(Duration::from_millis(5));
670 assert!(matches!(
671 store.replay_after_cursor(&start).expect("gap"),
672 LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
673 ));
674 }
675
676 #[test]
677 fn in_memory_replay_store_reports_unavailable_for_cursor_ahead_of_tail() {
678 let store = InMemoryLiveReplayStore::default();
679 let ahead = SessionCursor::new("s", SessionRevision(0), 99);
680 assert!(matches!(
681 store.replay_after_cursor(&ahead).expect("gap"),
682 LiveReplayResult::Gap(LiveReplayGapReason::Unavailable)
683 ));
684 }
685
686 #[tokio::test]
687 async fn in_memory_replay_subscription_yields_replay_then_live() {
688 let store = InMemoryLiveReplayStore::default();
689 let start = store.current_cursor("s", SessionRevision(0));
690 store
691 .append("s", SessionRevision(0), activity("a"))
692 .expect("append a");
693 let LiveReplaySubscribeResult::Subscribed(mut subscription) =
694 store.subscribe_after_cursor(&start).expect("subscribe")
695 else {
696 panic!("expected subscription");
697 };
698 let first = subscription.next_event().await.expect("replay");
699 assert_eq!(first.session_id, "s");
700 store
701 .append("s", SessionRevision(0), activity("b"))
702 .expect("append b");
703 let second = subscription.next_event().await.expect("live");
704 match second.payload {
705 SessionObservationEventPayload::TurnActivity(activity) => match activity.event {
706 crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "b"),
707 _ => panic!("wrong event"),
708 },
709 _ => panic!("wrong payload"),
710 }
711 }
712
713 #[test]
714 fn in_memory_replay_store_allocates_live_channel_lazily() {
715 let store = InMemoryLiveReplayStore::default();
716 let start = store.current_cursor("s", SessionRevision(0));
717 store
718 .append("s", SessionRevision(0), activity("a"))
719 .expect("append a");
720 {
721 let sessions = store.sessions.lock().expect("sessions");
722 assert!(sessions.get("s").expect("buffer").sender.is_none());
723 }
724 let LiveReplaySubscribeResult::Subscribed(subscription) =
725 store.subscribe_after_cursor(&start).expect("subscribe")
726 else {
727 panic!("expected subscription");
728 };
729 {
730 let sessions = store.sessions.lock().expect("sessions");
731 assert!(sessions.get("s").expect("buffer").sender.is_some());
732 }
733 drop(subscription);
734 store
735 .append("s", SessionRevision(0), activity("b"))
736 .expect("append b");
737 let sessions = store.sessions.lock().expect("sessions");
738 assert!(sessions.get("s").expect("buffer").sender.is_none());
739 }
740
741 #[test]
742 fn in_memory_replay_subscription_reports_gap_after_capacity_trim() {
743 let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
744 let start = store.current_cursor("s", SessionRevision(0));
745 store
746 .append("s", SessionRevision(0), activity("a"))
747 .expect("append a");
748 store
749 .append("s", SessionRevision(0), activity("b"))
750 .expect("append b");
751 assert!(matches!(
752 store.subscribe_after_cursor(&start).expect("subscribe"),
753 LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
754 ));
755 }
756
757 #[test]
758 fn in_memory_replay_subscription_reports_gap_after_ttl_trim() {
759 let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
760 let start = store.current_cursor("s", SessionRevision(0));
761 store
762 .append("s", SessionRevision(0), activity("a"))
763 .expect("append a");
764 std::thread::sleep(Duration::from_millis(5));
765 assert!(matches!(
766 store.subscribe_after_cursor(&start).expect("subscribe"),
767 LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
768 ));
769 }
770}