Skip to main content

lash_core/runtime/observation/
replay.rs

1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex as StdMutex};
5use std::task::{Context, Poll, ready};
6use std::time::{Duration, Instant};
7
8use futures_util::Stream;
9use tokio::sync::broadcast;
10use tokio_util::sync::ReusableBoxFuture;
11
12use crate::runtime::{LashRuntime, RuntimeSessionState};
13
14const SESSION_CURSOR_PREFIX: &str = "lashsc1:";
15const DEFAULT_LIVE_REPLAY_CAPACITY: usize = 2048;
16const DEFAULT_LIVE_REPLAY_TTL: Duration = Duration::from_secs(120);
17
18#[derive(
19    Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
20)]
21#[serde(transparent)]
22pub struct SessionRevision(pub u64);
23
24impl SessionRevision {
25    pub fn new(revision: u64) -> Self {
26        Self(revision)
27    }
28
29    pub fn as_u64(self) -> u64 {
30        self.0
31    }
32
33    pub(super) fn from_runtime(runtime: &LashRuntime) -> Self {
34        Self::from_state(&runtime.export_persisted_state())
35    }
36
37    pub(super) fn from_state(state: &RuntimeSessionState) -> Self {
38        Self(state.head_revision.unwrap_or(state.turn_index as u64))
39    }
40}
41
42#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
43#[serde(transparent)]
44pub struct SessionCursor(String);
45
46impl SessionCursor {
47    pub(crate) fn new(
48        session_id: impl AsRef<str>,
49        revision: SessionRevision,
50        live_position: u64,
51    ) -> Self {
52        Self(format!(
53            "{SESSION_CURSOR_PREFIX}{}:{live_position}:{}",
54            revision.0,
55            session_id.as_ref()
56        ))
57    }
58
59    #[cfg(test)]
60    pub(super) fn from_raw_for_testing(raw: impl Into<String>) -> Self {
61        Self(raw.into())
62    }
63
64    pub fn as_str(&self) -> &str {
65        &self.0
66    }
67
68    pub(crate) fn parse_for_session(
69        &self,
70        expected_session_id: &str,
71    ) -> Result<ParsedSessionCursor, SessionCursorError> {
72        let parsed = self.parse()?;
73        if parsed.session_id != expected_session_id {
74            return Err(SessionCursorError::WrongSession {
75                expected_session_id: expected_session_id.to_string(),
76                actual_session_id: parsed.session_id,
77            });
78        }
79        Ok(parsed)
80    }
81
82    fn parse(&self) -> Result<ParsedSessionCursor, SessionCursorError> {
83        let payload = self.0.strip_prefix(SESSION_CURSOR_PREFIX).ok_or_else(|| {
84            SessionCursorError::Malformed {
85                message: "missing cursor prefix".to_string(),
86            }
87        })?;
88        let mut parts = payload.splitn(3, ':');
89        let revision = parts
90            .next()
91            .ok_or_else(|| SessionCursorError::Malformed {
92                message: "missing session revision".to_string(),
93            })?
94            .parse::<u64>()
95            .map_err(|err| SessionCursorError::Malformed {
96                message: format!("invalid session revision: {err}"),
97            })?;
98        let live_position = parts
99            .next()
100            .ok_or_else(|| SessionCursorError::Malformed {
101                message: "missing live replay position".to_string(),
102            })?
103            .parse::<u64>()
104            .map_err(|err| SessionCursorError::Malformed {
105                message: format!("invalid live replay position: {err}"),
106            })?;
107        let session_id = parts
108            .next()
109            .filter(|value| !value.is_empty())
110            .ok_or_else(|| SessionCursorError::Malformed {
111                message: "missing session id".to_string(),
112            })?
113            .to_string();
114        Ok(ParsedSessionCursor {
115            session_id,
116            revision: SessionRevision(revision),
117            live_position,
118        })
119    }
120}
121
122impl fmt::Debug for SessionCursor {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.write_str("SessionCursor(<opaque>)")
125    }
126}
127
128impl fmt::Display for SessionCursor {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.write_str(&self.0)
131    }
132}
133
134#[derive(Clone, Debug)]
135pub(crate) struct ParsedSessionCursor {
136    pub session_id: String,
137    pub revision: SessionRevision,
138    pub live_position: u64,
139}
140
141#[derive(Clone, Debug, thiserror::Error)]
142pub enum SessionCursorError {
143    #[error("malformed session cursor: {message}")]
144    Malformed { message: String },
145    #[error("session cursor belongs to `{actual_session_id}`, not `{expected_session_id}`")]
146    WrongSession {
147        expected_session_id: String,
148        actual_session_id: String,
149    },
150}
151
152#[derive(Clone, Debug)]
153pub struct SessionObservation {
154    pub read_view: crate::SessionReadView,
155    pub cursor: SessionCursor,
156}
157
158#[derive(Clone, Debug)]
159pub struct SessionObservationEvent {
160    pub session_id: String,
161    pub revision: SessionRevision,
162    pub cursor: SessionCursor,
163    pub payload: SessionObservationEventPayload,
164}
165
166#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum SessionQueueEventKind {
169    Enqueued,
170    Cancelled,
171}
172
173#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
174#[serde(rename_all = "snake_case")]
175pub enum SessionProcessEventKind {
176    Started,
177    Cancelled,
178}
179
180#[derive(Clone, Debug)]
181#[allow(clippy::large_enum_variant)]
182pub enum SessionObservationEventPayload {
183    TurnActivity(crate::TurnActivity),
184    Committed {
185        read_view: crate::SessionReadView,
186    },
187    AgentFrameSwitched {
188        frame_id: String,
189    },
190    QueueChanged {
191        kind: SessionQueueEventKind,
192        batch_ids: Vec<String>,
193    },
194    ProcessChanged {
195        kind: SessionProcessEventKind,
196        process_ids: Vec<String>,
197    },
198}
199
200#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
201pub struct LiveReplayGap {
202    pub session_id: String,
203    pub requested_cursor: SessionCursor,
204    pub latest_cursor: SessionCursor,
205    pub latest_revision: SessionRevision,
206    pub reason: LiveReplayGapReason,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum LiveReplayGapReason {
212    Trimmed,
213    Unavailable,
214}
215
216#[derive(Clone, Debug, thiserror::Error)]
217pub enum LiveReplayStoreError {
218    #[error("{0}")]
219    Cursor(#[from] SessionCursorError),
220    #[error("live replay store error: {0}")]
221    Store(String),
222    #[error("live replay subscriber lagged by {0} events")]
223    SubscriberLagged(u64),
224    #[error("live replay channel closed")]
225    Closed,
226}
227
228#[derive(Clone, Debug)]
229pub enum LiveReplayResult {
230    Replayed(Vec<SessionObservationEvent>),
231    Gap(LiveReplayGapReason),
232}
233
234pub enum LiveReplaySubscribeResult {
235    Subscribed(LiveReplaySubscription),
236    Gap(LiveReplayGapReason),
237}
238
239pub struct LiveReplaySubscription {
240    replay: VecDeque<SessionObservationEvent>,
241    receiver: ReusableBoxFuture<
242        'static,
243        (
244            Result<SessionObservationEvent, broadcast::error::RecvError>,
245            broadcast::Receiver<SessionObservationEvent>,
246        ),
247    >,
248    closed: bool,
249}
250
251impl LiveReplaySubscription {
252    fn new(
253        replay: Vec<SessionObservationEvent>,
254        receiver: broadcast::Receiver<SessionObservationEvent>,
255    ) -> Self {
256        Self {
257            replay: replay.into(),
258            receiver: ReusableBoxFuture::new(live_replay_recv(receiver)),
259            closed: false,
260        }
261    }
262
263    pub async fn next_event(&mut self) -> Result<SessionObservationEvent, LiveReplayStoreError> {
264        futures_util::StreamExt::next(self)
265            .await
266            .unwrap_or(Err(LiveReplayStoreError::Closed))
267    }
268}
269
270async fn live_replay_recv(
271    mut receiver: broadcast::Receiver<SessionObservationEvent>,
272) -> (
273    Result<SessionObservationEvent, broadcast::error::RecvError>,
274    broadcast::Receiver<SessionObservationEvent>,
275) {
276    let result = receiver.recv().await;
277    (result, receiver)
278}
279
280impl Stream for LiveReplaySubscription {
281    type Item = Result<SessionObservationEvent, LiveReplayStoreError>;
282
283    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
284        if let Some(event) = self.replay.pop_front() {
285            return Poll::Ready(Some(Ok(event)));
286        }
287        if self.closed {
288            return Poll::Ready(None);
289        }
290        let (result, receiver) = ready!(self.receiver.poll(cx));
291        self.receiver.set(live_replay_recv(receiver));
292        match result {
293            Ok(event) => Poll::Ready(Some(Ok(event))),
294            Err(broadcast::error::RecvError::Lagged(count)) => {
295                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(count))))
296            }
297            Err(broadcast::error::RecvError::Closed) => {
298                self.closed = true;
299                Poll::Ready(Some(Err(LiveReplayStoreError::Closed)))
300            }
301        }
302    }
303}
304
305#[derive(Clone, Debug)]
306pub enum SessionResume {
307    Replayed {
308        events: Vec<SessionObservationEvent>,
309    },
310    Gap {
311        observation: SessionObservation,
312        gap: LiveReplayGap,
313    },
314}
315
316pub enum SessionObservationSubscription {
317    Subscribed(LiveReplaySubscription),
318    Gap {
319        observation: SessionObservation,
320        gap: LiveReplayGap,
321    },
322}
323
324/// Bounded, best-effort live replay for host reconnects.
325///
326/// Runtime turn execution calls this trait from synchronous boundary code. All
327/// methods must therefore be fast and nonblocking from the runtime's point of
328/// view. A custom external store should expose local or buffered behavior here,
329/// or offload blocking transport and durability work internally. Runtime turn
330/// execution must not wait for slow network or storage durability in this path.
331pub trait LiveReplayStore: Send + Sync {
332    /// Append one observation event and return its assigned cursor.
333    ///
334    /// This must be fast and nonblocking from the runtime's point of view.
335    fn append(
336        &self,
337        session_id: &str,
338        revision: SessionRevision,
339        payload: SessionObservationEventPayload,
340    ) -> Result<SessionObservationEvent, LiveReplayStoreError>;
341
342    /// Return buffered events after `cursor`, or report a recoverable gap.
343    ///
344    /// This must be fast and nonblocking from the runtime's point of view.
345    fn replay_after_cursor(
346        &self,
347        cursor: &SessionCursor,
348    ) -> Result<LiveReplayResult, LiveReplayStoreError>;
349
350    /// Subscribe after `cursor`, replaying buffered events before live events.
351    ///
352    /// This must be fast and nonblocking from the runtime's point of view.
353    fn subscribe_after_cursor(
354        &self,
355        cursor: &SessionCursor,
356    ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError>;
357
358    /// Return the latest cursor known locally for a session.
359    ///
360    /// This must be fast and nonblocking from the runtime's point of view.
361    fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor;
362
363    /// Apply best-effort retention trimming for a session.
364    ///
365    /// This must be fast and nonblocking from the runtime's point of view.
366    fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError>;
367}
368
369#[derive(Clone, Debug)]
370pub struct InMemoryLiveReplayStoreConfig {
371    pub max_events_per_session: usize,
372    pub max_age: Duration,
373}
374
375impl Default for InMemoryLiveReplayStoreConfig {
376    fn default() -> Self {
377        Self {
378            max_events_per_session: DEFAULT_LIVE_REPLAY_CAPACITY,
379            max_age: DEFAULT_LIVE_REPLAY_TTL,
380        }
381    }
382}
383
384#[derive(Debug)]
385pub struct InMemoryLiveReplayStore {
386    config: InMemoryLiveReplayStoreConfig,
387    clock: Arc<dyn crate::Clock>,
388    sessions: StdMutex<HashMap<String, LiveReplaySessionBuffer>>,
389}
390
391impl InMemoryLiveReplayStore {
392    pub fn new(config: InMemoryLiveReplayStoreConfig) -> Self {
393        Self::with_clock(config, Arc::new(crate::SystemClock))
394    }
395
396    pub fn with_clock(config: InMemoryLiveReplayStoreConfig, clock: Arc<dyn crate::Clock>) -> Self {
397        Self {
398            config,
399            clock,
400            sessions: StdMutex::new(HashMap::new()),
401        }
402    }
403
404    pub fn with_bounds(max_events_per_session: usize, max_age: Duration) -> Self {
405        Self::new(InMemoryLiveReplayStoreConfig {
406            max_events_per_session,
407            max_age,
408        })
409    }
410}
411
412impl Default for InMemoryLiveReplayStore {
413    fn default() -> Self {
414        Self::new(InMemoryLiveReplayStoreConfig::default())
415    }
416}
417
418#[derive(Debug)]
419struct LiveReplaySessionBuffer {
420    events: VecDeque<StoredObservationEvent>,
421    tail_position: u64,
422    sender: Option<broadcast::Sender<SessionObservationEvent>>,
423}
424
425impl LiveReplaySessionBuffer {
426    fn new() -> Self {
427        Self {
428            events: VecDeque::new(),
429            tail_position: 0,
430            sender: None,
431        }
432    }
433
434    fn subscribe(
435        &mut self,
436        channel_capacity: usize,
437    ) -> broadcast::Receiver<SessionObservationEvent> {
438        match self.sender.as_ref() {
439            Some(sender) => sender.subscribe(),
440            None => {
441                let (sender, receiver) = broadcast::channel(channel_capacity.max(1));
442                self.sender = Some(sender);
443                receiver
444            }
445        }
446    }
447
448    fn publish(&mut self, event: SessionObservationEvent) {
449        let Some(sender) = self.sender.as_ref() else {
450            return;
451        };
452        if sender.send(event).is_err() {
453            self.sender = None;
454        }
455    }
456}
457
458#[derive(Clone, Debug)]
459struct StoredObservationEvent {
460    position: u64,
461    appended_at: Instant,
462    event: SessionObservationEvent,
463}
464
465impl InMemoryLiveReplayStore {
466    fn trim_locked(
467        config: &InMemoryLiveReplayStoreConfig,
468        buffer: &mut LiveReplaySessionBuffer,
469        now: Instant,
470    ) {
471        while buffer.events.len() > config.max_events_per_session {
472            buffer.events.pop_front();
473        }
474        while buffer
475            .events
476            .front()
477            .is_some_and(|event| now.duration_since(event.appended_at) > config.max_age)
478        {
479            buffer.events.pop_front();
480        }
481    }
482
483    fn gap_reason_for_cursor(
484        buffer: Option<&LiveReplaySessionBuffer>,
485        cursor_position: u64,
486    ) -> Option<LiveReplayGapReason> {
487        let Some(buffer) = buffer else {
488            return (cursor_position > 0).then_some(LiveReplayGapReason::Unavailable);
489        };
490        if cursor_position > buffer.tail_position {
491            return Some(LiveReplayGapReason::Unavailable);
492        }
493        let Some(first) = buffer.events.front() else {
494            return (cursor_position < buffer.tail_position)
495                .then_some(LiveReplayGapReason::Trimmed);
496        };
497        if cursor_position + 1 < first.position {
498            Some(LiveReplayGapReason::Trimmed)
499        } else {
500            None
501        }
502    }
503}
504
505impl LiveReplayStore for InMemoryLiveReplayStore {
506    fn append(
507        &self,
508        session_id: &str,
509        revision: SessionRevision,
510        payload: SessionObservationEventPayload,
511    ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
512        let now = self.clock.now();
513        let mut sessions = self
514            .sessions
515            .lock()
516            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
517        let buffer = sessions
518            .entry(session_id.to_string())
519            .or_insert_with(LiveReplaySessionBuffer::new);
520        buffer.tail_position = buffer.tail_position.saturating_add(1);
521        let cursor = SessionCursor::new(session_id, revision, buffer.tail_position);
522        let event = SessionObservationEvent {
523            session_id: session_id.to_string(),
524            revision,
525            cursor,
526            payload,
527        };
528        buffer.events.push_back(StoredObservationEvent {
529            position: buffer.tail_position,
530            appended_at: now,
531            event: event.clone(),
532        });
533        Self::trim_locked(&self.config, buffer, now);
534        buffer.publish(event.clone());
535        Ok(event)
536    }
537
538    fn replay_after_cursor(
539        &self,
540        cursor: &SessionCursor,
541    ) -> Result<LiveReplayResult, LiveReplayStoreError> {
542        let parsed = cursor.parse()?;
543        let _cursor_revision = parsed.revision;
544        let now = self.clock.now();
545        let mut sessions = self
546            .sessions
547            .lock()
548            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
549        if let Some(buffer) = sessions.get_mut(&parsed.session_id) {
550            Self::trim_locked(&self.config, buffer, now);
551        }
552        let buffer = sessions.get(&parsed.session_id);
553        if let Some(reason) = Self::gap_reason_for_cursor(buffer, parsed.live_position) {
554            return Ok(LiveReplayResult::Gap(reason));
555        }
556        let events = buffer
557            .map(|buffer| {
558                buffer
559                    .events
560                    .iter()
561                    .filter(|event| event.position > parsed.live_position)
562                    .map(|event| event.event.clone())
563                    .collect()
564            })
565            .unwrap_or_default();
566        Ok(LiveReplayResult::Replayed(events))
567    }
568
569    fn subscribe_after_cursor(
570        &self,
571        cursor: &SessionCursor,
572    ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
573        let parsed = cursor.parse()?;
574        let _cursor_revision = parsed.revision;
575        let now = self.clock.now();
576        let mut sessions = self
577            .sessions
578            .lock()
579            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
580        let buffer = sessions
581            .entry(parsed.session_id.clone())
582            .or_insert_with(LiveReplaySessionBuffer::new);
583        Self::trim_locked(&self.config, buffer, now);
584        if let Some(reason) = Self::gap_reason_for_cursor(Some(buffer), parsed.live_position) {
585            return Ok(LiveReplaySubscribeResult::Gap(reason));
586        }
587        let replay = buffer
588            .events
589            .iter()
590            .filter(|event| event.position > parsed.live_position)
591            .map(|event| event.event.clone())
592            .collect();
593        let receiver = buffer.subscribe(self.config.max_events_per_session);
594        Ok(LiveReplaySubscribeResult::Subscribed(
595            LiveReplaySubscription::new(replay, receiver),
596        ))
597    }
598
599    fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
600        let tail_position = self
601            .sessions
602            .lock()
603            .ok()
604            .and_then(|sessions| sessions.get(session_id).map(|buffer| buffer.tail_position))
605            .unwrap_or(0);
606        SessionCursor::new(session_id, revision, tail_position)
607    }
608
609    fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError> {
610        let now = self.clock.now();
611        let mut sessions = self
612            .sessions
613            .lock()
614            .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
615        if let Some(buffer) = sessions.get_mut(session_id) {
616            Self::trim_locked(&self.config, buffer, now);
617        }
618        Ok(())
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    fn activity(text: &str) -> SessionObservationEventPayload {
627        SessionObservationEventPayload::TurnActivity(crate::TurnActivity::independent(
628            crate::TurnEvent::AssistantProseDelta {
629                text: text.to_string(),
630            },
631        ))
632    }
633
634    #[test]
635    fn session_cursor_round_trips_and_debug_is_opaque() {
636        let cursor = SessionCursor::new("session:with:colon", SessionRevision(3), 9);
637        let encoded = serde_json::to_string(&cursor).expect("serialize");
638        let decoded: SessionCursor = serde_json::from_str(&encoded).expect("deserialize");
639        assert_eq!(decoded, cursor);
640        assert_eq!(format!("{cursor:?}"), "SessionCursor(<opaque>)");
641        let parsed = cursor
642            .parse_for_session("session:with:colon")
643            .expect("parse");
644        assert_eq!(parsed.revision, SessionRevision(3));
645        assert_eq!(parsed.live_position, 9);
646    }
647
648    #[test]
649    fn session_cursor_rejects_malformed_and_wrong_session() {
650        let malformed = SessionCursor::from_raw_for_testing("bad");
651        assert!(matches!(
652            malformed.parse_for_session("s"),
653            Err(SessionCursorError::Malformed { .. })
654        ));
655        let cursor = SessionCursor::new("actual", SessionRevision(0), 0);
656        assert!(matches!(
657            cursor.parse_for_session("expected"),
658            Err(SessionCursorError::WrongSession { .. })
659        ));
660    }
661
662    #[test]
663    fn in_memory_replay_store_replays_after_cursor_in_order() {
664        let store = InMemoryLiveReplayStore::default();
665        let start = store.current_cursor("s", SessionRevision(0));
666        store
667            .append("s", SessionRevision(0), activity("a"))
668            .expect("append a");
669        store
670            .append("s", SessionRevision(0), activity("b"))
671            .expect("append b");
672        let LiveReplayResult::Replayed(events) = store.replay_after_cursor(&start).expect("replay")
673        else {
674            panic!("expected replay");
675        };
676        assert_eq!(events.len(), 2);
677        match &events[0].payload {
678            SessionObservationEventPayload::TurnActivity(activity) => match &activity.event {
679                crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "a"),
680                _ => panic!("wrong event"),
681            },
682            _ => panic!("wrong payload"),
683        }
684    }
685
686    #[test]
687    fn in_memory_replay_store_reports_gap_after_capacity_trim() {
688        let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
689        let start = store.current_cursor("s", SessionRevision(0));
690        store
691            .append("s", SessionRevision(0), activity("a"))
692            .expect("append a");
693        store
694            .append("s", SessionRevision(0), activity("b"))
695            .expect("append b");
696        assert!(matches!(
697            store.replay_after_cursor(&start).expect("gap"),
698            LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
699        ));
700    }
701
702    #[test]
703    fn in_memory_replay_store_reports_gap_after_ttl_trim() {
704        let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
705        let start = store.current_cursor("s", SessionRevision(0));
706        store
707            .append("s", SessionRevision(0), activity("a"))
708            .expect("append a");
709        std::thread::sleep(Duration::from_millis(5));
710        assert!(matches!(
711            store.replay_after_cursor(&start).expect("gap"),
712            LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
713        ));
714    }
715
716    #[test]
717    fn in_memory_replay_store_reports_unavailable_for_cursor_ahead_of_tail() {
718        let store = InMemoryLiveReplayStore::default();
719        let ahead = SessionCursor::new("s", SessionRevision(0), 99);
720        assert!(matches!(
721            store.replay_after_cursor(&ahead).expect("gap"),
722            LiveReplayResult::Gap(LiveReplayGapReason::Unavailable)
723        ));
724    }
725
726    #[tokio::test]
727    async fn in_memory_replay_subscription_yields_replay_then_live() {
728        let store = InMemoryLiveReplayStore::default();
729        let start = store.current_cursor("s", SessionRevision(0));
730        store
731            .append("s", SessionRevision(0), activity("a"))
732            .expect("append a");
733        let LiveReplaySubscribeResult::Subscribed(mut subscription) =
734            store.subscribe_after_cursor(&start).expect("subscribe")
735        else {
736            panic!("expected subscription");
737        };
738        let first = subscription.next_event().await.expect("replay");
739        assert_eq!(first.session_id, "s");
740        store
741            .append("s", SessionRevision(0), activity("b"))
742            .expect("append b");
743        let second = subscription.next_event().await.expect("live");
744        match second.payload {
745            SessionObservationEventPayload::TurnActivity(activity) => match activity.event {
746                crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "b"),
747                _ => panic!("wrong event"),
748            },
749            _ => panic!("wrong payload"),
750        }
751    }
752
753    #[test]
754    fn in_memory_replay_store_allocates_live_channel_lazily() {
755        let store = InMemoryLiveReplayStore::default();
756        let start = store.current_cursor("s", SessionRevision(0));
757        store
758            .append("s", SessionRevision(0), activity("a"))
759            .expect("append a");
760        {
761            let sessions = store.sessions.lock().expect("sessions");
762            assert!(sessions.get("s").expect("buffer").sender.is_none());
763        }
764        let LiveReplaySubscribeResult::Subscribed(subscription) =
765            store.subscribe_after_cursor(&start).expect("subscribe")
766        else {
767            panic!("expected subscription");
768        };
769        {
770            let sessions = store.sessions.lock().expect("sessions");
771            assert!(sessions.get("s").expect("buffer").sender.is_some());
772        }
773        drop(subscription);
774        store
775            .append("s", SessionRevision(0), activity("b"))
776            .expect("append b");
777        let sessions = store.sessions.lock().expect("sessions");
778        assert!(sessions.get("s").expect("buffer").sender.is_none());
779    }
780
781    #[test]
782    fn in_memory_replay_subscription_reports_gap_after_capacity_trim() {
783        let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
784        let start = store.current_cursor("s", SessionRevision(0));
785        store
786            .append("s", SessionRevision(0), activity("a"))
787            .expect("append a");
788        store
789            .append("s", SessionRevision(0), activity("b"))
790            .expect("append b");
791        assert!(matches!(
792            store.subscribe_after_cursor(&start).expect("subscribe"),
793            LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
794        ));
795    }
796
797    #[test]
798    fn in_memory_replay_subscription_reports_gap_after_ttl_trim() {
799        let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
800        let start = store.current_cursor("s", SessionRevision(0));
801        store
802            .append("s", SessionRevision(0), activity("a"))
803            .expect("append a");
804        std::thread::sleep(Duration::from_millis(5));
805        assert!(matches!(
806            store.subscribe_after_cursor(&start).expect("subscribe"),
807            LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
808        ));
809    }
810}