Skip to main content

lash_core/runtime/
observation.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use tokio::sync::Mutex;
5
6use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
7
8#[derive(Clone)]
9pub struct RuntimeObservation {
10    pub session_id: Arc<str>,
11    pub policy: crate::SessionPolicy,
12    pub read_view: crate::SessionReadView,
13    pub persisted_state: super::RuntimeSessionState,
14    pub usage_report: super::SessionUsageReport,
15    pub tool_state: Option<crate::ToolState>,
16    pub tool_catalog: Arc<Vec<serde_json::Value>>,
17    pub tool_catalog_error: Option<String>,
18    pub process_registry: Option<Arc<dyn ProcessRegistry>>,
19    pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
20    pub queued_work_poke: Option<super::QueuedWorkPoke>,
21}
22
23impl RuntimeObservation {
24    fn from_runtime(runtime: &LashRuntime, previous: Option<&RuntimeObservation>) -> Self {
25        let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
26            Ok(catalog) => (catalog, None),
27            Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
28        };
29        let tool_state_generation = runtime
30            .session
31            .as_ref()
32            .map(|session| session.plugins().tool_registry().generation());
33        let tool_state = match (
34            tool_state_generation,
35            previous.and_then(|observation| observation.tool_state.as_ref()),
36        ) {
37            (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
38                Some(snapshot.clone())
39            }
40            (Some(_), _) => match runtime.tool_state() {
41                Ok(state) => Some(state),
42                Err(err) => {
43                    tracing::warn!(
44                        session_id = %runtime.session_id(),
45                        error = %err,
46                        "failed to capture tool state for observation; omitting the snapshot",
47                    );
48                    None
49                }
50            },
51            (None, _) => None,
52        };
53        Self {
54            session_id: Arc::from(runtime.session_id()),
55            policy: runtime.read_view().policy().clone(),
56            read_view: runtime.read_view(),
57            persisted_state: runtime.export_persisted_state(),
58            usage_report: runtime.usage_report(),
59            tool_state,
60            tool_catalog,
61            tool_catalog_error,
62            process_registry: runtime.host.process_registry.clone(),
63            queue_store: runtime
64                .session
65                .as_ref()
66                .and_then(|session| session.history_store()),
67            queued_work_poke: runtime.host.queued_work_poke.clone(),
68        }
69    }
70
71    pub fn session_id(&self) -> &str {
72        &self.session_id
73    }
74
75    pub fn process_scope(&self) -> crate::ProcessScope {
76        crate::ProcessScope::new(self.session_id.as_ref())
77    }
78
79    pub fn process_scope_id(&self) -> crate::ProcessScopeId {
80        self.process_scope().id()
81    }
82
83    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
84        let Some(executor) = self.process_registry.as_ref() else {
85            return Vec::new();
86        };
87        self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
88            .await
89    }
90
91    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
92        let Some(executor) = self.process_registry.as_ref() else {
93            return Vec::new();
94        };
95        self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
96            .await
97    }
98
99    async fn list_process_handles_with_mode(
100        &self,
101        executor: &Arc<dyn crate::ProcessRegistry>,
102        mode: crate::ProcessListMode,
103    ) -> Vec<ProcessHandleSummary> {
104        let root_scope = self.process_scope();
105        let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
106        let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
107        if !agent_frame_id.is_empty() {
108            let frame_scope =
109                crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
110            if frame_scope.id() != root_scope.id() {
111                entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
112                entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
113                entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
114            }
115        }
116        entries
117            .into_iter()
118            .map(ProcessHandleSummary::from)
119            .collect()
120    }
121}
122
123async fn list_scope_process_handles(
124    executor: &Arc<dyn crate::ProcessRegistry>,
125    scope: &crate::ProcessScope,
126    mode: crate::ProcessListMode,
127) -> Vec<ProcessHandleGrantEntry> {
128    match mode {
129        crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
130        crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
131    }
132    .unwrap_or_default()
133}
134
135#[derive(Clone)]
136pub struct RuntimeHandle {
137    pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
138    observation: Arc<ArcSwap<RuntimeObservation>>,
139}
140
141impl RuntimeHandle {
142    pub fn new(runtime: LashRuntime) -> Self {
143        let observation = RuntimeObservation::from_runtime(&runtime, None);
144        Self {
145            runtime: Arc::new(Mutex::new(runtime)),
146            observation: Arc::new(ArcSwap::from_pointee(observation)),
147        }
148    }
149
150    pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
151        Arc::clone(&self.runtime)
152    }
153
154    pub fn observe(&self) -> Arc<RuntimeObservation> {
155        self.observation.load_full()
156    }
157
158    pub fn publish_from(&self, runtime: &LashRuntime) {
159        let previous = self.observation.load_full();
160        self.observation
161            .store(Arc::new(RuntimeObservation::from_runtime(
162                runtime,
163                Some(previous.as_ref()),
164            )));
165    }
166
167    pub async fn enqueue_turn_input(
168        &self,
169        input: crate::TurnInput,
170        delivery_policy: crate::DeliveryPolicy,
171        slot_policy: crate::SlotPolicy,
172        source_key: Option<String>,
173    ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
174        let observation = self.observe();
175        let store = observation
176            .queue_store
177            .clone()
178            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
179        super::session_api::enqueue_turn_input_to_store(
180            observation.session_id.as_ref().to_string(),
181            store,
182            observation.queued_work_poke.clone(),
183            input,
184            delivery_policy,
185            slot_policy,
186            source_key,
187        )
188        .await
189    }
190
191    pub async fn cancel_queued_work_batch(
192        &self,
193        session_id: &str,
194        batch_id: &str,
195    ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
196        let observation = self.observe();
197        let store = observation
198            .queue_store
199            .clone()
200            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
201        store
202            .cancel_queued_work_batch(session_id, batch_id)
203            .await
204            .map_err(|err| {
205                crate::RuntimeError::new(
206                    crate::RuntimeErrorCode::StoreCommitFailed,
207                    err.to_string(),
208                )
209            })
210    }
211
212    pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
213        match Arc::try_unwrap(self.runtime) {
214            Ok(mutex) => Ok(mutex.into_inner()),
215            Err(runtime) => Err(Self {
216                runtime,
217                observation: self.observation,
218            }),
219        }
220    }
221}