1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9 ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput,
10 ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator, ProcessRecord, SessionScope,
11 WaitState,
12};
13use super::registry::ProcessRegistry;
14use super::time::epoch_ms_from_system_time;
15
16#[derive(Clone)]
17pub struct ProcessWorkObserver {
18 registry: Arc<dyn ProcessRegistry>,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct ProcessWorkSnapshot {
23 pub session_id: String,
24 pub visible_process_ids: Vec<ProcessId>,
25 pub items: Vec<ObservedWorkItem>,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct ObservedWorkItem {
30 pub process: ObservedProcess,
31 pub descriptor: ProcessHandleDescriptor,
32 pub events: Vec<ObservedProcessEvent>,
33 pub kind: String,
34 pub label: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct ObservedProcess {
39 pub process_id: ProcessId,
40 pub graph_key: String,
41 pub kind: String,
42 pub lifecycle: ProcessLifecycleStatus,
43 pub status_label: String,
44 pub terminal: bool,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub error: Option<String>,
47 pub created_at_ms: u64,
48 pub updated_at_ms: u64,
49 pub input: ProcessInput,
50 pub originator: ProcessOriginator,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub env_ref: Option<ProcessExecutionEnvRef>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub wake_target: Option<SessionScope>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub caused_by: Option<crate::CausalRef>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub external_ref: Option<ProcessExternalRef>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub wait: Option<WaitState>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub child_session_id: Option<String>,
63 pub label: String,
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ObservedProcessEvent {
68 pub sequence: u64,
69 pub event_type: String,
70 pub occurred_at_ms: u64,
71 pub payload: serde_json::Value,
72}
73
74impl ProcessWorkObserver {
75 pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
76 Self { registry }
77 }
78
79 pub async fn snapshot_for_session(
80 &self,
81 session_id: impl Into<String>,
82 ) -> Result<ProcessWorkSnapshot, PluginError> {
83 let session_id = session_id.into();
84 let session_scope = SessionScope::new(session_id.clone());
85 let entries = self.registry.list_handle_grants(&session_scope).await?;
86 let mut items = Vec::with_capacity(entries.len());
87 for (grant, record) in entries {
88 let events = self
89 .registry
90 .events_after(&record.id, 0)
91 .await?
92 .into_iter()
93 .map(ObservedProcessEvent::from)
94 .collect();
95 let descriptor = grant.descriptor;
96 let process = ObservedProcess::from_record(record);
97 let kind = descriptor
98 .kind
99 .clone()
100 .unwrap_or_else(|| stable_kind(&process.input).to_string());
101 let label = typed_label(&process.input)
102 .or_else(|| descriptor.label.clone())
103 .unwrap_or_else(|| kind.clone());
104 items.push(ObservedWorkItem {
105 process,
106 descriptor,
107 events,
108 kind,
109 label,
110 });
111 }
112 items.sort_by(|left, right| {
113 right
114 .process
115 .updated_at_ms
116 .cmp(&left.process.updated_at_ms)
117 .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
118 .then_with(|| left.process.process_id.cmp(&right.process.process_id))
119 });
120 let visible_process_ids = items
121 .iter()
122 .map(|item| item.process.process_id.clone())
123 .collect();
124 Ok(ProcessWorkSnapshot {
125 session_id,
126 visible_process_ids,
127 items,
128 })
129 }
130
131 pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
132 self.registry
133 .get_process(process_id)
134 .await
135 .map(ObservedProcess::from_record)
136 }
137
138 pub async fn list(
139 &self,
140 filter: &ProcessListFilter,
141 ) -> Result<Vec<ObservedProcess>, PluginError> {
142 Ok(self
143 .registry
144 .list_processes(filter)
145 .await?
146 .into_iter()
147 .map(ObservedProcess::from_record)
148 .collect())
149 }
150
151 pub async fn events_after(
152 &self,
153 process_id: &str,
154 after_sequence: u64,
155 ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
156 Ok(self
157 .registry
158 .events_after(process_id, after_sequence)
159 .await?
160 .into_iter()
161 .map(ObservedProcessEvent::from)
162 .collect())
163 }
164}
165
166impl ObservedProcess {
167 fn from_record(record: ProcessRecord) -> Self {
168 let lifecycle = ProcessLifecycleStatus::from(&record.status);
169 let input = record.input.as_ref().clone();
170 let kind = stable_kind(&input).to_string();
171 let label = typed_label(&input).unwrap_or_else(|| kind.clone());
172 let process_id = record.id;
173 Self {
174 graph_key: format!("process:{process_id}"),
175 process_id,
176 kind,
177 lifecycle,
178 status_label: lifecycle.label().to_string(),
179 terminal: lifecycle.is_terminal(),
180 error: terminal_error(&record.status),
181 created_at_ms: record.created_at_ms,
182 updated_at_ms: record.updated_at_ms,
183 originator: record.provenance.originator,
184 env_ref: record.env_ref,
185 wake_target: record.wake_target,
186 caused_by: record.provenance.caused_by,
187 external_ref: record.external_ref,
188 wait: record.wait,
189 child_session_id: child_session_id(&input),
190 input,
191 label,
192 }
193 }
194}
195
196impl From<ProcessEvent> for ObservedProcessEvent {
197 fn from(event: ProcessEvent) -> Self {
198 Self {
199 sequence: event.sequence,
200 event_type: event.event_type,
201 occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
202 payload: event.payload,
203 }
204 }
205}
206
207fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
208 match status.await_output()? {
209 ProcessAwaitOutput::Failure { message, .. }
210 | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
211 ProcessAwaitOutput::Success { .. } => None,
212 }
213}
214
215fn child_session_id(input: &ProcessInput) -> Option<String> {
216 match input {
217 ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
218 ProcessInput::ToolCall { .. }
219 | ProcessInput::LashlangProcess { .. }
220 | ProcessInput::External { .. } => None,
221 }
222}
223
224fn typed_label(input: &ProcessInput) -> Option<String> {
225 match input {
226 ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
227 ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
228 ProcessInput::SessionTurn { create_request, .. } => create_request
229 .subagent
230 .as_ref()
231 .map(|subagent| subagent.capability.clone())
232 .or_else(|| create_request.usage_source.clone())
233 .or_else(|| create_request.session_id.clone()),
234 ProcessInput::External { metadata } => metadata
235 .get("label")
236 .or_else(|| metadata.get("name"))
237 .or_else(|| metadata.get("title"))
238 .and_then(serde_json::Value::as_str)
239 .map(str::to_string),
240 }
241}
242
243fn stable_kind(input: &ProcessInput) -> &'static str {
244 match input {
245 ProcessInput::ToolCall { .. } => "tool",
246 ProcessInput::LashlangProcess { .. } => "lashlang",
247 ProcessInput::SessionTurn { .. } => "session_turn",
248 ProcessInput::External { .. } => "external",
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use std::sync::Arc;
255 use std::time::Duration;
256
257 use serde_json::json;
258
259 use super::*;
260 use crate::{
261 InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
262 ProcessExecutionEnvRef, ProcessProvenance, ProcessRegistration, SessionCreateRequest,
263 SessionScope, SessionStartPoint, SubagentSessionContext, ToolFailureClass,
264 ToolOutputContract, TurnInput, WaitKind,
265 };
266
267 fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
268 ProcessWorkObserver::new(registry)
269 }
270
271 fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
272 ProcessRegistration::new(
273 process_id,
274 ProcessInput::External {
275 metadata: json!({ "label": label }),
276 },
277 ProcessProvenance::host("observer-test-host"),
278 )
279 }
280
281 async fn register_visible(
282 registry: &Arc<dyn ProcessRegistry>,
283 scope: &SessionScope,
284 registration: ProcessRegistration,
285 descriptor: ProcessHandleDescriptor,
286 ) {
287 let process_id = registration.id.clone();
288 registry
289 .register_process(registration)
290 .await
291 .expect("register process");
292 registry
293 .grant_handle(scope, &process_id, descriptor)
294 .await
295 .expect("grant process handle");
296 }
297
298 #[tokio::test]
299 async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
300 let registry =
301 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
302 let visible_scope = SessionScope::new("visible");
303 register_visible(
304 ®istry,
305 &visible_scope,
306 external_registration("visible-process", "Visible"),
307 ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
308 )
309 .await;
310 register_visible(
311 ®istry,
312 &SessionScope::new("other"),
313 external_registration("hidden-process", "Hidden"),
314 ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
315 )
316 .await;
317 registry
318 .append_event(
319 "visible-process",
320 ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
321 .with_replay_key("visible-process:cancel-requested"),
322 )
323 .await
324 .expect("append event");
325
326 let snapshot = observer(Arc::clone(®istry))
327 .snapshot_for_session("visible")
328 .await
329 .expect("snapshot");
330
331 assert_eq!(snapshot.session_id, "visible");
332 assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
333 assert_eq!(snapshot.items.len(), 1);
334 assert_eq!(snapshot.items[0].events.len(), 1);
335 assert_eq!(
336 snapshot.items[0].events[0].event_type,
337 "process.cancel_requested"
338 );
339 assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
340 }
341
342 #[tokio::test]
343 async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
344 let registry =
345 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
346 let scope = SessionScope::new("sort");
347 register_visible(
348 ®istry,
349 &scope,
350 external_registration("older", "Older"),
351 ProcessHandleDescriptor::new(None::<String>, None::<String>),
352 )
353 .await;
354 tokio::time::sleep(Duration::from_millis(2)).await;
355 register_visible(
356 ®istry,
357 &scope,
358 external_registration("newer", "Newer"),
359 ProcessHandleDescriptor::new(None::<String>, None::<String>),
360 )
361 .await;
362 tokio::time::sleep(Duration::from_millis(2)).await;
363 registry
364 .append_event(
365 "older",
366 ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
367 .with_replay_key("older:cancel-requested"),
368 )
369 .await
370 .expect("update older process");
371
372 let snapshot = observer(Arc::clone(®istry))
373 .snapshot_for_session("sort")
374 .await
375 .expect("snapshot");
376
377 assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
378 }
379
380 #[tokio::test]
381 async fn observed_process_reports_terminal_status_and_error_messages() {
382 let registry =
383 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
384 for process_id in ["failed", "cancelled"] {
385 registry
386 .register_process(external_registration(process_id, process_id))
387 .await
388 .expect("register");
389 }
390 registry
391 .complete_process(
392 "failed",
393 ProcessAwaitOutput::Failure {
394 class: ToolFailureClass::External,
395 code: "boom".to_string(),
396 message: "failed loudly".to_string(),
397 raw: None,
398 control: None,
399 },
400 )
401 .await
402 .expect("fail process");
403 registry
404 .complete_process(
405 "cancelled",
406 ProcessAwaitOutput::Cancelled {
407 message: "cancelled intentionally".to_string(),
408 raw: None,
409 control: None,
410 },
411 )
412 .await
413 .expect("cancel process");
414
415 let observer = observer(Arc::clone(®istry));
416 let failed = observer.process("failed").await.expect("failed process");
417 let cancelled = observer
418 .process("cancelled")
419 .await
420 .expect("cancelled process");
421
422 assert_eq!(failed.status_label, "failed");
423 assert!(failed.terminal);
424 assert_eq!(failed.error.as_deref(), Some("failed loudly"));
425 assert_eq!(cancelled.status_label, "cancelled");
426 assert!(cancelled.terminal);
427 assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
428 }
429
430 #[tokio::test]
431 async fn observed_process_exposes_current_wait_state() {
432 let registry =
433 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
434 let scope = SessionScope::new("wait");
435 register_visible(
436 ®istry,
437 &scope,
438 external_registration("waiting-process", "Waiting"),
439 ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
440 )
441 .await;
442 let wait = WaitState {
443 since_ms: 1234,
444 kind: WaitKind::Signal {
445 name: "ready".to_string(),
446 event_type: "signal.ready".to_string(),
447 key: "process:waiting-process:signal.ready:1".to_string(),
448 ordinal: 1,
449 },
450 };
451 registry
452 .set_process_wait("waiting-process", wait.clone())
453 .await
454 .expect("set wait");
455
456 let observer = observer(Arc::clone(®istry));
457 let observed = observer
458 .process("waiting-process")
459 .await
460 .expect("waiting process");
461 let snapshot = observer
462 .snapshot_for_session("wait")
463 .await
464 .expect("snapshot");
465
466 assert_eq!(observed.wait, Some(wait.clone()));
467 assert_eq!(snapshot.items.len(), 1);
468 assert_eq!(snapshot.items[0].process.wait, Some(wait));
469 }
470
471 #[tokio::test]
472 async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
473 let registry =
474 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
475 let scope = SessionScope::new("labels");
476 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
477 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
478 let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
479 let mut child_request = SessionCreateRequest::child_session(
480 "labels",
481 SessionStartPoint::Empty,
482 PluginOptions::default(),
483 )
484 .with_session_id("child-session");
485 child_request.subagent = Some(SubagentSessionContext {
486 parent_session_id: "labels".to_string(),
487 capability: "researcher".to_string(),
488 depth: 1,
489 max_depth: 4,
490 });
491 let cases = [
492 (
493 "tool",
494 ProcessInput::ToolCall {
495 call: PreparedToolCall::from_parts(
496 "call-1",
497 "shell.run",
498 json!({}),
499 None,
500 serde_json::Value::Null,
501 ),
502 },
503 "tool",
504 "shell.run",
505 None,
506 ),
507 (
508 "lashlang",
509 ProcessInput::LashlangProcess {
510 module_ref,
511 process_ref,
512 required_surface_ref: surface_ref,
513 process_name: "remember".to_string(),
514 args: serde_json::Map::new(),
515 },
516 "lashlang",
517 "remember",
518 None,
519 ),
520 (
521 "session",
522 ProcessInput::SessionTurn {
523 create_request: Box::new(child_request),
524 turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
525 output_contract: ToolOutputContract::Static,
526 },
527 "session_turn",
528 "researcher",
529 Some("child-session"),
530 ),
531 (
532 "external",
533 ProcessInput::External {
534 metadata: json!({ "label": "external job" }),
535 },
536 "external",
537 "external job",
538 None,
539 ),
540 ];
541 for (process_id, input, _kind, _label, _child_session_id) in cases {
542 let needs_env = matches!(
543 input,
544 ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. }
545 );
546 let mut registration = ProcessRegistration::new(
547 process_id,
548 input,
549 ProcessProvenance::host("observer-test-host"),
550 );
551 if needs_env {
552 registration = registration.with_execution_env_ref(Some(
553 ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
554 ));
555 }
556 register_visible(
557 ®istry,
558 &scope,
559 registration,
560 ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
561 )
562 .await;
563 }
564
565 let snapshot = observer(Arc::clone(®istry))
566 .snapshot_for_session("labels")
567 .await
568 .expect("snapshot");
569 let by_id = snapshot
570 .items
571 .iter()
572 .map(|item| (item.process.process_id.as_str(), item))
573 .collect::<std::collections::BTreeMap<_, _>>();
574
575 assert_eq!(by_id["tool"].label, "shell.run");
576 assert_eq!(by_id["lashlang"].label, "remember");
577 assert_eq!(by_id["session"].label, "researcher");
578 assert_eq!(
579 by_id["session"].process.child_session_id.as_deref(),
580 Some("child-session")
581 );
582 assert_eq!(by_id["external"].label, "external job");
583 }
584
585 #[tokio::test]
586 async fn observed_process_missing_lookup_returns_none() {
587 let registry =
588 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
589
590 assert!(observer(registry).process("missing").await.is_none());
591 }
592}