Skip to main content

lash_core/runtime/
observation.rs

1mod replay;
2
3use arc_swap::ArcSwap;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
8
9pub use replay::{
10    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
11    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
12    LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
13    SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
14    SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
15};
16
17#[derive(Clone)]
18pub struct RuntimeObservation {
19    pub session_id: Arc<str>,
20    pub revision: SessionRevision,
21    pub cursor: SessionCursor,
22    pub policy: crate::SessionPolicy,
23    pub read_view: crate::SessionReadView,
24    pub persisted_state: super::RuntimeSessionState,
25    pub usage_report: super::SessionUsageReport,
26    pub tool_state: Option<crate::ToolState>,
27    pub tool_catalog: Arc<Vec<serde_json::Value>>,
28    pub tool_catalog_error: Option<String>,
29    pub process_registry: Option<Arc<dyn ProcessRegistry>>,
30    pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
31    pub queued_work_poke: Option<super::QueuedWorkPoke>,
32}
33
34impl RuntimeObservation {
35    fn from_runtime(
36        runtime: &LashRuntime,
37        cursor: SessionCursor,
38        previous: Option<&RuntimeObservation>,
39    ) -> Self {
40        let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
41            Ok(catalog) => (catalog, None),
42            Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
43        };
44        let tool_state_generation = runtime
45            .session
46            .as_ref()
47            .map(|session| session.plugins().tool_registry().generation());
48        let tool_state = match (
49            tool_state_generation,
50            previous.and_then(|observation| observation.tool_state.as_ref()),
51        ) {
52            (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
53                Some(snapshot.clone())
54            }
55            (Some(_), _) => match runtime.tool_state() {
56                Ok(state) => Some(state),
57                Err(err) => {
58                    tracing::warn!(
59                        session_id = %runtime.session_id(),
60                        error = %err,
61                        "failed to capture tool state for observation; omitting the snapshot",
62                    );
63                    None
64                }
65            },
66            (None, _) => None,
67        };
68        let revision = SessionRevision::from_runtime(runtime);
69        Self {
70            session_id: Arc::from(runtime.session_id()),
71            revision,
72            cursor,
73            policy: runtime.read_view().policy().clone(),
74            read_view: runtime.read_view(),
75            persisted_state: runtime.export_persisted_state(),
76            usage_report: runtime.usage_report(),
77            tool_state,
78            tool_catalog,
79            tool_catalog_error,
80            process_registry: runtime.host.process_registry.clone(),
81            queue_store: runtime
82                .session
83                .as_ref()
84                .and_then(|session| session.history_store()),
85            queued_work_poke: runtime.host.queued_work_poke.clone(),
86        }
87    }
88
89    pub fn session_id(&self) -> &str {
90        &self.session_id
91    }
92
93    pub fn session_revision(&self) -> SessionRevision {
94        self.revision
95    }
96
97    pub fn cursor(&self) -> &SessionCursor {
98        &self.cursor
99    }
100
101    pub fn session_observation(&self) -> SessionObservation {
102        SessionObservation {
103            read_view: self.read_view.clone(),
104            cursor: self.cursor.clone(),
105        }
106    }
107
108    pub fn process_scope(&self) -> crate::ProcessScope {
109        crate::ProcessScope::new(self.session_id.as_ref())
110    }
111
112    pub fn process_scope_id(&self) -> crate::ProcessScopeId {
113        self.process_scope().id()
114    }
115
116    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
117        let Some(executor) = self.process_registry.as_ref() else {
118            return Vec::new();
119        };
120        self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
121            .await
122    }
123
124    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
125        let Some(executor) = self.process_registry.as_ref() else {
126            return Vec::new();
127        };
128        self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
129            .await
130    }
131
132    async fn list_process_handles_with_mode(
133        &self,
134        executor: &Arc<dyn crate::ProcessRegistry>,
135        mode: crate::ProcessListMode,
136    ) -> Vec<ProcessHandleSummary> {
137        let root_scope = self.process_scope();
138        let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
139        let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
140        if !agent_frame_id.is_empty() {
141            let frame_scope =
142                crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
143            if frame_scope.id() != root_scope.id() {
144                entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
145                entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
146                entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
147            }
148        }
149        entries
150            .into_iter()
151            .map(ProcessHandleSummary::from)
152            .collect()
153    }
154}
155
156async fn list_scope_process_handles(
157    executor: &Arc<dyn crate::ProcessRegistry>,
158    scope: &crate::ProcessScope,
159    mode: crate::ProcessListMode,
160) -> Vec<ProcessHandleGrantEntry> {
161    match mode {
162        crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
163        crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
164    }
165    .unwrap_or_default()
166}
167
168#[derive(Clone)]
169pub struct RuntimeHandle {
170    pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
171    observation: Arc<ArcSwap<RuntimeObservation>>,
172    live_replay_store: Arc<dyn LiveReplayStore>,
173}
174
175impl RuntimeHandle {
176    pub fn new(runtime: LashRuntime) -> Self {
177        Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
178    }
179
180    pub fn with_live_replay_store(
181        runtime: LashRuntime,
182        live_replay_store: Arc<dyn LiveReplayStore>,
183    ) -> Self {
184        let revision = SessionRevision::from_runtime(&runtime);
185        let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
186        let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
187        Self {
188            runtime: Arc::new(Mutex::new(runtime)),
189            observation: Arc::new(ArcSwap::from_pointee(observation)),
190            live_replay_store,
191        }
192    }
193
194    pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
195        Arc::clone(&self.runtime)
196    }
197
198    pub fn observe(&self) -> Arc<RuntimeObservation> {
199        self.observation.load_full()
200    }
201
202    pub fn publish_from(&self, runtime: &LashRuntime) {
203        let revision = SessionRevision::from_runtime(runtime);
204        let previous = self.observation.load_full();
205        let state = runtime.export_persisted_state();
206        if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
207            && !state.current_agent_frame_id.is_empty()
208            && let Err(err) = self.live_replay_store.append(
209                runtime.session_id(),
210                revision,
211                SessionObservationEventPayload::AgentFrameSwitched {
212                    frame_id: state.current_agent_frame_id.clone(),
213                },
214            )
215        {
216            tracing::warn!(
217                session_id = %runtime.session_id(),
218                error = %err,
219                "failed to append agent-frame observation event; reconnect may require gap recovery",
220            );
221        }
222        let cursor = match self.live_replay_store.append(
223            runtime.session_id(),
224            revision,
225            SessionObservationEventPayload::Committed {
226                read_view: runtime.read_view(),
227            },
228        ) {
229            Ok(event) => event.cursor,
230            Err(err) => {
231                tracing::warn!(
232                    session_id = %runtime.session_id(),
233                    error = %err,
234                    "failed to append session observation commit event; reconnect will fall back to gap recovery",
235                );
236                self.live_replay_store
237                    .current_cursor(runtime.session_id(), revision)
238            }
239        };
240        self.observation
241            .store(Arc::new(RuntimeObservation::from_runtime(
242                runtime,
243                cursor,
244                Some(previous.as_ref()),
245            )));
246    }
247
248    pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
249        let observation = self.observe();
250        if let Err(err) = self.live_replay_store.append(
251            observation.session_id(),
252            observation.session_revision(),
253            SessionObservationEventPayload::TurnActivity(activity),
254        ) {
255            tracing::warn!(
256                session_id = %observation.session_id(),
257                error = %err,
258                "failed to append live turn activity to session observation replay; reconnect may require gap recovery",
259            );
260        }
261    }
262
263    pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
264        let observation = self.observe();
265        if let Err(err) = self.live_replay_store.append(
266            observation.session_id(),
267            observation.session_revision(),
268            SessionObservationEventPayload::QueueChanged { kind, batch_ids },
269        ) {
270            tracing::warn!(
271                session_id = %observation.session_id(),
272                error = %err,
273                "failed to append queue observation event; reconnect may require gap recovery",
274            );
275        }
276    }
277
278    pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
279        let observation = self.observe();
280        if let Err(err) = self.live_replay_store.append(
281            observation.session_id(),
282            observation.session_revision(),
283            SessionObservationEventPayload::ProcessChanged { kind, process_ids },
284        ) {
285            tracing::warn!(
286                session_id = %observation.session_id(),
287                error = %err,
288                "failed to append process observation event; reconnect may require gap recovery",
289            );
290        }
291    }
292
293    pub fn current_session_observation(&self) -> SessionObservation {
294        let observation = self.observe();
295        self.session_observation_from(observation.as_ref())
296    }
297
298    pub fn resume_session_observation(
299        &self,
300        cursor: &SessionCursor,
301    ) -> Result<SessionResume, LiveReplayStoreError> {
302        let observation = self.observe();
303        cursor.parse_for_session(observation.session_id())?;
304        match self.live_replay_store.replay_after_cursor(cursor)? {
305            LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
306            LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
307                gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
308                observation: self.session_observation_from(observation.as_ref()),
309            }),
310        }
311    }
312
313    pub fn subscribe_session_observation(
314        &self,
315        cursor: &SessionCursor,
316    ) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
317        let observation = self.observe();
318        cursor.parse_for_session(observation.session_id())?;
319        match self.live_replay_store.subscribe_after_cursor(cursor)? {
320            LiveReplaySubscribeResult::Subscribed(subscription) => {
321                Ok(SessionObservationSubscription::Subscribed(subscription))
322            }
323            LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
324                gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
325                observation: self.session_observation_from(observation.as_ref()),
326            }),
327        }
328    }
329
330    fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
331        SessionObservation {
332            read_view: observation.read_view.clone(),
333            cursor: self
334                .live_replay_store
335                .current_cursor(observation.session_id(), observation.session_revision()),
336        }
337    }
338
339    fn live_replay_gap(
340        &self,
341        requested_cursor: &SessionCursor,
342        reason: LiveReplayGapReason,
343        observation: &RuntimeObservation,
344    ) -> LiveReplayGap {
345        let latest_cursor = self
346            .live_replay_store
347            .current_cursor(observation.session_id(), observation.session_revision());
348        LiveReplayGap {
349            session_id: observation.session_id().to_string(),
350            requested_cursor: requested_cursor.clone(),
351            latest_cursor,
352            latest_revision: observation.session_revision(),
353            reason,
354        }
355    }
356
357    pub async fn enqueue_turn_input(
358        &self,
359        input: crate::TurnInput,
360        delivery_policy: crate::DeliveryPolicy,
361        slot_policy: crate::SlotPolicy,
362        source_key: Option<String>,
363    ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
364        let observation = self.observe();
365        let store = observation
366            .queue_store
367            .clone()
368            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
369        super::session_api::enqueue_turn_input_to_store(
370            observation.session_id.as_ref().to_string(),
371            store,
372            observation.queued_work_poke.clone(),
373            input,
374            delivery_policy,
375            slot_policy,
376            source_key,
377        )
378        .await
379        .inspect(|batch| {
380            self.record_queue_changed(
381                SessionQueueEventKind::Enqueued,
382                vec![batch.batch_id.clone()],
383            );
384        })
385    }
386
387    pub async fn cancel_queued_work_batch(
388        &self,
389        session_id: &str,
390        batch_id: &str,
391    ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
392        let observation = self.observe();
393        let store = observation
394            .queue_store
395            .clone()
396            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
397        store
398            .cancel_queued_work_batch(session_id, batch_id)
399            .await
400            .map_err(|err| {
401                crate::RuntimeError::new(
402                    crate::RuntimeErrorCode::StoreCommitFailed,
403                    err.to_string(),
404                )
405            })
406            .inspect(|batch| {
407                if batch.is_some() {
408                    self.record_queue_changed(
409                        SessionQueueEventKind::Cancelled,
410                        vec![batch_id.to_string()],
411                    );
412                }
413            })
414    }
415
416    pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
417        match Arc::try_unwrap(self.runtime) {
418            Ok(mutex) => Ok(mutex.into_inner()),
419            Err(runtime) => Err(Self {
420                runtime,
421                observation: self.observation,
422                live_replay_store: self.live_replay_store,
423            }),
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    struct PanicLiveReplayStore;
433
434    impl LiveReplayStore for PanicLiveReplayStore {
435        fn append(
436            &self,
437            _session_id: &str,
438            _revision: SessionRevision,
439            _payload: SessionObservationEventPayload,
440        ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
441            panic!("append should not be called by cursor rejection tests")
442        }
443
444        fn replay_after_cursor(
445            &self,
446            _cursor: &SessionCursor,
447        ) -> Result<LiveReplayResult, LiveReplayStoreError> {
448            panic!("replay_after_cursor should not be called for rejected cursors")
449        }
450
451        fn subscribe_after_cursor(
452            &self,
453            _cursor: &SessionCursor,
454        ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
455            panic!("subscribe_after_cursor should not be called for rejected cursors")
456        }
457
458        fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
459            SessionCursor::new(session_id, revision, 0)
460        }
461
462        fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
463            Ok(())
464        }
465    }
466
467    #[tokio::test]
468    async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
469        let runtime = LashRuntime::builder()
470            .with_session_id("session-a")
471            .with_policy(crate::SessionPolicy {
472                model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
473                    .expect("model"),
474                ..Default::default()
475            })
476            .build()
477            .await
478            .expect("runtime");
479        let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
480        let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
481        let malformed = SessionCursor::from_raw_for_testing("bad");
482
483        assert!(matches!(
484            handle.resume_session_observation(&wrong_session),
485            Err(LiveReplayStoreError::Cursor(
486                SessionCursorError::WrongSession { .. }
487            ))
488        ));
489        assert!(matches!(
490            handle.subscribe_session_observation(&wrong_session),
491            Err(LiveReplayStoreError::Cursor(
492                SessionCursorError::WrongSession { .. }
493            ))
494        ));
495        assert!(matches!(
496            handle.resume_session_observation(&malformed),
497            Err(LiveReplayStoreError::Cursor(
498                SessionCursorError::Malformed { .. }
499            ))
500        ));
501        assert!(matches!(
502            handle.subscribe_session_observation(&malformed),
503            Err(LiveReplayStoreError::Cursor(
504                SessionCursorError::Malformed { .. }
505            ))
506        ));
507    }
508}