1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9 ProcessExecutionEnvRef, ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput,
10 ProcessLifecycleStatus, ProcessListFilter, ProcessOriginator, ProcessRecord, SessionScope,
11 WaitState,
12};
13use super::registry::ProcessRegistry;
14use super::time::epoch_ms_from_system_time;
15
16#[derive(Clone)]
17pub struct ProcessWorkObserver {
18 registry: Arc<dyn ProcessRegistry>,
19}
20
21#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct ProcessWorkSnapshot {
23 pub session_id: String,
24 pub visible_process_ids: Vec<ProcessId>,
25 pub items: Vec<ObservedWorkItem>,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct ObservedWorkItem {
30 pub process: ObservedProcess,
31 pub descriptor: ProcessHandleDescriptor,
32 pub events: Vec<ObservedProcessEvent>,
33 pub kind: String,
34 pub label: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct ObservedProcess {
39 pub process_id: ProcessId,
40 pub graph_key: String,
41 pub kind: String,
42 pub lifecycle: ProcessLifecycleStatus,
43 pub status_label: String,
44 pub terminal: bool,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub error: Option<String>,
47 pub created_at_ms: u64,
48 pub updated_at_ms: u64,
49 pub input: ProcessInput,
50 pub originator: ProcessOriginator,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub env_ref: Option<ProcessExecutionEnvRef>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub wake_target: Option<SessionScope>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub caused_by: Option<crate::CausalRef>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub external_ref: Option<ProcessExternalRef>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub wait: Option<WaitState>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub child_session_id: Option<String>,
63 pub label: String,
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ObservedProcessEvent {
68 pub sequence: u64,
69 pub event_type: String,
70 pub occurred_at_ms: u64,
71 pub payload: serde_json::Value,
72}
73
74pub const SNAPSHOT_EVENT_TAIL: usize = 32;
79
80impl ProcessWorkObserver {
81 pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
82 Self { registry }
83 }
84
85 pub async fn snapshot_for_session(
86 &self,
87 session_id: impl Into<String>,
88 ) -> Result<ProcessWorkSnapshot, PluginError> {
89 let session_id = session_id.into();
90 let session_scope = SessionScope::new(session_id.clone());
91 let entries = self.registry.list_handle_grants(&session_scope).await?;
92 let mut items = Vec::with_capacity(entries.len());
93 for (grant, record) in entries {
94 let events = self
95 .registry
96 .recent_events(&record.id, SNAPSHOT_EVENT_TAIL)
97 .await?
98 .into_iter()
99 .map(ObservedProcessEvent::from)
100 .collect();
101 let descriptor = grant.descriptor;
102 let process = ObservedProcess::from_record(record);
103 let kind = descriptor
104 .kind
105 .clone()
106 .unwrap_or_else(|| stable_kind(&process.input).to_string());
107 let label = typed_label(&process.input)
108 .or_else(|| descriptor.label.clone())
109 .unwrap_or_else(|| kind.clone());
110 items.push(ObservedWorkItem {
111 process,
112 descriptor,
113 events,
114 kind,
115 label,
116 });
117 }
118 items.sort_by(|left, right| {
119 right
120 .process
121 .updated_at_ms
122 .cmp(&left.process.updated_at_ms)
123 .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
124 .then_with(|| left.process.process_id.cmp(&right.process.process_id))
125 });
126 let visible_process_ids = items
127 .iter()
128 .map(|item| item.process.process_id.clone())
129 .collect();
130 Ok(ProcessWorkSnapshot {
131 session_id,
132 visible_process_ids,
133 items,
134 })
135 }
136
137 pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
138 self.registry
139 .get_process(process_id)
140 .await
141 .map(ObservedProcess::from_record)
142 }
143
144 pub async fn list(
145 &self,
146 filter: &ProcessListFilter,
147 ) -> Result<Vec<ObservedProcess>, PluginError> {
148 Ok(self
149 .registry
150 .list_processes(filter)
151 .await?
152 .into_iter()
153 .map(ObservedProcess::from_record)
154 .collect())
155 }
156
157 pub async fn events_after(
158 &self,
159 process_id: &str,
160 after_sequence: u64,
161 ) -> Result<Vec<ObservedProcessEvent>, PluginError> {
162 Ok(self
163 .registry
164 .events_after(process_id, after_sequence)
165 .await?
166 .into_iter()
167 .map(ObservedProcessEvent::from)
168 .collect())
169 }
170}
171
172impl ObservedProcess {
173 fn from_record(record: ProcessRecord) -> Self {
174 let lifecycle = ProcessLifecycleStatus::from(&record.status);
175 let input = record.input.as_ref().clone();
176 let kind = stable_kind(&input).to_string();
177 let label = typed_label(&input).unwrap_or_else(|| kind.clone());
178 let process_id = record.id;
179 Self {
180 graph_key: format!("process:{process_id}"),
181 process_id,
182 kind,
183 lifecycle,
184 status_label: lifecycle.label().to_string(),
185 terminal: lifecycle.is_terminal(),
186 error: terminal_error(&record.status),
187 created_at_ms: record.created_at_ms,
188 updated_at_ms: record.updated_at_ms,
189 originator: record.provenance.originator,
190 env_ref: record.env_ref,
191 wake_target: record.wake_target,
192 caused_by: record.provenance.caused_by,
193 external_ref: record.external_ref,
194 wait: record.wait,
195 child_session_id: child_session_id(&input),
196 input,
197 label,
198 }
199 }
200}
201
202impl From<ProcessEvent> for ObservedProcessEvent {
203 fn from(event: ProcessEvent) -> Self {
204 Self {
205 sequence: event.sequence,
206 event_type: event.event_type,
207 occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
208 payload: event.payload,
209 }
210 }
211}
212
213fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
214 match status.await_output()? {
215 ProcessAwaitOutput::Failure { message, .. }
216 | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
217 ProcessAwaitOutput::Success { .. } => None,
218 }
219}
220
221fn child_session_id(input: &ProcessInput) -> Option<String> {
222 match input {
223 ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
224 ProcessInput::ToolCall { .. }
225 | ProcessInput::LashlangProcess { .. }
226 | ProcessInput::External { .. } => None,
227 }
228}
229
230fn typed_label(input: &ProcessInput) -> Option<String> {
231 match input {
232 ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
233 ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
234 ProcessInput::SessionTurn { create_request, .. } => create_request
235 .subagent
236 .as_ref()
237 .map(|subagent| subagent.capability.clone())
238 .or_else(|| create_request.usage_source.clone())
239 .or_else(|| create_request.session_id.clone()),
240 ProcessInput::External { metadata } => metadata
241 .get("label")
242 .or_else(|| metadata.get("name"))
243 .or_else(|| metadata.get("title"))
244 .and_then(serde_json::Value::as_str)
245 .map(str::to_string),
246 }
247}
248
249fn stable_kind(input: &ProcessInput) -> &'static str {
250 match input {
251 ProcessInput::ToolCall { .. } => "tool",
252 ProcessInput::LashlangProcess { .. } => "lashlang",
253 ProcessInput::SessionTurn { .. } => "session_turn",
254 ProcessInput::External { .. } => "external",
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use std::sync::Arc;
261 use std::time::Duration;
262
263 use serde_json::json;
264
265 use super::*;
266 use crate::{
267 InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest,
268 ProcessExecutionEnvRef, ProcessProvenance, ProcessRegistration, SessionCreateRequest,
269 SessionScope, SessionStartPoint, SubagentSessionContext, ToolFailureClass,
270 ToolOutputContract, TurnInput, WaitKind,
271 };
272
273 fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
274 ProcessWorkObserver::new(registry)
275 }
276
277 fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
278 ProcessRegistration::new(
279 process_id,
280 ProcessInput::External {
281 metadata: json!({ "label": label }),
282 },
283 ProcessProvenance::host("observer-test-host"),
284 )
285 }
286
287 async fn register_visible(
288 registry: &Arc<dyn ProcessRegistry>,
289 scope: &SessionScope,
290 registration: ProcessRegistration,
291 descriptor: ProcessHandleDescriptor,
292 ) {
293 let process_id = registration.id.clone();
294 registry
295 .register_process(registration)
296 .await
297 .expect("register process");
298 registry
299 .grant_handle(scope, &process_id, descriptor)
300 .await
301 .expect("grant process handle");
302 }
303
304 #[tokio::test]
305 async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
306 let registry =
307 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
308 let visible_scope = SessionScope::new("visible");
309 register_visible(
310 ®istry,
311 &visible_scope,
312 external_registration("visible-process", "Visible"),
313 ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
314 )
315 .await;
316 register_visible(
317 ®istry,
318 &SessionScope::new("other"),
319 external_registration("hidden-process", "Hidden"),
320 ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
321 )
322 .await;
323 registry
324 .append_event(
325 "visible-process",
326 ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
327 .with_replay_key("visible-process:cancel-requested"),
328 )
329 .await
330 .expect("append event");
331
332 let snapshot = observer(Arc::clone(®istry))
333 .snapshot_for_session("visible")
334 .await
335 .expect("snapshot");
336
337 assert_eq!(snapshot.session_id, "visible");
338 assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
339 assert_eq!(snapshot.items.len(), 1);
340 assert_eq!(snapshot.items[0].events.len(), 1);
341 assert_eq!(
342 snapshot.items[0].events[0].event_type,
343 "process.cancel_requested"
344 );
345 assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
346 }
347
348 #[tokio::test]
349 async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
350 let registry =
351 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
352 let scope = SessionScope::new("sort");
353 register_visible(
354 ®istry,
355 &scope,
356 external_registration("older", "Older"),
357 ProcessHandleDescriptor::new(None::<String>, None::<String>),
358 )
359 .await;
360 tokio::time::sleep(Duration::from_millis(2)).await;
361 register_visible(
362 ®istry,
363 &scope,
364 external_registration("newer", "Newer"),
365 ProcessHandleDescriptor::new(None::<String>, None::<String>),
366 )
367 .await;
368 tokio::time::sleep(Duration::from_millis(2)).await;
369 registry
370 .append_event(
371 "older",
372 ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
373 .with_replay_key("older:cancel-requested"),
374 )
375 .await
376 .expect("update older process");
377
378 let snapshot = observer(Arc::clone(®istry))
379 .snapshot_for_session("sort")
380 .await
381 .expect("snapshot");
382
383 assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
384 }
385
386 #[tokio::test]
387 async fn observed_process_reports_terminal_status_and_error_messages() {
388 let registry =
389 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
390 for process_id in ["failed", "cancelled"] {
391 registry
392 .register_process(external_registration(process_id, process_id))
393 .await
394 .expect("register");
395 }
396 registry
397 .complete_process(
398 "failed",
399 ProcessAwaitOutput::Failure {
400 class: ToolFailureClass::External,
401 code: "boom".to_string(),
402 message: "failed loudly".to_string(),
403 raw: None,
404 control: None,
405 },
406 )
407 .await
408 .expect("fail process");
409 registry
410 .complete_process(
411 "cancelled",
412 ProcessAwaitOutput::Cancelled {
413 message: "cancelled intentionally".to_string(),
414 raw: None,
415 control: None,
416 },
417 )
418 .await
419 .expect("cancel process");
420
421 let observer = observer(Arc::clone(®istry));
422 let failed = observer.process("failed").await.expect("failed process");
423 let cancelled = observer
424 .process("cancelled")
425 .await
426 .expect("cancelled process");
427
428 assert_eq!(failed.status_label, "failed");
429 assert!(failed.terminal);
430 assert_eq!(failed.error.as_deref(), Some("failed loudly"));
431 assert_eq!(cancelled.status_label, "cancelled");
432 assert!(cancelled.terminal);
433 assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
434 }
435
436 #[tokio::test]
437 async fn observed_process_exposes_current_wait_state() {
438 let registry =
439 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
440 let scope = SessionScope::new("wait");
441 register_visible(
442 ®istry,
443 &scope,
444 external_registration("waiting-process", "Waiting"),
445 ProcessHandleDescriptor::new(Some("external"), Some("Waiting")),
446 )
447 .await;
448 let wait = WaitState {
449 since_ms: 1234,
450 kind: WaitKind::Signal {
451 name: "ready".to_string(),
452 event_type: "signal.ready".to_string(),
453 key: "process:waiting-process:signal.ready:1".to_string(),
454 ordinal: 1,
455 },
456 };
457 registry
458 .set_process_wait("waiting-process", wait.clone())
459 .await
460 .expect("set wait");
461
462 let observer = observer(Arc::clone(®istry));
463 let observed = observer
464 .process("waiting-process")
465 .await
466 .expect("waiting process");
467 let snapshot = observer
468 .snapshot_for_session("wait")
469 .await
470 .expect("snapshot");
471
472 assert_eq!(observed.wait, Some(wait.clone()));
473 assert_eq!(snapshot.items.len(), 1);
474 assert_eq!(snapshot.items[0].process.wait, Some(wait));
475 }
476
477 #[tokio::test]
478 async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
479 let registry =
480 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
481 let scope = SessionScope::new("labels");
482 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
483 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
484 let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
485 let mut child_request = SessionCreateRequest::child_session(
486 "labels",
487 SessionStartPoint::Empty,
488 PluginOptions::default(),
489 )
490 .with_session_id("child-session");
491 child_request.subagent = Some(SubagentSessionContext {
492 parent_session_id: "labels".to_string(),
493 capability: "researcher".to_string(),
494 depth: 1,
495 max_depth: 4,
496 });
497 let cases = [
498 (
499 "tool",
500 ProcessInput::ToolCall {
501 call: PreparedToolCall::from_parts(
502 "call-1",
503 "shell.run",
504 json!({}),
505 None,
506 serde_json::Value::Null,
507 ),
508 },
509 "tool",
510 "shell.run",
511 None,
512 ),
513 (
514 "lashlang",
515 ProcessInput::LashlangProcess {
516 module_ref,
517 process_ref,
518 required_surface_ref: surface_ref,
519 process_name: "remember".to_string(),
520 args: serde_json::Map::new(),
521 },
522 "lashlang",
523 "remember",
524 None,
525 ),
526 (
527 "session",
528 ProcessInput::SessionTurn {
529 create_request: Box::new(child_request),
530 turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
531 output_contract: ToolOutputContract::Static,
532 },
533 "session_turn",
534 "researcher",
535 Some("child-session"),
536 ),
537 (
538 "external",
539 ProcessInput::External {
540 metadata: json!({ "label": "external job" }),
541 },
542 "external",
543 "external job",
544 None,
545 ),
546 ];
547 for (process_id, input, _kind, _label, _child_session_id) in cases {
548 let needs_env = matches!(
549 input,
550 ProcessInput::ToolCall { .. } | ProcessInput::LashlangProcess { .. }
551 );
552 let mut registration = ProcessRegistration::new(
553 process_id,
554 input,
555 ProcessProvenance::host("observer-test-host"),
556 );
557 if needs_env {
558 registration = registration.with_execution_env_ref(Some(
559 ProcessExecutionEnvRef::new(format!("process-env:test:{process_id}")),
560 ));
561 }
562 register_visible(
563 ®istry,
564 &scope,
565 registration,
566 ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
567 )
568 .await;
569 }
570
571 let snapshot = observer(Arc::clone(®istry))
572 .snapshot_for_session("labels")
573 .await
574 .expect("snapshot");
575 let by_id = snapshot
576 .items
577 .iter()
578 .map(|item| (item.process.process_id.as_str(), item))
579 .collect::<std::collections::BTreeMap<_, _>>();
580
581 assert_eq!(by_id["tool"].label, "shell.run");
582 assert_eq!(by_id["lashlang"].label, "remember");
583 assert_eq!(by_id["session"].label, "researcher");
584 assert_eq!(
585 by_id["session"].process.child_session_id.as_deref(),
586 Some("child-session")
587 );
588 assert_eq!(by_id["external"].label, "external job");
589 }
590
591 #[tokio::test]
592 async fn observed_process_missing_lookup_returns_none() {
593 let registry =
594 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
595
596 assert!(observer(registry).process("missing").await.is_none());
597 }
598}