1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8use super::events::{ProcessAwaitOutput, ProcessEvent};
9use super::model::{
10 AbandonRequest, ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId,
11 ProcessIdentity, ProcessInput, ProcessLease, ProcessLifecycleStatus, ProcessListFilter,
12 ProcessOriginator, ProcessRecord, ProcessStarted, ProcessStatusFilter, RecoveryDisposition,
13 SessionScope, WaitState,
14};
15use super::registry::ProcessRegistry;
16use super::time::epoch_ms_from_system_time;
17
18#[derive(Clone)]
19pub struct ProcessWorkObserver {
20 registry: Arc<dyn ProcessRegistry>,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct ProcessWorkSnapshot {
25 pub session_id: String,
26 pub visible_process_ids: Vec<ProcessId>,
27 pub items: Vec<ObservedWorkItem>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct ObservedWorkItem {
32 pub process: ObservedProcess,
33 pub descriptor: ProcessHandleDescriptor,
34 pub events: Vec<ObservedProcessEvent>,
35 pub kind: String,
36 pub label: String,
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ObservedProcess {
41 pub process_id: ProcessId,
42 pub graph_key: String,
43 pub kind: String,
44 pub lifecycle: ProcessLifecycleStatus,
45 pub identity: ProcessIdentity,
46 pub status_label: String,
47 pub terminal: bool,
48 pub disposition: RecoveryDisposition,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub error: Option<String>,
52 pub created_at_ms: u64,
53 pub updated_at_ms: u64,
54 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub first_started: Option<ProcessStarted>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub lease_holder: Option<crate::LeaseOwnerIdentity>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub lease_expires_at_ms: Option<u64>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub abandon_request: Option<AbandonRequest>,
67 pub input: ProcessInput,
68 pub originator: ProcessOriginator,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub env_ref: Option<ProcessExecutionEnvRef>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub wake_target: Option<SessionScope>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub caused_by: Option<crate::CausalRef>,
75 #[serde(default, skip_serializing_if = "Option::is_none")]
76 pub external_ref: Option<ProcessExternalRef>,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub wait: Option<WaitState>,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
80 pub child_session_id: Option<String>,
81 pub label: String,
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize)]
85pub struct ObservedProcessEvent {
86 pub sequence: u64,
87 pub event_type: String,
88 pub occurred_at_ms: u64,
89 pub payload: serde_json::Value,
90}
91
92pub const SNAPSHOT_EVENT_TAIL: usize = 32;
97
98impl ProcessWorkObserver {
99 pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
100 Self { registry }
101 }
102
103 pub async fn snapshot_for_session(
104 &self,
105 session_id: impl Into<String>,
106 ) -> Result<ProcessWorkSnapshot, PluginError> {
107 let session_id = session_id.into();
108 let session_scope = SessionScope::new(session_id.clone());
109 let entries = self.registry.list_handle_grants(&session_scope).await?;
110 let mut items = Vec::new();
111 let mut seen_process_ids = BTreeSet::new();
112 for (grant, record) in entries {
113 seen_process_ids.insert(record.id.clone());
114 items.push(self.work_item_from_record(record, grant.descriptor).await?);
115 }
116 let visible_records = self
117 .registry
118 .list_processes(&ProcessListFilter {
119 status: ProcessStatusFilter::Any,
120 ..ProcessListFilter::default()
121 })
122 .await?;
123 for record in visible_records {
124 if seen_process_ids.contains(&record.id)
125 || !process_visible_to_session(&record, &session_id)
126 {
127 continue;
128 }
129 seen_process_ids.insert(record.id.clone());
130 let descriptor = descriptor_from_process_identity(&record.identity);
131 items.push(self.work_item_from_record(record, descriptor).await?);
132 }
133 items.sort_by(|left, right| {
134 right
135 .process
136 .updated_at_ms
137 .cmp(&left.process.updated_at_ms)
138 .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
139 .then_with(|| left.process.process_id.cmp(&right.process.process_id))
140 });
141 let visible_process_ids = items
142 .iter()
143 .map(|item| item.process.process_id.clone())
144 .collect();
145 Ok(ProcessWorkSnapshot {
146 session_id,
147 visible_process_ids,
148 items,
149 })
150 }
151
152 async fn work_item_from_record(
153 &self,
154 record: ProcessRecord,
155 descriptor: ProcessHandleDescriptor,
156 ) -> Result<ObservedWorkItem, PluginError> {
157 let events = self
158 .registry
159 .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
160 .await?
161 .into_iter()
162 .map(ObservedProcessEvent::from)
163 .collect();
164 let lease = self.registry.get_process_lease(&record.id).await?;
165 let process = ObservedProcess::from_record(record, lease);
166 let kind = process.identity.kind.clone();
167 let label = process
168 .identity
169 .label
170 .clone()
171 .or_else(|| descriptor.label.clone())
172 .unwrap_or_else(|| kind.clone());
173 Ok(ObservedWorkItem {
174 process,
175 descriptor,
176 events,
177 kind,
178 label,
179 })
180 }
181
182 pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
183 let record = self.registry.get_process(process_id).await?;
184 let lease = self
185 .registry
186 .get_process_lease(process_id)
187 .await
188 .ok()
189 .flatten();
190 Some(ObservedProcess::from_record(record, lease))
191 }
192
193 pub async fn list(
194 &self,
195 filter: &ProcessListFilter,
196 ) -> Result<Vec<ObservedProcess>, PluginError> {
197 let records = self.registry.list_processes(filter).await?;
198 self.observe_records(records).await
199 }
200
201 pub async fn list_granted_to(
207 &self,
208 scope: &SessionScope,
209 filter: &ProcessListFilter,
210 ) -> Result<Vec<ObservedProcess>, PluginError> {
211 let entries = self.registry.list_handle_grants(scope).await?;
212 let records = entries
213 .into_iter()
214 .map(|(_, record)| record)
215 .filter(|record| filter.matches_record(record))
216 .collect::<Vec<_>>();
217 self.observe_records(records).await
218 }
219
220 pub async fn list_originated_by(
226 &self,
227 scope: &SessionScope,
228 filter: &ProcessListFilter,
229 ) -> Result<Vec<ObservedProcess>, PluginError> {
230 let records = self
231 .registry
232 .list_processes(filter)
233 .await?
234 .into_iter()
235 .filter(|record| originator_matches(&record.provenance.originator, scope))
236 .collect::<Vec<_>>();
237 self.observe_records(records).await
238 }
239
240 async fn observe_records(
241 &self,
242 records: Vec<ProcessRecord>,
243 ) -> Result<Vec<ObservedProcess>, PluginError> {
244 let mut observed = Vec::with_capacity(records.len());
245 for record in records {
246 let lease = self.registry.get_process_lease(&record.id).await?;
247 observed.push(ObservedProcess::from_record(record, lease));
248 }
249 Ok(observed)
250 }
251
252 pub async fn events_after(
253 &self,
254 process_id: &str,
255 after_sequence: u64,
256 ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
257 Ok(self
258 .registry
259 .events_after(process_id, after_sequence)
260 .await?
261 .into_iter()
262 .map(ObservedProcessEvent::from)
263 .collect())
264 }
265}
266
267impl ObservedProcess {
268 fn from_record(record: ProcessRecord, lease: Option<ProcessLease>) -> Self {
272 let lifecycle = ProcessLifecycleStatus::from(&record.status);
273 let input = record.input.as_ref().clone();
274 let identity = record.identity;
275 let kind = identity.kind.clone();
276 let label = identity.label.clone().unwrap_or_else(|| kind.clone());
277 let process_id = record.id;
278 let (lease_holder, lease_expires_at_ms) = match lease {
279 Some(lease) => (Some(lease.owner), Some(lease.expires_at_epoch_ms)),
280 None => (None, None),
281 };
282 Self {
283 graph_key: format!("process:{process_id}"),
284 process_id,
285 kind,
286 lifecycle,
287 identity,
288 status_label: lifecycle.label().to_string(),
289 terminal: lifecycle.is_terminal(),
290 disposition: record.disposition,
291 error: terminal_error(&record.status),
292 created_at_ms: record.created_at_ms,
293 updated_at_ms: record.updated_at_ms,
294 first_started: record.first_started.map(|started| *started),
295 lease_holder,
296 lease_expires_at_ms,
297 abandon_request: record.abandon_request.map(|request| *request),
298 originator: record.provenance.originator,
299 env_ref: record.env_ref,
300 wake_target: record.wake_target,
301 caused_by: record.provenance.caused_by,
302 external_ref: record.external_ref,
303 wait: record.wait,
304 child_session_id: child_session_id(&input),
305 input,
306 label,
307 }
308 }
309}
310
311impl From<ProcessEvent> for ObservedProcessEvent {
312 fn from(event: ProcessEvent) -> Self {
313 Self {
314 sequence: event.sequence,
315 event_type: event.event_type,
316 occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
317 payload: event.payload,
318 }
319 }
320}
321
322fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
323 match status.await_output()? {
324 ProcessAwaitOutput::Failure { message, .. }
325 | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
326 ProcessAwaitOutput::Success { .. } | ProcessAwaitOutput::Abandoned { .. } => None,
329 }
330}
331
332fn child_session_id(input: &ProcessInput) -> Option<String> {
333 match input {
334 ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
335 ProcessInput::ToolCall { .. }
336 | ProcessInput::Engine { .. }
337 | ProcessInput::External { .. } => None,
338 }
339}
340
341fn originator_matches(originator: &ProcessOriginator, scope: &SessionScope) -> bool {
345 match originator {
346 ProcessOriginator::Host => false,
347 ProcessOriginator::Session {
348 scope: origin_scope,
349 } => {
350 origin_scope.session_id == scope.session_id
351 && (scope.agent_frame_id.is_none()
352 || origin_scope.agent_frame_id == scope.agent_frame_id)
353 }
354 }
355}
356
357fn process_visible_to_session(record: &ProcessRecord, session_id: &str) -> bool {
358 record
359 .wake_target
360 .as_ref()
361 .is_some_and(|scope| scope.session_id == session_id)
362}
363
364fn descriptor_from_process_identity(identity: &ProcessIdentity) -> ProcessHandleDescriptor {
365 ProcessHandleDescriptor::new(Some(identity.kind.clone()), identity.label.clone())
366}
367
368#[cfg(test)]
369mod tests {
370 use std::sync::Arc;
371 use std::time::Duration;
372
373 use serde_json::json;
374
375 use super::*;
376 use crate::{
377 InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
378 ProcessExecutionEnvRef, ProcessIdentity, ProcessProvenance, ProcessRegistration,
379 SessionCreateRequest, SessionScope, SessionStartPoint, SubagentSessionContext,
380 ToolFailureClass, ToolOutputContract, TurnInput, WaitKind,
381 };
382
383 fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
384 ProcessWorkObserver::new(registry)
385 }
386
387 fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
388 ProcessRegistration::new(
389 process_id,
390 ProcessInput::External {
391 metadata: json!({ "label": label }),
392 },
393 RecoveryDisposition::ExternallyOwned,
394 ProcessProvenance::host(),
395 )
396 }
397
398 async fn register_visible(
399 registry: &Arc<dyn ProcessRegistry>,
400 scope: &SessionScope,
401 registration: ProcessRegistration,
402 descriptor: ProcessHandleDescriptor,
403 ) {
404 let process_id = registration.id.clone();
405 registry
406 .register_process(registration)
407 .await
408 .expect("register process");
409 registry
410 .grant_handle(scope, &process_id, descriptor)
411 .await
412 .expect("grant process handle");
413 }
414
415 #[tokio::test]
416 async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
417 let registry =
418 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
419 let visible_scope = SessionScope::new("visible");
420 register_visible(
421 ®istry,
422 &visible_scope,
423 external_registration("visible-process", "Visible"),
424 ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
425 )
426 .await;
427 register_visible(
428 ®istry,
429 &SessionScope::new("other"),
430 external_registration("hidden-process", "Hidden"),
431 ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
432 )
433 .await;
434 registry
435 .append_event(
436 "visible-process",
437 ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
438 .with_replay_key("visible-process:cancel-requested"),
439 )
440 .await
441 .expect("append event");
442
443 let snapshot = observer(Arc::clone(®istry))
444 .snapshot_for_session("visible")
445 .await
446 .expect("snapshot");
447
448 assert_eq!(snapshot.session_id, "visible");
449 assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
450 assert_eq!(snapshot.items.len(), 1);
451 assert_eq!(snapshot.items[0].events.len(), 1);
452 assert_eq!(
453 snapshot.items[0].events[0].event_type,
454 "process.cancel_requested"
455 );
456 assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
457 }
458
459 #[tokio::test]
460 async fn snapshot_for_session_includes_frame_wake_targets_without_handle_grants() {
461 let registry =
462 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
463 let frame_scope = SessionScope::for_agent_frame("visible", "frame-a");
464 registry
465 .register_process(ProcessRegistration::new(
466 "frame-originated",
467 ProcessInput::External {
468 metadata: json!({ "label": "Frame originated" }),
469 },
470 RecoveryDisposition::ExternallyOwned,
471 ProcessProvenance::session(frame_scope.clone()),
472 ))
473 .await
474 .expect("register frame-originated process");
475 registry
476 .register_process(
477 external_registration("frame-wake-targeted", "Frame wake targeted")
478 .with_wake_target(Some(frame_scope)),
479 )
480 .await
481 .expect("register frame wake-targeted process");
482 registry
483 .register_process(
484 external_registration("hidden-frame", "Hidden")
485 .with_wake_target(Some(SessionScope::for_agent_frame("other", "frame-b"))),
486 )
487 .await
488 .expect("register hidden process");
489
490 let snapshot = observer(Arc::clone(®istry))
491 .snapshot_for_session("visible")
492 .await
493 .expect("snapshot");
494 let visible_process_ids = snapshot
495 .visible_process_ids
496 .iter()
497 .cloned()
498 .collect::<std::collections::BTreeSet<_>>();
499
500 assert_eq!(
501 visible_process_ids,
502 std::collections::BTreeSet::from(["frame-wake-targeted".to_string()])
503 );
504 assert_eq!(snapshot.items.len(), 1);
505 }
506
507 #[tokio::test]
508 async fn snapshot_for_session_labels_engine_wake_targets_from_identity_without_handle_grants() {
509 let registry =
510 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
511 let scope = SessionScope::new("visible");
512 registry
513 .register_process(
514 ProcessRegistration::new(
515 "engine-wake-targeted",
516 ProcessInput::Engine {
517 kind: "test-engine".to_string(),
518 payload: json!({}),
519 },
520 RecoveryDisposition::Rerunnable,
521 ProcessProvenance::host(),
522 )
523 .with_identity(
524 ProcessIdentity::new("test-engine").with_label(Some("remember".to_string())),
525 )
526 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new("process-env:test")))
527 .with_wake_target(Some(scope)),
528 )
529 .await
530 .expect("register engine wake-targeted process");
531
532 let snapshot = observer(Arc::clone(®istry))
533 .snapshot_for_session("visible")
534 .await
535 .expect("snapshot");
536
537 assert_eq!(snapshot.items.len(), 1);
538 assert_eq!(snapshot.items[0].kind, "test-engine");
539 assert_eq!(snapshot.items[0].label, "remember");
540 assert_eq!(
541 snapshot.items[0].descriptor.kind.as_deref(),
542 Some("test-engine")
543 );
544 assert_eq!(
545 snapshot.items[0].descriptor.label.as_deref(),
546 Some("remember")
547 );
548 assert_eq!(snapshot.items[0].process.kind, "test-engine");
549 assert_eq!(snapshot.items[0].process.label, "remember");
550 }
551
552 #[tokio::test]
553 async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
554 let registry =
555 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
556 let scope = SessionScope::new("sort");
557 register_visible(
558 ®istry,
559 &scope,
560 external_registration("older", "Older"),
561 ProcessHandleDescriptor::new(None::<String>, None::<String>),
562 )
563 .await;
564 tokio::time::sleep(Duration::from_millis(2)).await;
565 register_visible(
566 ®istry,
567 &scope,
568 external_registration("newer", "Newer"),
569 ProcessHandleDescriptor::new(None::<String>, None::<String>),
570 )
571 .await;
572 tokio::time::sleep(Duration::from_millis(2)).await;
573 registry
574 .append_event(
575 "older",
576 ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
577 .with_replay_key("older:cancel-requested"),
578 )
579 .await
580 .expect("update older process");
581
582 let snapshot = observer(Arc::clone(®istry))
583 .snapshot_for_session("sort")
584 .await
585 .expect("snapshot");
586
587 assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
588 }
589
590 #[tokio::test]
591 async fn observed_process_reports_terminal_status_and_error_messages() {
592 let registry =
593 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
594 for process_id in ["failed", "cancelled"] {
595 registry
596 .register_process(external_registration(process_id, process_id))
597 .await
598 .expect("register");
599 }
600 registry
601 .complete_process(
602 "failed",
603 ProcessAwaitOutput::Failure {
604 class: ToolFailureClass::External,
605 code: "boom".to_string(),
606 message: "failed loudly".to_string(),
607 raw: None,
608 control: None,
609 },
610 )
611 .await
612 .expect("fail process");
613 registry
614 .complete_process(
615 "cancelled",
616 ProcessAwaitOutput::Cancelled {
617 message: "cancelled intentionally".to_string(),
618 raw: None,
619 control: None,
620 },
621 )
622 .await
623 .expect("cancel process");
624
625 let observer = observer(Arc::clone(®istry));
626 let failed = observer.process("failed").await.expect("failed process");
627 let cancelled = observer
628 .process("cancelled")
629 .await
630 .expect("cancelled process");
631
632 assert_eq!(failed.status_label, "failed");
633 assert!(failed.terminal);
634 assert_eq!(failed.error.as_deref(), Some("failed loudly"));
635 assert_eq!(cancelled.status_label, "cancelled");
636 assert!(cancelled.terminal);
637 assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
638 }
639
640 #[tokio::test]
641 async fn observed_process_exposes_current_wait_state() {
642 let registry =
643 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
644 let scope = SessionScope::new("wait");
645 register_visible(
646 ®istry,
647 &scope,
648 external_registration("waiting-process", "Waiting"),
649 ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
650 )
651 .await;
652 let wait = WaitState {
653 since_ms: 1234,
654 kind: WaitKind::Signal {
655 name: "ready".to_string(),
656 event_type: "signal.ready".to_string(),
657 key: "process:waiting-process:signal.ready:1".to_string(),
658 ordinal: 1,
659 },
660 };
661 registry
662 .set_process_wait("waiting-process", wait.clone())
663 .await
664 .expect("set wait");
665
666 let observer = observer(Arc::clone(®istry));
667 let observed = observer
668 .process("waiting-process")
669 .await
670 .expect("waiting process");
671 let snapshot = observer
672 .snapshot_for_session("wait")
673 .await
674 .expect("snapshot");
675
676 assert_eq!(observed.wait, Some(wait.clone()));
677 assert_eq!(snapshot.items.len(), 1);
678 assert_eq!(snapshot.items[0].process.wait, Some(wait));
679 }
680
681 #[tokio::test]
682 async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
683 let registry =
684 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
685 let scope = SessionScope::new("labels");
686 let mut child_request = SessionCreateRequest::child_session(
687 "labels",
688 SessionStartPoint::Empty,
689 PluginOptions::default(),
690 )
691 .with_session_id("child-session");
692 child_request.subagent = Some(SubagentSessionContext {
693 parent_session_id: "labels".to_string(),
694 capability: "researcher".to_string(),
695 depth: 1,
696 max_depth: 4,
697 });
698 let cases = [
699 (
700 "tool",
701 ProcessInput::ToolCall {
702 call: PreparedToolCall::from_parts(
703 "call-1",
704 "tool:shell.run",
705 "shell.run",
706 json!({}),
707 None,
708 serde_json::Value::Null,
709 ),
710 },
711 "tool",
712 "shell.run",
713 None,
714 ),
715 (
716 "engine",
717 ProcessInput::Engine {
718 kind: "test-engine".to_string(),
719 payload: json!({}),
720 },
721 "test-engine",
722 "remember",
723 None,
724 ),
725 (
726 "session",
727 ProcessInput::SessionTurn {
728 create_request: Box::new(child_request),
729 turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
730 output_contract: ToolOutputContract::Static,
731 },
732 "session_turn",
733 "researcher",
734 Some("child-session"),
735 ),
736 (
737 "external",
738 ProcessInput::External {
739 metadata: json!({ "label": "external job" }),
740 },
741 "external",
742 "external job",
743 None,
744 ),
745 ];
746 for (process_id, input, kind, label, _child_session_id) in cases {
747 let needs_env = matches!(
748 input,
749 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. }
750 );
751 let disposition = match input {
752 ProcessInput::External { .. } => RecoveryDisposition::ExternallyOwned,
753 _ => RecoveryDisposition::Rerunnable,
754 };
755 let mut registration =
756 ProcessRegistration::new(process_id, input, disposition, ProcessProvenance::host())
757 .with_identity(ProcessIdentity::new(kind).with_label(Some(label.to_string())));
758 if needs_env {
759 registration = registration.with_execution_env_ref(Some(
760 ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
761 ));
762 }
763 register_visible(
764 ®istry,
765 &scope,
766 registration,
767 ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
768 )
769 .await;
770 }
771
772 let snapshot = observer(Arc::clone(®istry))
773 .snapshot_for_session("labels")
774 .await
775 .expect("snapshot");
776 let by_id = snapshot
777 .items
778 .iter()
779 .map(|item| (item.process.process_id.as_str(), item))
780 .collect::<std::collections::BTreeMap<_, _>>();
781
782 assert_eq!(by_id["tool"].label, "shell.run");
783 assert_eq!(by_id["engine"].label, "remember");
784 assert_eq!(by_id["engine"].process.kind, "test-engine");
785 assert_eq!(by_id["session"].label, "researcher");
786 assert_eq!(
787 by_id["session"].process.child_session_id.as_deref(),
788 Some("child-session")
789 );
790 assert_eq!(by_id["external"].label, "external job");
791 }
792
793 #[tokio::test]
794 async fn observed_process_missing_lookup_returns_none() {
795 let registry =
796 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
797
798 assert!(observer(registry).process("missing").await.is_none());
799 }
800}