Skip to main content

lash_core/runtime/observation/
replay.rs

1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::sync::{Arc, Mutex as StdMutex};
4use std::time::{Duration, Instant};
5
6use tokio::sync::broadcast;
7
8use crate::runtime::{LashRuntime, RuntimeSessionState};
9
10const SESSION_CURSOR_PREFIX: &str = "lashsc1:";
11const DEFAULT_LIVE_REPLAY_CAPACITY: usize = 2048;
12const DEFAULT_LIVE_REPLAY_TTL: Duration = Duration::from_secs(120);
13
14#[derive(
15    Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
16)]
17#[serde(transparent)]
18pub struct SessionRevision(pub u64);
19
20impl SessionRevision {
21    pub fn new(revision: u64) -> Self {
22        Self(revision)
23    }
24
25    pub fn as_u64(self) -> u64 {
26        self.0
27    }
28
29    pub(super) fn from_runtime(runtime: &LashRuntime) -> Self {
30        Self::from_state(&runtime.export_persisted_state())
31    }
32
33    pub(super) fn from_state(state: &RuntimeSessionState) -> Self {
34        Self(state.head_revision.unwrap_or(state.turn_index as u64))
35    }
36}
37
38#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
39#[serde(transparent)]
40pub struct SessionCursor(String);
41
42impl SessionCursor {
43    pub(crate) fn new(
44        session_id: impl AsRef<str>,
45        revision: SessionRevision,
46        live_position: u64,
47    ) -> Self {
48        Self(format!(
49            "{SESSION_CURSOR_PREFIX}{}:{live_position}:{}",
50            revision.0,
51            session_id.as_ref()
52        ))
53    }
54
55    #[cfg(test)]
56    pub(super) fn from_raw_for_testing(raw: impl Into<String>) -> Self {
57        Self(raw.into())
58    }
59
60    pub fn as_str(&self) -> &str {
61        &self.0
62    }
63
64    pub(crate) fn parse_for_session(
65        &self,
66        expected_session_id: &str,
67    ) -> Result<ParsedSessionCursor, SessionCursorError> {
68        let parsed = self.parse()?;
69        if parsed.session_id != expected_session_id {
70            return Err(SessionCursorError::WrongSession {
71                expected_session_id: expected_session_id.to_string(),
72                actual_session_id: parsed.session_id,
73            });
74        }
75        Ok(parsed)
76    }
77
78    fn parse(&self) -> Result<ParsedSessionCursor, SessionCursorError> {
79        let payload = self.0.strip_prefix(SESSION_CURSOR_PREFIX).ok_or_else(|| {
80            SessionCursorError::Malformed {
81                message: "missing cursor prefix".to_string(),
82            }
83        })?;
84        let mut parts = payload.splitn(3, ':');
85        let revision = parts
86            .next()
87            .ok_or_else(|| SessionCursorError::Malformed {
88                message: "missing session revision".to_string(),
89            })?
90            .parse::<u64>()
91            .map_err(|err| SessionCursorError::Malformed {
92                message: format!("invalid session revision: {err}"),
93            })?;
94        let live_position = parts
95            .next()
96            .ok_or_else(|| SessionCursorError::Malformed {
97                message: "missing live replay position".to_string(),
98            })?
99            .parse::<u64>()
100            .map_err(|err| SessionCursorError::Malformed {
101                message: format!("invalid live replay position: {err}"),
102            })?;
103        let session_id = parts
104            .next()
105            .filter(|value| !value.is_empty())
106            .ok_or_else(|| SessionCursorError::Malformed {
107                message: "missing session id".to_string(),
108            })?
109            .to_string();
110        Ok(ParsedSessionCursor {
111            session_id,
112            revision: SessionRevision(revision),
113            live_position,
114        })
115    }
116}
117
118impl fmt::Debug for SessionCursor {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        f.write_str("SessionCursor(<opaque>)")
121    }
122}
123
124impl fmt::Display for SessionCursor {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.write_str(&self.0)
127    }
128}
129
130#[derive(Clone, Debug)]
131pub(crate) struct ParsedSessionCursor {
132    pub session_id: String,
133    pub revision: SessionRevision,
134    pub live_position: u64,
135}
136
137#[derive(Clone, Debug, thiserror::Error)]
138pub enum SessionCursorError {
139    #[error("malformed session cursor: {message}")]
140    Malformed { message: String },
141    #[error("session cursor belongs to `{actual_session_id}`, not `{expected_session_id}`")]
142    WrongSession {
143        expected_session_id: String,
144        actual_session_id: String,
145    },
146}
147
148#[derive(Clone, Debug)]
149pub struct SessionObservation {
150    pub read_view: crate::SessionReadView,
151    pub cursor: SessionCursor,
152}
153
154#[derive(Clone, Debug)]
155pub struct SessionObservationEvent {
156    pub session_id: String,
157    pub revision: SessionRevision,
158    pub cursor: SessionCursor,
159    pub payload: SessionObservationEventPayload,
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
163#[serde(rename_all = "snake_case")]
164pub enum SessionQueueEventKind {
165    Enqueued,
166    Cancelled,
167}
168
169#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
170#[serde(rename_all = "snake_case")]
171pub enum SessionProcessEventKind {
172    Started,
173    Cancelled,
174}
175
176#[derive(Clone, Debug)]
177#[allow(clippy::large_enum_variant)]
178pub enum SessionObservationEventPayload {
179    TurnActivity(crate::TurnActivity),
180    Committed {
181        read_view: crate::SessionReadView,
182    },
183    AgentFrameSwitched {
184        frame_id: String,
185    },
186    QueueChanged {
187        kind: SessionQueueEventKind,
188        batch_ids: Vec<String>,
189    },
190    ProcessChanged {
191        kind: SessionProcessEventKind,
192        process_ids: Vec<String>,
193    },
194}
195
196#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
197pub struct LiveReplayGap {
198    pub session_id: String,
199    pub requested_cursor: SessionCursor,
200    pub latest_cursor: SessionCursor,
201    pub latest_revision: SessionRevision,
202    pub reason: LiveReplayGapReason,
203}
204
205#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
206#[serde(rename_all = "snake_case")]
207pub enum LiveReplayGapReason {
208    Trimmed,
209    Unavailable,
210}
211
212#[derive(Clone, Debug, thiserror::Error)]
213pub enum LiveReplayStoreError {
214    #[error("{0}")]
215    Cursor(#[from] SessionCursorError),
216    #[error("live replay store error: {0}")]
217    Store(String),
218    #[error("live replay subscriber lagged by {0} events")]
219    SubscriberLagged(u64),
220    #[error("live replay channel closed")]
221    Closed,
222}
223
224#[derive(Clone, Debug)]
225pub enum LiveReplayResult {
226    Replayed(Vec<SessionObservationEvent>),
227    Gap(LiveReplayGapReason),
228}
229
230pub enum LiveReplaySubscribeResult {
231    Subscribed(LiveReplaySubscription),
232    Gap(LiveReplayGapReason),
233}
234
235pub struct LiveReplaySubscription {
236    replay: VecDeque<SessionObservationEvent>,
237    receiver: broadcast::Receiver<SessionObservationEvent>,
238}
239
240impl LiveReplaySubscription {
241    fn new(
242        replay: Vec<SessionObservationEvent>,
243        receiver: broadcast::Receiver<SessionObservationEvent>,
244    ) -> Self {
245        Self {
246            replay: replay.into(),
247            receiver,
248        }
249    }
250
251    pub async fn next_event(&mut self) -> Result<SessionObservationEvent, LiveReplayStoreError> {
252        if let Some(event) = self.replay.pop_front() {
253            return Ok(event);
254        }
255        match self.receiver.recv().await {
256            Ok(event) => Ok(event),
257            Err(broadcast::error::RecvError::Lagged(count)) => {
258                Err(LiveReplayStoreError::SubscriberLagged(count))
259            }
260            Err(broadcast::error::RecvError::Closed) => Err(LiveReplayStoreError::Closed),
261        }
262    }
263}
264
265#[derive(Clone, Debug)]
266pub enum SessionResume {
267    Replayed {
268        events: Vec<SessionObservationEvent>,
269    },
270    Gap {
271        observation: SessionObservation,
272        gap: LiveReplayGap,
273    },
274}
275
276pub enum SessionObservationSubscription {
277    Subscribed(LiveReplaySubscription),
278    Gap {
279        observation: SessionObservation,
280        gap: LiveReplayGap,
281    },
282}
283
284/// Bounded, best-effort live replay for host reconnects.
285///
286/// Runtime turn execution calls this trait from synchronous boundary code. All
287/// methods must therefore be fast and nonblocking from the runtime's point of
288/// view. A custom external store should expose local or buffered behavior here,
289/// or offload blocking transport and durability work internally. Runtime turn
290/// execution must not wait for slow network or storage durability in this path.
291pub trait LiveReplayStore: Send + Sync {
292    /// Append one observation event and return its assigned cursor.
293    ///
294    /// This must be fast and nonblocking from the runtime's point of view.
295    fn append(
296        &self,
297        session_id: &str,
298        revision: SessionRevision,
299        payload: SessionObservationEventPayload,
300    ) -> Result<SessionObservationEvent, LiveReplayStoreError>;
301
302    /// Return buffered events after `cursor`, or report a recoverable gap.
303    ///
304    /// This must be fast and nonblocking from the runtime's point of view.
305    fn replay_after_cursor(
306        &self,
307        cursor: &SessionCursor,
308    ) -> Result<LiveReplayResult, LiveReplayStoreError>;
309
310    /// Subscribe after `cursor`, replaying buffered events before live events.
311    ///
312    /// This must be fast and nonblocking from the runtime's point of view.
313    fn subscribe_after_cursor(
314        &self,
315        cursor: &SessionCursor,
316    ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError>;
317
318    /// Return the latest cursor known locally for a session.
319    ///
320    /// This must be fast and nonblocking from the runtime's point of view.
321    fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor;
322
323    /// Apply best-effort retention trimming for a session.
324    ///
325    /// This must be fast and nonblocking from the runtime's point of view.
326    fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError>;
327}
328
329#[derive(Clone, Debug)]
330pub struct InMemoryLiveReplayStoreConfig {
331    pub max_events_per_session: usize,
332    pub max_age: Duration,
333}
334
335impl Default for InMemoryLiveReplayStoreConfig {
336    fn default() -> Self {
337        Self {
338            max_events_per_session: DEFAULT_LIVE_REPLAY_CAPACITY,
339            max_age: DEFAULT_LIVE_REPLAY_TTL,
340        }
341    }
342}
343
344#[derive(Debug)]
345pub struct InMemoryLiveReplayStore {
346    config: InMemoryLiveReplayStoreConfig,
347    clock: Arc<dyn crate::Clock>,
348    sessions: StdMutex<HashMap<String, LiveReplaySessionBuffer>>,
349}
350
351impl InMemoryLiveReplayStore {
352    pub fn new(config: InMemoryLiveReplayStoreConfig) -> Self {
353        Self::with_clock(config, Arc::new(crate::SystemClock))
354    }
355
356    pub fn with_clock(config: InMemoryLiveReplayStoreConfig, clock: Arc<dyn crate::Clock>) -> Self {
357        Self {
358            config,
359            clock,
360            sessions: StdMutex::new(HashMap::new()),
361        }
362    }
363
364    pub fn with_bounds(max_events_per_session: usize, max_age: Duration) -> Self {
365        Self::new(InMemoryLiveReplayStoreConfig {
366            max_events_per_session,
367            max_age,
368        })
369    }
370}
371
372impl Default for InMemoryLiveReplayStore {
373    fn default() -> Self {
374        Self::new(InMemoryLiveReplayStoreConfig::default())
375    }
376}
377
378#[derive(Debug)]
379struct LiveReplaySessionBuffer {
380    events: VecDeque<StoredObservationEvent>,
381    tail_position: u64,
382    sender: Option<broadcast::Sender<SessionObservationEvent>>,
383}
384
385impl LiveReplaySessionBuffer {
386    fn new() -> Self {
387        Self {
388            events: VecDeque::new(),
389            tail_position: 0,
390            sender: None,
391        }
392    }
393
394    fn subscribe(
395        &mut self,
396        channel_capacity: usize,
397    ) -> broadcast::Receiver<SessionObservationEvent> {
398        match self.sender.as_ref() {
399            Some(sender) => sender.subscribe(),
400            None => {
401                let (sender, receiver) = broadcast::channel(channel_capacity.max(1));
402                self.sender = Some(sender);
403                receiver
404            }
405        }
406    }
407
408    fn publish(&mut self, event: SessionObservationEvent) {
409        let Some(sender) = self.sender.as_ref() else {
410            return;
411        };
412        if sender.send(event).is_err() {
413            self.sender = None;
414        }
415    }
416}
417
418#[derive(Clone, Debug)]
419struct StoredObservationEvent {
420    position: u64,
421    appended_at: Instant,
422    event: SessionObservationEvent,
423}
424
425impl InMemoryLiveReplayStore {
426    fn trim_locked(
427        config: &InMemoryLiveReplayStoreConfig,
428        buffer: &mut LiveReplaySessionBuffer,
429        now: Instant,
430    ) {
431        while buffer.events.len() > config.max_events_per_session {
432            buffer.events.pop_front();
433        }
434        while buffer
435            .events
436            .front()
437            .is_some_and(|event| now.duration_since(event.appended_at) > config.max_age)
438        {
439            buffer.events.pop_front();
440        }
441    }
442
443    fn gap_reason_for_cursor(
444        buffer: Option<&LiveReplaySessionBuffer>,
445        cursor_position: u64,
446    ) -> Option<LiveReplayGapReason> {
447        let Some(buffer) = buffer else {
448            return (cursor_position > 0).then_some(LiveReplayGapReason::Unavailable);
449        };
450        if cursor_position > buffer.tail_position {
451            return Some(LiveReplayGapReason::Unavailable);
452        }
453        let Some(first) = buffer.events.front() else {
454            return (cursor_position < buffer.tail_position)
455                .then_some(LiveReplayGapReason::Trimmed);
456        };
457        if cursor_position + 1 < first.position {
458            Some(LiveReplayGapReason::Trimmed)
459        } else {
460            None
461        }
462    }
463}
464
465impl LiveReplayStore for InMemoryLiveReplayStore {
466    fn append(
467        &self,
468        session_id: &str,
469        revision: SessionRevision,
470        payload: SessionObservationEventPayload,
471    ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
472        let now = self.clock.now();
473        let mut sessions = self
474            .sessions
475            .lock()
476            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
477        let buffer = sessions
478            .entry(session_id.to_string())
479            .or_insert_with(LiveReplaySessionBuffer::new);
480        buffer.tail_position = buffer.tail_position.saturating_add(1);
481        let cursor = SessionCursor::new(session_id, revision, buffer.tail_position);
482        let event = SessionObservationEvent {
483            session_id: session_id.to_string(),
484            revision,
485            cursor,
486            payload,
487        };
488        buffer.events.push_back(StoredObservationEvent {
489            position: buffer.tail_position,
490            appended_at: now,
491            event: event.clone(),
492        });
493        Self::trim_locked(&self.config, buffer, now);
494        buffer.publish(event.clone());
495        Ok(event)
496    }
497
498    fn replay_after_cursor(
499        &self,
500        cursor: &SessionCursor,
501    ) -> Result<LiveReplayResult, LiveReplayStoreError> {
502        let parsed = cursor.parse()?;
503        let _cursor_revision = parsed.revision;
504        let now = self.clock.now();
505        let mut sessions = self
506            .sessions
507            .lock()
508            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
509        if let Some(buffer) = sessions.get_mut(&parsed.session_id) {
510            Self::trim_locked(&self.config, buffer, now);
511        }
512        let buffer = sessions.get(&parsed.session_id);
513        if let Some(reason) = Self::gap_reason_for_cursor(buffer, parsed.live_position) {
514            return Ok(LiveReplayResult::Gap(reason));
515        }
516        let events = buffer
517            .map(|buffer| {
518                buffer
519                    .events
520                    .iter()
521                    .filter(|event| event.position > parsed.live_position)
522                    .map(|event| event.event.clone())
523                    .collect()
524            })
525            .unwrap_or_default();
526        Ok(LiveReplayResult::Replayed(events))
527    }
528
529    fn subscribe_after_cursor(
530        &self,
531        cursor: &SessionCursor,
532    ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
533        let parsed = cursor.parse()?;
534        let _cursor_revision = parsed.revision;
535        let now = self.clock.now();
536        let mut sessions = self
537            .sessions
538            .lock()
539            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
540        let buffer = sessions
541            .entry(parsed.session_id.clone())
542            .or_insert_with(LiveReplaySessionBuffer::new);
543        Self::trim_locked(&self.config, buffer, now);
544        if let Some(reason) = Self::gap_reason_for_cursor(Some(buffer), parsed.live_position) {
545            return Ok(LiveReplaySubscribeResult::Gap(reason));
546        }
547        let replay = buffer
548            .events
549            .iter()
550            .filter(|event| event.position > parsed.live_position)
551            .map(|event| event.event.clone())
552            .collect();
553        let receiver = buffer.subscribe(self.config.max_events_per_session);
554        Ok(LiveReplaySubscribeResult::Subscribed(
555            LiveReplaySubscription::new(replay, receiver),
556        ))
557    }
558
559    fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
560        let tail_position = self
561            .sessions
562            .lock()
563            .ok()
564            .and_then(|sessions| sessions.get(session_id).map(|buffer| buffer.tail_position))
565            .unwrap_or(0);
566        SessionCursor::new(session_id, revision, tail_position)
567    }
568
569    fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError> {
570        let now = self.clock.now();
571        let mut sessions = self
572            .sessions
573            .lock()
574            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
575        if let Some(buffer) = sessions.get_mut(session_id) {
576            Self::trim_locked(&self.config, buffer, now);
577        }
578        Ok(())
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585
586    fn activity(text: &str) -> SessionObservationEventPayload {
587        SessionObservationEventPayload::TurnActivity(crate::TurnActivity::independent(
588            crate::TurnEvent::AssistantProseDelta {
589                text: text.to_string(),
590            },
591        ))
592    }
593
594    #[test]
595    fn session_cursor_round_trips_and_debug_is_opaque() {
596        let cursor = SessionCursor::new("session:with:colon", SessionRevision(3), 9);
597        let encoded = serde_json::to_string(&cursor).expect("serialize");
598        let decoded: SessionCursor = serde_json::from_str(&encoded).expect("deserialize");
599        assert_eq!(decoded, cursor);
600        assert_eq!(format!("{cursor:?}"), "SessionCursor(<opaque>)");
601        let parsed = cursor
602            .parse_for_session("session:with:colon")
603            .expect("parse");
604        assert_eq!(parsed.revision, SessionRevision(3));
605        assert_eq!(parsed.live_position, 9);
606    }
607
608    #[test]
609    fn session_cursor_rejects_malformed_and_wrong_session() {
610        let malformed = SessionCursor::from_raw_for_testing("bad");
611        assert!(matches!(
612            malformed.parse_for_session("s"),
613            Err(SessionCursorError::Malformed { .. })
614        ));
615        let cursor = SessionCursor::new("actual", SessionRevision(0), 0);
616        assert!(matches!(
617            cursor.parse_for_session("expected"),
618            Err(SessionCursorError::WrongSession { .. })
619        ));
620    }
621
622    #[test]
623    fn in_memory_replay_store_replays_after_cursor_in_order() {
624        let store = InMemoryLiveReplayStore::default();
625        let start = store.current_cursor("s", SessionRevision(0));
626        store
627            .append("s", SessionRevision(0), activity("a"))
628            .expect("append a");
629        store
630            .append("s", SessionRevision(0), activity("b"))
631            .expect("append b");
632        let LiveReplayResult::Replayed(events) = store.replay_after_cursor(&start).expect("replay")
633        else {
634            panic!("expected replay");
635        };
636        assert_eq!(events.len(), 2);
637        match &events[0].payload {
638            SessionObservationEventPayload::TurnActivity(activity) => match &activity.event {
639                crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "a"),
640                _ => panic!("wrong event"),
641            },
642            _ => panic!("wrong payload"),
643        }
644    }
645
646    #[test]
647    fn in_memory_replay_store_reports_gap_after_capacity_trim() {
648        let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
649        let start = store.current_cursor("s", SessionRevision(0));
650        store
651            .append("s", SessionRevision(0), activity("a"))
652            .expect("append a");
653        store
654            .append("s", SessionRevision(0), activity("b"))
655            .expect("append b");
656        assert!(matches!(
657            store.replay_after_cursor(&start).expect("gap"),
658            LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
659        ));
660    }
661
662    #[test]
663    fn in_memory_replay_store_reports_gap_after_ttl_trim() {
664        let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
665        let start = store.current_cursor("s", SessionRevision(0));
666        store
667            .append("s", SessionRevision(0), activity("a"))
668            .expect("append a");
669        std::thread::sleep(Duration::from_millis(5));
670        assert!(matches!(
671            store.replay_after_cursor(&start).expect("gap"),
672            LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
673        ));
674    }
675
676    #[test]
677    fn in_memory_replay_store_reports_unavailable_for_cursor_ahead_of_tail() {
678        let store = InMemoryLiveReplayStore::default();
679        let ahead = SessionCursor::new("s", SessionRevision(0), 99);
680        assert!(matches!(
681            store.replay_after_cursor(&ahead).expect("gap"),
682            LiveReplayResult::Gap(LiveReplayGapReason::Unavailable)
683        ));
684    }
685
686    #[tokio::test]
687    async fn in_memory_replay_subscription_yields_replay_then_live() {
688        let store = InMemoryLiveReplayStore::default();
689        let start = store.current_cursor("s", SessionRevision(0));
690        store
691            .append("s", SessionRevision(0), activity("a"))
692            .expect("append a");
693        let LiveReplaySubscribeResult::Subscribed(mut subscription) =
694            store.subscribe_after_cursor(&start).expect("subscribe")
695        else {
696            panic!("expected subscription");
697        };
698        let first = subscription.next_event().await.expect("replay");
699        assert_eq!(first.session_id, "s");
700        store
701            .append("s", SessionRevision(0), activity("b"))
702            .expect("append b");
703        let second = subscription.next_event().await.expect("live");
704        match second.payload {
705            SessionObservationEventPayload::TurnActivity(activity) => match activity.event {
706                crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "b"),
707                _ => panic!("wrong event"),
708            },
709            _ => panic!("wrong payload"),
710        }
711    }
712
713    #[test]
714    fn in_memory_replay_store_allocates_live_channel_lazily() {
715        let store = InMemoryLiveReplayStore::default();
716        let start = store.current_cursor("s", SessionRevision(0));
717        store
718            .append("s", SessionRevision(0), activity("a"))
719            .expect("append a");
720        {
721            let sessions = store.sessions.lock().expect("sessions");
722            assert!(sessions.get("s").expect("buffer").sender.is_none());
723        }
724        let LiveReplaySubscribeResult::Subscribed(subscription) =
725            store.subscribe_after_cursor(&start).expect("subscribe")
726        else {
727            panic!("expected subscription");
728        };
729        {
730            let sessions = store.sessions.lock().expect("sessions");
731            assert!(sessions.get("s").expect("buffer").sender.is_some());
732        }
733        drop(subscription);
734        store
735            .append("s", SessionRevision(0), activity("b"))
736            .expect("append b");
737        let sessions = store.sessions.lock().expect("sessions");
738        assert!(sessions.get("s").expect("buffer").sender.is_none());
739    }
740
741    #[test]
742    fn in_memory_replay_subscription_reports_gap_after_capacity_trim() {
743        let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
744        let start = store.current_cursor("s", SessionRevision(0));
745        store
746            .append("s", SessionRevision(0), activity("a"))
747            .expect("append a");
748        store
749            .append("s", SessionRevision(0), activity("b"))
750            .expect("append b");
751        assert!(matches!(
752            store.subscribe_after_cursor(&start).expect("subscribe"),
753            LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
754        ));
755    }
756
757    #[test]
758    fn in_memory_replay_subscription_reports_gap_after_ttl_trim() {
759        let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
760        let start = store.current_cursor("s", SessionRevision(0));
761        store
762            .append("s", SessionRevision(0), activity("a"))
763            .expect("append a");
764        std::thread::sleep(Duration::from_millis(5));
765        assert!(matches!(
766            store.subscribe_after_cursor(&start).expect("subscribe"),
767            LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
768        ));
769    }
770}