Skip to main content

lash_core/runtime/process/
observation.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9    ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput, ProcessLifecycleStatus,
10    ProcessRecord, ProcessScope,
11};
12use super::registry::ProcessRegistry;
13use super::time::epoch_ms_from_system_time;
14
15#[derive(Clone)]
16pub struct ProcessWorkObserver {
17    registry: Arc<dyn ProcessRegistry>,
18}
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct ProcessWorkSnapshot {
22    pub session_id: String,
23    pub visible_process_ids: Vec<ProcessId>,
24    pub items: Vec<ObservedWorkItem>,
25}
26
27#[derive(Clone, Debug, Serialize, Deserialize)]
28pub struct ObservedWorkItem {
29    pub process: ObservedProcess,
30    pub descriptor: ProcessHandleDescriptor,
31    pub events: Vec<ObservedProcessEvent>,
32    pub kind: String,
33    pub label: String,
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct ObservedProcess {
38    pub process_id: ProcessId,
39    pub graph_key: String,
40    pub lifecycle: ProcessLifecycleStatus,
41    pub status_label: String,
42    pub terminal: bool,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub error: Option<String>,
45    pub created_at_ms: u64,
46    pub updated_at_ms: u64,
47    pub input: ProcessInput,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub external_ref: Option<ProcessExternalRef>,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub child_session_id: Option<String>,
52    pub label: String,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
56pub struct ObservedProcessEvent {
57    pub sequence: u64,
58    pub event_type: String,
59    pub occurred_at_ms: u64,
60    pub payload: serde_json::Value,
61}
62
63impl ProcessWorkObserver {
64    pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
65        Self { registry }
66    }
67
68    pub async fn snapshot_for_session(
69        &self,
70        session_id: impl Into<String>,
71    ) -> Result<ProcessWorkSnapshot, PluginError> {
72        let session_id = session_id.into();
73        let owner_scope = ProcessScope::new(session_id.clone());
74        let entries = self.registry.list_handle_grants(&owner_scope).await?;
75        let mut items = Vec::with_capacity(entries.len());
76        for (grant, record) in entries {
77            let events = self
78                .registry
79                .events_after(&record.id, 0)
80                .await?
81                .into_iter()
82                .map(ObservedProcessEvent::from)
83                .collect();
84            let descriptor = grant.descriptor;
85            let process = ObservedProcess::from_record(record);
86            let kind = descriptor
87                .kind
88                .clone()
89                .unwrap_or_else(|| stable_kind(&process.input).to_string());
90            let label = typed_label(&process.input)
91                .or_else(|| descriptor.label.clone())
92                .unwrap_or_else(|| kind.clone());
93            items.push(ObservedWorkItem {
94                process,
95                descriptor,
96                events,
97                kind,
98                label,
99            });
100        }
101        items.sort_by(|left, right| {
102            right
103                .process
104                .updated_at_ms
105                .cmp(&left.process.updated_at_ms)
106                .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
107                .then_with(|| left.process.process_id.cmp(&right.process.process_id))
108        });
109        let visible_process_ids = items
110            .iter()
111            .map(|item| item.process.process_id.clone())
112            .collect();
113        Ok(ProcessWorkSnapshot {
114            session_id,
115            visible_process_ids,
116            items,
117        })
118    }
119
120    pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
121        self.registry
122            .get_process(process_id)
123            .await
124            .map(ObservedProcess::from_record)
125    }
126}
127
128impl ObservedProcess {
129    fn from_record(record: ProcessRecord) -> Self {
130        let lifecycle = ProcessLifecycleStatus::from(&record.status);
131        let input = record.input.as_ref().clone();
132        let label = typed_label(&input).unwrap_or_else(|| stable_kind(&input).to_string());
133        let process_id = record.id;
134        Self {
135            graph_key: format!("process:{process_id}"),
136            process_id,
137            lifecycle,
138            status_label: lifecycle.label().to_string(),
139            terminal: lifecycle.is_terminal(),
140            error: terminal_error(&record.status),
141            created_at_ms: record.created_at_ms,
142            updated_at_ms: record.updated_at_ms,
143            external_ref: record.external_ref,
144            child_session_id: child_session_id(&input),
145            input,
146            label,
147        }
148    }
149}
150
151impl From<ProcessEvent> for ObservedProcessEvent {
152    fn from(event: ProcessEvent) -> Self {
153        Self {
154            sequence: event.sequence,
155            event_type: event.event_type,
156            occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
157            payload: event.payload,
158        }
159    }
160}
161
162fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
163    match status.await_output()? {
164        ProcessAwaitOutput::Failure { message, .. }
165        | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
166        ProcessAwaitOutput::Success { .. } => None,
167    }
168}
169
170fn child_session_id(input: &ProcessInput) -> Option<String> {
171    match input {
172        ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
173        ProcessInput::ToolCall { .. }
174        | ProcessInput::LashlangProcess { .. }
175        | ProcessInput::External { .. } => None,
176    }
177}
178
179fn typed_label(input: &ProcessInput) -> Option<String> {
180    match input {
181        ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
182        ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
183        ProcessInput::SessionTurn { create_request, .. } => create_request
184            .subagent
185            .as_ref()
186            .map(|subagent| subagent.capability.clone())
187            .or_else(|| create_request.usage_source.clone())
188            .or_else(|| create_request.session_id.clone()),
189        ProcessInput::External { metadata } => metadata
190            .get("label")
191            .or_else(|| metadata.get("name"))
192            .or_else(|| metadata.get("title"))
193            .and_then(serde_json::Value::as_str)
194            .map(str::to_string),
195    }
196}
197
198fn stable_kind(input: &ProcessInput) -> &'static str {
199    match input {
200        ProcessInput::ToolCall { .. } => "tool",
201        ProcessInput::LashlangProcess { .. } => "lashlang",
202        ProcessInput::SessionTurn { .. } => "session_turn",
203        ProcessInput::External { .. } => "external",
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use std::sync::Arc;
210    use std::time::Duration;
211
212    use serde_json::json;
213
214    use super::*;
215    use crate::{
216        InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest, ProcessRegistration,
217        ProcessScope, SessionCreateRequest, SessionStartPoint, SubagentSessionContext,
218        ToolFailureClass, ToolOutputContract, TurnInput,
219    };
220
221    fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
222        ProcessWorkObserver::new(registry)
223    }
224
225    fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
226        ProcessRegistration::new(
227            process_id,
228            ProcessInput::External {
229                metadata: json!({ "label": label }),
230            },
231        )
232    }
233
234    async fn register_visible(
235        registry: &Arc<dyn ProcessRegistry>,
236        scope: &ProcessScope,
237        registration: ProcessRegistration,
238        descriptor: ProcessHandleDescriptor,
239    ) {
240        let process_id = registration.id.clone();
241        registry
242            .register_process(registration)
243            .await
244            .expect("register process");
245        registry
246            .grant_handle(scope, &process_id, descriptor)
247            .await
248            .expect("grant process handle");
249    }
250
251    #[tokio::test]
252    async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
253        let registry =
254            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
255        let visible_scope = ProcessScope::new("visible");
256        register_visible(
257            &registry,
258            &visible_scope,
259            external_registration("visible-process", "Visible"),
260            ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
261        )
262        .await;
263        register_visible(
264            &registry,
265            &ProcessScope::new("other"),
266            external_registration("hidden-process", "Hidden"),
267            ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
268        )
269        .await;
270        registry
271            .append_event(
272                "visible-process",
273                ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
274                    .with_replay_key("visible-process:cancel-requested"),
275            )
276            .await
277            .expect("append event");
278
279        let snapshot = observer(Arc::clone(&registry))
280            .snapshot_for_session("visible")
281            .await
282            .expect("snapshot");
283
284        assert_eq!(snapshot.session_id, "visible");
285        assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
286        assert_eq!(snapshot.items.len(), 1);
287        assert_eq!(snapshot.items[0].events.len(), 1);
288        assert_eq!(
289            snapshot.items[0].events[0].event_type,
290            "process.cancel_requested"
291        );
292        assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
293    }
294
295    #[tokio::test]
296    async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
297        let registry =
298            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
299        let scope = ProcessScope::new("sort");
300        register_visible(
301            &registry,
302            &scope,
303            external_registration("older", "Older"),
304            ProcessHandleDescriptor::new(None::<String>, None::<String>),
305        )
306        .await;
307        tokio::time::sleep(Duration::from_millis(2)).await;
308        register_visible(
309            &registry,
310            &scope,
311            external_registration("newer", "Newer"),
312            ProcessHandleDescriptor::new(None::<String>, None::<String>),
313        )
314        .await;
315        tokio::time::sleep(Duration::from_millis(2)).await;
316        registry
317            .append_event(
318                "older",
319                ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
320                    .with_replay_key("older:cancel-requested"),
321            )
322            .await
323            .expect("update older process");
324
325        let snapshot = observer(Arc::clone(&registry))
326            .snapshot_for_session("sort")
327            .await
328            .expect("snapshot");
329
330        assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
331    }
332
333    #[tokio::test]
334    async fn observed_process_reports_terminal_status_and_error_messages() {
335        let registry =
336            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
337        for process_id in ["failed", "cancelled"] {
338            registry
339                .register_process(external_registration(process_id, process_id))
340                .await
341                .expect("register");
342        }
343        registry
344            .complete_process(
345                "failed",
346                ProcessAwaitOutput::Failure {
347                    class: ToolFailureClass::External,
348                    code: "boom".to_string(),
349                    message: "failed loudly".to_string(),
350                    raw: None,
351                    control: None,
352                },
353            )
354            .await
355            .expect("fail process");
356        registry
357            .complete_process(
358                "cancelled",
359                ProcessAwaitOutput::Cancelled {
360                    message: "cancelled intentionally".to_string(),
361                    raw: None,
362                    control: None,
363                },
364            )
365            .await
366            .expect("cancel process");
367
368        let observer = observer(Arc::clone(&registry));
369        let failed = observer.process("failed").await.expect("failed process");
370        let cancelled = observer
371            .process("cancelled")
372            .await
373            .expect("cancelled process");
374
375        assert_eq!(failed.status_label, "failed");
376        assert!(failed.terminal);
377        assert_eq!(failed.error.as_deref(), Some("failed loudly"));
378        assert_eq!(cancelled.status_label, "cancelled");
379        assert!(cancelled.terminal);
380        assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
381    }
382
383    #[tokio::test]
384    async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
385        let registry =
386            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
387        let scope = ProcessScope::new("labels");
388        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
389        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
390        let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
391        let mut child_request = SessionCreateRequest::child_session(
392            "labels",
393            SessionStartPoint::Empty,
394            PluginOptions::default(),
395        )
396        .with_session_id("child-session");
397        child_request.subagent = Some(SubagentSessionContext {
398            parent_session_id: "labels".to_string(),
399            capability: "researcher".to_string(),
400            depth: 1,
401            max_depth: 4,
402        });
403        let cases = [
404            (
405                "tool",
406                ProcessInput::ToolCall {
407                    call: PreparedToolCall::from_parts(
408                        "call-1",
409                        "shell.run",
410                        json!({}),
411                        None,
412                        serde_json::Value::Null,
413                    ),
414                },
415                "tool",
416                "shell.run",
417                None,
418            ),
419            (
420                "lashlang",
421                ProcessInput::LashlangProcess {
422                    module_ref,
423                    process_ref,
424                    required_surface_ref: surface_ref,
425                    process_name: "remember".to_string(),
426                    args: serde_json::Map::new(),
427                },
428                "lashlang",
429                "remember",
430                None,
431            ),
432            (
433                "session",
434                ProcessInput::SessionTurn {
435                    create_request: Box::new(child_request),
436                    turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
437                    output_contract: ToolOutputContract::Static,
438                },
439                "session_turn",
440                "researcher",
441                Some("child-session"),
442            ),
443            (
444                "external",
445                ProcessInput::External {
446                    metadata: json!({ "label": "external job" }),
447                },
448                "external",
449                "external job",
450                None,
451            ),
452        ];
453        for (process_id, input, _kind, _label, _child_session_id) in cases {
454            register_visible(
455                &registry,
456                &scope,
457                ProcessRegistration::new(process_id, input),
458                ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
459            )
460            .await;
461        }
462
463        let snapshot = observer(Arc::clone(&registry))
464            .snapshot_for_session("labels")
465            .await
466            .expect("snapshot");
467        let by_id = snapshot
468            .items
469            .iter()
470            .map(|item| (item.process.process_id.as_str(), item))
471            .collect::<std::collections::BTreeMap<_, _>>();
472
473        assert_eq!(by_id["tool"].label, "shell.run");
474        assert_eq!(by_id["lashlang"].label, "remember");
475        assert_eq!(by_id["session"].label, "researcher");
476        assert_eq!(
477            by_id["session"].process.child_session_id.as_deref(),
478            Some("child-session")
479        );
480        assert_eq!(by_id["external"].label, "external job");
481    }
482
483    #[tokio::test]
484    async fn observed_process_missing_lookup_returns_none() {
485        let registry =
486            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
487
488        assert!(observer(registry).process("missing").await.is_none());
489    }
490}