Skip to main content

lash_core/runtime/process/
observation.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9    ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput,
10    ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator, ProcessRecord, SessionScope,
11    WaitState,
12};
13use super::registry::ProcessRegistry;
14use super::time::epoch_ms_from_system_time;
15
16#[derive(Clone)]
17pub struct ProcessWorkObserver {
18    registry: Arc<dyn ProcessRegistry>,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct ProcessWorkSnapshot {
23    pub session_id: String,
24    pub visible_process_ids: Vec<ProcessId>,
25    pub items: Vec<ObservedWorkItem>,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct ObservedWorkItem {
30    pub process: ObservedProcess,
31    pub descriptor: ProcessHandleDescriptor,
32    pub events: Vec<ObservedProcessEvent>,
33    pub kind: String,
34    pub label: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct ObservedProcess {
39    pub process_id: ProcessId,
40    pub graph_key: String,
41    pub kind: String,
42    pub lifecycle: ProcessLifecycleStatus,
43    pub status_label: String,
44    pub terminal: bool,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub error: Option<String>,
47    pub created_at_ms: u64,
48    pub updated_at_ms: u64,
49    pub input: ProcessInput,
50    pub originator: ProcessOriginator,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub env_ref: Option<ProcessExecutionEnvRef>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub wake_target: Option<SessionScope>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub caused_by: Option<crate::CausalRef>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub external_ref: Option<ProcessExternalRef>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub wait: Option<WaitState>,
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub child_session_id: Option<String>,
63    pub label: String,
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ObservedProcessEvent {
68    pub sequence: u64,
69    pub event_type: String,
70    pub occurred_at_ms: u64,
71    pub payload: serde_json::Value,
72}
73
74impl ProcessWorkObserver {
75    pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
76        Self { registry }
77    }
78
79    pub async fn snapshot_for_session(
80        &self,
81        session_id: impl Into<String>,
82    ) -> Result<ProcessWorkSnapshot, PluginError> {
83        let session_id = session_id.into();
84        let session_scope = SessionScope::new(session_id.clone());
85        let entries = self.registry.list_handle_grants(&session_scope).await?;
86        let mut items = Vec::with_capacity(entries.len());
87        for (grant, record) in entries {
88            let events = self
89                .registry
90                .events_after(&record.id, 0)
91                .await?
92                .into_iter()
93                .map(ObservedProcessEvent::from)
94                .collect();
95            let descriptor = grant.descriptor;
96            let process = ObservedProcess::from_record(record);
97            let kind = descriptor
98                .kind
99                .clone()
100                .unwrap_or_else(|| stable_kind(&process.input).to_string());
101            let label = typed_label(&process.input)
102                .or_else(|| descriptor.label.clone())
103                .unwrap_or_else(|| kind.clone());
104            items.push(ObservedWorkItem {
105                process,
106                descriptor,
107                events,
108                kind,
109                label,
110            });
111        }
112        items.sort_by(|left, right| {
113            right
114                .process
115                .updated_at_ms
116                .cmp(&left.process.updated_at_ms)
117                .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
118                .then_with(|| left.process.process_id.cmp(&right.process.process_id))
119        });
120        let visible_process_ids = items
121            .iter()
122            .map(|item| item.process.process_id.clone())
123            .collect();
124        Ok(ProcessWorkSnapshot {
125            session_id,
126            visible_process_ids,
127            items,
128        })
129    }
130
131    pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
132        self.registry
133            .get_process(process_id)
134            .await
135            .map(ObservedProcess::from_record)
136    }
137
138    pub async fn list(
139        &self,
140        filter: &ProcessListFilter,
141    ) -> Result<Vec<ObservedProcess>, PluginError> {
142        Ok(self
143            .registry
144            .list_processes(filter)
145            .await?
146            .into_iter()
147            .map(ObservedProcess::from_record)
148            .collect())
149    }
150
151    pub async fn events_after(
152        &self,
153        process_id: &str,
154        after_sequence: u64,
155    ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
156        Ok(self
157            .registry
158            .events_after(process_id, after_sequence)
159            .await?
160            .into_iter()
161            .map(ObservedProcessEvent::from)
162            .collect())
163    }
164}
165
166impl ObservedProcess {
167    fn from_record(record: ProcessRecord) -> Self {
168        let lifecycle = ProcessLifecycleStatus::from(&record.status);
169        let input = record.input.as_ref().clone();
170        let kind = stable_kind(&input).to_string();
171        let label = typed_label(&input).unwrap_or_else(|| kind.clone());
172        let process_id = record.id;
173        Self {
174            graph_key: format!("process:{process_id}"),
175            process_id,
176            kind,
177            lifecycle,
178            status_label: lifecycle.label().to_string(),
179            terminal: lifecycle.is_terminal(),
180            error: terminal_error(&record.status),
181            created_at_ms: record.created_at_ms,
182            updated_at_ms: record.updated_at_ms,
183            originator: record.provenance.originator,
184            env_ref: record.env_ref,
185            wake_target: record.wake_target,
186            caused_by: record.provenance.caused_by,
187            external_ref: record.external_ref,
188            wait: record.wait,
189            child_session_id: child_session_id(&input),
190            input,
191            label,
192        }
193    }
194}
195
196impl From<ProcessEvent> for ObservedProcessEvent {
197    fn from(event: ProcessEvent) -> Self {
198        Self {
199            sequence: event.sequence,
200            event_type: event.event_type,
201            occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
202            payload: event.payload,
203        }
204    }
205}
206
207fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
208    match status.await_output()? {
209        ProcessAwaitOutput::Failure { message, .. }
210        | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
211        ProcessAwaitOutput::Success { .. } => None,
212    }
213}
214
215fn child_session_id(input: &ProcessInput) -> Option<String> {
216    match input {
217        ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
218        ProcessInput::ToolCall { .. }
219        | ProcessInput::LashlangProcess { .. }
220        | ProcessInput::External { .. } => None,
221    }
222}
223
224fn typed_label(input: &ProcessInput) -> Option<String> {
225    match input {
226        ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
227        ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
228        ProcessInput::SessionTurn { create_request, .. } => create_request
229            .subagent
230            .as_ref()
231            .map(|subagent| subagent.capability.clone())
232            .or_else(|| create_request.usage_source.clone())
233            .or_else(|| create_request.session_id.clone()),
234        ProcessInput::External { metadata } => metadata
235            .get("label")
236            .or_else(|| metadata.get("name"))
237            .or_else(|| metadata.get("title"))
238            .and_then(serde_json::Value::as_str)
239            .map(str::to_string),
240    }
241}
242
243fn stable_kind(input: &ProcessInput) -> &'static str {
244    match input {
245        ProcessInput::ToolCall { .. } => "tool",
246        ProcessInput::LashlangProcess { .. } => "lashlang",
247        ProcessInput::SessionTurn { .. } => "session_turn",
248        ProcessInput::External { .. } => "external",
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use std::sync::Arc;
255    use std::time::Duration;
256
257    use serde_json::json;
258
259    use super::*;
260    use crate::{
261        InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
262        ProcessExecutionEnvRef, ProcessProvenance, ProcessRegistration, SessionCreateRequest,
263        SessionScope, SessionStartPoint, SubagentSessionContext, ToolFailureClass,
264        ToolOutputContract, TurnInput, WaitKind,
265    };
266
267    fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
268        ProcessWorkObserver::new(registry)
269    }
270
271    fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
272        ProcessRegistration::new(
273            process_id,
274            ProcessInput::External {
275                metadata: json!({ "label": label }),
276            },
277            ProcessProvenance::host("observer-test-host"),
278        )
279    }
280
281    async fn register_visible(
282        registry: &Arc<dyn ProcessRegistry>,
283        scope: &SessionScope,
284        registration: ProcessRegistration,
285        descriptor: ProcessHandleDescriptor,
286    ) {
287        let process_id = registration.id.clone();
288        registry
289            .register_process(registration)
290            .await
291            .expect("register process");
292        registry
293            .grant_handle(scope, &process_id, descriptor)
294            .await
295            .expect("grant process handle");
296    }
297
298    #[tokio::test]
299    async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
300        let registry =
301            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
302        let visible_scope = SessionScope::new("visible");
303        register_visible(
304            &registry,
305            &visible_scope,
306            external_registration("visible-process", "Visible"),
307            ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
308        )
309        .await;
310        register_visible(
311            &registry,
312            &SessionScope::new("other"),
313            external_registration("hidden-process", "Hidden"),
314            ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
315        )
316        .await;
317        registry
318            .append_event(
319                "visible-process",
320                ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
321                    .with_replay_key("visible-process:cancel-requested"),
322            )
323            .await
324            .expect("append event");
325
326        let snapshot = observer(Arc::clone(&registry))
327            .snapshot_for_session("visible")
328            .await
329            .expect("snapshot");
330
331        assert_eq!(snapshot.session_id, "visible");
332        assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
333        assert_eq!(snapshot.items.len(), 1);
334        assert_eq!(snapshot.items[0].events.len(), 1);
335        assert_eq!(
336            snapshot.items[0].events[0].event_type,
337            "process.cancel_requested"
338        );
339        assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
340    }
341
342    #[tokio::test]
343    async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
344        let registry =
345            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
346        let scope = SessionScope::new("sort");
347        register_visible(
348            &registry,
349            &scope,
350            external_registration("older", "Older"),
351            ProcessHandleDescriptor::new(None::<String>, None::<String>),
352        )
353        .await;
354        tokio::time::sleep(Duration::from_millis(2)).await;
355        register_visible(
356            &registry,
357            &scope,
358            external_registration("newer", "Newer"),
359            ProcessHandleDescriptor::new(None::<String>, None::<String>),
360        )
361        .await;
362        tokio::time::sleep(Duration::from_millis(2)).await;
363        registry
364            .append_event(
365                "older",
366                ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
367                    .with_replay_key("older:cancel-requested"),
368            )
369            .await
370            .expect("update older process");
371
372        let snapshot = observer(Arc::clone(&registry))
373            .snapshot_for_session("sort")
374            .await
375            .expect("snapshot");
376
377        assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
378    }
379
380    #[tokio::test]
381    async fn observed_process_reports_terminal_status_and_error_messages() {
382        let registry =
383            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
384        for process_id in ["failed", "cancelled"] {
385            registry
386                .register_process(external_registration(process_id, process_id))
387                .await
388                .expect("register");
389        }
390        registry
391            .complete_process(
392                "failed",
393                ProcessAwaitOutput::Failure {
394                    class: ToolFailureClass::External,
395                    code: "boom".to_string(),
396                    message: "failed loudly".to_string(),
397                    raw: None,
398                    control: None,
399                },
400            )
401            .await
402            .expect("fail process");
403        registry
404            .complete_process(
405                "cancelled",
406                ProcessAwaitOutput::Cancelled {
407                    message: "cancelled intentionally".to_string(),
408                    raw: None,
409                    control: None,
410                },
411            )
412            .await
413            .expect("cancel process");
414
415        let observer = observer(Arc::clone(&registry));
416        let failed = observer.process("failed").await.expect("failed process");
417        let cancelled = observer
418            .process("cancelled")
419            .await
420            .expect("cancelled process");
421
422        assert_eq!(failed.status_label, "failed");
423        assert!(failed.terminal);
424        assert_eq!(failed.error.as_deref(), Some("failed loudly"));
425        assert_eq!(cancelled.status_label, "cancelled");
426        assert!(cancelled.terminal);
427        assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
428    }
429
430    #[tokio::test]
431    async fn observed_process_exposes_current_wait_state() {
432        let registry =
433            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
434        let scope = SessionScope::new("wait");
435        register_visible(
436            &registry,
437            &scope,
438            external_registration("waiting-process", "Waiting"),
439            ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
440        )
441        .await;
442        let wait = WaitState {
443            since_ms: 1234,
444            kind: WaitKind::Signal {
445                name: "ready".to_string(),
446                event_type: "signal.ready".to_string(),
447                key: "process:waiting-process:signal.ready:1".to_string(),
448                ordinal: 1,
449            },
450        };
451        registry
452            .set_process_wait("waiting-process", wait.clone())
453            .await
454            .expect("set wait");
455
456        let observer = observer(Arc::clone(&registry));
457        let observed = observer
458            .process("waiting-process")
459            .await
460            .expect("waiting process");
461        let snapshot = observer
462            .snapshot_for_session("wait")
463            .await
464            .expect("snapshot");
465
466        assert_eq!(observed.wait, Some(wait.clone()));
467        assert_eq!(snapshot.items.len(), 1);
468        assert_eq!(snapshot.items[0].process.wait, Some(wait));
469    }
470
471    #[tokio::test]
472    async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
473        let registry =
474            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
475        let scope = SessionScope::new("labels");
476        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
477        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
478        let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
479        let mut child_request = SessionCreateRequest::child_session(
480            "labels",
481            SessionStartPoint::Empty,
482            PluginOptions::default(),
483        )
484        .with_session_id("child-session");
485        child_request.subagent = Some(SubagentSessionContext {
486            parent_session_id: "labels".to_string(),
487            capability: "researcher".to_string(),
488            depth: 1,
489            max_depth: 4,
490        });
491        let cases = [
492            (
493                "tool",
494                ProcessInput::ToolCall {
495                    call: PreparedToolCall::from_parts(
496                        "call-1",
497                        "shell.run",
498                        json!({}),
499                        None,
500                        serde_json::Value::Null,
501                    ),
502                },
503                "tool",
504                "shell.run",
505                None,
506            ),
507            (
508                "lashlang",
509                ProcessInput::LashlangProcess {
510                    module_ref,
511                    process_ref,
512                    required_surface_ref: surface_ref,
513                    process_name: "remember".to_string(),
514                    args: serde_json::Map::new(),
515                },
516                "lashlang",
517                "remember",
518                None,
519            ),
520            (
521                "session",
522                ProcessInput::SessionTurn {
523                    create_request: Box::new(child_request),
524                    turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
525                    output_contract: ToolOutputContract::Static,
526                },
527                "session_turn",
528                "researcher",
529                Some("child-session"),
530            ),
531            (
532                "external",
533                ProcessInput::External {
534                    metadata: json!({ "label": "external job" }),
535                },
536                "external",
537                "external job",
538                None,
539            ),
540        ];
541        for (process_id, input, _kind, _label, _child_session_id) in cases {
542            let needs_env = matches!(
543                input,
544                ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. }
545            );
546            let mut registration = ProcessRegistration::new(
547                process_id,
548                input,
549                ProcessProvenance::host("observer-test-host"),
550            );
551            if needs_env {
552                registration = registration.with_execution_env_ref(Some(
553                    ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
554                ));
555            }
556            register_visible(
557                &registry,
558                &scope,
559                registration,
560                ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
561            )
562            .await;
563        }
564
565        let snapshot = observer(Arc::clone(&registry))
566            .snapshot_for_session("labels")
567            .await
568            .expect("snapshot");
569        let by_id = snapshot
570            .items
571            .iter()
572            .map(|item| (item.process.process_id.as_str(), item))
573            .collect::<std::collections::BTreeMap<_, _>>();
574
575        assert_eq!(by_id["tool"].label, "shell.run");
576        assert_eq!(by_id["lashlang"].label, "remember");
577        assert_eq!(by_id["session"].label, "researcher");
578        assert_eq!(
579            by_id["session"].process.child_session_id.as_deref(),
580            Some("child-session")
581        );
582        assert_eq!(by_id["external"].label, "external job");
583    }
584
585    #[tokio::test]
586    async fn observed_process_missing_lookup_returns_none() {
587        let registry =
588            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
589
590        assert!(observer(registry).process("missing").await.is_none());
591    }
592}