1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8use super::events::{ProcessAwaitOutput, ProcessEvent};
9use super::model::{
10 ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId,
11 ProcessIdentity, ProcessInput, ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator,
12 ProcessRecord, ProcessStatusFilter, SessionScope, WaitState,
13};
14use super::registry::ProcessRegistry;
15use super::time::epoch_ms_from_system_time;
16
17#[derive(Clone)]
18pub struct ProcessWorkObserver {
19 registry: Arc<dyn ProcessRegistry>,
20}
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct ProcessWorkSnapshot {
24 pub session_id: String,
25 pub visible_process_ids: Vec<ProcessId>,
26 pub items: Vec<ObservedWorkItem>,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct ObservedWorkItem {
31 pub process: ObservedProcess,
32 pub descriptor: ProcessHandleDescriptor,
33 pub events: Vec<ObservedProcessEvent>,
34 pub kind: String,
35 pub label: String,
36}
37
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct ObservedProcess {
40 pub process_id: ProcessId,
41 pub graph_key: String,
42 pub kind: String,
43 pub lifecycle: ProcessLifecycleStatus,
44 pub identity: ProcessIdentity,
45 pub status_label: String,
46 pub terminal: bool,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub error: Option<String>,
49 pub created_at_ms: u64,
50 pub updated_at_ms: u64,
51 pub input: ProcessInput,
52 pub originator: ProcessOriginator,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub env_ref: Option<ProcessExecutionEnvRef>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub wake_target: Option<SessionScope>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub caused_by: Option<crate::CausalRef>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub external_ref: Option<ProcessExternalRef>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub wait: Option<WaitState>,
63 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub child_session_id: Option<String>,
65 pub label: String,
66}
67
68#[derive(Clone, Debug, Serialize, Deserialize)]
69pub struct ObservedProcessEvent {
70 pub sequence: u64,
71 pub event_type: String,
72 pub occurred_at_ms: u64,
73 pub payload: serde_json::Value,
74}
75
76pub const SNAPSHOT_EVENT_TAIL: usize = 32;
81
82impl ProcessWorkObserver {
83 pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
84 Self { registry }
85 }
86
87 pub async fn snapshot_for_session(
88 &self,
89 session_id: impl Into<String>,
90 ) -> Result<ProcessWorkSnapshot, PluginError> {
91 let session_id = session_id.into();
92 let session_scope = SessionScope::new(session_id.clone());
93 let entries = self.registry.list_handle_grants(&session_scope).await?;
94 let mut items = Vec::new();
95 let mut seen_process_ids = BTreeSet::new();
96 for (grant, record) in entries {
97 seen_process_ids.insert(record.id.clone());
98 items.push(self.work_item_from_record(record, grant.descriptor).await?);
99 }
100 let visible_records = self
101 .registry
102 .list_processes(&ProcessListFilter {
103 status: ProcessStatusFilter::Any,
104 ..ProcessListFilter::default()
105 })
106 .await?;
107 for record in visible_records {
108 if seen_process_ids.contains(&record.id)
109 || !process_visible_to_session(&record, &session_id)
110 {
111 continue;
112 }
113 seen_process_ids.insert(record.id.clone());
114 let descriptor = descriptor_from_process_identity(&record.identity);
115 items.push(self.work_item_from_record(record, descriptor).await?);
116 }
117 items.sort_by(|left, right| {
118 right
119 .process
120 .updated_at_ms
121 .cmp(&left.process.updated_at_ms)
122 .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
123 .then_with(|| left.process.process_id.cmp(&right.process.process_id))
124 });
125 let visible_process_ids = items
126 .iter()
127 .map(|item| item.process.process_id.clone())
128 .collect();
129 Ok(ProcessWorkSnapshot {
130 session_id,
131 visible_process_ids,
132 items,
133 })
134 }
135
136 async fn work_item_from_record(
137 &self,
138 record: ProcessRecord,
139 descriptor: ProcessHandleDescriptor,
140 ) -> Result<ObservedWorkItem, PluginError> {
141 let events = self
142 .registry
143 .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
144 .await?
145 .into_iter()
146 .map(ObservedProcessEvent::from)
147 .collect();
148 let process = ObservedProcess::from_record(record);
149 let kind = process.identity.kind.clone();
150 let label = process
151 .identity
152 .label
153 .clone()
154 .or_else(|| descriptor.label.clone())
155 .unwrap_or_else(|| kind.clone());
156 Ok(ObservedWorkItem {
157 process,
158 descriptor,
159 events,
160 kind,
161 label,
162 })
163 }
164
165 pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
166 self.registry
167 .get_process(process_id)
168 .await
169 .map(ObservedProcess::from_record)
170 }
171
172 pub async fn list(
173 &self,
174 filter: &ProcessListFilter,
175 ) -> Result<Vec<ObservedProcess>, PluginError> {
176 Ok(self
177 .registry
178 .list_processes(filter)
179 .await?
180 .into_iter()
181 .map(ObservedProcess::from_record)
182 .collect())
183 }
184
185 pub async fn events_after(
186 &self,
187 process_id: &str,
188 after_sequence: u64,
189 ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
190 Ok(self
191 .registry
192 .events_after(process_id, after_sequence)
193 .await?
194 .into_iter()
195 .map(ObservedProcessEvent::from)
196 .collect())
197 }
198}
199
200impl ObservedProcess {
201 fn from_record(record: ProcessRecord) -> Self {
202 let lifecycle = ProcessLifecycleStatus::from(&record.status);
203 let input = record.input.as_ref().clone();
204 let identity = record.identity;
205 let kind = identity.kind.clone();
206 let label = identity.label.clone().unwrap_or_else(|| kind.clone());
207 let process_id = record.id;
208 Self {
209 graph_key: format!("process:{process_id}"),
210 process_id,
211 kind,
212 lifecycle,
213 identity,
214 status_label: lifecycle.label().to_string(),
215 terminal: lifecycle.is_terminal(),
216 error: terminal_error(&record.status),
217 created_at_ms: record.created_at_ms,
218 updated_at_ms: record.updated_at_ms,
219 originator: record.provenance.originator,
220 env_ref: record.env_ref,
221 wake_target: record.wake_target,
222 caused_by: record.provenance.caused_by,
223 external_ref: record.external_ref,
224 wait: record.wait,
225 child_session_id: child_session_id(&input),
226 input,
227 label,
228 }
229 }
230}
231
232impl From<ProcessEvent> for ObservedProcessEvent {
233 fn from(event: ProcessEvent) -> Self {
234 Self {
235 sequence: event.sequence,
236 event_type: event.event_type,
237 occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
238 payload: event.payload,
239 }
240 }
241}
242
243fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
244 match status.await_output()? {
245 ProcessAwaitOutput::Failure { message, .. }
246 | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
247 ProcessAwaitOutput::Success { .. } => None,
248 }
249}
250
251fn child_session_id(input: &ProcessInput) -> Option<String> {
252 match input {
253 ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
254 ProcessInput::ToolCall { .. }
255 | ProcessInput::Engine { .. }
256 | ProcessInput::External { .. } => None,
257 }
258}
259
260fn process_visible_to_session(record: &ProcessRecord, session_id: &str) -> bool {
261 record
262 .wake_target
263 .as_ref()
264 .is_some_and(|scope| scope.session_id == session_id)
265}
266
267fn descriptor_from_process_identity(identity: &ProcessIdentity) -> ProcessHandleDescriptor {
268 ProcessHandleDescriptor::new(Some(identity.kind.clone()), identity.label.clone())
269}
270
271#[cfg(test)]
272mod tests {
273 use std::sync::Arc;
274 use std::time::Duration;
275
276 use serde_json::json;
277
278 use super::*;
279 use crate::{
280 InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
281 ProcessExecutionEnvRef, ProcessIdentity, ProcessProvenance, ProcessRegistration,
282 SessionCreateRequest, SessionScope, SessionStartPoint, SubagentSessionContext,
283 ToolFailureClass, ToolOutputContract, TurnInput, WaitKind,
284 };
285
286 fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
287 ProcessWorkObserver::new(registry)
288 }
289
290 fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
291 ProcessRegistration::new(
292 process_id,
293 ProcessInput::External {
294 metadata: json!({ "label": label }),
295 },
296 ProcessProvenance::host(),
297 )
298 }
299
300 async fn register_visible(
301 registry: &Arc<dyn ProcessRegistry>,
302 scope: &SessionScope,
303 registration: ProcessRegistration,
304 descriptor: ProcessHandleDescriptor,
305 ) {
306 let process_id = registration.id.clone();
307 registry
308 .register_process(registration)
309 .await
310 .expect("register process");
311 registry
312 .grant_handle(scope, &process_id, descriptor)
313 .await
314 .expect("grant process handle");
315 }
316
317 #[tokio::test]
318 async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
319 let registry =
320 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
321 let visible_scope = SessionScope::new("visible");
322 register_visible(
323 ®istry,
324 &visible_scope,
325 external_registration("visible-process", "Visible"),
326 ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
327 )
328 .await;
329 register_visible(
330 ®istry,
331 &SessionScope::new("other"),
332 external_registration("hidden-process", "Hidden"),
333 ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
334 )
335 .await;
336 registry
337 .append_event(
338 "visible-process",
339 ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
340 .with_replay_key("visible-process:cancel-requested"),
341 )
342 .await
343 .expect("append event");
344
345 let snapshot = observer(Arc::clone(®istry))
346 .snapshot_for_session("visible")
347 .await
348 .expect("snapshot");
349
350 assert_eq!(snapshot.session_id, "visible");
351 assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
352 assert_eq!(snapshot.items.len(), 1);
353 assert_eq!(snapshot.items[0].events.len(), 1);
354 assert_eq!(
355 snapshot.items[0].events[0].event_type,
356 "process.cancel_requested"
357 );
358 assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
359 }
360
361 #[tokio::test]
362 async fn snapshot_for_session_includes_frame_wake_targets_without_handle_grants() {
363 let registry =
364 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
365 let frame_scope = SessionScope::for_agent_frame("visible", "frame-a");
366 registry
367 .register_process(ProcessRegistration::new(
368 "frame-originated",
369 ProcessInput::External {
370 metadata: json!({ "label": "Frame originated" }),
371 },
372 ProcessProvenance::session(frame_scope.clone()),
373 ))
374 .await
375 .expect("register frame-originated process");
376 registry
377 .register_process(
378 external_registration("frame-wake-targeted", "Frame wake targeted")
379 .with_wake_target(Some(frame_scope)),
380 )
381 .await
382 .expect("register frame wake-targeted process");
383 registry
384 .register_process(
385 external_registration("hidden-frame", "Hidden")
386 .with_wake_target(Some(SessionScope::for_agent_frame("other", "frame-b"))),
387 )
388 .await
389 .expect("register hidden process");
390
391 let snapshot = observer(Arc::clone(®istry))
392 .snapshot_for_session("visible")
393 .await
394 .expect("snapshot");
395 let visible_process_ids = snapshot
396 .visible_process_ids
397 .iter()
398 .cloned()
399 .collect::<std::collections::BTreeSet<_>>();
400
401 assert_eq!(
402 visible_process_ids,
403 std::collections::BTreeSet::from(["frame-wake-targeted".to_string()])
404 );
405 assert_eq!(snapshot.items.len(), 1);
406 }
407
408 #[tokio::test]
409 async fn snapshot_for_session_labels_engine_wake_targets_from_identity_without_handle_grants() {
410 let registry =
411 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
412 let scope = SessionScope::new("visible");
413 registry
414 .register_process(
415 ProcessRegistration::new(
416 "engine-wake-targeted",
417 ProcessInput::Engine {
418 kind: "test-engine".to_string(),
419 payload: json!({}),
420 },
421 ProcessProvenance::host(),
422 )
423 .with_identity(
424 ProcessIdentity::new("test-engine").with_label(Some("remember".to_string())),
425 )
426 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new("process-env:test")))
427 .with_wake_target(Some(scope)),
428 )
429 .await
430 .expect("register engine wake-targeted process");
431
432 let snapshot = observer(Arc::clone(®istry))
433 .snapshot_for_session("visible")
434 .await
435 .expect("snapshot");
436
437 assert_eq!(snapshot.items.len(), 1);
438 assert_eq!(snapshot.items[0].kind, "test-engine");
439 assert_eq!(snapshot.items[0].label, "remember");
440 assert_eq!(
441 snapshot.items[0].descriptor.kind.as_deref(),
442 Some("test-engine")
443 );
444 assert_eq!(
445 snapshot.items[0].descriptor.label.as_deref(),
446 Some("remember")
447 );
448 assert_eq!(snapshot.items[0].process.kind, "test-engine");
449 assert_eq!(snapshot.items[0].process.label, "remember");
450 }
451
452 #[tokio::test]
453 async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
454 let registry =
455 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
456 let scope = SessionScope::new("sort");
457 register_visible(
458 ®istry,
459 &scope,
460 external_registration("older", "Older"),
461 ProcessHandleDescriptor::new(None::<String>, None::<String>),
462 )
463 .await;
464 tokio::time::sleep(Duration::from_millis(2)).await;
465 register_visible(
466 ®istry,
467 &scope,
468 external_registration("newer", "Newer"),
469 ProcessHandleDescriptor::new(None::<String>, None::<String>),
470 )
471 .await;
472 tokio::time::sleep(Duration::from_millis(2)).await;
473 registry
474 .append_event(
475 "older",
476 ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
477 .with_replay_key("older:cancel-requested"),
478 )
479 .await
480 .expect("update older process");
481
482 let snapshot = observer(Arc::clone(®istry))
483 .snapshot_for_session("sort")
484 .await
485 .expect("snapshot");
486
487 assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
488 }
489
490 #[tokio::test]
491 async fn observed_process_reports_terminal_status_and_error_messages() {
492 let registry =
493 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
494 for process_id in ["failed", "cancelled"] {
495 registry
496 .register_process(external_registration(process_id, process_id))
497 .await
498 .expect("register");
499 }
500 registry
501 .complete_process(
502 "failed",
503 ProcessAwaitOutput::Failure {
504 class: ToolFailureClass::External,
505 code: "boom".to_string(),
506 message: "failed loudly".to_string(),
507 raw: None,
508 control: None,
509 },
510 )
511 .await
512 .expect("fail process");
513 registry
514 .complete_process(
515 "cancelled",
516 ProcessAwaitOutput::Cancelled {
517 message: "cancelled intentionally".to_string(),
518 raw: None,
519 control: None,
520 },
521 )
522 .await
523 .expect("cancel process");
524
525 let observer = observer(Arc::clone(®istry));
526 let failed = observer.process("failed").await.expect("failed process");
527 let cancelled = observer
528 .process("cancelled")
529 .await
530 .expect("cancelled process");
531
532 assert_eq!(failed.status_label, "failed");
533 assert!(failed.terminal);
534 assert_eq!(failed.error.as_deref(), Some("failed loudly"));
535 assert_eq!(cancelled.status_label, "cancelled");
536 assert!(cancelled.terminal);
537 assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
538 }
539
540 #[tokio::test]
541 async fn observed_process_exposes_current_wait_state() {
542 let registry =
543 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
544 let scope = SessionScope::new("wait");
545 register_visible(
546 ®istry,
547 &scope,
548 external_registration("waiting-process", "Waiting"),
549 ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
550 )
551 .await;
552 let wait = WaitState {
553 since_ms: 1234,
554 kind: WaitKind::Signal {
555 name: "ready".to_string(),
556 event_type: "signal.ready".to_string(),
557 key: "process:waiting-process:signal.ready:1".to_string(),
558 ordinal: 1,
559 },
560 };
561 registry
562 .set_process_wait("waiting-process", wait.clone())
563 .await
564 .expect("set wait");
565
566 let observer = observer(Arc::clone(®istry));
567 let observed = observer
568 .process("waiting-process")
569 .await
570 .expect("waiting process");
571 let snapshot = observer
572 .snapshot_for_session("wait")
573 .await
574 .expect("snapshot");
575
576 assert_eq!(observed.wait, Some(wait.clone()));
577 assert_eq!(snapshot.items.len(), 1);
578 assert_eq!(snapshot.items[0].process.wait, Some(wait));
579 }
580
581 #[tokio::test]
582 async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
583 let registry =
584 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
585 let scope = SessionScope::new("labels");
586 let mut child_request = SessionCreateRequest::child_session(
587 "labels",
588 SessionStartPoint::Empty,
589 PluginOptions::default(),
590 )
591 .with_session_id("child-session");
592 child_request.subagent = Some(SubagentSessionContext {
593 parent_session_id: "labels".to_string(),
594 capability: "researcher".to_string(),
595 depth: 1,
596 max_depth: 4,
597 });
598 let cases = [
599 (
600 "tool",
601 ProcessInput::ToolCall {
602 call: PreparedToolCall::from_parts(
603 "call-1",
604 "shell.run",
605 json!({}),
606 None,
607 serde_json::Value::Null,
608 ),
609 },
610 "tool",
611 "shell.run",
612 None,
613 ),
614 (
615 "engine",
616 ProcessInput::Engine {
617 kind: "test-engine".to_string(),
618 payload: json!({}),
619 },
620 "test-engine",
621 "remember",
622 None,
623 ),
624 (
625 "session",
626 ProcessInput::SessionTurn {
627 create_request: Box::new(child_request),
628 turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
629 output_contract: ToolOutputContract::Static,
630 },
631 "session_turn",
632 "researcher",
633 Some("child-session"),
634 ),
635 (
636 "external",
637 ProcessInput::External {
638 metadata: json!({ "label": "external job" }),
639 },
640 "external",
641 "external job",
642 None,
643 ),
644 ];
645 for (process_id, input, kind, label, _child_session_id) in cases {
646 let needs_env = matches!(
647 input,
648 ProcessInput::ToolCall { .. } | ProcessInput::Engine { .. }
649 );
650 let mut registration =
651 ProcessRegistration::new(process_id, input, ProcessProvenance::host())
652 .with_identity(ProcessIdentity::new(kind).with_label(Some(label.to_string())));
653 if needs_env {
654 registration = registration.with_execution_env_ref(Some(
655 ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
656 ));
657 }
658 register_visible(
659 ®istry,
660 &scope,
661 registration,
662 ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
663 )
664 .await;
665 }
666
667 let snapshot = observer(Arc::clone(®istry))
668 .snapshot_for_session("labels")
669 .await
670 .expect("snapshot");
671 let by_id = snapshot
672 .items
673 .iter()
674 .map(|item| (item.process.process_id.as_str(), item))
675 .collect::<std::collections::BTreeMap<_, _>>();
676
677 assert_eq!(by_id["tool"].label, "shell.run");
678 assert_eq!(by_id["engine"].label, "remember");
679 assert_eq!(by_id["engine"].process.kind, "test-engine");
680 assert_eq!(by_id["session"].label, "researcher");
681 assert_eq!(
682 by_id["session"].process.child_session_id.as_deref(),
683 Some("child-session")
684 );
685 assert_eq!(by_id["external"].label, "external job");
686 }
687
688 #[tokio::test]
689 async fn observed_process_missing_lookup_returns_none() {
690 let registry =
691 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
692
693 assert!(observer(registry).process("missing").await.is_none());
694 }
695}