Skip to main content

lash_core/runtime/process/
observation.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8use super::events::{ProcessAwaitOutput, ProcessEvent};
9use super::model::{
10    AbandonRequest, ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId,
11    ProcessIdentity, ProcessInput, ProcessLease, ProcessLifecycleStatus, ProcessListFilter,
12    ProcessOriginator, ProcessRecord, ProcessStarted, ProcessStatusFilter, RecoveryDisposition,
13    SessionScope, WaitState,
14};
15use super::registry::ProcessRegistry;
16use super::time::epoch_ms_from_system_time;
17
18#[derive(Clone)]
19pub struct ProcessWorkObserver {
20    registry: Arc<dyn ProcessRegistry>,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct ProcessWorkSnapshot {
25    pub session_id: String,
26    pub visible_process_ids: Vec<ProcessId>,
27    pub items: Vec<ObservedWorkItem>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct ObservedWorkItem {
32    pub process: ObservedProcess,
33    pub descriptor: ProcessHandleDescriptor,
34    pub events: Vec<ObservedProcessEvent>,
35    pub kind: String,
36    pub label: String,
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ObservedProcess {
41    pub process_id: ProcessId,
42    pub graph_key: String,
43    pub kind: String,
44    pub lifecycle: ProcessLifecycleStatus,
45    pub identity: ProcessIdentity,
46    pub status_label: String,
47    pub terminal: bool,
48    /// Declared recovery contract (ADR 0019). Raw fact; hosts classify.
49    pub disposition: RecoveryDisposition,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub error: Option<String>,
52    pub created_at_ms: u64,
53    pub updated_at_ms: u64,
54    /// Durable execution-started fact, if the row has begun executing.
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub first_started: Option<ProcessStarted>,
57    /// Current lease holder identity, if the row is leased (ADR 0019). Raw
58    /// fact for host-side staleness classification — no derived "stuck" verdict.
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub lease_holder: Option<crate::LeaseOwnerIdentity>,
61    /// Current lease expiry, paired with `lease_holder`.
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub lease_expires_at_ms: Option<u64>,
64    /// Pending Abandon Request the sweep reconciles once the lease lapses.
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub abandon_request: Option<AbandonRequest>,
67    pub input: ProcessInput,
68    pub originator: ProcessOriginator,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub env_ref: Option<ProcessExecutionEnvRef>,
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub wake_target: Option<SessionScope>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub caused_by: Option<crate::CausalRef>,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub external_ref: Option<ProcessExternalRef>,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub wait: Option<WaitState>,
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub child_session_id: Option<String>,
81    pub label: String,
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize)]
85pub struct ObservedProcessEvent {
86    pub sequence: u64,
87    pub event_type: String,
88    pub occurred_at_ms: u64,
89    pub payload: serde_json::Value,
90}
91
92/// Per-item event tail in session snapshots. Snapshots are polled by
93/// docks/UIs, so per-poll cost must stay bounded instead of growing with a
94/// process's full event history; detail views page through `events_after`
95/// with a cursor.
96pub const SNAPSHOT_EVENT_TAIL: usize = 32;
97
98impl ProcessWorkObserver {
99    pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
100        Self { registry }
101    }
102
103    pub async fn snapshot_for_session(
104        &self,
105        session_id: impl Into<String>,
106    ) -> Result<ProcessWorkSnapshot, PluginError> {
107        let session_id = session_id.into();
108        let session_scope = SessionScope::new(session_id.clone());
109        let entries = self.registry.list_handle_grants(&session_scope).await?;
110        let mut items = Vec::new();
111        let mut seen_process_ids = BTreeSet::new();
112        for (grant, record) in entries {
113            seen_process_ids.insert(record.id.clone());
114            items.push(self.work_item_from_record(record, grant.descriptor).await?);
115        }
116        let visible_records = self
117            .registry
118            .list_processes(&ProcessListFilter {
119                status: ProcessStatusFilter::Any,
120                ..ProcessListFilter::default()
121            })
122            .await?;
123        for record in visible_records {
124            if seen_process_ids.contains(&record.id)
125                || !process_visible_to_session(&record, &session_id)
126            {
127                continue;
128            }
129            seen_process_ids.insert(record.id.clone());
130            let descriptor = descriptor_from_process_identity(&record.identity);
131            items.push(self.work_item_from_record(record, descriptor).await?);
132        }
133        items.sort_by(|left, right| {
134            right
135                .process
136                .updated_at_ms
137                .cmp(&left.process.updated_at_ms)
138                .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
139                .then_with(|| left.process.process_id.cmp(&right.process.process_id))
140        });
141        let visible_process_ids = items
142            .iter()
143            .map(|item| item.process.process_id.clone())
144            .collect();
145        Ok(ProcessWorkSnapshot {
146            session_id,
147            visible_process_ids,
148            items,
149        })
150    }
151
152    async fn work_item_from_record(
153        &self,
154        record: ProcessRecord,
155        descriptor: ProcessHandleDescriptor,
156    ) -> Result<ObservedWorkItem, PluginError> {
157        let events = self
158            .registry
159            .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
160            .await?
161            .into_iter()
162            .map(ObservedProcessEvent::from)
163            .collect();
164        let lease = self.registry.get_process_lease(&record.id).await?;
165        let process = ObservedProcess::from_record(record, lease);
166        let kind = process.identity.kind.clone();
167        let label = process
168            .identity
169            .label
170            .clone()
171            .or_else(|| descriptor.label.clone())
172            .unwrap_or_else(|| kind.clone());
173        Ok(ObservedWorkItem {
174            process,
175            descriptor,
176            events,
177            kind,
178            label,
179        })
180    }
181
182    pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
183        let record = self.registry.get_process(process_id).await?;
184        let lease = self
185            .registry
186            .get_process_lease(process_id)
187            .await
188            .ok()
189            .flatten();
190        Some(ObservedProcess::from_record(record, lease))
191    }
192
193    pub async fn list(
194        &self,
195        filter: &ProcessListFilter,
196    ) -> Result<Vec<ObservedProcess>, PluginError> {
197        let records = self.registry.list_processes(filter).await?;
198        self.observe_records(records).await
199    }
200
201    /// List processes a session may address — the grant filter (ADR 0019 /
202    /// process design grill). "Granted to" is the security lens: a process is
203    /// visible here only if `scope` holds a handle grant for it. This is the
204    /// single home for the grant-scoped view; the session facade sugar is a thin
205    /// caller of this method, never a parallel implementation.
206    pub async fn list_granted_to(
207        &self,
208        scope: &SessionScope,
209        filter: &ProcessListFilter,
210    ) -> Result<Vec<ObservedProcess>, PluginError> {
211        let entries = self.registry.list_handle_grants(scope).await?;
212        let records = entries
213            .into_iter()
214            .map(|(_, record)| record)
215            .filter(|record| filter.matches_record(record))
216            .collect::<Vec<_>>();
217        self.observe_records(records).await
218    }
219
220    /// List processes a session originated — the provenance filter (ADR 0019 /
221    /// process design grill). "Originated by" is the lineage lens, distinct from
222    /// the grant lens: a process matches when its recorded originator is a
223    /// session whose id equals `scope.session_id` (and its agent frame, when
224    /// `scope` names one), regardless of who currently holds a grant.
225    pub async fn list_originated_by(
226        &self,
227        scope: &SessionScope,
228        filter: &ProcessListFilter,
229    ) -> Result<Vec<ObservedProcess>, PluginError> {
230        let records = self
231            .registry
232            .list_processes(filter)
233            .await?
234            .into_iter()
235            .filter(|record| originator_matches(&record.provenance.originator, scope))
236            .collect::<Vec<_>>();
237        self.observe_records(records).await
238    }
239
240    async fn observe_records(
241        &self,
242        records: Vec<ProcessRecord>,
243    ) -> Result<Vec<ObservedProcess>, PluginError> {
244        let mut observed = Vec::with_capacity(records.len());
245        for record in records {
246            let lease = self.registry.get_process_lease(&record.id).await?;
247            observed.push(ObservedProcess::from_record(record, lease));
248        }
249        Ok(observed)
250    }
251
252    pub async fn events_after(
253        &self,
254        process_id: &str,
255        after_sequence: u64,
256    ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
257        Ok(self
258            .registry
259            .events_after(process_id, after_sequence)
260            .await?
261            .into_iter()
262            .map(ObservedProcessEvent::from)
263            .collect())
264    }
265}
266
267impl ObservedProcess {
268    /// Build a read-side view of a process. `lease` is the current lease row (if
269    /// any), read separately so the observer exposes holder identity and expiry
270    /// as raw facts — no derived "stuck" classification (ADR 0019).
271    fn from_record(record: ProcessRecord, lease: Option<ProcessLease>) -> Self {
272        let lifecycle = ProcessLifecycleStatus::from(&record.status);
273        let input = record.input.as_ref().clone();
274        let identity = record.identity;
275        let kind = identity.kind.clone();
276        let label = identity.label.clone().unwrap_or_else(|| kind.clone());
277        let process_id = record.id;
278        let (lease_holder, lease_expires_at_ms) = match lease {
279            Some(lease) => (Some(lease.owner), Some(lease.expires_at_epoch_ms)),
280            None => (None, None),
281        };
282        Self {
283            graph_key: format!("process:{process_id}"),
284            process_id,
285            kind,
286            lifecycle,
287            identity,
288            status_label: lifecycle.label().to_string(),
289            terminal: lifecycle.is_terminal(),
290            disposition: record.disposition,
291            error: terminal_error(&record.status),
292            created_at_ms: record.created_at_ms,
293            updated_at_ms: record.updated_at_ms,
294            first_started: record.first_started.map(|started| *started),
295            lease_holder,
296            lease_expires_at_ms,
297            abandon_request: record.abandon_request.map(|request| *request),
298            originator: record.provenance.originator,
299            env_ref: record.env_ref,
300            wake_target: record.wake_target,
301            caused_by: record.provenance.caused_by,
302            external_ref: record.external_ref,
303            wait: record.wait,
304            child_session_id: child_session_id(&input),
305            input,
306            label,
307        }
308    }
309}
310
311impl From<ProcessEvent> for ObservedProcessEvent {
312    fn from(event: ProcessEvent) -> Self {
313        Self {
314            sequence: event.sequence,
315            event_type: event.event_type,
316            occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
317            payload: event.payload,
318        }
319    }
320}
321
322fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
323    match status.await_output()? {
324        ProcessAwaitOutput::Failure { message, .. }
325        | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
326        // Abandonment is not a reported failure; the status label conveys it and
327        // the evidence rides the terminal event. No derived error string here.
328        ProcessAwaitOutput::Success { .. } | ProcessAwaitOutput::Abandoned { .. } => None,
329    }
330}
331
332fn child_session_id(input: &ProcessInput) -> Option<String> {
333    match input {
334        ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
335        ProcessInput::ToolCall { .. }
336        | ProcessInput::Engine { .. }
337        | ProcessInput::External { .. } => None,
338    }
339}
340
341/// Whether `originator` names the session (or session+frame) identified by
342/// `scope`. Frame is matched only when `scope` names one, so a session-level
343/// provenance filter captures every frame the session originated.
344fn originator_matches(originator: &ProcessOriginator, scope: &SessionScope) -> bool {
345    match originator {
346        ProcessOriginator::Host => false,
347        ProcessOriginator::Session {
348            scope: origin_scope,
349        } => {
350            origin_scope.session_id == scope.session_id
351                && (scope.agent_frame_id.is_none()
352                    || origin_scope.agent_frame_id == scope.agent_frame_id)
353        }
354    }
355}
356
357fn process_visible_to_session(record: &ProcessRecord, session_id: &str) -> bool {
358    record
359        .wake_target
360        .as_ref()
361        .is_some_and(|scope| scope.session_id == session_id)
362}
363
364fn descriptor_from_process_identity(identity: &ProcessIdentity) -> ProcessHandleDescriptor {
365    ProcessHandleDescriptor::new(Some(identity.kind.clone()), identity.label.clone())
366}
367
368#[cfg(test)]
369mod tests {
370    use std::sync::Arc;
371    use std::time::Duration;
372
373    use serde_json::json;
374
375    use super::*;
376    use crate::{
377        InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
378        ProcessExecutionEnvRef, ProcessIdentity, ProcessProvenance, ProcessRegistration,
379        SessionCreateRequest, SessionScope, SessionStartPoint, SubagentSessionContext,
380        ToolFailureClass, ToolOutputContract, TurnInput, WaitKind,
381    };
382
383    fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
384        ProcessWorkObserver::new(registry)
385    }
386
387    fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
388        ProcessRegistration::new(
389            process_id,
390            ProcessInput::External {
391                metadata: json!({ "label": label }),
392            },
393            RecoveryDisposition::ExternallyOwned,
394            ProcessProvenance::host(),
395        )
396    }
397
398    async fn register_visible(
399        registry: &Arc<dyn ProcessRegistry>,
400        scope: &SessionScope,
401        registration: ProcessRegistration,
402        descriptor: ProcessHandleDescriptor,
403    ) {
404        let process_id = registration.id.clone();
405        registry
406            .register_process(registration)
407            .await
408            .expect("register process");
409        registry
410            .grant_handle(scope, &process_id, descriptor)
411            .await
412            .expect("grant process handle");
413    }
414
415    #[tokio::test]
416    async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
417        let registry =
418            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
419        let visible_scope = SessionScope::new("visible");
420        register_visible(
421            &registry,
422            &visible_scope,
423            external_registration("visible-process", "Visible"),
424            ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
425        )
426        .await;
427        register_visible(
428            &registry,
429            &SessionScope::new("other"),
430            external_registration("hidden-process", "Hidden"),
431            ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
432        )
433        .await;
434        registry
435            .append_event(
436                "visible-process",
437                ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
438                    .with_replay_key("visible-process:cancel-requested"),
439            )
440            .await
441            .expect("append event");
442
443        let snapshot = observer(Arc::clone(&registry))
444            .snapshot_for_session("visible")
445            .await
446            .expect("snapshot");
447
448        assert_eq!(snapshot.session_id, "visible");
449        assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
450        assert_eq!(snapshot.items.len(), 1);
451        assert_eq!(snapshot.items[0].events.len(), 1);
452        assert_eq!(
453            snapshot.items[0].events[0].event_type,
454            "process.cancel_requested"
455        );
456        assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
457    }
458
459    #[tokio::test]
460    async fn snapshot_for_session_includes_frame_wake_targets_without_handle_grants() {
461        let registry =
462            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
463        let frame_scope = SessionScope::for_agent_frame("visible", "frame-a");
464        registry
465            .register_process(ProcessRegistration::new(
466                "frame-originated",
467                ProcessInput::External {
468                    metadata: json!({ "label": "Frame originated" }),
469                },
470                RecoveryDisposition::ExternallyOwned,
471                ProcessProvenance::session(frame_scope.clone()),
472            ))
473            .await
474            .expect("register frame-originated process");
475        registry
476            .register_process(
477                external_registration("frame-wake-targeted", "Frame wake targeted")
478                    .with_wake_target(Some(frame_scope)),
479            )
480            .await
481            .expect("register frame wake-targeted process");
482        registry
483            .register_process(
484                external_registration("hidden-frame", "Hidden")
485                    .with_wake_target(Some(SessionScope::for_agent_frame("other", "frame-b"))),
486            )
487            .await
488            .expect("register hidden process");
489
490        let snapshot = observer(Arc::clone(&registry))
491            .snapshot_for_session("visible")
492            .await
493            .expect("snapshot");
494        let visible_process_ids = snapshot
495            .visible_process_ids
496            .iter()
497            .cloned()
498            .collect::<std::collections::BTreeSet<_>>();
499
500        assert_eq!(
501            visible_process_ids,
502            std::collections::BTreeSet::from(["frame-wake-targeted".to_string()])
503        );
504        assert_eq!(snapshot.items.len(), 1);
505    }
506
507    #[tokio::test]
508    async fn snapshot_for_session_labels_engine_wake_targets_from_identity_without_handle_grants() {
509        let registry =
510            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
511        let scope = SessionScope::new("visible");
512        registry
513            .register_process(
514                ProcessRegistration::new(
515                    "engine-wake-targeted",
516                    ProcessInput::Engine {
517                        kind: "test-engine".to_string(),
518                        payload: json!({}),
519                    },
520                    RecoveryDisposition::Rerunnable,
521                    ProcessProvenance::host(),
522                )
523                .with_identity(
524                    ProcessIdentity::new("test-engine").with_label(Some("remember".to_string())),
525                )
526                .with_execution_env_ref(Some(ProcessExecutionEnvRef::new("process-env:test")))
527                .with_wake_target(Some(scope)),
528            )
529            .await
530            .expect("register engine wake-targeted process");
531
532        let snapshot = observer(Arc::clone(&registry))
533            .snapshot_for_session("visible")
534            .await
535            .expect("snapshot");
536
537        assert_eq!(snapshot.items.len(), 1);
538        assert_eq!(snapshot.items[0].kind, "test-engine");
539        assert_eq!(snapshot.items[0].label, "remember");
540        assert_eq!(
541            snapshot.items[0].descriptor.kind.as_deref(),
542            Some("test-engine")
543        );
544        assert_eq!(
545            snapshot.items[0].descriptor.label.as_deref(),
546            Some("remember")
547        );
548        assert_eq!(snapshot.items[0].process.kind, "test-engine");
549        assert_eq!(snapshot.items[0].process.label, "remember");
550    }
551
552    #[tokio::test]
553    async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
554        let registry =
555            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
556        let scope = SessionScope::new("sort");
557        register_visible(
558            &registry,
559            &scope,
560            external_registration("older", "Older"),
561            ProcessHandleDescriptor::new(None::<String>, None::<String>),
562        )
563        .await;
564        tokio::time::sleep(Duration::from_millis(2)).await;
565        register_visible(
566            &registry,
567            &scope,
568            external_registration("newer", "Newer"),
569            ProcessHandleDescriptor::new(None::<String>, None::<String>),
570        )
571        .await;
572        tokio::time::sleep(Duration::from_millis(2)).await;
573        registry
574            .append_event(
575                "older",
576                ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
577                    .with_replay_key("older:cancel-requested"),
578            )
579            .await
580            .expect("update older process");
581
582        let snapshot = observer(Arc::clone(&registry))
583            .snapshot_for_session("sort")
584            .await
585            .expect("snapshot");
586
587        assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
588    }
589
590    #[tokio::test]
591    async fn observed_process_reports_terminal_status_and_error_messages() {
592        let registry =
593            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
594        for process_id in ["failed", "cancelled"] {
595            registry
596                .register_process(external_registration(process_id, process_id))
597                .await
598                .expect("register");
599        }
600        registry
601            .complete_process(
602                "failed",
603                ProcessAwaitOutput::Failure {
604                    class: ToolFailureClass::External,
605                    code: "boom".to_string(),
606                    message: "failed loudly".to_string(),
607                    raw: None,
608                    control: None,
609                },
610            )
611            .await
612            .expect("fail process");
613        registry
614            .complete_process(
615                "cancelled",
616                ProcessAwaitOutput::Cancelled {
617                    message: "cancelled intentionally".to_string(),
618                    raw: None,
619                    control: None,
620                },
621            )
622            .await
623            .expect("cancel process");
624
625        let observer = observer(Arc::clone(&registry));
626        let failed = observer.process("failed").await.expect("failed process");
627        let cancelled = observer
628            .process("cancelled")
629            .await
630            .expect("cancelled process");
631
632        assert_eq!(failed.status_label, "failed");
633        assert!(failed.terminal);
634        assert_eq!(failed.error.as_deref(), Some("failed loudly"));
635        assert_eq!(cancelled.status_label, "cancelled");
636        assert!(cancelled.terminal);
637        assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
638    }
639
640    #[tokio::test]
641    async fn observed_process_exposes_current_wait_state() {
642        let registry =
643            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
644        let scope = SessionScope::new("wait");
645        register_visible(
646            &registry,
647            &scope,
648            external_registration("waiting-process", "Waiting"),
649            ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
650        )
651        .await;
652        let wait = WaitState {
653            since_ms: 1234,
654            kind: WaitKind::Signal {
655                name: "ready".to_string(),
656                event_type: "signal.ready".to_string(),
657                key: "process:waiting-process:signal.ready:1".to_string(),
658                ordinal: 1,
659            },
660        };
661        registry
662            .set_process_wait("waiting-process", wait.clone())
663            .await
664            .expect("set wait");
665
666        let observer = observer(Arc::clone(&registry));
667        let observed = observer
668            .process("waiting-process")
669            .await
670            .expect("waiting process");
671        let snapshot = observer
672            .snapshot_for_session("wait")
673            .await
674            .expect("snapshot");
675
676        assert_eq!(observed.wait, Some(wait.clone()));
677        assert_eq!(snapshot.items.len(), 1);
678        assert_eq!(snapshot.items[0].process.wait, Some(wait));
679    }
680
681    #[tokio::test]
682    async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
683        let registry =
684            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
685        let scope = SessionScope::new("labels");
686        let mut child_request = SessionCreateRequest::child_session(
687            "labels",
688            SessionStartPoint::Empty,
689            PluginOptions::default(),
690        )
691        .with_session_id("child-session");
692        child_request.subagent = Some(SubagentSessionContext {
693            parent_session_id: "labels".to_string(),
694            capability: "researcher".to_string(),
695            depth: 1,
696            max_depth: 4,
697        });
698        let cases = [
699            (
700                "tool",
701                ProcessInput::ToolCall {
702                    call: PreparedToolCall::from_parts(
703                        "call-1",
704                        "tool:shell.run",
705                        "shell.run",
706                        json!({}),
707                        None,
708                        serde_json::Value::Null,
709                    ),
710                },
711                "tool",
712                "shell.run",
713                None,
714            ),
715            (
716                "engine",
717                ProcessInput::Engine {
718                    kind: "test-engine".to_string(),
719                    payload: json!({}),
720                },
721                "test-engine",
722                "remember",
723                None,
724            ),
725            (
726                "session",
727                ProcessInput::SessionTurn {
728                    create_request: Box::new(child_request),
729                    turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
730                    output_contract: ToolOutputContract::Static,
731                },
732                "session_turn",
733                "researcher",
734                Some("child-session"),
735            ),
736            (
737                "external",
738                ProcessInput::External {
739                    metadata: json!({ "label": "external job" }),
740                },
741                "external",
742                "external job",
743                None,
744            ),
745        ];
746        for (process_id, input, kind, label, _child_session_id) in cases {
747            let needs_env = matches!(
748                input,
749                ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. }
750            );
751            let disposition = match input {
752                ProcessInput::External { .. } => RecoveryDisposition::ExternallyOwned,
753                _ => RecoveryDisposition::Rerunnable,
754            };
755            let mut registration =
756                ProcessRegistration::new(process_id, input, disposition, ProcessProvenance::host())
757                    .with_identity(ProcessIdentity::new(kind).with_label(Some(label.to_string())));
758            if needs_env {
759                registration = registration.with_execution_env_ref(Some(
760                    ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
761                ));
762            }
763            register_visible(
764                &registry,
765                &scope,
766                registration,
767                ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
768            )
769            .await;
770        }
771
772        let snapshot = observer(Arc::clone(&registry))
773            .snapshot_for_session("labels")
774            .await
775            .expect("snapshot");
776        let by_id = snapshot
777            .items
778            .iter()
779            .map(|item| (item.process.process_id.as_str(), item))
780            .collect::<std::collections::BTreeMap<_, _>>();
781
782        assert_eq!(by_id["tool"].label, "shell.run");
783        assert_eq!(by_id["engine"].label, "remember");
784        assert_eq!(by_id["engine"].process.kind, "test-engine");
785        assert_eq!(by_id["session"].label, "researcher");
786        assert_eq!(
787            by_id["session"].process.child_session_id.as_deref(),
788            Some("child-session")
789        );
790        assert_eq!(by_id["external"].label, "external job");
791    }
792
793    #[tokio::test]
794    async fn observed_process_missing_lookup_returns_none() {
795        let registry =
796            Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
797
798        assert!(observer(registry).process("missing").await.is_none());
799    }
800}