mod replay;
use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::Mutex;
use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
pub use replay::{
InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
};
#[derive(Clone)]
pub struct RuntimeObservation {
pub session_id: Arc<str>,
pub revision: SessionRevision,
pub cursor: SessionCursor,
pub policy: crate::SessionPolicy,
pub read_view: crate::SessionReadView,
pub persisted_state: super::RuntimeSessionState,
pub usage_report: super::SessionUsageReport,
pub tool_state: Option<crate::ToolState>,
pub tool_catalog: Arc<Vec<serde_json::Value>>,
pub tool_catalog_error: Option<String>,
pub process_registry: Option<Arc<dyn ProcessRegistry>>,
pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
pub queued_work_poke: Option<super::QueuedWorkPoke>,
}
impl RuntimeObservation {
fn from_runtime(
runtime: &LashRuntime,
cursor: SessionCursor,
previous: Option<&RuntimeObservation>,
) -> Self {
let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
Ok(catalog) => (catalog, None),
Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
};
let tool_state_generation = runtime
.session
.as_ref()
.map(|session| session.plugins().tool_registry().generation());
let tool_state = match (
tool_state_generation,
previous.and_then(|observation| observation.tool_state.as_ref()),
) {
(Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
Some(snapshot.clone())
}
(Some(_), _) => match runtime.tool_state() {
Ok(state) => Some(state),
Err(err) => {
tracing::warn!(
session_id = %runtime.session_id(),
error = %err,
"failed to capture tool state for observation; omitting the snapshot",
);
None
}
},
(None, _) => None,
};
let revision = SessionRevision::from_runtime(runtime);
Self {
session_id: Arc::from(runtime.session_id()),
revision,
cursor,
policy: runtime.read_view().policy().clone(),
read_view: runtime.read_view(),
persisted_state: runtime.export_persisted_state(),
usage_report: runtime.usage_report(),
tool_state,
tool_catalog,
tool_catalog_error,
process_registry: runtime.host.process_registry.clone(),
queue_store: runtime
.session
.as_ref()
.and_then(|session| session.history_store()),
queued_work_poke: runtime.host.queued_work_poke.clone(),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn session_revision(&self) -> SessionRevision {
self.revision
}
pub fn cursor(&self) -> &SessionCursor {
&self.cursor
}
pub fn session_observation(&self) -> SessionObservation {
SessionObservation {
read_view: self.read_view.clone(),
cursor: self.cursor.clone(),
}
}
pub fn process_scope(&self) -> crate::ProcessScope {
crate::ProcessScope::new(self.session_id.as_ref())
}
pub fn process_scope_id(&self) -> crate::ProcessScopeId {
self.process_scope().id()
}
pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
let Some(executor) = self.process_registry.as_ref() else {
return Vec::new();
};
self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
.await
}
pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
let Some(executor) = self.process_registry.as_ref() else {
return Vec::new();
};
self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
.await
}
async fn list_process_handles_with_mode(
&self,
executor: &Arc<dyn crate::ProcessRegistry>,
mode: crate::ProcessListMode,
) -> Vec<ProcessHandleSummary> {
let root_scope = self.process_scope();
let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
if !agent_frame_id.is_empty() {
let frame_scope =
crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
if frame_scope.id() != root_scope.id() {
entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
}
}
entries
.into_iter()
.map(ProcessHandleSummary::from)
.collect()
}
}
async fn list_scope_process_handles(
executor: &Arc<dyn crate::ProcessRegistry>,
scope: &crate::ProcessScope,
mode: crate::ProcessListMode,
) -> Vec<ProcessHandleGrantEntry> {
match mode {
crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
}
.unwrap_or_default()
}
#[derive(Clone)]
pub struct RuntimeHandle {
pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
observation: Arc<ArcSwap<RuntimeObservation>>,
live_replay_store: Arc<dyn LiveReplayStore>,
}
impl RuntimeHandle {
pub fn new(runtime: LashRuntime) -> Self {
Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
}
pub fn with_live_replay_store(
runtime: LashRuntime,
live_replay_store: Arc<dyn LiveReplayStore>,
) -> Self {
let revision = SessionRevision::from_runtime(&runtime);
let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
Self {
runtime: Arc::new(Mutex::new(runtime)),
observation: Arc::new(ArcSwap::from_pointee(observation)),
live_replay_store,
}
}
pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
Arc::clone(&self.runtime)
}
pub fn observe(&self) -> Arc<RuntimeObservation> {
self.observation.load_full()
}
pub fn publish_from(&self, runtime: &LashRuntime) {
let revision = SessionRevision::from_runtime(runtime);
let previous = self.observation.load_full();
let state = runtime.export_persisted_state();
if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
&& !state.current_agent_frame_id.is_empty()
&& let Err(err) = self.live_replay_store.append(
runtime.session_id(),
revision,
SessionObservationEventPayload::AgentFrameSwitched {
frame_id: state.current_agent_frame_id.clone(),
},
)
{
tracing::warn!(
session_id = %runtime.session_id(),
error = %err,
"failed to append agent-frame observation event; reconnect may require gap recovery",
);
}
let cursor = match self.live_replay_store.append(
runtime.session_id(),
revision,
SessionObservationEventPayload::Committed {
read_view: runtime.read_view(),
},
) {
Ok(event) => event.cursor,
Err(err) => {
tracing::warn!(
session_id = %runtime.session_id(),
error = %err,
"failed to append session observation commit event; reconnect will fall back to gap recovery",
);
self.live_replay_store
.current_cursor(runtime.session_id(), revision)
}
};
self.observation
.store(Arc::new(RuntimeObservation::from_runtime(
runtime,
cursor,
Some(previous.as_ref()),
)));
}
pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
let observation = self.observe();
if let Err(err) = self.live_replay_store.append(
observation.session_id(),
observation.session_revision(),
SessionObservationEventPayload::TurnActivity(activity),
) {
tracing::warn!(
session_id = %observation.session_id(),
error = %err,
"failed to append live turn activity to session observation replay; reconnect may require gap recovery",
);
}
}
pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
let observation = self.observe();
if let Err(err) = self.live_replay_store.append(
observation.session_id(),
observation.session_revision(),
SessionObservationEventPayload::QueueChanged { kind, batch_ids },
) {
tracing::warn!(
session_id = %observation.session_id(),
error = %err,
"failed to append queue observation event; reconnect may require gap recovery",
);
}
}
pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
let observation = self.observe();
if let Err(err) = self.live_replay_store.append(
observation.session_id(),
observation.session_revision(),
SessionObservationEventPayload::ProcessChanged { kind, process_ids },
) {
tracing::warn!(
session_id = %observation.session_id(),
error = %err,
"failed to append process observation event; reconnect may require gap recovery",
);
}
}
pub fn current_session_observation(&self) -> SessionObservation {
let observation = self.observe();
self.session_observation_from(observation.as_ref())
}
pub fn resume_session_observation(
&self,
cursor: &SessionCursor,
) -> Result<SessionResume, LiveReplayStoreError> {
let observation = self.observe();
cursor.parse_for_session(observation.session_id())?;
match self.live_replay_store.replay_after_cursor(cursor)? {
LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
observation: self.session_observation_from(observation.as_ref()),
}),
}
}
pub fn subscribe_session_observation(
&self,
cursor: &SessionCursor,
) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
let observation = self.observe();
cursor.parse_for_session(observation.session_id())?;
match self.live_replay_store.subscribe_after_cursor(cursor)? {
LiveReplaySubscribeResult::Subscribed(subscription) => {
Ok(SessionObservationSubscription::Subscribed(subscription))
}
LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
observation: self.session_observation_from(observation.as_ref()),
}),
}
}
fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
SessionObservation {
read_view: observation.read_view.clone(),
cursor: self
.live_replay_store
.current_cursor(observation.session_id(), observation.session_revision()),
}
}
fn live_replay_gap(
&self,
requested_cursor: &SessionCursor,
reason: LiveReplayGapReason,
observation: &RuntimeObservation,
) -> LiveReplayGap {
let latest_cursor = self
.live_replay_store
.current_cursor(observation.session_id(), observation.session_revision());
LiveReplayGap {
session_id: observation.session_id().to_string(),
requested_cursor: requested_cursor.clone(),
latest_cursor,
latest_revision: observation.session_revision(),
reason,
}
}
pub async fn enqueue_turn_input(
&self,
input: crate::TurnInput,
delivery_policy: crate::DeliveryPolicy,
slot_policy: crate::SlotPolicy,
source_key: Option<String>,
) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
let observation = self.observe();
let store = observation
.queue_store
.clone()
.ok_or_else(super::session_api::queued_turn_input_store_required)?;
super::session_api::enqueue_turn_input_to_store(
observation.session_id.as_ref().to_string(),
store,
observation.queued_work_poke.clone(),
input,
delivery_policy,
slot_policy,
source_key,
)
.await
.inspect(|batch| {
self.record_queue_changed(
SessionQueueEventKind::Enqueued,
vec![batch.batch_id.clone()],
);
})
}
pub async fn cancel_queued_work_batch(
&self,
session_id: &str,
batch_id: &str,
) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
let observation = self.observe();
let store = observation
.queue_store
.clone()
.ok_or_else(super::session_api::queued_turn_input_store_required)?;
store
.cancel_queued_work_batch(session_id, batch_id)
.await
.map_err(|err| {
crate::RuntimeError::new(
crate::RuntimeErrorCode::StoreCommitFailed,
err.to_string(),
)
})
.inspect(|batch| {
if batch.is_some() {
self.record_queue_changed(
SessionQueueEventKind::Cancelled,
vec![batch_id.to_string()],
);
}
})
}
pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
match Arc::try_unwrap(self.runtime) {
Ok(mutex) => Ok(mutex.into_inner()),
Err(runtime) => Err(Self {
runtime,
observation: self.observation,
live_replay_store: self.live_replay_store,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
struct PanicLiveReplayStore;
impl LiveReplayStore for PanicLiveReplayStore {
fn append(
&self,
_session_id: &str,
_revision: SessionRevision,
_payload: SessionObservationEventPayload,
) -> Result<SessionObservationEvent, LiveReplayStoreError> {
panic!("append should not be called by cursor rejection tests")
}
fn replay_after_cursor(
&self,
_cursor: &SessionCursor,
) -> Result<LiveReplayResult, LiveReplayStoreError> {
panic!("replay_after_cursor should not be called for rejected cursors")
}
fn subscribe_after_cursor(
&self,
_cursor: &SessionCursor,
) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
panic!("subscribe_after_cursor should not be called for rejected cursors")
}
fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
SessionCursor::new(session_id, revision, 0)
}
fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
Ok(())
}
}
#[tokio::test]
async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
let runtime = LashRuntime::builder()
.with_session_id("session-a")
.with_policy(crate::SessionPolicy {
model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
.expect("model"),
..Default::default()
})
.build()
.await
.expect("runtime");
let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
let malformed = SessionCursor::from_raw_for_testing("bad");
assert!(matches!(
handle.resume_session_observation(&wrong_session),
Err(LiveReplayStoreError::Cursor(
SessionCursorError::WrongSession { .. }
))
));
assert!(matches!(
handle.subscribe_session_observation(&wrong_session),
Err(LiveReplayStoreError::Cursor(
SessionCursorError::WrongSession { .. }
))
));
assert!(matches!(
handle.resume_session_observation(&malformed),
Err(LiveReplayStoreError::Cursor(
SessionCursorError::Malformed { .. }
))
));
assert!(matches!(
handle.subscribe_session_observation(&malformed),
Err(LiveReplayStoreError::Cursor(
SessionCursorError::Malformed { .. }
))
));
}
}