1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::plugin::PluginError;
6
7use super::events::{ProcessAwaitOutput, ProcessEvent};
8use super::model::{
9 ProcessExternalRef, ProcessHandleDescriptor, ProcessId, ProcessInput, ProcessLifecycleStatus,
10 ProcessRecord, ProcessScope,
11};
12use super::registry::ProcessRegistry;
13use super::time::epoch_ms_from_system_time;
14
15#[derive(Clone)]
16pub struct ProcessWorkObserver {
17 registry: Arc<dyn ProcessRegistry>,
18}
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct ProcessWorkSnapshot {
22 pub session_id: String,
23 pub visible_process_ids: Vec<ProcessId>,
24 pub items: Vec<ObservedWorkItem>,
25}
26
27#[derive(Clone, Debug, Serialize, Deserialize)]
28pub struct ObservedWorkItem {
29 pub process: ObservedProcess,
30 pub descriptor: ProcessHandleDescriptor,
31 pub events: Vec<ObservedProcessEvent>,
32 pub kind: String,
33 pub label: String,
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct ObservedProcess {
38 pub process_id: ProcessId,
39 pub graph_key: String,
40 pub lifecycle: ProcessLifecycleStatus,
41 pub status_label: String,
42 pub terminal: bool,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub error: Option<String>,
45 pub created_at_ms: u64,
46 pub updated_at_ms: u64,
47 pub input: ProcessInput,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub external_ref: Option<ProcessExternalRef>,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub child_session_id: Option<String>,
52 pub label: String,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
56pub struct ObservedProcessEvent {
57 pub sequence: u64,
58 pub event_type: String,
59 pub occurred_at_ms: u64,
60 pub payload: serde_json::Value,
61}
62
63impl ProcessWorkObserver {
64 pub fn new(registry: Arc<dyn ProcessRegistry>) -> Self {
65 Self { registry }
66 }
67
68 pub async fn snapshot_for_session(
69 &self,
70 session_id: impl Into<String>,
71 ) -> Result<ProcessWorkSnapshot, PluginError> {
72 let session_id = session_id.into();
73 let owner_scope = ProcessScope::new(session_id.clone());
74 let entries = self.registry.list_handle_grants(&owner_scope).await?;
75 let mut items = Vec::with_capacity(entries.len());
76 for (grant, record) in entries {
77 let events = self
78 .registry
79 .events_after(&record.id, 0)
80 .await?
81 .into_iter()
82 .map(ObservedProcessEvent::from)
83 .collect();
84 let descriptor = grant.descriptor;
85 let process = ObservedProcess::from_record(record);
86 let kind = descriptor
87 .kind
88 .clone()
89 .unwrap_or_else(|| stable_kind(&process.input).to_string());
90 let label = typed_label(&process.input)
91 .or_else(|| descriptor.label.clone())
92 .unwrap_or_else(|| kind.clone());
93 items.push(ObservedWorkItem {
94 process,
95 descriptor,
96 events,
97 kind,
98 label,
99 });
100 }
101 items.sort_by(|left, right| {
102 right
103 .process
104 .updated_at_ms
105 .cmp(&left.process.updated_at_ms)
106 .then_with(|| right.process.created_at_ms.cmp(&left.process.created_at_ms))
107 .then_with(|| left.process.process_id.cmp(&right.process.process_id))
108 });
109 let visible_process_ids = items
110 .iter()
111 .map(|item| item.process.process_id.clone())
112 .collect();
113 Ok(ProcessWorkSnapshot {
114 session_id,
115 visible_process_ids,
116 items,
117 })
118 }
119
120 pub async fn process(&self, process_id: &str) -> Option<ObservedProcess> {
121 self.registry
122 .get_process(process_id)
123 .await
124 .map(ObservedProcess::from_record)
125 }
126}
127
128impl ObservedProcess {
129 fn from_record(record: ProcessRecord) -> Self {
130 let lifecycle = ProcessLifecycleStatus::from(&record.status);
131 let input = record.input.as_ref().clone();
132 let label = typed_label(&input).unwrap_or_else(|| stable_kind(&input).to_string());
133 let process_id = record.id;
134 Self {
135 graph_key: format!("process:{process_id}"),
136 process_id,
137 lifecycle,
138 status_label: lifecycle.label().to_string(),
139 terminal: lifecycle.is_terminal(),
140 error: terminal_error(&record.status),
141 created_at_ms: record.created_at_ms,
142 updated_at_ms: record.updated_at_ms,
143 external_ref: record.external_ref,
144 child_session_id: child_session_id(&input),
145 input,
146 label,
147 }
148 }
149}
150
151impl From<ProcessEvent> for ObservedProcessEvent {
152 fn from(event: ProcessEvent) -> Self {
153 Self {
154 sequence: event.sequence,
155 event_type: event.event_type,
156 occurred_at_ms: epoch_ms_from_system_time(event.occurred_at),
157 payload: event.payload,
158 }
159 }
160}
161
162fn terminal_error(status: &super::model::ProcessStatus) -> Option<String> {
163 match status.await_output()? {
164 ProcessAwaitOutput::Failure { message, .. }
165 | ProcessAwaitOutput::Cancelled { message, .. } => Some(message.clone()),
166 ProcessAwaitOutput::Success { .. } => None,
167 }
168}
169
170fn child_session_id(input: &ProcessInput) -> Option<String> {
171 match input {
172 ProcessInput::SessionTurn { create_request, .. } => create_request.session_id.clone(),
173 ProcessInput::ToolCall { .. }
174 | ProcessInput::LashlangProcess { .. }
175 | ProcessInput::External { .. } => None,
176 }
177}
178
179fn typed_label(input: &ProcessInput) -> Option<String> {
180 match input {
181 ProcessInput::ToolCall { call } => Some(call.tool_name.clone()),
182 ProcessInput::LashlangProcess { process_name, .. } => Some(process_name.clone()),
183 ProcessInput::SessionTurn { create_request, .. } => create_request
184 .subagent
185 .as_ref()
186 .map(|subagent| subagent.capability.clone())
187 .or_else(|| create_request.usage_source.clone())
188 .or_else(|| create_request.session_id.clone()),
189 ProcessInput::External { metadata } => metadata
190 .get("label")
191 .or_else(|| metadata.get("name"))
192 .or_else(|| metadata.get("title"))
193 .and_then(serde_json::Value::as_str)
194 .map(str::to_string),
195 }
196}
197
198fn stable_kind(input: &ProcessInput) -> &'static str {
199 match input {
200 ProcessInput::ToolCall { .. } => "tool",
201 ProcessInput::LashlangProcess { .. } => "lashlang",
202 ProcessInput::SessionTurn { .. } => "session_turn",
203 ProcessInput::External { .. } => "external",
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use std::sync::Arc;
210 use std::time::Duration;
211
212 use serde_json::json;
213
214 use super::*;
215 use crate::{
216 InputItem, PluginOptions, PreparedToolCall, ProcessEventAppendRequest, ProcessRegistration,
217 ProcessScope, SessionCreateRequest, SessionStartPoint, SubagentSessionContext,
218 ToolFailureClass, ToolOutputContract, TurnInput,
219 };
220
221 fn observer(registry: Arc<dyn ProcessRegistry>) -> ProcessWorkObserver {
222 ProcessWorkObserver::new(registry)
223 }
224
225 fn external_registration(process_id: &str, label: &str) -> ProcessRegistration {
226 ProcessRegistration::new(
227 process_id,
228 ProcessInput::External {
229 metadata: json!({ "label": label }),
230 },
231 )
232 }
233
234 async fn register_visible(
235 registry: &Arc<dyn ProcessRegistry>,
236 scope: &ProcessScope,
237 registration: ProcessRegistration,
238 descriptor: ProcessHandleDescriptor,
239 ) {
240 let process_id = registration.id.clone();
241 registry
242 .register_process(registration)
243 .await
244 .expect("register process");
245 registry
246 .grant_handle(scope, &process_id, descriptor)
247 .await
248 .expect("grant process handle");
249 }
250
251 #[tokio::test]
252 async fn snapshot_for_session_reads_visible_grants_and_events_as_epoch_ms() {
253 let registry =
254 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
255 let visible_scope = ProcessScope::new("visible");
256 register_visible(
257 ®istry,
258 &visible_scope,
259 external_registration("visible-process", "Visible"),
260 ProcessHandleDescriptor::new(Some("visible-kind"), Some("Visible descriptor")),
261 )
262 .await;
263 register_visible(
264 ®istry,
265 &ProcessScope::new("other"),
266 external_registration("hidden-process", "Hidden"),
267 ProcessHandleDescriptor::new(Some("hidden-kind"), Some("Hidden")),
268 )
269 .await;
270 registry
271 .append_event(
272 "visible-process",
273 ProcessEventAppendRequest::new("process.cancel_requested", json!({"why": "test"}))
274 .with_replay_key("visible-process:cancel-requested"),
275 )
276 .await
277 .expect("append event");
278
279 let snapshot = observer(Arc::clone(®istry))
280 .snapshot_for_session("visible")
281 .await
282 .expect("snapshot");
283
284 assert_eq!(snapshot.session_id, "visible");
285 assert_eq!(snapshot.visible_process_ids, vec!["visible-process"]);
286 assert_eq!(snapshot.items.len(), 1);
287 assert_eq!(snapshot.items[0].events.len(), 1);
288 assert_eq!(
289 snapshot.items[0].events[0].event_type,
290 "process.cancel_requested"
291 );
292 assert!(snapshot.items[0].events[0].occurred_at_ms > 0);
293 }
294
295 #[tokio::test]
296 async fn snapshot_for_session_sorts_work_by_updated_then_created_descending() {
297 let registry =
298 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
299 let scope = ProcessScope::new("sort");
300 register_visible(
301 ®istry,
302 &scope,
303 external_registration("older", "Older"),
304 ProcessHandleDescriptor::new(None::<String>, None::<String>),
305 )
306 .await;
307 tokio::time::sleep(Duration::from_millis(2)).await;
308 register_visible(
309 ®istry,
310 &scope,
311 external_registration("newer", "Newer"),
312 ProcessHandleDescriptor::new(None::<String>, None::<String>),
313 )
314 .await;
315 tokio::time::sleep(Duration::from_millis(2)).await;
316 registry
317 .append_event(
318 "older",
319 ProcessEventAppendRequest::new("process.cancel_requested", json!({}))
320 .with_replay_key("older:cancel-requested"),
321 )
322 .await
323 .expect("update older process");
324
325 let snapshot = observer(Arc::clone(®istry))
326 .snapshot_for_session("sort")
327 .await
328 .expect("snapshot");
329
330 assert_eq!(snapshot.visible_process_ids, vec!["older", "newer"]);
331 }
332
333 #[tokio::test]
334 async fn observed_process_reports_terminal_status_and_error_messages() {
335 let registry =
336 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
337 for process_id in ["failed", "cancelled"] {
338 registry
339 .register_process(external_registration(process_id, process_id))
340 .await
341 .expect("register");
342 }
343 registry
344 .complete_process(
345 "failed",
346 ProcessAwaitOutput::Failure {
347 class: ToolFailureClass::External,
348 code: "boom".to_string(),
349 message: "failed loudly".to_string(),
350 raw: None,
351 control: None,
352 },
353 )
354 .await
355 .expect("fail process");
356 registry
357 .complete_process(
358 "cancelled",
359 ProcessAwaitOutput::Cancelled {
360 message: "cancelled intentionally".to_string(),
361 raw: None,
362 control: None,
363 },
364 )
365 .await
366 .expect("cancel process");
367
368 let observer = observer(Arc::clone(®istry));
369 let failed = observer.process("failed").await.expect("failed process");
370 let cancelled = observer
371 .process("cancelled")
372 .await
373 .expect("cancelled process");
374
375 assert_eq!(failed.status_label, "failed");
376 assert!(failed.terminal);
377 assert_eq!(failed.error.as_deref(), Some("failed loudly"));
378 assert_eq!(cancelled.status_label, "cancelled");
379 assert!(cancelled.terminal);
380 assert_eq!(cancelled.error.as_deref(), Some("cancelled intentionally"));
381 }
382
383 #[tokio::test]
384 async fn snapshot_for_session_prefers_typed_labels_and_extracts_child_session_id() {
385 let registry =
386 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
387 let scope = ProcessScope::new("labels");
388 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
389 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
390 let process_ref = lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 1);
391 let mut child_request = SessionCreateRequest::child_session(
392 "labels",
393 SessionStartPoint::Empty,
394 PluginOptions::default(),
395 )
396 .with_session_id("child-session");
397 child_request.subagent = Some(SubagentSessionContext {
398 parent_session_id: "labels".to_string(),
399 capability: "researcher".to_string(),
400 depth: 1,
401 max_depth: 4,
402 });
403 let cases = [
404 (
405 "tool",
406 ProcessInput::ToolCall {
407 call: PreparedToolCall::from_parts(
408 "call-1",
409 "shell.run",
410 json!({}),
411 None,
412 serde_json::Value::Null,
413 ),
414 },
415 "tool",
416 "shell.run",
417 None,
418 ),
419 (
420 "lashlang",
421 ProcessInput::LashlangProcess {
422 module_ref,
423 process_ref,
424 required_surface_ref: surface_ref,
425 process_name: "remember".to_string(),
426 args: serde_json::Map::new(),
427 },
428 "lashlang",
429 "remember",
430 None,
431 ),
432 (
433 "session",
434 ProcessInput::SessionTurn {
435 create_request: Box::new(child_request),
436 turn_input: Box::new(TurnInput::items([InputItem::text("run child")])),
437 output_contract: ToolOutputContract::Static,
438 },
439 "session_turn",
440 "researcher",
441 Some("child-session"),
442 ),
443 (
444 "external",
445 ProcessInput::External {
446 metadata: json!({ "label": "external job" }),
447 },
448 "external",
449 "external job",
450 None,
451 ),
452 ];
453 for (process_id, input, _kind, _label, _child_session_id) in cases {
454 register_visible(
455 ®istry,
456 &scope,
457 ProcessRegistration::new(process_id, input),
458 ProcessHandleDescriptor::new(Some("descriptor-kind"), Some("Descriptor label")),
459 )
460 .await;
461 }
462
463 let snapshot = observer(Arc::clone(®istry))
464 .snapshot_for_session("labels")
465 .await
466 .expect("snapshot");
467 let by_id = snapshot
468 .items
469 .iter()
470 .map(|item| (item.process.process_id.as_str(), item))
471 .collect::<std::collections::BTreeMap<_, _>>();
472
473 assert_eq!(by_id["tool"].label, "shell.run");
474 assert_eq!(by_id["lashlang"].label, "remember");
475 assert_eq!(by_id["session"].label, "researcher");
476 assert_eq!(
477 by_id["session"].process.child_session_id.as_deref(),
478 Some("child-session")
479 );
480 assert_eq!(by_id["external"].label, "external job");
481 }
482
483 #[tokio::test]
484 async fn observed_process_missing_lookup_returns_none() {
485 let registry =
486 Arc::new(super::super::TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
487
488 assert!(observer(registry).process("missing").await.is_none());
489 }
490}