Skip to main content

lash_core/runtime/
observation.rs

1mod replay;
2
3use arc_swap::ArcSwap;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
8
9pub use replay::{
10    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
11    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
12    LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
13    SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
14    SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
15};
16
17#[derive(Clone)]
18pub struct RuntimeObservation {
19    pub session_id: Arc<str>,
20    pub revision: SessionRevision,
21    pub cursor: SessionCursor,
22    pub policy: crate::SessionPolicy,
23    pub read_view: crate::SessionReadView,
24    pub persisted_state: super::RuntimeSessionState,
25    pub usage_report: super::SessionUsageReport,
26    pub tool_state: Option<crate::ToolState>,
27    pub tool_catalog: Arc<Vec<serde_json::Value>>,
28    pub tool_catalog_error: Option<String>,
29    pub plugin_session: Option<Arc<crate::PluginSession>>,
30    pub session_read_service: Option<Arc<dyn crate::plugin::SessionReadService>>,
31    pub process_read_service: Option<Arc<dyn crate::plugin::ProcessReadService>>,
32    pub process_registry: Option<Arc<dyn ProcessRegistry>>,
33    pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
34    pub queued_work_driver: Option<super::QueuedWorkDriver>,
35}
36
37impl RuntimeObservation {
38    fn from_runtime(
39        runtime: &LashRuntime,
40        cursor: SessionCursor,
41        previous: Option<&RuntimeObservation>,
42    ) -> Self {
43        let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
44            Ok(catalog) => (catalog, None),
45            Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
46        };
47        let tool_state_generation = runtime
48            .session
49            .as_ref()
50            .map(|session| session.plugins().tool_registry().generation());
51        let tool_state = match (
52            tool_state_generation,
53            previous.and_then(|observation| observation.tool_state.as_ref()),
54        ) {
55            (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
56                Some(snapshot.clone())
57            }
58            (Some(_), _) => match runtime.tool_state() {
59                Ok(state) => Some(state),
60                Err(err) => {
61                    tracing::warn!(
62                        session_id = %runtime.session_id(),
63                        error = %err,
64                        "failed to capture tool state for observation; omitting the snapshot",
65                    );
66                    None
67                }
68            },
69            (None, _) => None,
70        };
71        let (plugin_session, session_read_service, process_read_service) =
72            match (runtime.session.as_ref(), runtime.runtime_session_services()) {
73                (Some(session), Ok(services)) => (
74                    Some(Arc::clone(session.plugins())),
75                    Some(services.read_service()),
76                    Some(services.process_read_service()),
77                ),
78                (_, Err(err)) => {
79                    tracing::warn!(
80                        session_id = %runtime.session_id(),
81                        error = %err,
82                        "failed to capture plugin query services for observation",
83                    );
84                    (None, None, None)
85                }
86                (None, _) => (None, None, None),
87            };
88        let revision = SessionRevision::from_runtime(runtime);
89        Self {
90            session_id: Arc::from(runtime.session_id()),
91            revision,
92            cursor,
93            policy: runtime.read_view().policy().clone(),
94            read_view: runtime.read_view(),
95            persisted_state: runtime.export_persisted_state(),
96            usage_report: runtime.usage_report(),
97            tool_state,
98            tool_catalog,
99            tool_catalog_error,
100            plugin_session,
101            session_read_service,
102            process_read_service,
103            process_registry: runtime.host.process_registry.clone(),
104            queue_store: runtime
105                .session
106                .as_ref()
107                .and_then(|session| session.history_store()),
108            queued_work_driver: runtime.host.queued_work_driver.clone(),
109        }
110    }
111
112    pub fn session_id(&self) -> &str {
113        &self.session_id
114    }
115
116    pub fn session_revision(&self) -> SessionRevision {
117        self.revision
118    }
119
120    pub fn cursor(&self) -> &SessionCursor {
121        &self.cursor
122    }
123
124    pub fn session_observation(&self) -> SessionObservation {
125        SessionObservation {
126            read_view: self.read_view.clone(),
127            cursor: self.cursor.clone(),
128        }
129    }
130
131    pub fn process_scope(&self) -> crate::SessionScope {
132        crate::SessionScope::new(self.session_id.as_ref())
133    }
134
135    pub fn process_scope_id(&self) -> crate::SessionScopeId {
136        self.process_scope().id()
137    }
138
139    pub async fn query_plugin(
140        &self,
141        name: &str,
142        args: serde_json::Value,
143        session_id: Option<String>,
144    ) -> Result<(String, serde_json::Value), crate::PluginOperationInvokeError> {
145        let Some(plugin_session) = self.plugin_session.as_ref().cloned() else {
146            return Err(crate::PluginOperationInvokeError::Unknown(
147                "runtime session not available".to_string(),
148            ));
149        };
150        let Some(session_read_service) = self.session_read_service.as_ref().cloned() else {
151            return Err(crate::PluginOperationInvokeError::Unknown(
152                "runtime session read service not available".to_string(),
153            ));
154        };
155        let Some(process_read_service) = self.process_read_service.as_ref().cloned() else {
156            return Err(crate::PluginOperationInvokeError::Unknown(
157                "runtime process read service not available".to_string(),
158            ));
159        };
160        plugin_session
161            .query_plugin(
162                name,
163                args,
164                session_id,
165                true,
166                session_read_service,
167                process_read_service,
168            )
169            .await
170    }
171
172    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
173        let Some(executor) = self.process_registry.as_ref() else {
174            return Vec::new();
175        };
176        self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
177            .await
178    }
179
180    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
181        let Some(executor) = self.process_registry.as_ref() else {
182            return Vec::new();
183        };
184        self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
185            .await
186    }
187
188    async fn list_process_handles_with_mode(
189        &self,
190        executor: &Arc<dyn crate::ProcessRegistry>,
191        mode: crate::ProcessListMode,
192    ) -> Vec<ProcessHandleSummary> {
193        let root_scope = self.process_scope();
194        let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
195        let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
196        if !agent_frame_id.is_empty() {
197            let frame_scope =
198                crate::SessionScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
199            if frame_scope.id() != root_scope.id() {
200                entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
201                entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
202                entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
203            }
204        }
205        entries
206            .into_iter()
207            .map(ProcessHandleSummary::from)
208            .collect()
209    }
210}
211
212async fn list_scope_process_handles(
213    executor: &Arc<dyn crate::ProcessRegistry>,
214    scope: &crate::SessionScope,
215    mode: crate::ProcessListMode,
216) -> Vec<ProcessHandleGrantEntry> {
217    match mode {
218        crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
219        crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
220    }
221    .unwrap_or_default()
222}
223
224#[derive(Clone)]
225pub struct RuntimeHandle {
226    pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
227    observation: Arc<ArcSwap<RuntimeObservation>>,
228    live_replay_store: Arc<dyn LiveReplayStore>,
229}
230
231impl RuntimeHandle {
232    pub fn new(runtime: LashRuntime) -> Self {
233        Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
234    }
235
236    pub fn with_live_replay_store(
237        runtime: LashRuntime,
238        live_replay_store: Arc<dyn LiveReplayStore>,
239    ) -> Self {
240        let revision = SessionRevision::from_runtime(&runtime);
241        let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
242        let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
243        Self {
244            runtime: Arc::new(Mutex::new(runtime)),
245            observation: Arc::new(ArcSwap::from_pointee(observation)),
246            live_replay_store,
247        }
248    }
249
250    pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
251        Arc::clone(&self.runtime)
252    }
253
254    pub fn observe(&self) -> Arc<RuntimeObservation> {
255        self.observation.load_full()
256    }
257
258    pub fn publish_from(&self, runtime: &LashRuntime) {
259        let revision = SessionRevision::from_runtime(runtime);
260        let previous = self.observation.load_full();
261        let state = runtime.export_persisted_state();
262        if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
263            && !state.current_agent_frame_id.is_empty()
264            && let Err(err) = self.live_replay_store.append(
265                runtime.session_id(),
266                revision,
267                SessionObservationEventPayload::AgentFrameSwitched {
268                    frame_id: state.current_agent_frame_id.clone(),
269                },
270            )
271        {
272            tracing::warn!(
273                session_id = %runtime.session_id(),
274                error = %err,
275                "failed to append agent-frame observation event; reconnect may require gap recovery",
276            );
277        }
278        let cursor = match self.live_replay_store.append(
279            runtime.session_id(),
280            revision,
281            SessionObservationEventPayload::Committed {
282                read_view: runtime.read_view(),
283            },
284        ) {
285            Ok(event) => event.cursor,
286            Err(err) => {
287                tracing::warn!(
288                    session_id = %runtime.session_id(),
289                    error = %err,
290                    "failed to append session observation commit event; reconnect will fall back to gap recovery",
291                );
292                self.live_replay_store
293                    .current_cursor(runtime.session_id(), revision)
294            }
295        };
296        self.observation
297            .store(Arc::new(RuntimeObservation::from_runtime(
298                runtime,
299                cursor,
300                Some(previous.as_ref()),
301            )));
302    }
303
304    pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
305        let observation = self.observe();
306        if let Err(err) = self.live_replay_store.append(
307            observation.session_id(),
308            observation.session_revision(),
309            SessionObservationEventPayload::TurnActivity(activity),
310        ) {
311            tracing::warn!(
312                session_id = %observation.session_id(),
313                error = %err,
314                "failed to append live turn activity to session observation replay; reconnect may require gap recovery",
315            );
316        }
317    }
318
319    pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
320        let observation = self.observe();
321        if let Err(err) = self.live_replay_store.append(
322            observation.session_id(),
323            observation.session_revision(),
324            SessionObservationEventPayload::QueueChanged { kind, batch_ids },
325        ) {
326            tracing::warn!(
327                session_id = %observation.session_id(),
328                error = %err,
329                "failed to append queue observation event; reconnect may require gap recovery",
330            );
331        }
332    }
333
334    pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
335        let observation = self.observe();
336        if let Err(err) = self.live_replay_store.append(
337            observation.session_id(),
338            observation.session_revision(),
339            SessionObservationEventPayload::ProcessChanged { kind, process_ids },
340        ) {
341            tracing::warn!(
342                session_id = %observation.session_id(),
343                error = %err,
344                "failed to append process observation event; reconnect may require gap recovery",
345            );
346        }
347    }
348
349    pub fn current_session_observation(&self) -> SessionObservation {
350        let observation = self.observe();
351        self.session_observation_from(observation.as_ref())
352    }
353
354    pub fn resume_session_observation(
355        &self,
356        cursor: &SessionCursor,
357    ) -> Result<SessionResume, LiveReplayStoreError> {
358        let observation = self.observe();
359        cursor.parse_for_session(observation.session_id())?;
360        match self.live_replay_store.replay_after_cursor(cursor)? {
361            LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
362            LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
363                gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
364                observation: self.session_observation_from(observation.as_ref()),
365            }),
366        }
367    }
368
369    pub fn subscribe_session_observation(
370        &self,
371        cursor: &SessionCursor,
372    ) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
373        let observation = self.observe();
374        cursor.parse_for_session(observation.session_id())?;
375        match self.live_replay_store.subscribe_after_cursor(cursor)? {
376            LiveReplaySubscribeResult::Subscribed(subscription) => {
377                Ok(SessionObservationSubscription::Subscribed(subscription))
378            }
379            LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
380                gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
381                observation: self.session_observation_from(observation.as_ref()),
382            }),
383        }
384    }
385
386    fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
387        SessionObservation {
388            read_view: observation.read_view.clone(),
389            cursor: self
390                .live_replay_store
391                .current_cursor(observation.session_id(), observation.session_revision()),
392        }
393    }
394
395    fn live_replay_gap(
396        &self,
397        requested_cursor: &SessionCursor,
398        reason: LiveReplayGapReason,
399        observation: &RuntimeObservation,
400    ) -> LiveReplayGap {
401        let latest_cursor = self
402            .live_replay_store
403            .current_cursor(observation.session_id(), observation.session_revision());
404        LiveReplayGap {
405            session_id: observation.session_id().to_string(),
406            requested_cursor: requested_cursor.clone(),
407            latest_cursor,
408            latest_revision: observation.session_revision(),
409            reason,
410        }
411    }
412
413    pub async fn enqueue_turn_input(
414        &self,
415        input: crate::TurnInput,
416        delivery_policy: crate::DeliveryPolicy,
417        slot_policy: crate::SlotPolicy,
418        source_key: Option<String>,
419    ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
420        let observation = self.observe();
421        let store = observation
422            .queue_store
423            .clone()
424            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
425        super::session_api::enqueue_turn_input_to_store(
426            observation.session_id.as_ref().to_string(),
427            store,
428            observation.queued_work_driver.clone(),
429            input,
430            delivery_policy,
431            slot_policy,
432            source_key,
433        )
434        .await
435        .inspect(|batch| {
436            self.record_queue_changed(
437                SessionQueueEventKind::Enqueued,
438                vec![batch.batch_id.clone()],
439            );
440        })
441    }
442
443    pub async fn cancel_queued_work_batch(
444        &self,
445        session_id: &str,
446        batch_id: &str,
447    ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
448        let observation = self.observe();
449        let store = observation
450            .queue_store
451            .clone()
452            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
453        store
454            .cancel_queued_work_batch(session_id, batch_id)
455            .await
456            .map_err(|err| {
457                crate::RuntimeError::new(
458                    crate::RuntimeErrorCode::StoreCommitFailed,
459                    err.to_string(),
460                )
461            })
462            .inspect(|batch| {
463                if batch.is_some() {
464                    self.record_queue_changed(
465                        SessionQueueEventKind::Cancelled,
466                        vec![batch_id.to_string()],
467                    );
468                }
469            })
470    }
471
472    pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
473        match Arc::try_unwrap(self.runtime) {
474            Ok(mutex) => Ok(mutex.into_inner()),
475            Err(runtime) => Err(Self {
476                runtime,
477                observation: self.observation,
478                live_replay_store: self.live_replay_store,
479            }),
480        }
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    struct PanicLiveReplayStore;
489
490    impl LiveReplayStore for PanicLiveReplayStore {
491        fn append(
492            &self,
493            _session_id: &str,
494            _revision: SessionRevision,
495            _payload: SessionObservationEventPayload,
496        ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
497            panic!("append should not be called by cursor rejection tests")
498        }
499
500        fn replay_after_cursor(
501            &self,
502            _cursor: &SessionCursor,
503        ) -> Result<LiveReplayResult, LiveReplayStoreError> {
504            panic!("replay_after_cursor should not be called for rejected cursors")
505        }
506
507        fn subscribe_after_cursor(
508            &self,
509            _cursor: &SessionCursor,
510        ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
511            panic!("subscribe_after_cursor should not be called for rejected cursors")
512        }
513
514        fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
515            SessionCursor::new(session_id, revision, 0)
516        }
517
518        fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
519            Ok(())
520        }
521    }
522
523    #[tokio::test]
524    async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
525        let runtime = LashRuntime::builder()
526            .with_session_id("session-a")
527            .with_policy(crate::SessionPolicy {
528                model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
529                    .expect("model"),
530                ..Default::default()
531            })
532            .build()
533            .await
534            .expect("runtime");
535        let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
536        let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
537        let malformed = SessionCursor::from_raw_for_testing("bad");
538
539        assert!(matches!(
540            handle.resume_session_observation(&wrong_session),
541            Err(LiveReplayStoreError::Cursor(
542                SessionCursorError::WrongSession { .. }
543            ))
544        ));
545        assert!(matches!(
546            handle.subscribe_session_observation(&wrong_session),
547            Err(LiveReplayStoreError::Cursor(
548                SessionCursorError::WrongSession { .. }
549            ))
550        ));
551        assert!(matches!(
552            handle.resume_session_observation(&malformed),
553            Err(LiveReplayStoreError::Cursor(
554                SessionCursorError::Malformed { .. }
555            ))
556        ));
557        assert!(matches!(
558            handle.subscribe_session_observation(&malformed),
559            Err(LiveReplayStoreError::Cursor(
560                SessionCursorError::Malformed { .. }
561            ))
562        ));
563    }
564}