Skip to main content

lash_core/runtime/process/
observation.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8use super::events::{ProcessAwaitOutput, ProcessEvent};
9use super::model::{
10    ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId,
11    ProcessIdentity, ProcessInput, ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator,
12    ProcessRecord, ProcessStatusFilter, SessionScope, WaitState,
13};
14use super::registry::ProcessRegistry;
15use super::time::epoch_ms_from_system_time;
16
17#[derive(Clone)]
18pub struct ProcessWorkObserver {
19    registry: Arc<dyn ProcessRegistry>,
20}
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct ProcessWorkSnapshot {
24    pub session_id: String,
25    pub visible_process_ids: Vec<ProcessId>,
26    pub items: Vec<ObservedWorkItem>,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct ObservedWorkItem {
31    pub process: ObservedProcess,
32    pub descriptor: ProcessHandleDescriptor,
33    pub events: Vec<ObservedProcessEvent>,
34    pub kind: String,
35    pub label: String,
36}
37
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct ObservedProcess {
40    pub process_id: ProcessId,
41    pub graph_key: String,
42    pub kind: String,
43    pub lifecycle: ProcessLifecycleStatus,
44    pub identity: ProcessIdentity,
45    pub status_label: String,
46    pub terminal: bool,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub error: Option<String>,
49    pub created_at_ms: u64,
50    pub updated_at_ms: u64,
51    pub input: ProcessInput,
52    pub originator: ProcessOriginator,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub env_ref: Option<ProcessExecutionEnvRef>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub wake_target: Option<SessionScope>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub caused_by: Option<crate::CausalRef>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub external_ref: Option<ProcessExternalRef>,
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub wait: Option<WaitState>,
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub child_session_id: Option<String>,
65    pub label: String,
66}
67
68#[derive(Clone, Debug, Serialize, Deserialize)]
69pub struct ObservedProcessEvent {
70    pub sequence: u64,
71    pub event_type: String,
72    pub occurred_at_ms: u64,
73    pub payload: serde_json::Value,
74}
75
76/// Per-item event tail in session snapshots. Snapshots are polled by
77/// docks/UIs, so per-poll cost must stay bounded instead of growing with a
78/// process's full event history; detail views page through `events_after`
79/// with a cursor.
80pub const SNAPSHOT_EVENT_TAIL: usize = 32;
81
82impl ProcessWorkObserver {
83    pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
84        Self { registry }
85    }
86
87    pub async fn snapshot_for_session(
88        &self,
89        session_id: impl Into<String>,
90    ) -> Result<ProcessWorkSnapshot, PluginError> {
91        let session_id = session_id.into();
92        let session_scope = SessionScope::new(session_id.clone());
93        let entries = self.registry.list_handle_grants(&session_scope).await?;
94        let mut items = Vec::new();
95        let mut seen_process_ids = BTreeSet::new();
96        for (grant, record) in entries {
97            seen_process_ids.insert(record.id.clone());
98            items.push(self.work_item_from_record(record, grant.descriptor).await?);
99        }
100        let visible_records = self
101            .registry
102            .list_processes(&ProcessListFilter {
103                status: ProcessStatusFilter::Any,
104                ..ProcessListFilter::default()
105            })
106            .await?;
107        for record in visible_records {
108            if seen_process_ids.contains(&record.id)
109                || !process_visible_to_session(&record, &session_id)
110            {
111                continue;
112            }
113            seen_process_ids.insert(record.id.clone());
114            let descriptor = descriptor_from_process_identity(&record.identity);
115            items.push(self.work_item_from_record(record, descriptor).await?);
116        }
117        items.sort_by(|left, right| {
118            right
119                .process
120                .updated_at_ms
121                .cmp(&left.process.updated_at_ms)
122                .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
123                .then_with(|| left.process.process_id.cmp(&right.process.process_id))
124        });
125        let visible_process_ids = items
126            .iter()
127            .map(|item| item.process.process_id.clone())
128            .collect();
129        Ok(ProcessWorkSnapshot {
130            session_id,
131            visible_process_ids,
132            items,
133        })
134    }
135
136    async fn work_item_from_record(
137        &self,
138        record: ProcessRecord,
139        descriptor: ProcessHandleDescriptor,
140    ) -> Result<ObservedWorkItem, PluginError> {
141        let events = self
142            .registry
143            .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
144            .await?
145            .into_iter()
146            .map(ObservedProcessEvent::from)
147            .collect();
148        let process = ObservedProcess::from_record(record);
149        let kind = process.identity.kind.clone();
150        let label = process
151            .identity
152            .label
153            .clone()
154            .or_else(|| descriptor.label.clone())
155            .unwrap_or_else(|| kind.clone());
156        Ok(ObservedWorkItem {
157            process,
158            descriptor,
159            events,
160            kind,
161            label,
162        })
163    }
164
165    pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
166        self.registry
167            .get_process(process_id)
168            .await
169            .map(ObservedProcess::from_record)
170    }
171
172    pub async fn list(
173        &self,
174        filter: &ProcessListFilter,
175    ) -> Result<Vec<ObservedProcess>, PluginError> {
176        Ok(self
177            .registry
178            .list_processes(filter)
179            .await?
180            .into_iter()
181            .map(ObservedProcess::from_record)
182            .collect())
183    }
184
185    pub async fn events_after(
186        &self,
187        process_id: &str,
188        after_sequence: u64,
189    ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
190        Ok(self
191            .registry
192            .events_after(process_id, after_sequence)
193            .await?
194            .into_iter()
195            .map(ObservedProcessEvent::from)
196            .collect())
197    }
198}
199
200impl ObservedProcess {
201    fn from_record(record: ProcessRecord) -> Self {
202        let lifecycle = ProcessLifecycleStatus::from(&record.status);
203        let input = record.input.as_ref().clone();
204        let identity = record.identity;
205        let kind = identity.kind.clone();
206        let label = identity.label.clone().unwrap_or_else(|| kind.clone());
207        let process_id = record.id;
208        Self {
209            graph_key: format!("process:{process_id}"),
210            process_id,
211            kind,
212            lifecycle,
213            identity,
214            status_label: lifecycle.label().to_string(),
215            terminal: lifecycle.is_terminal(),
216            error: terminal_error(&record.status),
217            created_at_ms: record.created_at_ms,
218            updated_at_ms: record.updated_at_ms,
219            originator: record.provenance.originator,
220            env_ref: record.env_ref,
221            wake_target: record.wake_target,
222            caused_by: record.provenance.caused_by,
223            external_ref: record.external_ref,
224            wait: record.wait,
225            child_session_id: child_session_id(&input),
226            input,
227            label,
228        }
229    }
230}
231
232impl From<ProcessEvent> for ObservedProcessEvent {
233    fn from(event: ProcessEvent) -> Self {
234        Self {
235            sequence: event.sequence,
236            event_type: event.event_type,
237            occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
238            payload: event.payload,
239        }
240    }
241}
242
243fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
244    match status.await_output()? {
245        ProcessAwaitOutput::Failure { message, .. }
246        | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
247        ProcessAwaitOutput::Success { .. } => None,
248    }
249}
250
251fn child_session_id(input: &ProcessInput) -> Option<String> {
252    match input {
253        ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
254        ProcessInput::ToolCall { .. }
255        | ProcessInput::Engine { .. }
256        | ProcessInput::External { .. } => None,
257    }
258}
259
260fn process_visible_to_session(record: &ProcessRecord, session_id: &str) -> bool {
261    record
262        .wake_target
263        .as_ref()
264        .is_some_and(|scope| scope.session_id == session_id)
265}
266
267fn descriptor_from_process_identity(identity: &ProcessIdentity) -> ProcessHandleDescriptor {
268    ProcessHandleDescriptor::new(Some(identity.kind.clone()), identity.label.clone())
269}
270
271#[cfg(test)]
272mod tests {
273    use std::sync::Arc;
274    use std::time::Duration;
275
276    use serde_json::json;
277
278    use super::*;
279    use crate::{
280        InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
281        ProcessExecutionEnvRef, ProcessIdentity, ProcessProvenance, ProcessRegistration,
282        SessionCreateRequest, SessionScope, SessionStartPoint, SubagentSessionContext,
283        ToolFailureClass, ToolOutputContract, TurnInput, WaitKind,
284    };
285
286    fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
287        ProcessWorkObserver::new(registry)
288    }
289
290    fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
291        ProcessRegistration::new(
292            process_id,
293            ProcessInput::External {
294                metadata: json!({ "label": label }),
295            },
296            ProcessProvenance::host(),
297        )
298    }
299
300    async fn register_visible(
301        registry: &Arc<dyn ProcessRegistry>,
302        scope: &SessionScope,
303        registration: ProcessRegistration,
304        descriptor: ProcessHandleDescriptor,
305    ) {
306        let process_id = registration.id.clone();
307        registry
308            .register_process(registration)
309            .await
310            .expect("register process");
311        registry
312            .grant_handle(scope, &process_id, descriptor)
313            .await
314            .expect("grant process handle");
315    }
316
317    #[tokio::test]
318    async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
319        let registry =
320            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
321        let visible_scope = SessionScope::new("visible");
322        register_visible(
323            &registry,
324            &visible_scope,
325            external_registration("visible-process", "Visible"),
326            ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
327        )
328        .await;
329        register_visible(
330            &registry,
331            &SessionScope::new("other"),
332            external_registration("hidden-process", "Hidden"),
333            ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
334        )
335        .await;
336        registry
337            .append_event(
338                "visible-process",
339                ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
340                    .with_replay_key("visible-process:cancel-requested"),
341            )
342            .await
343            .expect("append event");
344
345        let snapshot = observer(Arc::clone(&registry))
346            .snapshot_for_session("visible")
347            .await
348            .expect("snapshot");
349
350        assert_eq!(snapshot.session_id, "visible");
351        assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
352        assert_eq!(snapshot.items.len(), 1);
353        assert_eq!(snapshot.items[0].events.len(), 1);
354        assert_eq!(
355            snapshot.items[0].events[0].event_type,
356            "process.cancel_requested"
357        );
358        assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
359    }
360
361    #[tokio::test]
362    async fn snapshot_for_session_includes_frame_wake_targets_without_handle_grants() {
363        let registry =
364            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
365        let frame_scope = SessionScope::for_agent_frame("visible", "frame-a");
366        registry
367            .register_process(ProcessRegistration::new(
368                "frame-originated",
369                ProcessInput::External {
370                    metadata: json!({ "label": "Frame originated" }),
371                },
372                ProcessProvenance::session(frame_scope.clone()),
373            ))
374            .await
375            .expect("register frame-originated process");
376        registry
377            .register_process(
378                external_registration("frame-wake-targeted", "Frame wake targeted")
379                    .with_wake_target(Some(frame_scope)),
380            )
381            .await
382            .expect("register frame wake-targeted process");
383        registry
384            .register_process(
385                external_registration("hidden-frame", "Hidden")
386                    .with_wake_target(Some(SessionScope::for_agent_frame("other", "frame-b"))),
387            )
388            .await
389            .expect("register hidden process");
390
391        let snapshot = observer(Arc::clone(&registry))
392            .snapshot_for_session("visible")
393            .await
394            .expect("snapshot");
395        let visible_process_ids = snapshot
396            .visible_process_ids
397            .iter()
398            .cloned()
399            .collect::<std::collections::BTreeSet<_>>();
400
401        assert_eq!(
402            visible_process_ids,
403            std::collections::BTreeSet::from(["frame-wake-targeted".to_string()])
404        );
405        assert_eq!(snapshot.items.len(), 1);
406    }
407
408    #[tokio::test]
409    async fn snapshot_for_session_labels_engine_wake_targets_from_identity_without_handle_grants() {
410        let registry =
411            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
412        let scope = SessionScope::new("visible");
413        registry
414            .register_process(
415                ProcessRegistration::new(
416                    "engine-wake-targeted",
417                    ProcessInput::Engine {
418                        kind: "test-engine".to_string(),
419                        payload: json!({}),
420                    },
421                    ProcessProvenance::host(),
422                )
423                .with_identity(
424                    ProcessIdentity::new("test-engine").with_label(Some("remember".to_string())),
425                )
426                .with_execution_env_ref(Some(ProcessExecutionEnvRef::new("process-env:test")))
427                .with_wake_target(Some(scope)),
428            )
429            .await
430            .expect("register engine wake-targeted process");
431
432        let snapshot = observer(Arc::clone(&registry))
433            .snapshot_for_session("visible")
434            .await
435            .expect("snapshot");
436
437        assert_eq!(snapshot.items.len(), 1);
438        assert_eq!(snapshot.items[0].kind, "test-engine");
439        assert_eq!(snapshot.items[0].label, "remember");
440        assert_eq!(
441            snapshot.items[0].descriptor.kind.as_deref(),
442            Some("test-engine")
443        );
444        assert_eq!(
445            snapshot.items[0].descriptor.label.as_deref(),
446            Some("remember")
447        );
448        assert_eq!(snapshot.items[0].process.kind, "test-engine");
449        assert_eq!(snapshot.items[0].process.label, "remember");
450    }
451
452    #[tokio::test]
453    async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
454        let registry =
455            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
456        let scope = SessionScope::new("sort");
457        register_visible(
458            &registry,
459            &scope,
460            external_registration("older", "Older"),
461            ProcessHandleDescriptor::new(None::<String>, None::<String>),
462        )
463        .await;
464        tokio::time::sleep(Duration::from_millis(2)).await;
465        register_visible(
466            &registry,
467            &scope,
468            external_registration("newer", "Newer"),
469            ProcessHandleDescriptor::new(None::<String>, None::<String>),
470        )
471        .await;
472        tokio::time::sleep(Duration::from_millis(2)).await;
473        registry
474            .append_event(
475                "older",
476                ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
477                    .with_replay_key("older:cancel-requested"),
478            )
479            .await
480            .expect("update older process");
481
482        let snapshot = observer(Arc::clone(&registry))
483            .snapshot_for_session("sort")
484            .await
485            .expect("snapshot");
486
487        assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
488    }
489
490    #[tokio::test]
491    async fn observed_process_reports_terminal_status_and_error_messages() {
492        let registry =
493            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
494        for process_id in ["failed", "cancelled"] {
495            registry
496                .register_process(external_registration(process_id, process_id))
497                .await
498                .expect("register");
499        }
500        registry
501            .complete_process(
502                "failed",
503                ProcessAwaitOutput::Failure {
504                    class: ToolFailureClass::External,
505                    code: "boom".to_string(),
506                    message: "failed loudly".to_string(),
507                    raw: None,
508                    control: None,
509                },
510            )
511            .await
512            .expect("fail process");
513        registry
514            .complete_process(
515                "cancelled",
516                ProcessAwaitOutput::Cancelled {
517                    message: "cancelled intentionally".to_string(),
518                    raw: None,
519                    control: None,
520                },
521            )
522            .await
523            .expect("cancel process");
524
525        let observer = observer(Arc::clone(&registry));
526        let failed = observer.process("failed").await.expect("failed process");
527        let cancelled = observer
528            .process("cancelled")
529            .await
530            .expect("cancelled process");
531
532        assert_eq!(failed.status_label, "failed");
533        assert!(failed.terminal);
534        assert_eq!(failed.error.as_deref(), Some("failed loudly"));
535        assert_eq!(cancelled.status_label, "cancelled");
536        assert!(cancelled.terminal);
537        assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
538    }
539
540    #[tokio::test]
541    async fn observed_process_exposes_current_wait_state() {
542        let registry =
543            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
544        let scope = SessionScope::new("wait");
545        register_visible(
546            &registry,
547            &scope,
548            external_registration("waiting-process", "Waiting"),
549            ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
550        )
551        .await;
552        let wait = WaitState {
553            since_ms: 1234,
554            kind: WaitKind::Signal {
555                name: "ready".to_string(),
556                event_type: "signal.ready".to_string(),
557                key: "process:waiting-process:signal.ready:1".to_string(),
558                ordinal: 1,
559            },
560        };
561        registry
562            .set_process_wait("waiting-process", wait.clone())
563            .await
564            .expect("set wait");
565
566        let observer = observer(Arc::clone(&registry));
567        let observed = observer
568            .process("waiting-process")
569            .await
570            .expect("waiting process");
571        let snapshot = observer
572            .snapshot_for_session("wait")
573            .await
574            .expect("snapshot");
575
576        assert_eq!(observed.wait, Some(wait.clone()));
577        assert_eq!(snapshot.items.len(), 1);
578        assert_eq!(snapshot.items[0].process.wait, Some(wait));
579    }
580
581    #[tokio::test]
582    async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
583        let registry =
584            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
585        let scope = SessionScope::new("labels");
586        let mut child_request = SessionCreateRequest::child_session(
587            "labels",
588            SessionStartPoint::Empty,
589            PluginOptions::default(),
590        )
591        .with_session_id("child-session");
592        child_request.subagent = Some(SubagentSessionContext {
593            parent_session_id: "labels".to_string(),
594            capability: "researcher".to_string(),
595            depth: 1,
596            max_depth: 4,
597        });
598        let cases = [
599            (
600                "tool",
601                ProcessInput::ToolCall {
602                    call: PreparedToolCall::from_parts(
603                        "call-1",
604                        "shell.run",
605                        json!({}),
606                        None,
607                        serde_json::Value::Null,
608                    ),
609                },
610                "tool",
611                "shell.run",
612                None,
613            ),
614            (
615                "engine",
616                ProcessInput::Engine {
617                    kind: "test-engine".to_string(),
618                    payload: json!({}),
619                },
620                "test-engine",
621                "remember",
622                None,
623            ),
624            (
625                "session",
626                ProcessInput::SessionTurn {
627                    create_request: Box::new(child_request),
628                    turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
629                    output_contract: ToolOutputContract::Static,
630                },
631                "session_turn",
632                "researcher",
633                Some("child-session"),
634            ),
635            (
636                "external",
637                ProcessInput::External {
638                    metadata: json!({ "label": "external job" }),
639                },
640                "external",
641                "external job",
642                None,
643            ),
644        ];
645        for (process_id, input, kind, label, _child_session_id) in cases {
646            let needs_env = matches!(
647                input,
648                ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. }
649            );
650            let mut registration =
651                ProcessRegistration::new(process_id, input, ProcessProvenance::host())
652                    .with_identity(ProcessIdentity::new(kind).with_label(Some(label.to_string())));
653            if needs_env {
654                registration = registration.with_execution_env_ref(Some(
655                    ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
656                ));
657            }
658            register_visible(
659                &registry,
660                &scope,
661                registration,
662                ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
663            )
664            .await;
665        }
666
667        let snapshot = observer(Arc::clone(&registry))
668            .snapshot_for_session("labels")
669            .await
670            .expect("snapshot");
671        let by_id = snapshot
672            .items
673            .iter()
674            .map(|item| (item.process.process_id.as_str(), item))
675            .collect::<std::collections::BTreeMap<_, _>>();
676
677        assert_eq!(by_id["tool"].label, "shell.run");
678        assert_eq!(by_id["engine"].label, "remember");
679        assert_eq!(by_id["engine"].process.kind, "test-engine");
680        assert_eq!(by_id["session"].label, "researcher");
681        assert_eq!(
682            by_id["session"].process.child_session_id.as_deref(),
683            Some("child-session")
684        );
685        assert_eq!(by_id["external"].label, "external job");
686    }
687
688    #[tokio::test]
689    async fn observed_process_missing_lookup_returns_none() {
690        let registry =
691            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
692
693        assert!(observer(registry).process("missing").await.is_none());
694    }
695}