1mod replay;
2
3use arc_swap::ArcSwap;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
8
9pub use replay::{
10 InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
11 LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
12 LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
13 SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
14 SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
15};
16
17#[derive(Clone)]
18pub struct RuntimeObservation {
19 pub session_id: Arc<str>,
20 pub revision: SessionRevision,
21 pub cursor: SessionCursor,
22 pub policy: crate::SessionPolicy,
23 pub read_view: crate::SessionReadView,
24 pub persisted_state: super::RuntimeSessionState,
25 pub usage_report: super::SessionUsageReport,
26 pub tool_state: Option<crate::ToolState>,
27 pub tool_catalog: Arc<Vec<serde_json::Value>>,
28 pub tool_catalog_error: Option<String>,
29 pub plugin_session: Option<Arc<crate::PluginSession>>,
30 pub session_read_service: Option<Arc<dyn crate::plugin::SessionReadService>>,
31 pub process_read_service: Option<Arc<dyn crate::plugin::ProcessReadService>>,
32 pub process_registry: Option<Arc<dyn ProcessRegistry>>,
33 pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
34 pub queued_work_driver: Option<super::QueuedWorkDriver>,
35}
36
37impl RuntimeObservation {
38 fn from_runtime(
39 runtime: &LashRuntime,
40 cursor: SessionCursor,
41 previous: Option<&RuntimeObservation>,
42 ) -> Self {
43 let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
44 Ok(catalog) => (catalog, None),
45 Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
46 };
47 let tool_state_generation = runtime
48 .session
49 .as_ref()
50 .map(|session| session.plugins().tool_registry().generation());
51 let tool_state = match (
52 tool_state_generation,
53 previous.and_then(|observation| observation.tool_state.as_ref()),
54 ) {
55 (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
56 Some(snapshot.clone())
57 }
58 (Some(_), _) => match runtime.tool_state() {
59 Ok(state) => Some(state),
60 Err(err) => {
61 tracing::warn!(
62 session_id = %runtime.session_id(),
63 error = %err,
64 "failed to capture tool state for observation; omitting the snapshot",
65 );
66 None
67 }
68 },
69 (None, _) => None,
70 };
71 let (plugin_session, session_read_service, process_read_service) =
72 match (runtime.session.as_ref(), runtime.runtime_session_services()) {
73 (Some(session), Ok(services)) => (
74 Some(Arc::clone(session.plugins())),
75 Some(services.read_service()),
76 Some(services.process_read_service()),
77 ),
78 (_, Err(err)) => {
79 tracing::warn!(
80 session_id = %runtime.session_id(),
81 error = %err,
82 "failed to capture plugin query services for observation",
83 );
84 (None, None, None)
85 }
86 (None, _) => (None, None, None),
87 };
88 let revision = SessionRevision::from_runtime(runtime);
89 Self {
90 session_id: Arc::from(runtime.session_id()),
91 revision,
92 cursor,
93 policy: runtime.read_view().policy().clone(),
94 read_view: runtime.read_view(),
95 persisted_state: runtime.export_persisted_state(),
96 usage_report: runtime.usage_report(),
97 tool_state,
98 tool_catalog,
99 tool_catalog_error,
100 plugin_session,
101 session_read_service,
102 process_read_service,
103 process_registry: runtime.host.process_registry.clone(),
104 queue_store: runtime
105 .session
106 .as_ref()
107 .and_then(|session| session.history_store()),
108 queued_work_driver: runtime.host.queued_work_driver.clone(),
109 }
110 }
111
112 pub fn session_id(&self) -> &str {
113 &self.session_id
114 }
115
116 pub fn session_revision(&self) -> SessionRevision {
117 self.revision
118 }
119
120 pub fn cursor(&self) -> &SessionCursor {
121 &self.cursor
122 }
123
124 pub fn session_observation(&self) -> SessionObservation {
125 SessionObservation {
126 read_view: self.read_view.clone(),
127 cursor: self.cursor.clone(),
128 }
129 }
130
131 pub fn process_scope(&self) -> crate::SessionScope {
132 crate::SessionScope::new(self.session_id.as_ref())
133 }
134
135 pub fn process_scope_id(&self) -> crate::SessionScopeId {
136 self.process_scope().id()
137 }
138
139 pub async fn query_plugin(
140 &self,
141 name: &str,
142 args: serde_json::Value,
143 session_id: Option<String>,
144 ) -> Result<(String, serde_json::Value), crate::PluginOperationInvokeError> {
145 let Some(plugin_session) = self.plugin_session.as_ref().cloned() else {
146 return Err(crate::PluginOperationInvokeError::Unknown(
147 "runtime session not available".to_string(),
148 ));
149 };
150 let Some(session_read_service) = self.session_read_service.as_ref().cloned() else {
151 return Err(crate::PluginOperationInvokeError::Unknown(
152 "runtime session read service not available".to_string(),
153 ));
154 };
155 let Some(process_read_service) = self.process_read_service.as_ref().cloned() else {
156 return Err(crate::PluginOperationInvokeError::Unknown(
157 "runtime process read service not available".to_string(),
158 ));
159 };
160 plugin_session
161 .query_plugin(
162 name,
163 args,
164 session_id,
165 true,
166 session_read_service,
167 process_read_service,
168 )
169 .await
170 }
171
172 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
173 let Some(executor) = self.process_registry.as_ref() else {
174 return Vec::new();
175 };
176 self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
177 .await
178 }
179
180 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
181 let Some(executor) = self.process_registry.as_ref() else {
182 return Vec::new();
183 };
184 self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
185 .await
186 }
187
188 async fn list_process_handles_with_mode(
189 &self,
190 executor: &Arc<dyn crate::ProcessRegistry>,
191 mode: crate::ProcessListMode,
192 ) -> Vec<ProcessHandleSummary> {
193 let root_scope = self.process_scope();
194 let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
195 let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
196 if !agent_frame_id.is_empty() {
197 let frame_scope =
198 crate::SessionScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
199 if frame_scope.id() != root_scope.id() {
200 entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
201 entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
202 entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
203 }
204 }
205 entries
206 .into_iter()
207 .map(ProcessHandleSummary::from)
208 .collect()
209 }
210}
211
212async fn list_scope_process_handles(
213 executor: &Arc<dyn crate::ProcessRegistry>,
214 scope: &crate::SessionScope,
215 mode: crate::ProcessListMode,
216) -> Vec<ProcessHandleGrantEntry> {
217 match mode {
218 crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
219 crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
220 }
221 .unwrap_or_default()
222}
223
224#[derive(Clone)]
225pub struct RuntimeHandle {
226 pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
227 observation: Arc<ArcSwap<RuntimeObservation>>,
228 live_replay_store: Arc<dyn LiveReplayStore>,
229}
230
231impl RuntimeHandle {
232 pub fn new(runtime: LashRuntime) -> Self {
233 Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
234 }
235
236 pub fn with_live_replay_store(
237 runtime: LashRuntime,
238 live_replay_store: Arc<dyn LiveReplayStore>,
239 ) -> Self {
240 let revision = SessionRevision::from_runtime(&runtime);
241 let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
242 let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
243 Self {
244 runtime: Arc::new(Mutex::new(runtime)),
245 observation: Arc::new(ArcSwap::from_pointee(observation)),
246 live_replay_store,
247 }
248 }
249
250 pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
251 Arc::clone(&self.runtime)
252 }
253
254 pub fn observe(&self) -> Arc<RuntimeObservation> {
255 self.observation.load_full()
256 }
257
258 pub fn publish_from(&self, runtime: &LashRuntime) {
259 let revision = SessionRevision::from_runtime(runtime);
260 let previous = self.observation.load_full();
261 let state = runtime.export_persisted_state();
262 if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
263 && !state.current_agent_frame_id.is_empty()
264 && let Err(err) = self.live_replay_store.append(
265 runtime.session_id(),
266 revision,
267 SessionObservationEventPayload::AgentFrameSwitched {
268 frame_id: state.current_agent_frame_id.clone(),
269 },
270 )
271 {
272 tracing::warn!(
273 session_id = %runtime.session_id(),
274 error = %err,
275 "failed to append agent-frame observation event; reconnect may require gap recovery",
276 );
277 }
278 let cursor = match self.live_replay_store.append(
279 runtime.session_id(),
280 revision,
281 SessionObservationEventPayload::Committed {
282 read_view: runtime.read_view(),
283 },
284 ) {
285 Ok(event) => event.cursor,
286 Err(err) => {
287 tracing::warn!(
288 session_id = %runtime.session_id(),
289 error = %err,
290 "failed to append session observation commit event; reconnect will fall back to gap recovery",
291 );
292 self.live_replay_store
293 .current_cursor(runtime.session_id(), revision)
294 }
295 };
296 self.observation
297 .store(Arc::new(RuntimeObservation::from_runtime(
298 runtime,
299 cursor,
300 Some(previous.as_ref()),
301 )));
302 }
303
304 pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
305 let observation = self.observe();
306 if let Err(err) = self.live_replay_store.append(
307 observation.session_id(),
308 observation.session_revision(),
309 SessionObservationEventPayload::TurnActivity(activity),
310 ) {
311 tracing::warn!(
312 session_id = %observation.session_id(),
313 error = %err,
314 "failed to append live turn activity to session observation replay; reconnect may require gap recovery",
315 );
316 }
317 }
318
319 pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
320 let observation = self.observe();
321 if let Err(err) = self.live_replay_store.append(
322 observation.session_id(),
323 observation.session_revision(),
324 SessionObservationEventPayload::QueueChanged { kind, batch_ids },
325 ) {
326 tracing::warn!(
327 session_id = %observation.session_id(),
328 error = %err,
329 "failed to append queue observation event; reconnect may require gap recovery",
330 );
331 }
332 }
333
334 pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
335 let observation = self.observe();
336 if let Err(err) = self.live_replay_store.append(
337 observation.session_id(),
338 observation.session_revision(),
339 SessionObservationEventPayload::ProcessChanged { kind, process_ids },
340 ) {
341 tracing::warn!(
342 session_id = %observation.session_id(),
343 error = %err,
344 "failed to append process observation event; reconnect may require gap recovery",
345 );
346 }
347 }
348
349 pub fn current_session_observation(&self) -> SessionObservation {
350 let observation = self.observe();
351 self.session_observation_from(observation.as_ref())
352 }
353
354 pub fn resume_session_observation(
355 &self,
356 cursor: &SessionCursor,
357 ) -> Result<SessionResume, LiveReplayStoreError> {
358 let observation = self.observe();
359 cursor.parse_for_session(observation.session_id())?;
360 match self.live_replay_store.replay_after_cursor(cursor)? {
361 LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
362 LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
363 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
364 observation: self.session_observation_from(observation.as_ref()),
365 }),
366 }
367 }
368
369 pub fn subscribe_session_observation(
370 &self,
371 cursor: &SessionCursor,
372 ) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
373 let observation = self.observe();
374 cursor.parse_for_session(observation.session_id())?;
375 match self.live_replay_store.subscribe_after_cursor(cursor)? {
376 LiveReplaySubscribeResult::Subscribed(subscription) => {
377 Ok(SessionObservationSubscription::Subscribed(subscription))
378 }
379 LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
380 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
381 observation: self.session_observation_from(observation.as_ref()),
382 }),
383 }
384 }
385
386 fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
387 SessionObservation {
388 read_view: observation.read_view.clone(),
389 cursor: self
390 .live_replay_store
391 .current_cursor(observation.session_id(), observation.session_revision()),
392 }
393 }
394
395 fn live_replay_gap(
396 &self,
397 requested_cursor: &SessionCursor,
398 reason: LiveReplayGapReason,
399 observation: &RuntimeObservation,
400 ) -> LiveReplayGap {
401 let latest_cursor = self
402 .live_replay_store
403 .current_cursor(observation.session_id(), observation.session_revision());
404 LiveReplayGap {
405 session_id: observation.session_id().to_string(),
406 requested_cursor: requested_cursor.clone(),
407 latest_cursor,
408 latest_revision: observation.session_revision(),
409 reason,
410 }
411 }
412
413 pub async fn enqueue_turn_input(
414 &self,
415 input: crate::TurnInput,
416 ingress: crate::TurnInputIngress,
417 source_key: Option<String>,
418 ) -> Result<crate::PendingTurnInput, crate::RuntimeError> {
419 let observation = self.observe();
420 let store = observation
421 .queue_store
422 .clone()
423 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
424 let is_next_turn = matches!(ingress, crate::TurnInputIngress::NextTurn);
425 super::session_api::enqueue_turn_input_to_store(
426 observation.session_id.as_ref().to_string(),
427 store,
428 observation.queued_work_driver.clone(),
429 input,
430 ingress,
431 source_key,
432 )
433 .await
434 .inspect(|input| {
435 self.record_queue_changed(
436 SessionQueueEventKind::Enqueued,
437 is_next_turn
438 .then(|| vec![input.input_id.clone()])
439 .unwrap_or_default(),
440 );
441 })
442 }
443
444 pub async fn cancel_pending_turn_input(
445 &self,
446 session_id: &str,
447 input_id: &str,
448 ) -> Result<crate::PendingTurnInputCancelOutcome, crate::RuntimeError> {
449 let observation = self.observe();
450 let store = observation
451 .queue_store
452 .clone()
453 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
454 store
455 .cancel_pending_turn_input(session_id, input_id)
456 .await
457 .map_err(|err| {
458 crate::RuntimeError::new(
459 crate::RuntimeErrorCode::StoreCommitFailed,
460 err.to_string(),
461 )
462 })
463 .inspect(|outcome| {
464 if outcome.is_cancelled() {
465 self.record_queue_changed(
466 SessionQueueEventKind::Cancelled,
467 vec![input_id.to_string()],
468 );
469 }
470 })
471 }
472
473 pub async fn cancel_pending_turn_inputs(
474 &self,
475 session_id: &str,
476 targets: &[crate::PendingTurnInputCancelTarget],
477 ) -> Result<Vec<crate::PendingTurnInputCancelResult>, crate::RuntimeError> {
478 let observation = self.observe();
479 let store = observation
480 .queue_store
481 .clone()
482 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
483 store
484 .cancel_pending_turn_inputs(session_id, targets)
485 .await
486 .map_err(|err| {
487 crate::RuntimeError::new(
488 crate::RuntimeErrorCode::StoreCommitFailed,
489 err.to_string(),
490 )
491 })
492 .inspect(|results| {
493 let cancelled_ids = results
494 .iter()
495 .filter_map(|result| match &result.outcome {
496 crate::PendingTurnInputCancelOutcome::Cancelled(input) => {
497 Some(input.input_id.clone())
498 }
499 _ => None,
500 })
501 .collect::<Vec<_>>();
502 if !cancelled_ids.is_empty() {
503 self.record_queue_changed(SessionQueueEventKind::Cancelled, cancelled_ids);
504 }
505 })
506 }
507
508 pub async fn cancel_pending_turn_input_suffix(
509 &self,
510 session_id: &str,
511 anchor: &crate::PendingTurnInputCancelTarget,
512 ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, crate::RuntimeError> {
513 let observation = self.observe();
514 let store = observation
515 .queue_store
516 .clone()
517 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
518 store
519 .cancel_pending_turn_input_suffix(session_id, anchor)
520 .await
521 .map_err(|err| {
522 crate::RuntimeError::new(
523 crate::RuntimeErrorCode::StoreCommitFailed,
524 err.to_string(),
525 )
526 })
527 .inspect(|outcome| {
528 let crate::PendingTurnInputSuffixCancelOutcome::Outcomes { outcomes, .. } = outcome
529 else {
530 return;
531 };
532 let cancelled_ids = outcomes
533 .iter()
534 .filter_map(|outcome| match outcome {
535 crate::PendingTurnInputCancelOutcome::Cancelled(input) => {
536 Some(input.input_id.clone())
537 }
538 _ => None,
539 })
540 .collect::<Vec<_>>();
541 if !cancelled_ids.is_empty() {
542 self.record_queue_changed(SessionQueueEventKind::Cancelled, cancelled_ids);
543 }
544 })
545 }
546
547 pub async fn cancel_queued_work_batch(
548 &self,
549 session_id: &str,
550 batch_id: &str,
551 ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
552 let observation = self.observe();
553 let store = observation
554 .queue_store
555 .clone()
556 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
557 store
558 .cancel_queued_work_batch(session_id, batch_id)
559 .await
560 .map_err(|err| {
561 crate::RuntimeError::new(
562 crate::RuntimeErrorCode::StoreCommitFailed,
563 err.to_string(),
564 )
565 })
566 .inspect(|batch| {
567 if batch.is_some() {
568 self.record_queue_changed(
569 SessionQueueEventKind::Cancelled,
570 vec![batch_id.to_string()],
571 );
572 }
573 })
574 }
575
576 pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
577 match Arc::try_unwrap(self.runtime) {
578 Ok(mutex) => Ok(mutex.into_inner()),
579 Err(runtime) => Err(Self {
580 runtime,
581 observation: self.observation,
582 live_replay_store: self.live_replay_store,
583 }),
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591
592 struct PanicLiveReplayStore;
593
594 impl LiveReplayStore for PanicLiveReplayStore {
595 fn append(
596 &self,
597 _session_id: &str,
598 _revision: SessionRevision,
599 _payload: SessionObservationEventPayload,
600 ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
601 panic!("append should not be called by cursor rejection tests")
602 }
603
604 fn replay_after_cursor(
605 &self,
606 _cursor: &SessionCursor,
607 ) -> Result<LiveReplayResult, LiveReplayStoreError> {
608 panic!("replay_after_cursor should not be called for rejected cursors")
609 }
610
611 fn subscribe_after_cursor(
612 &self,
613 _cursor: &SessionCursor,
614 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
615 panic!("subscribe_after_cursor should not be called for rejected cursors")
616 }
617
618 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
619 SessionCursor::new(session_id, revision, 0)
620 }
621
622 fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
623 Ok(())
624 }
625 }
626
627 #[tokio::test]
628 async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
629 let runtime = LashRuntime::builder()
630 .with_session_id("session-a")
631 .with_policy(crate::SessionPolicy {
632 model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
633 .expect("model"),
634 ..Default::default()
635 })
636 .build()
637 .await
638 .expect("runtime");
639 let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
640 let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
641 let malformed = SessionCursor::from_raw_for_testing("bad");
642
643 assert!(matches!(
644 handle.resume_session_observation(&wrong_session),
645 Err(LiveReplayStoreError::Cursor(
646 SessionCursorError::WrongSession { .. }
647 ))
648 ));
649 assert!(matches!(
650 handle.subscribe_session_observation(&wrong_session),
651 Err(LiveReplayStoreError::Cursor(
652 SessionCursorError::WrongSession { .. }
653 ))
654 ));
655 assert!(matches!(
656 handle.resume_session_observation(&malformed),
657 Err(LiveReplayStoreError::Cursor(
658 SessionCursorError::Malformed { .. }
659 ))
660 ));
661 assert!(matches!(
662 handle.subscribe_session_observation(&malformed),
663 Err(LiveReplayStoreError::Cursor(
664 SessionCursorError::Malformed { .. }
665 ))
666 ));
667 }
668}