1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex as StdMutex};
5use std::task::{Context, Poll, ready};
6use std::time::{Duration, Instant};
7
8use futures_util::Stream;
9use tokio::sync::broadcast;
10use tokio_util::sync::ReusableBoxFuture;
11
12use crate::runtime::{LashRuntime, RuntimeSessionState};
13
14const SESSION_CURSOR_PREFIX: &str = "lashsc1:";
15const DEFAULT_LIVE_REPLAY_CAPACITY: usize = 2048;
16const DEFAULT_LIVE_REPLAY_TTL: Duration = Duration::from_secs(120);
17
18#[derive(
19 Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
20)]
21#[serde(transparent)]
22pub struct SessionRevision(pub u64);
23
24impl SessionRevision {
25 pub fn new(revision: u64) -> Self {
26 Self(revision)
27 }
28
29 pub fn as_u64(self) -> u64 {
30 self.0
31 }
32
33 pub(super) fn from_runtime(runtime: &LashRuntime) -> Self {
34 Self::from_state(&runtime.export_persisted_state())
35 }
36
37 pub(super) fn from_state(state: &RuntimeSessionState) -> Self {
38 Self(state.head_revision.unwrap_or(state.turn_index as u64))
39 }
40}
41
42#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
43#[serde(transparent)]
44pub struct SessionCursor(String);
45
46impl SessionCursor {
47 pub(crate) fn new(
48 session_id: impl AsRef<str>,
49 revision: SessionRevision,
50 live_position: u64,
51 ) -> Self {
52 Self(format!(
53 "{SESSION_CURSOR_PREFIX}{}:{live_position}:{}",
54 revision.0,
55 session_id.as_ref()
56 ))
57 }
58
59 #[cfg(test)]
60 pub(super) fn from_raw_for_testing(raw: impl Into<String>) -> Self {
61 Self(raw.into())
62 }
63
64 pub fn as_str(&self) -> &str {
65 &self.0
66 }
67
68 pub(crate) fn parse_for_session(
69 &self,
70 expected_session_id: &str,
71 ) -> Result<ParsedSessionCursor, SessionCursorError> {
72 let parsed = self.parse()?;
73 if parsed.session_id != expected_session_id {
74 return Err(SessionCursorError::WrongSession {
75 expected_session_id: expected_session_id.to_string(),
76 actual_session_id: parsed.session_id,
77 });
78 }
79 Ok(parsed)
80 }
81
82 fn parse(&self) -> Result<ParsedSessionCursor, SessionCursorError> {
83 let payload = self.0.strip_prefix(SESSION_CURSOR_PREFIX).ok_or_else(|| {
84 SessionCursorError::Malformed {
85 message: "missing cursor prefix".to_string(),
86 }
87 })?;
88 let mut parts = payload.splitn(3, ':');
89 let revision = parts
90 .next()
91 .ok_or_else(|| SessionCursorError::Malformed {
92 message: "missing session revision".to_string(),
93 })?
94 .parse::<u64>()
95 .map_err(|err| SessionCursorError::Malformed {
96 message: format!("invalid session revision: {err}"),
97 })?;
98 let live_position = parts
99 .next()
100 .ok_or_else(|| SessionCursorError::Malformed {
101 message: "missing live replay position".to_string(),
102 })?
103 .parse::<u64>()
104 .map_err(|err| SessionCursorError::Malformed {
105 message: format!("invalid live replay position: {err}"),
106 })?;
107 let session_id = parts
108 .next()
109 .filter(|value| !value.is_empty())
110 .ok_or_else(|| SessionCursorError::Malformed {
111 message: "missing session id".to_string(),
112 })?
113 .to_string();
114 Ok(ParsedSessionCursor {
115 session_id,
116 revision: SessionRevision(revision),
117 live_position,
118 })
119 }
120}
121
122impl fmt::Debug for SessionCursor {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.write_str("SessionCursor(<opaque>)")
125 }
126}
127
128impl fmt::Display for SessionCursor {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.write_str(&self.0)
131 }
132}
133
134#[derive(Clone, Debug)]
135pub(crate) struct ParsedSessionCursor {
136 pub session_id: String,
137 pub revision: SessionRevision,
138 pub live_position: u64,
139}
140
141#[derive(Clone, Debug, thiserror::Error)]
142pub enum SessionCursorError {
143 #[error("malformed session cursor: {message}")]
144 Malformed { message: String },
145 #[error("session cursor belongs to `{actual_session_id}`, not `{expected_session_id}`")]
146 WrongSession {
147 expected_session_id: String,
148 actual_session_id: String,
149 },
150}
151
152#[derive(Clone, Debug)]
153pub struct SessionObservation {
154 pub read_view: crate::SessionReadView,
155 pub cursor: SessionCursor,
156}
157
158#[derive(Clone, Debug)]
159pub struct SessionObservationEvent {
160 pub session_id: String,
161 pub revision: SessionRevision,
162 pub cursor: SessionCursor,
163 pub payload: SessionObservationEventPayload,
164}
165
166#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum SessionQueueEventKind {
169 Enqueued,
170 Cancelled,
171}
172
173#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
174#[serde(rename_all = "snake_case")]
175pub enum SessionProcessEventKind {
176 Started,
177 Cancelled,
178}
179
180#[derive(Clone, Debug)]
181#[allow(clippy::large_enum_variant)]
182pub enum SessionObservationEventPayload {
183 TurnActivity(crate::TurnActivity),
184 Committed {
185 read_view: crate::SessionReadView,
186 },
187 AgentFrameSwitched {
188 frame_id: String,
189 },
190 QueueChanged {
191 kind: SessionQueueEventKind,
192 batch_ids: Vec<String>,
193 },
194 ProcessChanged {
195 kind: SessionProcessEventKind,
196 process_ids: Vec<String>,
197 },
198}
199
200#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
201pub struct LiveReplayGap {
202 pub session_id: String,
203 pub requested_cursor: SessionCursor,
204 pub latest_cursor: SessionCursor,
205 pub latest_revision: SessionRevision,
206 pub reason: LiveReplayGapReason,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum LiveReplayGapReason {
212 Trimmed,
213 Unavailable,
214}
215
216#[derive(Clone, Debug, thiserror::Error)]
217pub enum LiveReplayStoreError {
218 #[error("{0}")]
219 Cursor(#[from] SessionCursorError),
220 #[error("live replay store error: {0}")]
221 Store(String),
222 #[error("live replay subscriber lagged by {0} events")]
223 SubscriberLagged(u64),
224 #[error("live replay channel closed")]
225 Closed,
226}
227
228#[derive(Clone, Debug)]
229pub enum LiveReplayResult {
230 Replayed(Vec<SessionObservationEvent>),
231 Gap(LiveReplayGapReason),
232}
233
234pub enum LiveReplaySubscribeResult {
235 Subscribed(LiveReplaySubscription),
236 Gap(LiveReplayGapReason),
237}
238
239pub struct LiveReplaySubscription {
240 replay: VecDeque<SessionObservationEvent>,
241 receiver: ReusableBoxFuture<
242 'static,
243 (
244 Result<SessionObservationEvent, broadcast::error::RecvError>,
245 broadcast::Receiver<SessionObservationEvent>,
246 ),
247 >,
248 closed: bool,
249}
250
251impl LiveReplaySubscription {
252 fn new(
253 replay: Vec<SessionObservationEvent>,
254 receiver: broadcast::Receiver<SessionObservationEvent>,
255 ) -> Self {
256 Self {
257 replay: replay.into(),
258 receiver: ReusableBoxFuture::new(live_replay_recv(receiver)),
259 closed: false,
260 }
261 }
262
263 pub async fn next_event(&mut self) -> Result<SessionObservationEvent, LiveReplayStoreError> {
264 futures_util::StreamExt::next(self)
265 .await
266 .unwrap_or(Err(LiveReplayStoreError::Closed))
267 }
268}
269
270async fn live_replay_recv(
271 mut receiver: broadcast::Receiver<SessionObservationEvent>,
272) -> (
273 Result<SessionObservationEvent, broadcast::error::RecvError>,
274 broadcast::Receiver<SessionObservationEvent>,
275) {
276 let result = receiver.recv().await;
277 (result, receiver)
278}
279
280impl Stream for LiveReplaySubscription {
281 type Item = Result<SessionObservationEvent, LiveReplayStoreError>;
282
283 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
284 if let Some(event) = self.replay.pop_front() {
285 return Poll::Ready(Some(Ok(event)));
286 }
287 if self.closed {
288 return Poll::Ready(None);
289 }
290 let (result, receiver) = ready!(self.receiver.poll(cx));
291 self.receiver.set(live_replay_recv(receiver));
292 match result {
293 Ok(event) => Poll::Ready(Some(Ok(event))),
294 Err(broadcast::error::RecvError::Lagged(count)) => {
295 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(count))))
296 }
297 Err(broadcast::error::RecvError::Closed) => {
298 self.closed = true;
299 Poll::Ready(Some(Err(LiveReplayStoreError::Closed)))
300 }
301 }
302 }
303}
304
305#[derive(Clone, Debug)]
306pub enum SessionResume {
307 Replayed {
308 events: Vec<SessionObservationEvent>,
309 },
310 Gap {
311 observation: SessionObservation,
312 gap: LiveReplayGap,
313 },
314}
315
316pub enum SessionObservationSubscription {
317 Subscribed(LiveReplaySubscription),
318 Gap {
319 observation: SessionObservation,
320 gap: LiveReplayGap,
321 },
322}
323
324pub trait LiveReplayStore: Send + Sync {
332 fn append(
336 &self,
337 session_id: &str,
338 revision: SessionRevision,
339 payload: SessionObservationEventPayload,
340 ) -> Result<SessionObservationEvent, LiveReplayStoreError>;
341
342 fn replay_after_cursor(
346 &self,
347 cursor: &SessionCursor,
348 ) -> Result<LiveReplayResult, LiveReplayStoreError>;
349
350 fn subscribe_after_cursor(
354 &self,
355 cursor: &SessionCursor,
356 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError>;
357
358 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor;
362
363 fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError>;
367}
368
369#[derive(Clone, Debug)]
370pub struct InMemoryLiveReplayStoreConfig {
371 pub max_events_per_session: usize,
372 pub max_age: Duration,
373}
374
375impl Default for InMemoryLiveReplayStoreConfig {
376 fn default() -> Self {
377 Self {
378 max_events_per_session: DEFAULT_LIVE_REPLAY_CAPACITY,
379 max_age: DEFAULT_LIVE_REPLAY_TTL,
380 }
381 }
382}
383
384#[derive(Debug)]
385pub struct InMemoryLiveReplayStore {
386 config: InMemoryLiveReplayStoreConfig,
387 clock: Arc<dyn crate::Clock>,
388 sessions: StdMutex<HashMap<String, LiveReplaySessionBuffer>>,
389}
390
391impl InMemoryLiveReplayStore {
392 pub fn new(config: InMemoryLiveReplayStoreConfig) -> Self {
393 Self::with_clock(config, Arc::new(crate::SystemClock))
394 }
395
396 pub fn with_clock(config: InMemoryLiveReplayStoreConfig, clock: Arc<dyn crate::Clock>) -> Self {
397 Self {
398 config,
399 clock,
400 sessions: StdMutex::new(HashMap::new()),
401 }
402 }
403
404 pub fn with_bounds(max_events_per_session: usize, max_age: Duration) -> Self {
405 Self::new(InMemoryLiveReplayStoreConfig {
406 max_events_per_session,
407 max_age,
408 })
409 }
410}
411
412impl Default for InMemoryLiveReplayStore {
413 fn default() -> Self {
414 Self::new(InMemoryLiveReplayStoreConfig::default())
415 }
416}
417
418#[derive(Debug)]
419struct LiveReplaySessionBuffer {
420 events: VecDeque<StoredObservationEvent>,
421 tail_position: u64,
422 sender: Option<broadcast::Sender<SessionObservationEvent>>,
423}
424
425impl LiveReplaySessionBuffer {
426 fn new() -> Self {
427 Self {
428 events: VecDeque::new(),
429 tail_position: 0,
430 sender: None,
431 }
432 }
433
434 fn subscribe(
435 &mut self,
436 channel_capacity: usize,
437 ) -> broadcast::Receiver<SessionObservationEvent> {
438 match self.sender.as_ref() {
439 Some(sender) => sender.subscribe(),
440 None => {
441 let (sender, receiver) = broadcast::channel(channel_capacity.max(1));
442 self.sender = Some(sender);
443 receiver
444 }
445 }
446 }
447
448 fn publish(&mut self, event: SessionObservationEvent) {
449 let Some(sender) = self.sender.as_ref() else {
450 return;
451 };
452 if sender.send(event).is_err() {
453 self.sender = None;
454 }
455 }
456}
457
458#[derive(Clone, Debug)]
459struct StoredObservationEvent {
460 position: u64,
461 appended_at: Instant,
462 event: SessionObservationEvent,
463}
464
465impl InMemoryLiveReplayStore {
466 fn trim_locked(
467 config: &InMemoryLiveReplayStoreConfig,
468 buffer: &mut LiveReplaySessionBuffer,
469 now: Instant,
470 ) {
471 while buffer.events.len() > config.max_events_per_session {
472 buffer.events.pop_front();
473 }
474 while buffer
475 .events
476 .front()
477 .is_some_and(|event| now.duration_since(event.appended_at) > config.max_age)
478 {
479 buffer.events.pop_front();
480 }
481 }
482
483 fn gap_reason_for_cursor(
484 buffer: Option<&LiveReplaySessionBuffer>,
485 cursor_position: u64,
486 ) -> Option<LiveReplayGapReason> {
487 let Some(buffer) = buffer else {
488 return (cursor_position > 0).then_some(LiveReplayGapReason::Unavailable);
489 };
490 if cursor_position > buffer.tail_position {
491 return Some(LiveReplayGapReason::Unavailable);
492 }
493 let Some(first) = buffer.events.front() else {
494 return (cursor_position < buffer.tail_position)
495 .then_some(LiveReplayGapReason::Trimmed);
496 };
497 if cursor_position + 1 < first.position {
498 Some(LiveReplayGapReason::Trimmed)
499 } else {
500 None
501 }
502 }
503}
504
505impl LiveReplayStore for InMemoryLiveReplayStore {
506 fn append(
507 &self,
508 session_id: &str,
509 revision: SessionRevision,
510 payload: SessionObservationEventPayload,
511 ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
512 let now = self.clock.now();
513 let mut sessions = self
514 .sessions
515 .lock()
516 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
517 let buffer = sessions
518 .entry(session_id.to_string())
519 .or_insert_with(LiveReplaySessionBuffer::new);
520 buffer.tail_position = buffer.tail_position.saturating_add(1);
521 let cursor = SessionCursor::new(session_id, revision, buffer.tail_position);
522 let event = SessionObservationEvent {
523 session_id: session_id.to_string(),
524 revision,
525 cursor,
526 payload,
527 };
528 buffer.events.push_back(StoredObservationEvent {
529 position: buffer.tail_position,
530 appended_at: now,
531 event: event.clone(),
532 });
533 Self::trim_locked(&self.config, buffer, now);
534 buffer.publish(event.clone());
535 Ok(event)
536 }
537
538 fn replay_after_cursor(
539 &self,
540 cursor: &SessionCursor,
541 ) -> Result<LiveReplayResult, LiveReplayStoreError> {
542 let parsed = cursor.parse()?;
543 let _cursor_revision = parsed.revision;
544 let now = self.clock.now();
545 let mut sessions = self
546 .sessions
547 .lock()
548 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
549 if let Some(buffer) = sessions.get_mut(&parsed.session_id) {
550 Self::trim_locked(&self.config, buffer, now);
551 }
552 let buffer = sessions.get(&parsed.session_id);
553 if let Some(reason) = Self::gap_reason_for_cursor(buffer, parsed.live_position) {
554 return Ok(LiveReplayResult::Gap(reason));
555 }
556 let events = buffer
557 .map(|buffer| {
558 buffer
559 .events
560 .iter()
561 .filter(|event| event.position > parsed.live_position)
562 .map(|event| event.event.clone())
563 .collect()
564 })
565 .unwrap_or_default();
566 Ok(LiveReplayResult::Replayed(events))
567 }
568
569 fn subscribe_after_cursor(
570 &self,
571 cursor: &SessionCursor,
572 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
573 let parsed = cursor.parse()?;
574 let _cursor_revision = parsed.revision;
575 let now = self.clock.now();
576 let mut sessions = self
577 .sessions
578 .lock()
579 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
580 let buffer = sessions
581 .entry(parsed.session_id.clone())
582 .or_insert_with(LiveReplaySessionBuffer::new);
583 Self::trim_locked(&self.config, buffer, now);
584 if let Some(reason) = Self::gap_reason_for_cursor(Some(buffer), parsed.live_position) {
585 return Ok(LiveReplaySubscribeResult::Gap(reason));
586 }
587 let replay = buffer
588 .events
589 .iter()
590 .filter(|event| event.position > parsed.live_position)
591 .map(|event| event.event.clone())
592 .collect();
593 let receiver = buffer.subscribe(self.config.max_events_per_session);
594 Ok(LiveReplaySubscribeResult::Subscribed(
595 LiveReplaySubscription::new(replay, receiver),
596 ))
597 }
598
599 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
600 let tail_position = self
601 .sessions
602 .lock()
603 .ok()
604 .and_then(|sessions| sessions.get(session_id).map(|buffer| buffer.tail_position))
605 .unwrap_or(0);
606 SessionCursor::new(session_id, revision, tail_position)
607 }
608
609 fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError> {
610 let now = self.clock.now();
611 let mut sessions = self
612 .sessions
613 .lock()
614 .map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
615 if let Some(buffer) = sessions.get_mut(session_id) {
616 Self::trim_locked(&self.config, buffer, now);
617 }
618 Ok(())
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 fn activity(text: &str) -> SessionObservationEventPayload {
627 SessionObservationEventPayload::TurnActivity(crate::TurnActivity::independent(
628 crate::TurnEvent::AssistantProseDelta {
629 text: text.to_string(),
630 },
631 ))
632 }
633
634 #[test]
635 fn session_cursor_round_trips_and_debug_is_opaque() {
636 let cursor = SessionCursor::new("session:with:colon", SessionRevision(3), 9);
637 let encoded = serde_json::to_string(&cursor).expect("serialize");
638 let decoded: SessionCursor = serde_json::from_str(&encoded).expect("deserialize");
639 assert_eq!(decoded, cursor);
640 assert_eq!(format!("{cursor:?}"), "SessionCursor(<opaque>)");
641 let parsed = cursor
642 .parse_for_session("session:with:colon")
643 .expect("parse");
644 assert_eq!(parsed.revision, SessionRevision(3));
645 assert_eq!(parsed.live_position, 9);
646 }
647
648 #[test]
649 fn session_cursor_rejects_malformed_and_wrong_session() {
650 let malformed = SessionCursor::from_raw_for_testing("bad");
651 assert!(matches!(
652 malformed.parse_for_session("s"),
653 Err(SessionCursorError::Malformed { .. })
654 ));
655 let cursor = SessionCursor::new("actual", SessionRevision(0), 0);
656 assert!(matches!(
657 cursor.parse_for_session("expected"),
658 Err(SessionCursorError::WrongSession { .. })
659 ));
660 }
661
662 #[test]
663 fn in_memory_replay_store_replays_after_cursor_in_order() {
664 let store = InMemoryLiveReplayStore::default();
665 let start = store.current_cursor("s", SessionRevision(0));
666 store
667 .append("s", SessionRevision(0), activity("a"))
668 .expect("append a");
669 store
670 .append("s", SessionRevision(0), activity("b"))
671 .expect("append b");
672 let LiveReplayResult::Replayed(events) = store.replay_after_cursor(&start).expect("replay")
673 else {
674 panic!("expected replay");
675 };
676 assert_eq!(events.len(), 2);
677 match &events[0].payload {
678 SessionObservationEventPayload::TurnActivity(activity) => match &activity.event {
679 crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "a"),
680 _ => panic!("wrong event"),
681 },
682 _ => panic!("wrong payload"),
683 }
684 }
685
686 #[test]
687 fn in_memory_replay_store_reports_gap_after_capacity_trim() {
688 let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
689 let start = store.current_cursor("s", SessionRevision(0));
690 store
691 .append("s", SessionRevision(0), activity("a"))
692 .expect("append a");
693 store
694 .append("s", SessionRevision(0), activity("b"))
695 .expect("append b");
696 assert!(matches!(
697 store.replay_after_cursor(&start).expect("gap"),
698 LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
699 ));
700 }
701
702 #[test]
703 fn in_memory_replay_store_reports_gap_after_ttl_trim() {
704 let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
705 let start = store.current_cursor("s", SessionRevision(0));
706 store
707 .append("s", SessionRevision(0), activity("a"))
708 .expect("append a");
709 std::thread::sleep(Duration::from_millis(5));
710 assert!(matches!(
711 store.replay_after_cursor(&start).expect("gap"),
712 LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
713 ));
714 }
715
716 #[test]
717 fn in_memory_replay_store_reports_unavailable_for_cursor_ahead_of_tail() {
718 let store = InMemoryLiveReplayStore::default();
719 let ahead = SessionCursor::new("s", SessionRevision(0), 99);
720 assert!(matches!(
721 store.replay_after_cursor(&ahead).expect("gap"),
722 LiveReplayResult::Gap(LiveReplayGapReason::Unavailable)
723 ));
724 }
725
726 #[tokio::test]
727 async fn in_memory_replay_subscription_yields_replay_then_live() {
728 let store = InMemoryLiveReplayStore::default();
729 let start = store.current_cursor("s", SessionRevision(0));
730 store
731 .append("s", SessionRevision(0), activity("a"))
732 .expect("append a");
733 let LiveReplaySubscribeResult::Subscribed(mut subscription) =
734 store.subscribe_after_cursor(&start).expect("subscribe")
735 else {
736 panic!("expected subscription");
737 };
738 let first = subscription.next_event().await.expect("replay");
739 assert_eq!(first.session_id, "s");
740 store
741 .append("s", SessionRevision(0), activity("b"))
742 .expect("append b");
743 let second = subscription.next_event().await.expect("live");
744 match second.payload {
745 SessionObservationEventPayload::TurnActivity(activity) => match activity.event {
746 crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "b"),
747 _ => panic!("wrong event"),
748 },
749 _ => panic!("wrong payload"),
750 }
751 }
752
753 #[test]
754 fn in_memory_replay_store_allocates_live_channel_lazily() {
755 let store = InMemoryLiveReplayStore::default();
756 let start = store.current_cursor("s", SessionRevision(0));
757 store
758 .append("s", SessionRevision(0), activity("a"))
759 .expect("append a");
760 {
761 let sessions = store.sessions.lock().expect("sessions");
762 assert!(sessions.get("s").expect("buffer").sender.is_none());
763 }
764 let LiveReplaySubscribeResult::Subscribed(subscription) =
765 store.subscribe_after_cursor(&start).expect("subscribe")
766 else {
767 panic!("expected subscription");
768 };
769 {
770 let sessions = store.sessions.lock().expect("sessions");
771 assert!(sessions.get("s").expect("buffer").sender.is_some());
772 }
773 drop(subscription);
774 store
775 .append("s", SessionRevision(0), activity("b"))
776 .expect("append b");
777 let sessions = store.sessions.lock().expect("sessions");
778 assert!(sessions.get("s").expect("buffer").sender.is_none());
779 }
780
781 #[test]
782 fn in_memory_replay_subscription_reports_gap_after_capacity_trim() {
783 let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
784 let start = store.current_cursor("s", SessionRevision(0));
785 store
786 .append("s", SessionRevision(0), activity("a"))
787 .expect("append a");
788 store
789 .append("s", SessionRevision(0), activity("b"))
790 .expect("append b");
791 assert!(matches!(
792 store.subscribe_after_cursor(&start).expect("subscribe"),
793 LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
794 ));
795 }
796
797 #[test]
798 fn in_memory_replay_subscription_reports_gap_after_ttl_trim() {
799 let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
800 let start = store.current_cursor("s", SessionRevision(0));
801 store
802 .append("s", SessionRevision(0), activity("a"))
803 .expect("append a");
804 std::thread::sleep(Duration::from_millis(5));
805 assert!(matches!(
806 store.subscribe_after_cursor(&start).expect("subscribe"),
807 LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
808 ));
809 }
810}