1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use tokio::sync::Mutex;
5
6use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
7
8#[derive(Clone)]
9pub struct RuntimeObservation {
10 pub session_id: Arc<str>,
11 pub policy: crate::SessionPolicy,
12 pub read_view: crate::SessionReadView,
13 pub persisted_state: super::RuntimeSessionState,
14 pub usage_report: super::SessionUsageReport,
15 pub tool_state: Option<crate::ToolState>,
16 pub tool_catalog: Arc<Vec<serde_json::Value>>,
17 pub tool_catalog_error: Option<String>,
18 pub process_registry: Option<Arc<dyn ProcessRegistry>>,
19 pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
20 pub queued_work_poke: Option<super::QueuedWorkPoke>,
21}
22
23impl RuntimeObservation {
24 fn from_runtime(runtime: &LashRuntime, previous: Option<&RuntimeObservation>) -> Self {
25 let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
26 Ok(catalog) => (catalog, None),
27 Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
28 };
29 let tool_state_generation = runtime
30 .session
31 .as_ref()
32 .map(|session| session.plugins().tool_registry().generation());
33 let tool_state = match (
34 tool_state_generation,
35 previous.and_then(|observation| observation.tool_state.as_ref()),
36 ) {
37 (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
38 Some(snapshot.clone())
39 }
40 (Some(_), _) => match runtime.tool_state() {
41 Ok(state) => Some(state),
42 Err(err) => {
43 tracing::warn!(
44 session_id = %runtime.session_id(),
45 error = %err,
46 "failed to capture tool state for observation; omitting the snapshot",
47 );
48 None
49 }
50 },
51 (None, _) => None,
52 };
53 Self {
54 session_id: Arc::from(runtime.session_id()),
55 policy: runtime.read_view().policy().clone(),
56 read_view: runtime.read_view(),
57 persisted_state: runtime.export_persisted_state(),
58 usage_report: runtime.usage_report(),
59 tool_state,
60 tool_catalog,
61 tool_catalog_error,
62 process_registry: runtime.host.process_registry.clone(),
63 queue_store: runtime
64 .session
65 .as_ref()
66 .and_then(|session| session.history_store()),
67 queued_work_poke: runtime.host.queued_work_poke.clone(),
68 }
69 }
70
71 pub fn session_id(&self) -> &str {
72 &self.session_id
73 }
74
75 pub fn process_scope(&self) -> crate::ProcessScope {
76 crate::ProcessScope::new(self.session_id.as_ref())
77 }
78
79 pub fn process_scope_id(&self) -> crate::ProcessScopeId {
80 self.process_scope().id()
81 }
82
83 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
84 let Some(executor) = self.process_registry.as_ref() else {
85 return Vec::new();
86 };
87 self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
88 .await
89 }
90
91 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
92 let Some(executor) = self.process_registry.as_ref() else {
93 return Vec::new();
94 };
95 self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
96 .await
97 }
98
99 async fn list_process_handles_with_mode(
100 &self,
101 executor: &Arc<dyn crate::ProcessRegistry>,
102 mode: crate::ProcessListMode,
103 ) -> Vec<ProcessHandleSummary> {
104 let root_scope = self.process_scope();
105 let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
106 let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
107 if !agent_frame_id.is_empty() {
108 let frame_scope =
109 crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
110 if frame_scope.id() != root_scope.id() {
111 entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
112 entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
113 entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
114 }
115 }
116 entries
117 .into_iter()
118 .map(ProcessHandleSummary::from)
119 .collect()
120 }
121}
122
123async fn list_scope_process_handles(
124 executor: &Arc<dyn crate::ProcessRegistry>,
125 scope: &crate::ProcessScope,
126 mode: crate::ProcessListMode,
127) -> Vec<ProcessHandleGrantEntry> {
128 match mode {
129 crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
130 crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
131 }
132 .unwrap_or_default()
133}
134
135#[derive(Clone)]
136pub struct RuntimeHandle {
137 pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
138 observation: Arc<ArcSwap<RuntimeObservation>>,
139}
140
141impl RuntimeHandle {
142 pub fn new(runtime: LashRuntime) -> Self {
143 let observation = RuntimeObservation::from_runtime(&runtime, None);
144 Self {
145 runtime: Arc::new(Mutex::new(runtime)),
146 observation: Arc::new(ArcSwap::from_pointee(observation)),
147 }
148 }
149
150 pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
151 Arc::clone(&self.runtime)
152 }
153
154 pub fn observe(&self) -> Arc<RuntimeObservation> {
155 self.observation.load_full()
156 }
157
158 pub fn publish_from(&self, runtime: &LashRuntime) {
159 let previous = self.observation.load_full();
160 self.observation
161 .store(Arc::new(RuntimeObservation::from_runtime(
162 runtime,
163 Some(previous.as_ref()),
164 )));
165 }
166
167 pub async fn enqueue_turn_input(
168 &self,
169 input: crate::TurnInput,
170 delivery_policy: crate::DeliveryPolicy,
171 slot_policy: crate::SlotPolicy,
172 source_key: Option<String>,
173 ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
174 let observation = self.observe();
175 let store = observation
176 .queue_store
177 .clone()
178 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
179 super::session_api::enqueue_turn_input_to_store(
180 observation.session_id.as_ref().to_string(),
181 store,
182 observation.queued_work_poke.clone(),
183 input,
184 delivery_policy,
185 slot_policy,
186 source_key,
187 )
188 .await
189 }
190
191 pub async fn cancel_queued_work_batch(
192 &self,
193 session_id: &str,
194 batch_id: &str,
195 ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
196 let observation = self.observe();
197 let store = observation
198 .queue_store
199 .clone()
200 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
201 store
202 .cancel_queued_work_batch(session_id, batch_id)
203 .await
204 .map_err(|err| {
205 crate::RuntimeError::new(
206 crate::RuntimeErrorCode::StoreCommitFailed,
207 err.to_string(),
208 )
209 })
210 }
211
212 pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
213 match Arc::try_unwrap(self.runtime) {
214 Ok(mutex) => Ok(mutex.into_inner()),
215 Err(runtime) => Err(Self {
216 runtime,
217 observation: self.observation,
218 }),
219 }
220 }
221}