1mod replay;
2
3use arc_swap::ArcSwap;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
8
9pub use replay::{
10 InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
11 LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
12 LiveReplaySubscription, SessionCursor, SessionCursorError, SessionObservation,
13 SessionObservationEvent, SessionObservationEventPayload, SessionObservationSubscription,
14 SessionProcessEventKind, SessionQueueEventKind, SessionResume, SessionRevision,
15};
16
17#[derive(Clone)]
18pub struct RuntimeObservation {
19 pub session_id: Arc<str>,
20 pub revision: SessionRevision,
21 pub cursor: SessionCursor,
22 pub policy: crate::SessionPolicy,
23 pub read_view: crate::SessionReadView,
24 pub persisted_state: super::RuntimeSessionState,
25 pub usage_report: super::SessionUsageReport,
26 pub tool_state: Option<crate::ToolState>,
27 pub tool_catalog: Arc<Vec<serde_json::Value>>,
28 pub tool_catalog_error: Option<String>,
29 pub process_registry: Option<Arc<dyn ProcessRegistry>>,
30 pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
31 pub queued_work_poke: Option<super::QueuedWorkPoke>,
32}
33
34impl RuntimeObservation {
35 fn from_runtime(
36 runtime: &LashRuntime,
37 cursor: SessionCursor,
38 previous: Option<&RuntimeObservation>,
39 ) -> Self {
40 let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
41 Ok(catalog) => (catalog, None),
42 Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
43 };
44 let tool_state_generation = runtime
45 .session
46 .as_ref()
47 .map(|session| session.plugins().tool_registry().generation());
48 let tool_state = match (
49 tool_state_generation,
50 previous.and_then(|observation| observation.tool_state.as_ref()),
51 ) {
52 (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
53 Some(snapshot.clone())
54 }
55 (Some(_), _) => match runtime.tool_state() {
56 Ok(state) => Some(state),
57 Err(err) => {
58 tracing::warn!(
59 session_id = %runtime.session_id(),
60 error = %err,
61 "failed to capture tool state for observation; omitting the snapshot",
62 );
63 None
64 }
65 },
66 (None, _) => None,
67 };
68 let revision = SessionRevision::from_runtime(runtime);
69 Self {
70 session_id: Arc::from(runtime.session_id()),
71 revision,
72 cursor,
73 policy: runtime.read_view().policy().clone(),
74 read_view: runtime.read_view(),
75 persisted_state: runtime.export_persisted_state(),
76 usage_report: runtime.usage_report(),
77 tool_state,
78 tool_catalog,
79 tool_catalog_error,
80 process_registry: runtime.host.process_registry.clone(),
81 queue_store: runtime
82 .session
83 .as_ref()
84 .and_then(|session| session.history_store()),
85 queued_work_poke: runtime.host.queued_work_poke.clone(),
86 }
87 }
88
89 pub fn session_id(&self) -> &str {
90 &self.session_id
91 }
92
93 pub fn session_revision(&self) -> SessionRevision {
94 self.revision
95 }
96
97 pub fn cursor(&self) -> &SessionCursor {
98 &self.cursor
99 }
100
101 pub fn session_observation(&self) -> SessionObservation {
102 SessionObservation {
103 read_view: self.read_view.clone(),
104 cursor: self.cursor.clone(),
105 }
106 }
107
108 pub fn process_scope(&self) -> crate::ProcessScope {
109 crate::ProcessScope::new(self.session_id.as_ref())
110 }
111
112 pub fn process_scope_id(&self) -> crate::ProcessScopeId {
113 self.process_scope().id()
114 }
115
116 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
117 let Some(executor) = self.process_registry.as_ref() else {
118 return Vec::new();
119 };
120 self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
121 .await
122 }
123
124 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
125 let Some(executor) = self.process_registry.as_ref() else {
126 return Vec::new();
127 };
128 self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
129 .await
130 }
131
132 async fn list_process_handles_with_mode(
133 &self,
134 executor: &Arc<dyn crate::ProcessRegistry>,
135 mode: crate::ProcessListMode,
136 ) -> Vec<ProcessHandleSummary> {
137 let root_scope = self.process_scope();
138 let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
139 let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
140 if !agent_frame_id.is_empty() {
141 let frame_scope =
142 crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
143 if frame_scope.id() != root_scope.id() {
144 entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
145 entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
146 entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
147 }
148 }
149 entries
150 .into_iter()
151 .map(ProcessHandleSummary::from)
152 .collect()
153 }
154}
155
156async fn list_scope_process_handles(
157 executor: &Arc<dyn crate::ProcessRegistry>,
158 scope: &crate::ProcessScope,
159 mode: crate::ProcessListMode,
160) -> Vec<ProcessHandleGrantEntry> {
161 match mode {
162 crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
163 crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
164 }
165 .unwrap_or_default()
166}
167
168#[derive(Clone)]
169pub struct RuntimeHandle {
170 pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
171 observation: Arc<ArcSwap<RuntimeObservation>>,
172 live_replay_store: Arc<dyn LiveReplayStore>,
173}
174
175impl RuntimeHandle {
176 pub fn new(runtime: LashRuntime) -> Self {
177 Self::with_live_replay_store(runtime, Arc::new(InMemoryLiveReplayStore::default()))
178 }
179
180 pub fn with_live_replay_store(
181 runtime: LashRuntime,
182 live_replay_store: Arc<dyn LiveReplayStore>,
183 ) -> Self {
184 let revision = SessionRevision::from_runtime(&runtime);
185 let cursor = live_replay_store.current_cursor(runtime.session_id(), revision);
186 let observation = RuntimeObservation::from_runtime(&runtime, cursor, None);
187 Self {
188 runtime: Arc::new(Mutex::new(runtime)),
189 observation: Arc::new(ArcSwap::from_pointee(observation)),
190 live_replay_store,
191 }
192 }
193
194 pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
195 Arc::clone(&self.runtime)
196 }
197
198 pub fn observe(&self) -> Arc<RuntimeObservation> {
199 self.observation.load_full()
200 }
201
202 pub fn publish_from(&self, runtime: &LashRuntime) {
203 let revision = SessionRevision::from_runtime(runtime);
204 let previous = self.observation.load_full();
205 let state = runtime.export_persisted_state();
206 if previous.persisted_state.current_agent_frame_id != state.current_agent_frame_id
207 && !state.current_agent_frame_id.is_empty()
208 && let Err(err) = self.live_replay_store.append(
209 runtime.session_id(),
210 revision,
211 SessionObservationEventPayload::AgentFrameSwitched {
212 frame_id: state.current_agent_frame_id.clone(),
213 },
214 )
215 {
216 tracing::warn!(
217 session_id = %runtime.session_id(),
218 error = %err,
219 "failed to append agent-frame observation event; reconnect may require gap recovery",
220 );
221 }
222 let cursor = match self.live_replay_store.append(
223 runtime.session_id(),
224 revision,
225 SessionObservationEventPayload::Committed {
226 read_view: runtime.read_view(),
227 },
228 ) {
229 Ok(event) => event.cursor,
230 Err(err) => {
231 tracing::warn!(
232 session_id = %runtime.session_id(),
233 error = %err,
234 "failed to append session observation commit event; reconnect will fall back to gap recovery",
235 );
236 self.live_replay_store
237 .current_cursor(runtime.session_id(), revision)
238 }
239 };
240 self.observation
241 .store(Arc::new(RuntimeObservation::from_runtime(
242 runtime,
243 cursor,
244 Some(previous.as_ref()),
245 )));
246 }
247
248 pub fn record_turn_activity(&self, activity: crate::TurnActivity) {
249 let observation = self.observe();
250 if let Err(err) = self.live_replay_store.append(
251 observation.session_id(),
252 observation.session_revision(),
253 SessionObservationEventPayload::TurnActivity(activity),
254 ) {
255 tracing::warn!(
256 session_id = %observation.session_id(),
257 error = %err,
258 "failed to append live turn activity to session observation replay; reconnect may require gap recovery",
259 );
260 }
261 }
262
263 pub fn record_queue_changed(&self, kind: SessionQueueEventKind, batch_ids: Vec<String>) {
264 let observation = self.observe();
265 if let Err(err) = self.live_replay_store.append(
266 observation.session_id(),
267 observation.session_revision(),
268 SessionObservationEventPayload::QueueChanged { kind, batch_ids },
269 ) {
270 tracing::warn!(
271 session_id = %observation.session_id(),
272 error = %err,
273 "failed to append queue observation event; reconnect may require gap recovery",
274 );
275 }
276 }
277
278 pub fn record_process_changed(&self, kind: SessionProcessEventKind, process_ids: Vec<String>) {
279 let observation = self.observe();
280 if let Err(err) = self.live_replay_store.append(
281 observation.session_id(),
282 observation.session_revision(),
283 SessionObservationEventPayload::ProcessChanged { kind, process_ids },
284 ) {
285 tracing::warn!(
286 session_id = %observation.session_id(),
287 error = %err,
288 "failed to append process observation event; reconnect may require gap recovery",
289 );
290 }
291 }
292
293 pub fn current_session_observation(&self) -> SessionObservation {
294 let observation = self.observe();
295 self.session_observation_from(observation.as_ref())
296 }
297
298 pub fn resume_session_observation(
299 &self,
300 cursor: &SessionCursor,
301 ) -> Result<SessionResume, LiveReplayStoreError> {
302 let observation = self.observe();
303 cursor.parse_for_session(observation.session_id())?;
304 match self.live_replay_store.replay_after_cursor(cursor)? {
305 LiveReplayResult::Replayed(events) => Ok(SessionResume::Replayed { events }),
306 LiveReplayResult::Gap(reason) => Ok(SessionResume::Gap {
307 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
308 observation: self.session_observation_from(observation.as_ref()),
309 }),
310 }
311 }
312
313 pub fn subscribe_session_observation(
314 &self,
315 cursor: &SessionCursor,
316 ) -> Result<SessionObservationSubscription, LiveReplayStoreError> {
317 let observation = self.observe();
318 cursor.parse_for_session(observation.session_id())?;
319 match self.live_replay_store.subscribe_after_cursor(cursor)? {
320 LiveReplaySubscribeResult::Subscribed(subscription) => {
321 Ok(SessionObservationSubscription::Subscribed(subscription))
322 }
323 LiveReplaySubscribeResult::Gap(reason) => Ok(SessionObservationSubscription::Gap {
324 gap: self.live_replay_gap(cursor, reason, observation.as_ref()),
325 observation: self.session_observation_from(observation.as_ref()),
326 }),
327 }
328 }
329
330 fn session_observation_from(&self, observation: &RuntimeObservation) -> SessionObservation {
331 SessionObservation {
332 read_view: observation.read_view.clone(),
333 cursor: self
334 .live_replay_store
335 .current_cursor(observation.session_id(), observation.session_revision()),
336 }
337 }
338
339 fn live_replay_gap(
340 &self,
341 requested_cursor: &SessionCursor,
342 reason: LiveReplayGapReason,
343 observation: &RuntimeObservation,
344 ) -> LiveReplayGap {
345 let latest_cursor = self
346 .live_replay_store
347 .current_cursor(observation.session_id(), observation.session_revision());
348 LiveReplayGap {
349 session_id: observation.session_id().to_string(),
350 requested_cursor: requested_cursor.clone(),
351 latest_cursor,
352 latest_revision: observation.session_revision(),
353 reason,
354 }
355 }
356
357 pub async fn enqueue_turn_input(
358 &self,
359 input: crate::TurnInput,
360 delivery_policy: crate::DeliveryPolicy,
361 slot_policy: crate::SlotPolicy,
362 source_key: Option<String>,
363 ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
364 let observation = self.observe();
365 let store = observation
366 .queue_store
367 .clone()
368 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
369 super::session_api::enqueue_turn_input_to_store(
370 observation.session_id.as_ref().to_string(),
371 store,
372 observation.queued_work_poke.clone(),
373 input,
374 delivery_policy,
375 slot_policy,
376 source_key,
377 )
378 .await
379 .inspect(|batch| {
380 self.record_queue_changed(
381 SessionQueueEventKind::Enqueued,
382 vec![batch.batch_id.clone()],
383 );
384 })
385 }
386
387 pub async fn cancel_queued_work_batch(
388 &self,
389 session_id: &str,
390 batch_id: &str,
391 ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
392 let observation = self.observe();
393 let store = observation
394 .queue_store
395 .clone()
396 .ok_or_else(super::session_api::queued_turn_input_store_required)?;
397 store
398 .cancel_queued_work_batch(session_id, batch_id)
399 .await
400 .map_err(|err| {
401 crate::RuntimeError::new(
402 crate::RuntimeErrorCode::StoreCommitFailed,
403 err.to_string(),
404 )
405 })
406 .inspect(|batch| {
407 if batch.is_some() {
408 self.record_queue_changed(
409 SessionQueueEventKind::Cancelled,
410 vec![batch_id.to_string()],
411 );
412 }
413 })
414 }
415
416 pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
417 match Arc::try_unwrap(self.runtime) {
418 Ok(mutex) => Ok(mutex.into_inner()),
419 Err(runtime) => Err(Self {
420 runtime,
421 observation: self.observation,
422 live_replay_store: self.live_replay_store,
423 }),
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 struct PanicLiveReplayStore;
433
434 impl LiveReplayStore for PanicLiveReplayStore {
435 fn append(
436 &self,
437 _session_id: &str,
438 _revision: SessionRevision,
439 _payload: SessionObservationEventPayload,
440 ) -> Result<SessionObservationEvent, LiveReplayStoreError> {
441 panic!("append should not be called by cursor rejection tests")
442 }
443
444 fn replay_after_cursor(
445 &self,
446 _cursor: &SessionCursor,
447 ) -> Result<LiveReplayResult, LiveReplayStoreError> {
448 panic!("replay_after_cursor should not be called for rejected cursors")
449 }
450
451 fn subscribe_after_cursor(
452 &self,
453 _cursor: &SessionCursor,
454 ) -> Result<LiveReplaySubscribeResult, LiveReplayStoreError> {
455 panic!("subscribe_after_cursor should not be called for rejected cursors")
456 }
457
458 fn current_cursor(&self, session_id: &str, revision: SessionRevision) -> SessionCursor {
459 SessionCursor::new(session_id, revision, 0)
460 }
461
462 fn trim_session(&self, _session_id: &str) -> Result<(), LiveReplayStoreError> {
463 Ok(())
464 }
465 }
466
467 #[tokio::test]
468 async fn runtime_rejects_bad_cursors_before_replay_store_gap_handling() {
469 let runtime = LashRuntime::builder()
470 .with_session_id("session-a")
471 .with_policy(crate::SessionPolicy {
472 model: crate::ModelSpec::from_token_limits("test-model", None, 1024, None)
473 .expect("model"),
474 ..Default::default()
475 })
476 .build()
477 .await
478 .expect("runtime");
479 let handle = RuntimeHandle::with_live_replay_store(runtime, Arc::new(PanicLiveReplayStore));
480 let wrong_session = SessionCursor::new("session-b", SessionRevision(0), 99);
481 let malformed = SessionCursor::from_raw_for_testing("bad");
482
483 assert!(matches!(
484 handle.resume_session_observation(&wrong_session),
485 Err(LiveReplayStoreError::Cursor(
486 SessionCursorError::WrongSession { .. }
487 ))
488 ));
489 assert!(matches!(
490 handle.subscribe_session_observation(&wrong_session),
491 Err(LiveReplayStoreError::Cursor(
492 SessionCursorError::WrongSession { .. }
493 ))
494 ));
495 assert!(matches!(
496 handle.resume_session_observation(&malformed),
497 Err(LiveReplayStoreError::Cursor(
498 SessionCursorError::Malformed { .. }
499 ))
500 ));
501 assert!(matches!(
502 handle.subscribe_session_observation(&malformed),
503 Err(LiveReplayStoreError::Cursor(
504 SessionCursorError::Malformed { .. }
505 ))
506 ));
507 }
508}