Skip to main content

lash_core/runtime/process/
observation.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9    ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput,
10    ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator, ProcessRecord, SessionScope,
11    WaitState,
12};
13use super::registry::ProcessRegistry;
14use super::time::epoch_ms_from_system_time;
15
16#[derive(Clone)]
17pub struct ProcessWorkObserver {
18    registry: Arc<dyn ProcessRegistry>,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct ProcessWorkSnapshot {
23    pub session_id: String,
24    pub visible_process_ids: Vec<ProcessId>,
25    pub items: Vec<ObservedWorkItem>,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct ObservedWorkItem {
30    pub process: ObservedProcess,
31    pub descriptor: ProcessHandleDescriptor,
32    pub events: Vec<ObservedProcessEvent>,
33    pub kind: String,
34    pub label: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct ObservedProcess {
39    pub process_id: ProcessId,
40    pub graph_key: String,
41    pub kind: String,
42    pub lifecycle: ProcessLifecycleStatus,
43    pub status_label: String,
44    pub terminal: bool,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub error: Option<String>,
47    pub created_at_ms: u64,
48    pub updated_at_ms: u64,
49    pub input: ProcessInput,
50    pub originator: ProcessOriginator,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub env_ref: Option<ProcessExecutionEnvRef>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub wake_target: Option<SessionScope>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub caused_by: Option<crate::CausalRef>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub external_ref: Option<ProcessExternalRef>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub wait: Option<WaitState>,
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub child_session_id: Option<String>,
63    pub label: String,
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ObservedProcessEvent {
68    pub sequence: u64,
69    pub event_type: String,
70    pub occurred_at_ms: u64,
71    pub payload: serde_json::Value,
72}
73
74/// Per-item event tail in session snapshots. Snapshots are polled by
75/// docks/UIs, so per-poll cost must stay bounded instead of growing with a
76/// process's full event history; detail views page through `events_after`
77/// with a cursor.
78pub const SNAPSHOT_EVENT_TAIL: usize = 32;
79
80impl ProcessWorkObserver {
81    pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
82        Self { registry }
83    }
84
85    pub async fn snapshot_for_session(
86        &self,
87        session_id: impl Into<String>,
88    ) -> Result<ProcessWorkSnapshot, PluginError> {
89        let session_id = session_id.into();
90        let session_scope = SessionScope::new(session_id.clone());
91        let entries = self.registry.list_handle_grants(&session_scope).await?;
92        let mut items = Vec::with_capacity(entries.len());
93        for (grant, record) in entries {
94            let events = self
95                .registry
96                .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
97                .await?
98                .into_iter()
99                .map(ObservedProcessEvent::from)
100                .collect();
101            let descriptor = grant.descriptor;
102            let process = ObservedProcess::from_record(record);
103            let kind = descriptor
104                .kind
105                .clone()
106                .unwrap_or_else(|| stable_kind(&process.input).to_string());
107            let label = typed_label(&process.input)
108                .or_else(|| descriptor.label.clone())
109                .unwrap_or_else(|| kind.clone());
110            items.push(ObservedWorkItem {
111                process,
112                descriptor,
113                events,
114                kind,
115                label,
116            });
117        }
118        items.sort_by(|left, right| {
119            right
120                .process
121                .updated_at_ms
122                .cmp(&left.process.updated_at_ms)
123                .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
124                .then_with(|| left.process.process_id.cmp(&right.process.process_id))
125        });
126        let visible_process_ids = items
127            .iter()
128            .map(|item| item.process.process_id.clone())
129            .collect();
130        Ok(ProcessWorkSnapshot {
131            session_id,
132            visible_process_ids,
133            items,
134        })
135    }
136
137    pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
138        self.registry
139            .get_process(process_id)
140            .await
141            .map(ObservedProcess::from_record)
142    }
143
144    pub async fn list(
145        &self,
146        filter: &ProcessListFilter,
147    ) -> Result<Vec<ObservedProcess>, PluginError> {
148        Ok(self
149            .registry
150            .list_processes(filter)
151            .await?
152            .into_iter()
153            .map(ObservedProcess::from_record)
154            .collect())
155    }
156
157    pub async fn events_after(
158        &self,
159        process_id: &str,
160        after_sequence: u64,
161    ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
162        Ok(self
163            .registry
164            .events_after(process_id, after_sequence)
165            .await?
166            .into_iter()
167            .map(ObservedProcessEvent::from)
168            .collect())
169    }
170}
171
172impl ObservedProcess {
173    fn from_record(record: ProcessRecord) -> Self {
174        let lifecycle = ProcessLifecycleStatus::from(&record.status);
175        let input = record.input.as_ref().clone();
176        let kind = stable_kind(&input).to_string();
177        let label = typed_label(&input).unwrap_or_else(|| kind.clone());
178        let process_id = record.id;
179        Self {
180            graph_key: format!("process:{process_id}"),
181            process_id,
182            kind,
183            lifecycle,
184            status_label: lifecycle.label().to_string(),
185            terminal: lifecycle.is_terminal(),
186            error: terminal_error(&record.status),
187            created_at_ms: record.created_at_ms,
188            updated_at_ms: record.updated_at_ms,
189            originator: record.provenance.originator,
190            env_ref: record.env_ref,
191            wake_target: record.wake_target,
192            caused_by: record.provenance.caused_by,
193            external_ref: record.external_ref,
194            wait: record.wait,
195            child_session_id: child_session_id(&input),
196            input,
197            label,
198        }
199    }
200}
201
202impl From<ProcessEvent> for ObservedProcessEvent {
203    fn from(event: ProcessEvent) -> Self {
204        Self {
205            sequence: event.sequence,
206            event_type: event.event_type,
207            occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
208            payload: event.payload,
209        }
210    }
211}
212
213fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
214    match status.await_output()? {
215        ProcessAwaitOutput::Failure { message, .. }
216        | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
217        ProcessAwaitOutput::Success { .. } => None,
218    }
219}
220
221fn child_session_id(input: &ProcessInput) -> Option<String> {
222    match input {
223        ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
224        ProcessInput::ToolCall { .. }
225        | ProcessInput::LashlangProcess { .. }
226        | ProcessInput::External { .. } => None,
227    }
228}
229
230fn typed_label(input: &ProcessInput) -> Option<String> {
231    match input {
232        ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
233        ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
234        ProcessInput::SessionTurn { create_request, .. } => create_request
235            .subagent
236            .as_ref()
237            .map(|subagent| subagent.capability.clone())
238            .or_else(|| create_request.usage_source.clone())
239            .or_else(|| create_request.session_id.clone()),
240        ProcessInput::External { metadata } => metadata
241            .get("label")
242            .or_else(|| metadata.get("name"))
243            .or_else(|| metadata.get("title"))
244            .and_then(serde_json::Value::as_str)
245            .map(str::to_string),
246    }
247}
248
249fn stable_kind(input: &ProcessInput) -> &'static str {
250    match input {
251        ProcessInput::ToolCall { .. } => "tool",
252        ProcessInput::LashlangProcess { .. } => "lashlang",
253        ProcessInput::SessionTurn { .. } => "session_turn",
254        ProcessInput::External { .. } => "external",
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::sync::Arc;
261    use std::time::Duration;
262
263    use serde_json::json;
264
265    use super::*;
266    use crate::{
267        InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
268        ProcessExecutionEnvRef, ProcessProvenance, ProcessRegistration, SessionCreateRequest,
269        SessionScope, SessionStartPoint, SubagentSessionContext, ToolFailureClass,
270        ToolOutputContract, TurnInput, WaitKind,
271    };
272
273    fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
274        ProcessWorkObserver::new(registry)
275    }
276
277    fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
278        ProcessRegistration::new(
279            process_id,
280            ProcessInput::External {
281                metadata: json!({ "label": label }),
282            },
283            ProcessProvenance::host("observer-test-host"),
284        )
285    }
286
287    async fn register_visible(
288        registry: &Arc<dyn ProcessRegistry>,
289        scope: &SessionScope,
290        registration: ProcessRegistration,
291        descriptor: ProcessHandleDescriptor,
292    ) {
293        let process_id = registration.id.clone();
294        registry
295            .register_process(registration)
296            .await
297            .expect("register process");
298        registry
299            .grant_handle(scope, &process_id, descriptor)
300            .await
301            .expect("grant process handle");
302    }
303
304    #[tokio::test]
305    async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
306        let registry =
307            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
308        let visible_scope = SessionScope::new("visible");
309        register_visible(
310            &registry,
311            &visible_scope,
312            external_registration("visible-process", "Visible"),
313            ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
314        )
315        .await;
316        register_visible(
317            &registry,
318            &SessionScope::new("other"),
319            external_registration("hidden-process", "Hidden"),
320            ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
321        )
322        .await;
323        registry
324            .append_event(
325                "visible-process",
326                ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
327                    .with_replay_key("visible-process:cancel-requested"),
328            )
329            .await
330            .expect("append event");
331
332        let snapshot = observer(Arc::clone(&registry))
333            .snapshot_for_session("visible")
334            .await
335            .expect("snapshot");
336
337        assert_eq!(snapshot.session_id, "visible");
338        assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
339        assert_eq!(snapshot.items.len(), 1);
340        assert_eq!(snapshot.items[0].events.len(), 1);
341        assert_eq!(
342            snapshot.items[0].events[0].event_type,
343            "process.cancel_requested"
344        );
345        assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
346    }
347
348    #[tokio::test]
349    async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
350        let registry =
351            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
352        let scope = SessionScope::new("sort");
353        register_visible(
354            &registry,
355            &scope,
356            external_registration("older", "Older"),
357            ProcessHandleDescriptor::new(None::<String>, None::<String>),
358        )
359        .await;
360        tokio::time::sleep(Duration::from_millis(2)).await;
361        register_visible(
362            &registry,
363            &scope,
364            external_registration("newer", "Newer"),
365            ProcessHandleDescriptor::new(None::<String>, None::<String>),
366        )
367        .await;
368        tokio::time::sleep(Duration::from_millis(2)).await;
369        registry
370            .append_event(
371                "older",
372                ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
373                    .with_replay_key("older:cancel-requested"),
374            )
375            .await
376            .expect("update older process");
377
378        let snapshot = observer(Arc::clone(&registry))
379            .snapshot_for_session("sort")
380            .await
381            .expect("snapshot");
382
383        assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
384    }
385
386    #[tokio::test]
387    async fn observed_process_reports_terminal_status_and_error_messages() {
388        let registry =
389            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
390        for process_id in ["failed", "cancelled"] {
391            registry
392                .register_process(external_registration(process_id, process_id))
393                .await
394                .expect("register");
395        }
396        registry
397            .complete_process(
398                "failed",
399                ProcessAwaitOutput::Failure {
400                    class: ToolFailureClass::External,
401                    code: "boom".to_string(),
402                    message: "failed loudly".to_string(),
403                    raw: None,
404                    control: None,
405                },
406            )
407            .await
408            .expect("fail process");
409        registry
410            .complete_process(
411                "cancelled",
412                ProcessAwaitOutput::Cancelled {
413                    message: "cancelled intentionally".to_string(),
414                    raw: None,
415                    control: None,
416                },
417            )
418            .await
419            .expect("cancel process");
420
421        let observer = observer(Arc::clone(&registry));
422        let failed = observer.process("failed").await.expect("failed process");
423        let cancelled = observer
424            .process("cancelled")
425            .await
426            .expect("cancelled process");
427
428        assert_eq!(failed.status_label, "failed");
429        assert!(failed.terminal);
430        assert_eq!(failed.error.as_deref(), Some("failed loudly"));
431        assert_eq!(cancelled.status_label, "cancelled");
432        assert!(cancelled.terminal);
433        assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
434    }
435
436    #[tokio::test]
437    async fn observed_process_exposes_current_wait_state() {
438        let registry =
439            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
440        let scope = SessionScope::new("wait");
441        register_visible(
442            &registry,
443            &scope,
444            external_registration("waiting-process", "Waiting"),
445            ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
446        )
447        .await;
448        let wait = WaitState {
449            since_ms: 1234,
450            kind: WaitKind::Signal {
451                name: "ready".to_string(),
452                event_type: "signal.ready".to_string(),
453                key: "process:waiting-process:signal.ready:1".to_string(),
454                ordinal: 1,
455            },
456        };
457        registry
458            .set_process_wait("waiting-process", wait.clone())
459            .await
460            .expect("set wait");
461
462        let observer = observer(Arc::clone(&registry));
463        let observed = observer
464            .process("waiting-process")
465            .await
466            .expect("waiting process");
467        let snapshot = observer
468            .snapshot_for_session("wait")
469            .await
470            .expect("snapshot");
471
472        assert_eq!(observed.wait, Some(wait.clone()));
473        assert_eq!(snapshot.items.len(), 1);
474        assert_eq!(snapshot.items[0].process.wait, Some(wait));
475    }
476
477    #[tokio::test]
478    async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
479        let registry =
480            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
481        let scope = SessionScope::new("labels");
482        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
483        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
484        let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
485        let mut child_request = SessionCreateRequest::child_session(
486            "labels",
487            SessionStartPoint::Empty,
488            PluginOptions::default(),
489        )
490        .with_session_id("child-session");
491        child_request.subagent = Some(SubagentSessionContext {
492            parent_session_id: "labels".to_string(),
493            capability: "researcher".to_string(),
494            depth: 1,
495            max_depth: 4,
496        });
497        let cases = [
498            (
499                "tool",
500                ProcessInput::ToolCall {
501                    call: PreparedToolCall::from_parts(
502                        "call-1",
503                        "shell.run",
504                        json!({}),
505                        None,
506                        serde_json::Value::Null,
507                    ),
508                },
509                "tool",
510                "shell.run",
511                None,
512            ),
513            (
514                "lashlang",
515                ProcessInput::LashlangProcess {
516                    module_ref,
517                    process_ref,
518                    required_surface_ref: surface_ref,
519                    process_name: "remember".to_string(),
520                    args: serde_json::Map::new(),
521                },
522                "lashlang",
523                "remember",
524                None,
525            ),
526            (
527                "session",
528                ProcessInput::SessionTurn {
529                    create_request: Box::new(child_request),
530                    turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
531                    output_contract: ToolOutputContract::Static,
532                },
533                "session_turn",
534                "researcher",
535                Some("child-session"),
536            ),
537            (
538                "external",
539                ProcessInput::External {
540                    metadata: json!({ "label": "external job" }),
541                },
542                "external",
543                "external job",
544                None,
545            ),
546        ];
547        for (process_id, input, _kind, _label, _child_session_id) in cases {
548            let needs_env = matches!(
549                input,
550                ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. }
551            );
552            let mut registration = ProcessRegistration::new(
553                process_id,
554                input,
555                ProcessProvenance::host("observer-test-host"),
556            );
557            if needs_env {
558                registration = registration.with_execution_env_ref(Some(
559                    ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
560                ));
561            }
562            register_visible(
563                &registry,
564                &scope,
565                registration,
566                ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
567            )
568            .await;
569        }
570
571        let snapshot = observer(Arc::clone(&registry))
572            .snapshot_for_session("labels")
573            .await
574            .expect("snapshot");
575        let by_id = snapshot
576            .items
577            .iter()
578            .map(|item| (item.process.process_id.as_str(), item))
579            .collect::<std::collections::BTreeMap<_, _>>();
580
581        assert_eq!(by_id["tool"].label, "shell.run");
582        assert_eq!(by_id["lashlang"].label, "remember");
583        assert_eq!(by_id["session"].label, "researcher");
584        assert_eq!(
585            by_id["session"].process.child_session_id.as_deref(),
586            Some("child-session")
587        );
588        assert_eq!(by_id["external"].label, "external job");
589    }
590
591    #[tokio::test]
592    async fn observed_process_missing_lookup_returns_none() {
593        let registry =
594            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
595
596        assert!(observer(registry).process("missing").await.is_none());
597    }
598}