use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Mutex as StdMutex;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use crate::runtime::{LashRuntime, RuntimeSessionState};
const SESSION_CURSOR_PREFIX: &str = "lashsc1:";
const DEFAULT_LIVE_REPLAY_CAPACITY: usize = 2048;
const DEFAULT_LIVE_REPLAY_TTL: Duration = Duration::from_secs(120);
#[derive(
Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[serde(transparent)]
pub struct SessionRevision(pub u64);
impl SessionRevision {
pub fn new(revision: u64) -> Self {
Self(revision)
}
pub fn as_u64(self) -> u64 {
self.0
}
pub(super) fn from_runtime(runtime: &LashRuntime) -> Self {
Self::from_state(&runtime.export_persisted_state())
}
pub(super) fn from_state(state: &RuntimeSessionState) -> Self {
Self(state.head_revision.unwrap_or(state.turn_index as u64))
}
}
#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct SessionCursor(String);
impl SessionCursor {
pub(crate) fn new(
session_id: impl AsRef<str>,
revision: SessionRevision,
live_position: u64,
) -> Self {
Self(format!(
"{SESSION_CURSOR_PREFIX}{}:{live_position}:{}",
revision.0,
session_id.as_ref()
))
}
#[cfg(test)]
pub(super) fn from_raw_for_testing(raw: impl Into<String>) -> Self {
Self(raw.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
pub(crate) fn parse_for_session(
&self,
expected_session_id: &str,
) -> Result<ParsedSessionCursor, SessionCursorError> {
let parsed = self.parse()?;
if parsed.session_id != expected_session_id {
return Err(SessionCursorError::WrongSession {
expected_session_id: expected_session_id.to_string(),
actual_session_id: parsed.session_id,
});
}
Ok(parsed)
}
fn parse(&self) -> Result<ParsedSessionCursor, SessionCursorError> {
let payload = self.0.strip_prefix(SESSION_CURSOR_PREFIX).ok_or_else(|| {
SessionCursorError::Malformed {
message: "missing cursor prefix".to_string(),
}
})?;
let mut parts = payload.splitn(3, ':');
let revision = parts
.next()
.ok_or_else(|| SessionCursorError::Malformed {
message: "missing session revision".to_string(),
})?
.parse::<u64>()
.map_err(|err| SessionCursorError::Malformed {
message: format!("invalid session revision: {err}"),
})?;
let live_position = parts
.next()
.ok_or_else(|| SessionCursorError::Malformed {
message: "missing live replay position".to_string(),
})?
.parse::<u64>()
.map_err(|err| SessionCursorError::Malformed {
message: format!("invalid live replay position: {err}"),
})?;
let session_id = parts
.next()
.filter(|value| !value.is_empty())
.ok_or_else(|| SessionCursorError::Malformed {
message: "missing session id".to_string(),
})?
.to_string();
Ok(ParsedSessionCursor {
session_id,
revision: SessionRevision(revision),
live_position,
})
}
}
impl fmt::Debug for SessionCursor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("SessionCursor(<opaque>)")
}
}
impl fmt::Display for SessionCursor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Clone, Debug)]
pub(crate) struct ParsedSessionCursor {
pub session_id: String,
pub revision: SessionRevision,
pub live_position: u64,
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum SessionCursorError {
#[error("malformed session cursor: {message}")]
Malformed { message: String },
#[error("session cursor belongs to `{actual_session_id}`, not `{expected_session_id}`")]
WrongSession {
expected_session_id: String,
actual_session_id: String,
},
}
#[derive(Clone, Debug)]
pub struct SessionObservation {
pub read_view: crate::SessionReadView,
pub cursor: SessionCursor,
}
#[derive(Clone, Debug)]
pub struct SessionObservationEvent {
pub session_id: String,
pub revision: SessionRevision,
pub cursor: SessionCursor,
pub payload: SessionObservationEventPayload,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionQueueEventKind {
Enqueued,
Cancelled,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionProcessEventKind {
Started,
Cancelled,
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum SessionObservationEventPayload {
TurnActivity(crate::TurnActivity),
Committed {
read_view: crate::SessionReadView,
},
AgentFrameSwitched {
frame_id: String,
},
QueueChanged {
kind: SessionQueueEventKind,
batch_ids: Vec<String>,
},
ProcessChanged {
kind: SessionProcessEventKind,
process_ids: Vec<String>,
},
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct LiveReplayGap {
pub session_id: String,
pub requested_cursor: SessionCursor,
pub latest_cursor: SessionCursor,
pub latest_revision: SessionRevision,
pub reason: LiveReplayGapReason,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LiveReplayGapReason {
Trimmed,
Unavailable,
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum LiveReplayStoreError {
#[error("{0}")]
Cursor(#[from] SessionCursorError),
#[error("live replay store error: {0}")]
Store(String),
#[error("live replay subscriber lagged by {0} events")]
SubscriberLagged(u64),
#[error("live replay channel closed")]
Closed,
}
#[derive(Clone, Debug)]
pub enum LiveReplayResult {
Replayed(Vec<SessionObservationEvent>),
Gap(LiveReplayGapReason),
}
pub enum LiveReplaySubscribeResult {
Subscribed(LiveReplaySubscription),
Gap(LiveReplayGapReason),
}
pub struct LiveReplaySubscription {
replay: VecDeque<SessionObservationEvent>,
receiver: broadcast::Receiver<SessionObservationEvent>,
}
impl LiveReplaySubscription {
fn new(
replay: Vec<SessionObservationEvent>,
receiver: broadcast::Receiver<SessionObservationEvent>,
) -> Self {
Self {
replay: replay.into(),
receiver,
}
}
pub async fn next_event(&mut self) -> Result<SessionObservationEvent, LiveReplayStoreError> {
if let Some(event) = self.replay.pop_front() {
return Ok(event);
}
match self.receiver.recv().await {
Ok(event) => Ok(event),
Err(broadcast::error::RecvError::Lagged(count)) => {
Err(LiveReplayStoreError::SubscriberLagged(count))
}
Err(broadcast::error::RecvError::Closed) => Err(LiveReplayStoreError::Closed),
}
}
}
#[derive(Clone, Debug)]
pub enum SessionResume {
Replayed {
events: Vec<SessionObservationEvent>,
},
Gap {
observation: SessionObservation,
gap: LiveReplayGap,
},
}
pub enum SessionObservationSubscription {
Subscribed(LiveReplaySubscription),
Gap {
observation: SessionObservation,
gap: LiveReplayGap,
},
}
pub trait LiveReplayStore: Send + Sync {
fn append(
&self,
session_id: &str,
revision: SessionRevision,
payload: SessionObservationEventPayload,
) -> Result<SessionObservationEvent, LiveReplayStoreError>;
fn replay_after_cursor(
&self,
cursor: &SessionCursor,
) -> Result<LiveReplayResult, LiveReplayStoreError>;
fn subscribe_after_cursor(
&self,
cursor: &SessionCursor,
) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError>;
fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor;
fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError>;
}
#[derive(Clone, Debug)]
pub struct InMemoryLiveReplayStoreConfig {
pub max_events_per_session: usize,
pub max_age: Duration,
}
impl Default for InMemoryLiveReplayStoreConfig {
fn default() -> Self {
Self {
max_events_per_session: DEFAULT_LIVE_REPLAY_CAPACITY,
max_age: DEFAULT_LIVE_REPLAY_TTL,
}
}
}
#[derive(Debug)]
pub struct InMemoryLiveReplayStore {
config: InMemoryLiveReplayStoreConfig,
sessions: StdMutex<HashMap<String, LiveReplaySessionBuffer>>,
}
impl InMemoryLiveReplayStore {
pub fn new(config: InMemoryLiveReplayStoreConfig) -> Self {
Self {
config,
sessions: StdMutex::new(HashMap::new()),
}
}
pub fn with_bounds(max_events_per_session: usize, max_age: Duration) -> Self {
Self::new(InMemoryLiveReplayStoreConfig {
max_events_per_session,
max_age,
})
}
}
impl Default for InMemoryLiveReplayStore {
fn default() -> Self {
Self::new(InMemoryLiveReplayStoreConfig::default())
}
}
#[derive(Debug)]
struct LiveReplaySessionBuffer {
events: VecDeque<StoredObservationEvent>,
tail_position: u64,
sender: broadcast::Sender<SessionObservationEvent>,
}
impl LiveReplaySessionBuffer {
fn new() -> Self {
let (sender, _) = broadcast::channel(DEFAULT_LIVE_REPLAY_CAPACITY);
Self {
events: VecDeque::new(),
tail_position: 0,
sender,
}
}
}
#[derive(Clone, Debug)]
struct StoredObservationEvent {
position: u64,
appended_at: Instant,
event: SessionObservationEvent,
}
impl InMemoryLiveReplayStore {
fn trim_locked(config: &InMemoryLiveReplayStoreConfig, buffer: &mut LiveReplaySessionBuffer) {
let now = Instant::now();
while buffer.events.len() > config.max_events_per_session {
buffer.events.pop_front();
}
while buffer
.events
.front()
.is_some_and(|event| now.duration_since(event.appended_at) > config.max_age)
{
buffer.events.pop_front();
}
}
fn gap_reason_for_cursor(
buffer: Option<&LiveReplaySessionBuffer>,
cursor_position: u64,
) -> Option<LiveReplayGapReason> {
let Some(buffer) = buffer else {
return (cursor_position > 0).then_some(LiveReplayGapReason::Unavailable);
};
if cursor_position > buffer.tail_position {
return Some(LiveReplayGapReason::Unavailable);
}
let Some(first) = buffer.events.front() else {
return (cursor_position < buffer.tail_position)
.then_some(LiveReplayGapReason::Trimmed);
};
if cursor_position + 1 < first.position {
Some(LiveReplayGapReason::Trimmed)
} else {
None
}
}
}
impl LiveReplayStore for InMemoryLiveReplayStore {
fn append(
&self,
session_id: &str,
revision: SessionRevision,
payload: SessionObservationEventPayload,
) -> Result<SessionObservationEvent, LiveReplayStoreError> {
let mut sessions = self
.sessions
.lock()
.map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
let buffer = sessions
.entry(session_id.to_string())
.or_insert_with(LiveReplaySessionBuffer::new);
buffer.tail_position = buffer.tail_position.saturating_add(1);
let cursor = SessionCursor::new(session_id, revision, buffer.tail_position);
let event = SessionObservationEvent {
session_id: session_id.to_string(),
revision,
cursor,
payload,
};
buffer.events.push_back(StoredObservationEvent {
position: buffer.tail_position,
appended_at: Instant::now(),
event: event.clone(),
});
Self::trim_locked(&self.config, buffer);
let _ = buffer.sender.send(event.clone());
Ok(event)
}
fn replay_after_cursor(
&self,
cursor: &SessionCursor,
) -> Result<LiveReplayResult, LiveReplayStoreError> {
let parsed = cursor.parse()?;
let _cursor_revision = parsed.revision;
let mut sessions = self
.sessions
.lock()
.map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
if let Some(buffer) = sessions.get_mut(&parsed.session_id) {
Self::trim_locked(&self.config, buffer);
}
let buffer = sessions.get(&parsed.session_id);
if let Some(reason) = Self::gap_reason_for_cursor(buffer, parsed.live_position) {
return Ok(LiveReplayResult::Gap(reason));
}
let events = buffer
.map(|buffer| {
buffer
.events
.iter()
.filter(|event| event.position > parsed.live_position)
.map(|event| event.event.clone())
.collect()
})
.unwrap_or_default();
Ok(LiveReplayResult::Replayed(events))
}
fn subscribe_after_cursor(
&self,
cursor: &SessionCursor,
) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
let parsed = cursor.parse()?;
let _cursor_revision = parsed.revision;
let mut sessions = self
.sessions
.lock()
.map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
let buffer = sessions
.entry(parsed.session_id.clone())
.or_insert_with(LiveReplaySessionBuffer::new);
Self::trim_locked(&self.config, buffer);
let receiver = buffer.sender.subscribe();
if let Some(reason) = Self::gap_reason_for_cursor(Some(buffer), parsed.live_position) {
return Ok(LiveReplaySubscribeResult::Gap(reason));
}
let replay = buffer
.events
.iter()
.filter(|event| event.position > parsed.live_position)
.map(|event| event.event.clone())
.collect();
Ok(LiveReplaySubscribeResult::Subscribed(
LiveReplaySubscription::new(replay, receiver),
))
}
fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
let tail_position = self
.sessions
.lock()
.ok()
.and_then(|sessions| sessions.get(session_id).map(|buffer| buffer.tail_position))
.unwrap_or(0);
SessionCursor::new(session_id, revision, tail_position)
}
fn trim_session(&self, session_id: &str) -> Result<(), LiveReplayStoreError> {
let mut sessions = self
.sessions
.lock()
.map_err(|_| LiveReplayStoreError::Store("live replay mutex poisoned".to_string()))?;
if let Some(buffer) = sessions.get_mut(session_id) {
Self::trim_locked(&self.config, buffer);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn activity(text: &str) -> SessionObservationEventPayload {
SessionObservationEventPayload::TurnActivity(crate::TurnActivity::independent(
crate::TurnEvent::AssistantProseDelta {
text: text.to_string(),
},
))
}
#[test]
fn session_cursor_round_trips_and_debug_is_opaque() {
let cursor = SessionCursor::new("session:with:colon", SessionRevision(3), 9);
let encoded = serde_json::to_string(&cursor).expect("serialize");
let decoded: SessionCursor = serde_json::from_str(&encoded).expect("deserialize");
assert_eq!(decoded, cursor);
assert_eq!(format!("{cursor:?}"), "SessionCursor(<opaque>)");
let parsed = cursor
.parse_for_session("session:with:colon")
.expect("parse");
assert_eq!(parsed.revision, SessionRevision(3));
assert_eq!(parsed.live_position, 9);
}
#[test]
fn session_cursor_rejects_malformed_and_wrong_session() {
let malformed = SessionCursor::from_raw_for_testing("bad");
assert!(matches!(
malformed.parse_for_session("s"),
Err(SessionCursorError::Malformed { .. })
));
let cursor = SessionCursor::new("actual", SessionRevision(0), 0);
assert!(matches!(
cursor.parse_for_session("expected"),
Err(SessionCursorError::WrongSession { .. })
));
}
#[test]
fn in_memory_replay_store_replays_after_cursor_in_order() {
let store = InMemoryLiveReplayStore::default();
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
store
.append("s", SessionRevision(0), activity("b"))
.expect("append b");
let LiveReplayResult::Replayed(events) = store.replay_after_cursor(&start).expect("replay")
else {
panic!("expected replay");
};
assert_eq!(events.len(), 2);
match &events[0].payload {
SessionObservationEventPayload::TurnActivity(activity) => match &activity.event {
crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "a"),
_ => panic!("wrong event"),
},
_ => panic!("wrong payload"),
}
}
#[test]
fn in_memory_replay_store_reports_gap_after_capacity_trim() {
let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
store
.append("s", SessionRevision(0), activity("b"))
.expect("append b");
assert!(matches!(
store.replay_after_cursor(&start).expect("gap"),
LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
));
}
#[test]
fn in_memory_replay_store_reports_gap_after_ttl_trim() {
let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
std::thread::sleep(Duration::from_millis(5));
assert!(matches!(
store.replay_after_cursor(&start).expect("gap"),
LiveReplayResult::Gap(LiveReplayGapReason::Trimmed)
));
}
#[test]
fn in_memory_replay_store_reports_unavailable_for_cursor_ahead_of_tail() {
let store = InMemoryLiveReplayStore::default();
let ahead = SessionCursor::new("s", SessionRevision(0), 99);
assert!(matches!(
store.replay_after_cursor(&ahead).expect("gap"),
LiveReplayResult::Gap(LiveReplayGapReason::Unavailable)
));
}
#[tokio::test]
async fn in_memory_replay_subscription_yields_replay_then_live() {
let store = InMemoryLiveReplayStore::default();
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
let LiveReplaySubscribeResult::Subscribed(mut subscription) =
store.subscribe_after_cursor(&start).expect("subscribe")
else {
panic!("expected subscription");
};
let first = subscription.next_event().await.expect("replay");
assert_eq!(first.session_id, "s");
store
.append("s", SessionRevision(0), activity("b"))
.expect("append b");
let second = subscription.next_event().await.expect("live");
match second.payload {
SessionObservationEventPayload::TurnActivity(activity) => match activity.event {
crate::TurnEvent::AssistantProseDelta { text } => assert_eq!(text, "b"),
_ => panic!("wrong event"),
},
_ => panic!("wrong payload"),
}
}
#[test]
fn in_memory_replay_subscription_reports_gap_after_capacity_trim() {
let store = InMemoryLiveReplayStore::with_bounds(1, Duration::from_secs(120));
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
store
.append("s", SessionRevision(0), activity("b"))
.expect("append b");
assert!(matches!(
store.subscribe_after_cursor(&start).expect("subscribe"),
LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
));
}
#[test]
fn in_memory_replay_subscription_reports_gap_after_ttl_trim() {
let store = InMemoryLiveReplayStore::with_bounds(16, Duration::from_millis(1));
let start = store.current_cursor("s", SessionRevision(0));
store
.append("s", SessionRevision(0), activity("a"))
.expect("append a");
std::thread::sleep(Duration::from_millis(5));
assert!(matches!(
store.subscribe_after_cursor(&start).expect("subscribe"),
LiveReplaySubscribeResult::Gap(LiveReplayGapReason::Trimmed)
));
}
}