1mod replay;
2
3use arc_swap::ArcSwap;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
8
9pub use replay::{
10 InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
11 LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
12 LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
13 SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
14 SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
15};
16
17#[derive(Clone)]
18pub struct RuntimeObservation {
19 pub session_id: Arc<str>,
20 pub revision: SessionRevision,
21 pub cursor: SessionCursor,
22 pub policy: crate::SessionPolicy,
23 pub read_view: crate::SessionReadView,
24 pub persisted_state: super::RuntimeSessionState,
25 pub usage_report: super::SessionUsageReport,
26 pub tool_state: Option<crate::ToolState>,
27 pub tool_catalog: Arc<Vec<serde_json::Value>>,
28 pub tool_catalog_error: Option<String>,
29 pub plugin_session: Option<Arc<crate::PluginSession>>,
30 pub session_read_service: Option<Arc<dyn crate::plugin::SessionReadService>>,
31 pub process_read_service: Option<Arc<dyn crate::plugin::ProcessReadService>>,
32 pub process_registry: Option<Arc<dyn ProcessRegistry>>,
33 pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
34 pub queued_work_driver: Option<super::QueuedWorkDriver>,
35}
36
37impl RuntimeObservation {
38 fn from_runtime(
39 runtime: &LashRuntime,
40 cursor: SessionCursor,
41 previous: Option<&RuntimeObservation>,
42 ) -> Self {
43 let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
44 Ok(catalog) => (catalog, None),
45 Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
46 };
47 let tool_state_generation = runtime
48 .session
49 .as_ref()
50 .map(|session| session.plugins().tool_registry().generation());
51 let tool_state = match (
52 tool_state_generation,
53 previous.and_then(|observation| observation.tool_state.as_ref()),
54 ) {
55 (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
56 Some(snapshot.clone())
57 }
58 (Some(_), _) => match runtime.tool_state() {
59 Ok(state) => Some(state),
60 Err(err) => {
61 tracing::warn!(
62 session_id = %runtime.session_id(),
63 error = %err,
64 "failed to capture tool state for observation; omitting the snapshot",
65 );
66 None
67 }
68 },
69 (None, _) => None,
70 };
71 let (plugin_session, session_read_service, process_read_service) =
72 match (runtime.session.as_ref(), runtime.runtime_session_services()) {
73 (Some(session), Ok(services)) => (
74 Some(Arc::clone(session.plugins())),
75 Some(services.read_service()),
76 Some(services.process_read_service()),
77 ),
78 (_, Err(err)) => {
79 tracing::warn!(
80 session_id = %runtime.session_id(),
81 error = %err,
82 "failed to capture plugin query services for observation",
83 );
84 (None, None, None)
85 }
86 (None, _) => (None, None, None),
87 };
88 let revision = SessionRevision::from_runtime(runtime);
89 Self {
90 session_id: Arc::from(runtime.session_id()),
91 revision,
92 cursor,
93 policy: runtime.read_view().policy().clone(),
94 read_view: runtime.read_view(),
95 persisted_state: runtime.export_persisted_state(),
96 usage_report: runtime.usage_report(),
97 tool_state,
98 tool_catalog,
99 tool_catalog_error,
100 plugin_session,
101 session_read_service,
102 process_read_service,
103 process_registry: runtime.host.process_registry.clone(),
104 queue_store: runtime
105 .session
106 .as_ref()
107 .and_then(|session| session.history_store()),
108 queued_work_driver: runtime.host.queued_work_driver.clone(),
109 }
110 }
111
112 pub fn session_id(&self) -> &str {
113 &self.session_id
114 }
115
116 pub fn session_revision(&self) -> SessionRevision {
117 self.revision
118 }
119
120 pub fn cursor(&self) -> &SessionCursor {
121 &self.cursor
122 }
123
124 pub fn session_observation(&self) -> SessionObservation {
125 SessionObservation {
126 read_view: self.read_view.clone(),
127 cursor: self.cursor.clone(),
128 }
129 }
130
131 pub fn process_scope(&self) -> crate::SessionScope {
132 crate::SessionScope::new(self.session_id.as_ref())
133 }
134
135 pub fn process_scope_id(&self) -> crate::SessionScopeId {
136 self.process_scope().id()
137 }
138
139 pub async fn query_plugin(
140 &self,
141 name: &str,
142 args: serde_json::Value,
143 session_id: Option<String>,
144 ) -> Result<(String, serde_json::Value), crate::PluginOperationInvokeError> {
145 let Some(plugin_session) = self.plugin_session.as_ref().cloned() else {
146 return Err(crate::PluginOperationInvokeError::Unknown(
147 "runtime session not available".to_string(),
148 ));
149 };
150 let Some(session_read_service) = self.session_read_service.as_ref().cloned() else {
151 return Err(crate::PluginOperationInvokeError::Unknown(
152 "runtime session read service not available".to_string(),
153 ));
154 };
155 let Some(process_read_service) = self.process_read_service.as_ref().cloned() else {
156 return Err(crate::PluginOperationInvokeError::Unknown(
157 "runtime process read service not available".to_string(),
158 ));
159 };
160 plugin_session
161 .query_plugin(
162 name,
163 args,
164 session_id,
165 true,
166 session_read_service,
167 process_read_service,
168 )
169 .await
170 }
171
172 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
173 let Some(executor) = self.process_registry.as_ref() else {
174 return Vec::new();
175 };
176 self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
177 .await
178 }
179
180 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
181 let Some(executor) = self.process_registry.as_ref() else {
182 return Vec::new();
183 };
184 self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
185 .await
186 }
187
188 async fn list_process_handles_with_mode(
189 &self,
190 executor: &Arc<dyn crate::ProcessRegistry>,
191 mode: crate::ProcessListMode,
192 ) -> Vec<ProcessHandleSummary> {
193 let root_scope = self.process_scope();
194 let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
195 let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
196 if !agent_frame_id.is_empty() {
197 let frame_scope =
198 crate::SessionScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
199 if frame_scope.id() != root_scope.id() {
200 entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
201 entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
202 entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
203 }
204 }
205 entries
206 .into_iter()
207 .map(ProcessHandleSummary::from)
208 .collect()
209 }
210}
211
212async fn list_scope_process_handles(
213 executor: &Arc<dyn crate::ProcessRegistry>,
214 scope: &crate::SessionScope,
215 mode: crate::ProcessListMode,
216) -> Vec<ProcessHandleGrantEntry> {
217 match mode {
218 crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
219 crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
220 }
221 .unwrap_or_default()
222}
223
224#[derive(Clone)]
225pub struct RuntimeHandle {
226 pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
227 observation: Arc<ArcSwap<RuntimeObservation>>,
228 live_replay_store: Arc<dyn LiveReplayStore>,
229}
230
231impl RuntimeHandle {
232 pub fn new(runtime: LashRuntime) -> Self {
233 Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
234 }
235
236 pub fn with_live_replay_store(
237 runtime: LashRuntime,
238 live_replay_store: Arc<dyn LiveReplayStore>,
239 ) -> Self {
240 let revision = SessionRevision::from_runtime(&runtime);
241 let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
242 let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
243 Self {
244 runtime: Arc::new(Mutex::new(runtime)),
245 observation: Arc::new(ArcSwap::from_pointee(observation)),
246 live_replay_store,
247 }
248 }
249
250 pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
251 Arc::clone(&self.runtime)
252 }
253
254 pub fn observe(&self) -> Arc<RuntimeObservation> {
255 self.observation.load_full()
256 }
257
258 pub fn publish_from(&self, runtime: &LashRuntime) {
259 let revision = SessionRevision::from_runtime(runtime);
260 let previous = self.observation.load_full();
261 let state = runtime.export_persisted_state();
262 if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
263 && !state.current_agent_frame_id.is_empty()
264 && let Err(err) = self.live_replay_store.append(
265 runtime.session_id(),
266 revision,
267 SessionObservationEventPayload::AgentFrameSwitched {
268 frame_id: state.current_agent_frame_id.clone(),
269 },
270 )
271 {
272 tracing::warn!(
273 session_id = %runtime.session_id(),
274 error = %err,
275 "failed to append agent-frame observation event; reconnect may require gap recovery",
276 );
277 }
278 let cursor = match self.live_replay_store.append(
279 runtime.session_id(),
280 revision,
281 SessionObservationEventPayload::Committed {
282 read_view: runtime.read_view(),
283 },
284 ) {
285 Ok(event) => event.cursor,
286 Err(err) => {
287 tracing::warn!(
288 session_id = %runtime.session_id(),
289 error = %err,
290 "failed to append session observation commit event; reconnect will fall back to gap recovery",
291 );
292 self.live_replay_store
293 .current_cursor(runtime.session_id(), revision)
294 }
295 };
296 self.observation
297 .store(Arc::new(RuntimeObservation::from_runtime(
298 runtime,
299 cursor,
300 Some(previous.as_ref()),
301 )));
302 }
303
304 pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
305 let observation = self.observe();
306 if let Err(err) = self.live_replay_store.append(
307 observation.session_id(),
308 observation.session_revision(),
309 SessionObservationEventPayload::TurnActivity(activity),
310 ) {
311 tracing::warn!(
312 session_id = %observation.session_id(),
313 error = %err,
314 "failed to append live turn activity to session observation replay; reconnect may require gap recovery",
315 );
316 }
317 }
318
319 pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
320 let observation = self.observe();
321 if let Err(err) = self.live_replay_store.append(
322 observation.session_id(),
323 observation.session_revision(),
324 SessionObservationEventPayload::QueueChanged { kind, batch_ids },
325 ) {
326 tracing::warn!(
327 session_id = %observation.session_id(),
328 error = %err,
329 "failed to append queue observation event; reconnect may require gap recovery",
330 );
331 }
332 }
333
334 pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
335 let observation = self.observe();
336 if let Err(err) = self.live_replay_store.append(
337 observation.session_id(),
338 observation.session_revision(),
339 SessionObservationEventPayload::ProcessChanged { kind, process_ids },
340 ) {
341 tracing::warn!(
342 session_id = %observation.session_id(),
343 error = %err,
344 "failed to append process observation event; reconnect may require gap recovery",
345 );
346 }
347 }
348
349 pub fn current_session_observation(&self) -> SessionObservation {
350 let observation = self.observe();
351 self.session_observation_from(observation.as_ref())
352 }
353
354 pub fn resume_session_observation(
355 &self,
356 cursor: &SessionCursor,
357 ) -> Result<SessionResume, LiveReplayStoreError> {
358 let observation = self.observe();
359 cursor.parse_for_session(observation.session_id())?;
360 match self.live_replay_store.replay_after_cursor(cursor)? {
361 LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
362 LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
363 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
364 observation: self.session_observation_from(observation.as_ref()),
365 }),
366 }
367 }
368
369 pub fn subscribe_session_observation(
370 &self,
371 cursor: &SessionCursor,
372 ) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
373 let observation = self.observe();
374 cursor.parse_for_session(observation.session_id())?;
375 match self.live_replay_store.subscribe_after_cursor(cursor)? {
376 LiveReplaySubscribeResult::Subscribed(subscription) => {
377 Ok(SessionObservationSubscription::Subscribed(subscription))
378 }
379 LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
380 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
381 observation: self.session_observation_from(observation.as_ref()),
382 }),
383 }
384 }
385
386 fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
387 SessionObservation {
388 read_view: observation.read_view.clone(),
389 cursor: self
390 .live_replay_store
391 .current_cursor(observation.session_id(), observation.session_revision()),
392 }
393 }
394
395 fn live_replay_gap(
396 &self,
397 requested_cursor: &SessionCursor,
398 reason: LiveReplayGapReason,
399 observation: &RuntimeObservation,
400 ) -> LiveReplayGap {
401 let latest_cursor = self
402 .live_replay_store
403 .current_cursor(observation.session_id(), observation.session_revision());
404 LiveReplayGap {
405 session_id: observation.session_id().to_string(),
406 requested_cursor: requested_cursor.clone(),
407 latest_cursor,
408 latest_revision: observation.session_revision(),
409 reason,
410 }
411 }
412
413 pub async fn enqueue_turn_input(
414 &self,
415 input: crate::TurnInput,
416 delivery_policy: crate::DeliveryPolicy,
417 slot_policy: crate::SlotPolicy,
418 source_key: Option<String>,
419 ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
420 let observation = self.observe();
421 let store = observation
422 .queue_store
423 .clone()
424 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
425 super::session_api::enqueue_turn_input_to_store(
426 observation.session_id.as_ref().to_string(),
427 store,
428 observation.queued_work_driver.clone(),
429 input,
430 delivery_policy,
431 slot_policy,
432 source_key,
433 )
434 .await
435 .inspect(|batch| {
436 self.record_queue_changed(
437 SessionQueueEventKind::Enqueued,
438 vec![batch.batch_id.clone()],
439 );
440 })
441 }
442
443 pub async fn cancel_queued_work_batch(
444 &self,
445 session_id: &str,
446 batch_id: &str,
447 ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
448 let observation = self.observe();
449 let store = observation
450 .queue_store
451 .clone()
452 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
453 store
454 .cancel_queued_work_batch(session_id, batch_id)
455 .await
456 .map_err(|err| {
457 crate::RuntimeError::new(
458 crate::RuntimeErrorCode::StoreCommitFailed,
459 err.to_string(),
460 )
461 })
462 .inspect(|batch| {
463 if batch.is_some() {
464 self.record_queue_changed(
465 SessionQueueEventKind::Cancelled,
466 vec![batch_id.to_string()],
467 );
468 }
469 })
470 }
471
472 pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
473 match Arc::try_unwrap(self.runtime) {
474 Ok(mutex) => Ok(mutex.into_inner()),
475 Err(runtime) => Err(Self {
476 runtime,
477 observation: self.observation,
478 live_replay_store: self.live_replay_store,
479 }),
480 }
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 struct PanicLiveReplayStore;
489
490 impl LiveReplayStore for PanicLiveReplayStore {
491 fn append(
492 &self,
493 _session_id: &str,
494 _revision: SessionRevision,
495 _payload: SessionObservationEventPayload,
496 ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
497 panic!("append should not be called by cursor rejection tests")
498 }
499
500 fn replay_after_cursor(
501 &self,
502 _cursor: &SessionCursor,
503 ) -> Result<LiveReplayResult, LiveReplayStoreError> {
504 panic!("replay_after_cursor should not be called for rejected cursors")
505 }
506
507 fn subscribe_after_cursor(
508 &self,
509 _cursor: &SessionCursor,
510 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
511 panic!("subscribe_after_cursor should not be called for rejected cursors")
512 }
513
514 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
515 SessionCursor::new(session_id, revision, 0)
516 }
517
518 fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
519 Ok(())
520 }
521 }
522
523 #[tokio::test]
524 async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
525 let runtime = LashRuntime::builder()
526 .with_session_id("session-a")
527 .with_policy(crate::SessionPolicy {
528 model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
529 .expect("model"),
530 ..Default::default()
531 })
532 .build()
533 .await
534 .expect("runtime");
535 let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
536 let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
537 let malformed = SessionCursor::from_raw_for_testing("bad");
538
539 assert!(matches!(
540 handle.resume_session_observation(&wrong_session),
541 Err(LiveReplayStoreError::Cursor(
542 SessionCursorError::WrongSession { .. }
543 ))
544 ));
545 assert!(matches!(
546 handle.subscribe_session_observation(&wrong_session),
547 Err(LiveReplayStoreError::Cursor(
548 SessionCursorError::WrongSession { .. }
549 ))
550 ));
551 assert!(matches!(
552 handle.resume_session_observation(&malformed),
553 Err(LiveReplayStoreError::Cursor(
554 SessionCursorError::Malformed { .. }
555 ))
556 ));
557 assert!(matches!(
558 handle.subscribe_session_observation(&malformed),
559 Err(LiveReplayStoreError::Cursor(
560 SessionCursorError::Malformed { .. }
561 ))
562 ));
563 }
564}