1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10 CapabilityPolicy,
11};
12use crate::event_log::{active_event_log, EventLog, LogEvent as EventLogRecord, Topic};
13use crate::llm::vm_value_to_json;
14use crate::triggers::{SignatureStatus, TriggerEvent};
15use crate::value::{VmError, VmValue};
16
17pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
18pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
19pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
21pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
22pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
23pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
24pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
25pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
26pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
27
28pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
29pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
30pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
31pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
32pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
33pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
34pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
35pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
36pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
39#[serde(default)]
40pub struct LlmUsageRecord {
41 pub input_tokens: i64,
42 pub output_tokens: i64,
43 pub total_duration_ms: i64,
44 pub call_count: i64,
45 pub total_cost: f64,
46 pub models: Vec<String>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageRecord {
52 pub id: String,
53 pub node_id: String,
54 pub kind: String,
55 pub status: String,
56 pub outcome: String,
57 pub branch: Option<String>,
58 pub started_at: String,
59 pub finished_at: Option<String>,
60 pub visible_text: Option<String>,
61 pub private_reasoning: Option<String>,
62 pub transcript: Option<serde_json::Value>,
63 pub verification: Option<serde_json::Value>,
64 pub usage: Option<LlmUsageRecord>,
65 pub artifacts: Vec<ArtifactRecord>,
66 pub consumed_artifact_ids: Vec<String>,
67 pub produced_artifact_ids: Vec<String>,
68 pub attempts: Vec<RunStageAttemptRecord>,
69 pub metadata: BTreeMap<String, serde_json::Value>,
70}
71
72#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
73#[serde(default)]
74pub struct RunStageAttemptRecord {
75 pub attempt: usize,
76 pub status: String,
77 pub outcome: String,
78 pub branch: Option<String>,
79 pub error: Option<String>,
80 pub verification: Option<serde_json::Value>,
81 pub started_at: String,
82 pub finished_at: Option<String>,
83}
84
85#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(default)]
87pub struct RunTransitionRecord {
88 pub id: String,
89 pub from_stage_id: Option<String>,
90 pub from_node_id: Option<String>,
91 pub to_node_id: String,
92 pub branch: Option<String>,
93 pub timestamp: String,
94 pub consumed_artifact_ids: Vec<String>,
95 pub produced_artifact_ids: Vec<String>,
96}
97
98#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
99#[serde(default)]
100pub struct RunCheckpointRecord {
101 pub id: String,
102 pub ready_nodes: Vec<String>,
103 pub completed_nodes: Vec<String>,
104 pub last_stage_id: Option<String>,
105 pub persisted_at: String,
106 pub reason: String,
107}
108
109#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
110#[serde(default)]
111pub struct ReplayFixture {
112 #[serde(rename = "_type")]
113 pub type_name: String,
114 pub id: String,
115 pub source_run_id: String,
116 pub workflow_id: String,
117 pub workflow_name: Option<String>,
118 pub created_at: String,
119 pub expected_status: String,
120 pub stage_assertions: Vec<ReplayStageAssertion>,
121}
122
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct ReplayStageAssertion {
126 pub node_id: String,
127 pub expected_status: String,
128 pub expected_outcome: String,
129 pub expected_branch: Option<String>,
130 pub required_artifact_kinds: Vec<String>,
131 pub visible_text_contains: Option<String>,
132}
133
134#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(default)]
136pub struct ReplayEvalReport {
137 pub pass: bool,
138 pub failures: Vec<String>,
139 pub stage_count: usize,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct ReplayEvalCaseReport {
145 pub run_id: String,
146 pub workflow_id: String,
147 pub label: Option<String>,
148 pub pass: bool,
149 pub failures: Vec<String>,
150 pub stage_count: usize,
151 pub source_path: Option<String>,
152 pub comparison: Option<RunDiffReport>,
153}
154
155#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
156#[serde(default)]
157pub struct ReplayEvalSuiteReport {
158 pub pass: bool,
159 pub total: usize,
160 pub passed: usize,
161 pub failed: usize,
162 pub cases: Vec<ReplayEvalCaseReport>,
163}
164
165#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166#[serde(default)]
167pub struct RunDeliverableSummaryRecord {
168 pub id: String,
169 pub text: String,
170 pub status: String,
171 pub note: Option<String>,
172}
173
174#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
175#[serde(default)]
176pub struct RunTaskLedgerSummaryRecord {
177 pub root_task: String,
178 pub rationale: String,
179 pub deliverables: Vec<RunDeliverableSummaryRecord>,
180 pub observations: Vec<String>,
181 pub blocking_count: usize,
182}
183
184#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
185#[serde(default)]
186pub struct RunPlannerRoundRecord {
187 pub stage_id: String,
188 pub node_id: String,
189 pub stage_kind: String,
190 pub status: String,
191 pub outcome: String,
192 pub iteration_count: usize,
193 pub llm_call_count: usize,
194 pub tool_execution_count: usize,
195 pub tool_rejection_count: usize,
196 pub intervention_count: usize,
197 pub compaction_count: usize,
198 pub tools_used: Vec<String>,
199 pub successful_tools: Vec<String>,
200 pub ledger_done_rejections: usize,
201 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
202 pub research_facts: Vec<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(default)]
207pub struct RunWorkerLineageRecord {
208 pub worker_id: String,
209 pub worker_name: String,
210 pub parent_stage_id: Option<String>,
211 pub task: String,
212 pub status: String,
213 pub session_id: Option<String>,
214 pub parent_session_id: Option<String>,
215 pub run_id: Option<String>,
216 pub run_path: Option<String>,
217 pub snapshot_path: Option<String>,
218}
219
220#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
221#[serde(default)]
222pub struct RunActionGraphNodeRecord {
223 pub id: String,
224 pub label: String,
225 pub kind: String,
226 pub status: String,
227 pub outcome: String,
228 pub trace_id: Option<String>,
229 pub stage_id: Option<String>,
230 pub node_id: Option<String>,
231 pub worker_id: Option<String>,
232 pub run_id: Option<String>,
233 pub run_path: Option<String>,
234}
235
236#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
237#[serde(default)]
238pub struct RunActionGraphEdgeRecord {
239 pub from_id: String,
240 pub to_id: String,
241 pub kind: String,
242 pub label: Option<String>,
243}
244
245#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(default)]
247pub struct RunVerificationOutcomeRecord {
248 pub stage_id: String,
249 pub node_id: String,
250 pub status: String,
251 pub passed: Option<bool>,
252 pub summary: Option<String>,
253}
254
255#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
256#[serde(default)]
257pub struct RunTranscriptPointerRecord {
258 pub id: String,
259 pub label: String,
260 pub kind: String,
261 pub location: String,
262 pub path: Option<String>,
263 pub available: bool,
264}
265
266#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
267#[serde(default)]
268pub struct CompactionEventRecord {
269 pub id: String,
270 pub transcript_id: Option<String>,
271 pub stage_id: Option<String>,
272 pub node_id: Option<String>,
273 pub mode: String,
274 pub strategy: String,
275 pub archived_messages: usize,
276 pub estimated_tokens_before: usize,
277 pub estimated_tokens_after: usize,
278 pub snapshot_asset_id: Option<String>,
279 pub snapshot_location: String,
280 pub snapshot_path: Option<String>,
281 pub available: bool,
282}
283
284#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
285#[serde(rename_all = "snake_case")]
286pub enum DaemonEventKindRecord {
287 #[default]
288 Spawned,
289 Triggered,
290 Snapshotted,
291 Resumed,
292 Stopped,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
296#[serde(default)]
297pub struct DaemonEventRecord {
298 pub daemon_id: String,
299 pub name: String,
300 pub kind: DaemonEventKindRecord,
301 pub timestamp: String,
302 pub persist_path: String,
303 pub payload_summary: Option<String>,
304}
305
306#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
307#[serde(default)]
308pub struct RunObservabilityRecord {
309 pub schema_version: usize,
310 pub planner_rounds: Vec<RunPlannerRoundRecord>,
311 pub research_fact_count: usize,
312 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
313 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
314 pub worker_lineage: Vec<RunWorkerLineageRecord>,
315 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
316 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
317 pub compaction_events: Vec<CompactionEventRecord>,
318 pub daemon_events: Vec<DaemonEventRecord>,
319}
320
321#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(default)]
323pub struct RunStageDiffRecord {
324 pub node_id: String,
325 pub change: String,
326 pub details: Vec<String>,
327}
328
329#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
330#[serde(default)]
331pub struct ToolCallDiffRecord {
332 pub tool_name: String,
333 pub args_hash: String,
334 pub result_changed: bool,
335 pub left_result: Option<String>,
336 pub right_result: Option<String>,
337}
338
339#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
340#[serde(default)]
341pub struct RunObservabilityDiffRecord {
342 pub section: String,
343 pub label: String,
344 pub details: Vec<String>,
345}
346
347#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
348#[serde(default)]
349pub struct RunDiffReport {
350 pub left_run_id: String,
351 pub right_run_id: String,
352 pub identical: bool,
353 pub status_changed: bool,
354 pub left_status: String,
355 pub right_status: String,
356 pub stage_diffs: Vec<RunStageDiffRecord>,
357 pub tool_diffs: Vec<ToolCallDiffRecord>,
358 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
359 pub transition_count_delta: isize,
360 pub artifact_count_delta: isize,
361 pub checkpoint_count_delta: isize,
362}
363
364#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
365#[serde(default)]
366pub struct EvalSuiteManifest {
367 #[serde(rename = "_type")]
368 pub type_name: String,
369 pub id: String,
370 pub name: Option<String>,
371 pub base_dir: Option<String>,
372 pub cases: Vec<EvalSuiteCase>,
373}
374
375#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
376#[serde(default)]
377pub struct EvalSuiteCase {
378 pub label: Option<String>,
379 pub run_path: String,
380 pub fixture_path: Option<String>,
381 pub compare_to: Option<String>,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
385#[serde(default)]
386pub struct RunRecord {
387 #[serde(rename = "_type")]
388 pub type_name: String,
389 pub id: String,
390 pub workflow_id: String,
391 pub workflow_name: Option<String>,
392 pub task: String,
393 pub status: String,
394 pub started_at: String,
395 pub finished_at: Option<String>,
396 pub parent_run_id: Option<String>,
397 pub root_run_id: Option<String>,
398 pub stages: Vec<RunStageRecord>,
399 pub transitions: Vec<RunTransitionRecord>,
400 pub checkpoints: Vec<RunCheckpointRecord>,
401 pub pending_nodes: Vec<String>,
402 pub completed_nodes: Vec<String>,
403 pub child_runs: Vec<RunChildRecord>,
404 pub artifacts: Vec<ArtifactRecord>,
405 pub policy: CapabilityPolicy,
406 pub execution: Option<RunExecutionRecord>,
407 pub transcript: Option<serde_json::Value>,
408 pub usage: Option<LlmUsageRecord>,
409 pub replay_fixture: Option<ReplayFixture>,
410 pub observability: Option<RunObservabilityRecord>,
411 pub trace_spans: Vec<RunTraceSpanRecord>,
412 pub tool_recordings: Vec<ToolCallRecord>,
413 pub metadata: BTreeMap<String, serde_json::Value>,
414 pub persisted_path: Option<String>,
415}
416
417#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
418#[serde(default)]
419pub struct ToolCallRecord {
420 pub tool_name: String,
421 pub tool_use_id: String,
422 pub args_hash: String,
423 pub result: String,
424 pub is_rejected: bool,
425 pub duration_ms: u64,
426 pub iteration: usize,
427 pub timestamp: String,
428}
429
430pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
432 use std::hash::{Hash, Hasher};
433 let mut hasher = std::collections::hash_map::DefaultHasher::new();
434 tool_name.hash(&mut hasher);
435 let args_str = serde_json::to_string(args).unwrap_or_default();
436 args_str.hash(&mut hasher);
437 format!("{:016x}", hasher.finish())
438}
439
440#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
441#[serde(default)]
442pub struct RunTraceSpanRecord {
443 pub span_id: u64,
444 pub parent_id: Option<u64>,
445 pub kind: String,
446 pub name: String,
447 pub start_ms: u64,
448 pub duration_ms: u64,
449 pub metadata: BTreeMap<String, serde_json::Value>,
450}
451
452#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
453#[serde(default)]
454pub struct RunChildRecord {
455 pub worker_id: String,
456 pub worker_name: String,
457 pub parent_stage_id: Option<String>,
458 pub session_id: Option<String>,
459 pub parent_session_id: Option<String>,
460 pub mutation_scope: Option<String>,
461 pub approval_policy: Option<super::ToolApprovalPolicy>,
462 pub task: String,
463 pub request: Option<serde_json::Value>,
464 pub provenance: Option<serde_json::Value>,
465 pub status: String,
466 pub started_at: String,
467 pub finished_at: Option<String>,
468 pub run_id: Option<String>,
469 pub run_path: Option<String>,
470 pub snapshot_path: Option<String>,
471 pub execution: Option<RunExecutionRecord>,
472}
473
474#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
475#[serde(default)]
476pub struct RunExecutionRecord {
477 pub cwd: Option<String>,
478 pub source_dir: Option<String>,
479 pub env: BTreeMap<String, String>,
480 pub adapter: Option<String>,
481 pub repo_path: Option<String>,
482 pub worktree_path: Option<String>,
483 pub branch: Option<String>,
484 pub base_ref: Option<String>,
485 pub cleanup: Option<String>,
486}
487
488fn compact_json_value(value: &serde_json::Value) -> String {
489 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
490}
491
492fn signature_status_label(status: &SignatureStatus) -> &'static str {
493 match status {
494 SignatureStatus::Verified => "verified",
495 SignatureStatus::Unsigned => "unsigned",
496 SignatureStatus::Failed { .. } => "failed",
497 }
498}
499
500fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
501 run.metadata
502 .get("trigger_event")
503 .cloned()
504 .and_then(|value| serde_json::from_value(value).ok())
505}
506
507fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
508 trigger_event
509 .map(|event| event.trace_id.0.clone())
510 .or_else(|| {
511 run.metadata
512 .get("trace_id")
513 .and_then(|value| value.as_str())
514 .map(str::to_string)
515 })
516}
517
518fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
519 run.metadata
520 .get("replay_of_event_id")
521 .and_then(|value| value.as_str())
522 .map(str::to_string)
523}
524
525fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
526 if stage.kind == "condition" {
527 ACTION_GRAPH_NODE_KIND_PREDICATE
528 } else {
529 ACTION_GRAPH_NODE_KIND_STAGE
530 }
531}
532
533fn append_action_graph_node(
534 nodes: &mut Vec<RunActionGraphNodeRecord>,
535 record: RunActionGraphNodeRecord,
536) {
537 nodes.push(record);
538}
539
540pub async fn append_action_graph_update(
541 headers: BTreeMap<String, String>,
542 payload: serde_json::Value,
543) -> Result<(), crate::event_log::LogError> {
544 let Some(log) = active_event_log() else {
545 return Ok(());
546 };
547 let topic = Topic::new("observability.action_graph")
548 .expect("static observability.action_graph topic should always be valid");
549 let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
550 log.append(&topic, record).await.map(|_| ())
551}
552
553fn publish_action_graph_event(
554 run: &RunRecord,
555 observability: &RunObservabilityRecord,
556 path: &Path,
557) {
558 let trigger_event = trigger_event_from_run(run);
559 let mut headers = BTreeMap::new();
560 headers.insert("run_id".to_string(), run.id.clone());
561 headers.insert("workflow_id".to_string(), run.workflow_id.clone());
562 if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
563 headers.insert("trace_id".to_string(), trace_id);
564 }
565 let payload = serde_json::json!({
566 "run_id": run.id,
567 "workflow_id": run.workflow_id,
568 "persisted_path": path.to_string_lossy(),
569 "status": run.status,
570 "observability": observability,
571 });
572 if let Ok(handle) = tokio::runtime::Handle::try_current() {
573 handle.spawn(async move {
574 let _ = append_action_graph_update(headers, payload).await;
575 });
576 } else {
577 let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
578 }
579}
580
581fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
582 let stem = run_path.file_stem()?.to_str()?;
583 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
584 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
585}
586
587fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
588 value
589 .and_then(|value| value.as_array())
590 .map(|items| {
591 items
592 .iter()
593 .filter_map(|item| item.as_str().map(str::to_string))
594 .collect::<Vec<_>>()
595 })
596 .unwrap_or_default()
597}
598
599fn json_usize(value: Option<&serde_json::Value>) -> usize {
600 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
601}
602
603fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
604 value.and_then(|value| value.as_bool())
605}
606
607fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
608 stage
609 .artifacts
610 .iter()
611 .find_map(|artifact| artifact.data.as_ref())
612}
613
614fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
615 let deliverables = value
616 .get("deliverables")
617 .and_then(|raw| raw.as_array())
618 .map(|items| {
619 items
620 .iter()
621 .map(|item| RunDeliverableSummaryRecord {
622 id: item
623 .get("id")
624 .and_then(|value| value.as_str())
625 .unwrap_or_default()
626 .to_string(),
627 text: item
628 .get("text")
629 .and_then(|value| value.as_str())
630 .unwrap_or_default()
631 .to_string(),
632 status: item
633 .get("status")
634 .and_then(|value| value.as_str())
635 .unwrap_or_default()
636 .to_string(),
637 note: item
638 .get("note")
639 .and_then(|value| value.as_str())
640 .map(str::to_string),
641 })
642 .collect::<Vec<_>>()
643 })
644 .unwrap_or_default();
645 let observations = json_string_array(value.get("observations"));
646 let root_task = value
647 .get("root_task")
648 .and_then(|value| value.as_str())
649 .unwrap_or_default()
650 .to_string();
651 let rationale = value
652 .get("rationale")
653 .and_then(|value| value.as_str())
654 .unwrap_or_default()
655 .to_string();
656 if root_task.is_empty()
657 && rationale.is_empty()
658 && deliverables.is_empty()
659 && observations.is_empty()
660 {
661 return None;
662 }
663 let blocking_count = deliverables
664 .iter()
665 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
666 .count();
667 Some(RunTaskLedgerSummaryRecord {
668 root_task,
669 rationale,
670 deliverables,
671 observations,
672 blocking_count,
673 })
674}
675
676fn compaction_events_from_transcript(
677 transcript: &serde_json::Value,
678 stage_id: Option<&str>,
679 node_id: Option<&str>,
680 location_prefix: &str,
681 persisted_path: Option<&Path>,
682) -> Vec<CompactionEventRecord> {
683 let transcript_id = transcript
684 .get("id")
685 .and_then(|value| value.as_str())
686 .map(str::to_string);
687 let asset_ids = transcript
688 .get("assets")
689 .and_then(|value| value.as_array())
690 .map(|assets| {
691 assets
692 .iter()
693 .filter_map(|asset| {
694 asset
695 .get("id")
696 .and_then(|value| value.as_str())
697 .map(str::to_string)
698 })
699 .collect::<BTreeSet<_>>()
700 })
701 .unwrap_or_default();
702 transcript
703 .get("events")
704 .and_then(|value| value.as_array())
705 .map(|events| {
706 events
707 .iter()
708 .filter(|event| {
709 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
710 })
711 .map(|event| {
712 let metadata = event.get("metadata");
713 let snapshot_asset_id = metadata
714 .and_then(|value| value.get("snapshot_asset_id"))
715 .and_then(|value| value.as_str())
716 .map(str::to_string);
717 let available = snapshot_asset_id
718 .as_ref()
719 .is_some_and(|asset_id| asset_ids.contains(asset_id));
720 let snapshot_location = snapshot_asset_id
721 .as_ref()
722 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
723 .unwrap_or_else(|| location_prefix.to_string());
724 CompactionEventRecord {
725 id: event
726 .get("id")
727 .and_then(|value| value.as_str())
728 .unwrap_or_default()
729 .to_string(),
730 transcript_id: transcript_id.clone(),
731 stage_id: stage_id.map(str::to_string),
732 node_id: node_id.map(str::to_string),
733 mode: metadata
734 .and_then(|value| value.get("mode"))
735 .and_then(|value| value.as_str())
736 .unwrap_or_default()
737 .to_string(),
738 strategy: metadata
739 .and_then(|value| value.get("strategy"))
740 .and_then(|value| value.as_str())
741 .unwrap_or_default()
742 .to_string(),
743 archived_messages: json_usize(
744 metadata.and_then(|value| value.get("archived_messages")),
745 ),
746 estimated_tokens_before: json_usize(
747 metadata.and_then(|value| value.get("estimated_tokens_before")),
748 ),
749 estimated_tokens_after: json_usize(
750 metadata.and_then(|value| value.get("estimated_tokens_after")),
751 ),
752 snapshot_asset_id,
753 snapshot_location,
754 snapshot_path: persisted_path
755 .map(|path| path.to_string_lossy().into_owned()),
756 available,
757 }
758 })
759 .collect()
760 })
761 .unwrap_or_default()
762}
763
764fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
765 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
766 return Vec::new();
767 };
768 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
769 return Vec::new();
770 };
771
772 content
773 .lines()
774 .filter(|line| !line.trim().is_empty())
775 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
776 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
777 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
778 .collect()
779}
780
781pub fn derive_run_observability(
782 run: &RunRecord,
783 persisted_path: Option<&Path>,
784) -> RunObservabilityRecord {
785 let mut action_graph_nodes = Vec::new();
786 let mut action_graph_edges = Vec::new();
787 let mut verification_outcomes = Vec::new();
788 let mut planner_rounds = Vec::new();
789 let mut transcript_pointers = Vec::new();
790 let mut compaction_events = Vec::new();
791 let mut daemon_events = Vec::new();
792 let mut research_fact_count = 0usize;
793
794 let root_node_id = format!("run:{}", run.id);
795 let trigger_event = trigger_event_from_run(run);
796 let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
797 append_action_graph_node(
798 &mut action_graph_nodes,
799 RunActionGraphNodeRecord {
800 id: root_node_id.clone(),
801 label: run
802 .workflow_name
803 .clone()
804 .unwrap_or_else(|| run.workflow_id.clone()),
805 kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
806 status: run.status.clone(),
807 outcome: run.status.clone(),
808 trace_id: propagated_trace_id.clone(),
809 stage_id: None,
810 node_id: None,
811 worker_id: None,
812 run_id: Some(run.id.clone()),
813 run_path: run.persisted_path.clone(),
814 },
815 );
816 let mut entry_node_id = root_node_id.clone();
817 if let Some(trigger_event) = trigger_event.as_ref() {
818 if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
819 let replay_source_node_id = format!("trigger:{replay_of_event_id}");
820 append_action_graph_node(
821 &mut action_graph_nodes,
822 RunActionGraphNodeRecord {
823 id: replay_source_node_id.clone(),
824 label: format!(
825 "{}:{} (original {})",
826 trigger_event.provider.as_str(),
827 trigger_event.kind,
828 replay_of_event_id
829 ),
830 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
831 status: "historical".to_string(),
832 outcome: "replayed_from".to_string(),
833 trace_id: Some(trigger_event.trace_id.0.clone()),
834 stage_id: None,
835 node_id: None,
836 worker_id: None,
837 run_id: Some(run.id.clone()),
838 run_path: run.persisted_path.clone(),
839 },
840 );
841 action_graph_edges.push(RunActionGraphEdgeRecord {
842 from_id: replay_source_node_id,
843 to_id: format!("trigger:{}", trigger_event.id.0),
844 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
845 label: Some("replay chain".to_string()),
846 });
847 }
848 let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
849 append_action_graph_node(
850 &mut action_graph_nodes,
851 RunActionGraphNodeRecord {
852 id: trigger_node_id.clone(),
853 label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
854 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
855 status: "received".to_string(),
856 outcome: signature_status_label(&trigger_event.signature_status).to_string(),
857 trace_id: Some(trigger_event.trace_id.0.clone()),
858 stage_id: None,
859 node_id: None,
860 worker_id: None,
861 run_id: Some(run.id.clone()),
862 run_path: run.persisted_path.clone(),
863 },
864 );
865 action_graph_edges.push(RunActionGraphEdgeRecord {
866 from_id: root_node_id.clone(),
867 to_id: trigger_node_id.clone(),
868 kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
869 label: Some(trigger_event.id.0.clone()),
870 });
871 entry_node_id = trigger_node_id;
872 }
873
874 let stage_node_ids = run
875 .stages
876 .iter()
877 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
878 .collect::<BTreeMap<_, _>>();
879 let stage_by_id = run
880 .stages
881 .iter()
882 .map(|stage| (stage.id.as_str(), stage))
883 .collect::<BTreeMap<_, _>>();
884 let stage_by_node_id = run
885 .stages
886 .iter()
887 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
888 .collect::<BTreeMap<_, _>>();
889
890 let incoming_nodes = run
891 .transitions
892 .iter()
893 .map(|transition| transition.to_node_id.clone())
894 .collect::<BTreeSet<_>>();
895
896 for stage in &run.stages {
897 let graph_node_id = stage_node_ids
898 .get(&stage.id)
899 .cloned()
900 .unwrap_or_else(|| format!("stage:{}", stage.id));
901 append_action_graph_node(
902 &mut action_graph_nodes,
903 RunActionGraphNodeRecord {
904 id: graph_node_id.clone(),
905 label: stage.node_id.clone(),
906 kind: action_graph_kind_for_stage(stage).to_string(),
907 status: stage.status.clone(),
908 outcome: stage.outcome.clone(),
909 trace_id: propagated_trace_id.clone(),
910 stage_id: Some(stage.id.clone()),
911 node_id: Some(stage.node_id.clone()),
912 worker_id: stage
913 .metadata
914 .get("worker_id")
915 .and_then(|value| value.as_str())
916 .map(str::to_string),
917 run_id: None,
918 run_path: None,
919 },
920 );
921 if !incoming_nodes.contains(&stage.node_id) {
922 action_graph_edges.push(RunActionGraphEdgeRecord {
923 from_id: entry_node_id.clone(),
924 to_id: graph_node_id.clone(),
925 kind: if trigger_event.is_some() {
926 ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
927 } else {
928 ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
929 },
930 label: None,
931 });
932 }
933
934 if stage.kind == "verify" || stage.verification.is_some() {
935 let passed = json_bool(
936 stage
937 .verification
938 .as_ref()
939 .and_then(|value| value.get("pass")),
940 )
941 .or_else(|| {
942 json_bool(
943 stage
944 .verification
945 .as_ref()
946 .and_then(|value| value.get("success")),
947 )
948 })
949 .or_else(|| {
950 if stage.status == "completed" && stage.outcome == "success" {
951 Some(true)
952 } else if stage.status == "failed" || stage.outcome == "failed" {
953 Some(false)
954 } else {
955 None
956 }
957 });
958 verification_outcomes.push(RunVerificationOutcomeRecord {
959 stage_id: stage.id.clone(),
960 node_id: stage.node_id.clone(),
961 status: stage.status.clone(),
962 passed,
963 summary: stage
964 .verification
965 .as_ref()
966 .map(compact_json_value)
967 .or_else(|| {
968 stage
969 .visible_text
970 .as_ref()
971 .filter(|value| !value.trim().is_empty())
972 .cloned()
973 }),
974 });
975 }
976
977 if stage.transcript.is_some() {
978 transcript_pointers.push(RunTranscriptPointerRecord {
979 id: format!("stage:{}:transcript", stage.id),
980 label: format!("Stage {} transcript", stage.node_id),
981 kind: "embedded_transcript".to_string(),
982 location: format!("run.stages[{}].transcript", stage.node_id),
983 path: run.persisted_path.clone(),
984 available: true,
985 });
986 if let Some(transcript) = stage.transcript.as_ref() {
987 compaction_events.extend(compaction_events_from_transcript(
988 transcript,
989 Some(&stage.id),
990 Some(&stage.node_id),
991 &format!("run.stages[{}].transcript", stage.node_id),
992 persisted_path,
993 ));
994 }
995 }
996
997 if let Some(payload) = stage_result_payload(stage) {
998 let trace = payload.get("trace");
999 let task_ledger = payload
1000 .get("task_ledger")
1001 .and_then(task_ledger_summary_from_value);
1002 let research_facts = task_ledger
1003 .as_ref()
1004 .map(|ledger| ledger.observations.clone())
1005 .unwrap_or_default();
1006 research_fact_count += research_facts.len();
1007 let tools_used = json_string_array(
1008 payload
1009 .get("tools_used")
1010 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1011 );
1012 let successful_tools = json_string_array(payload.get("successful_tools"));
1013 let planner_round = RunPlannerRoundRecord {
1014 stage_id: stage.id.clone(),
1015 node_id: stage.node_id.clone(),
1016 stage_kind: stage.kind.clone(),
1017 status: stage.status.clone(),
1018 outcome: stage.outcome.clone(),
1019 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1020 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1021 tool_execution_count: json_usize(
1022 trace.and_then(|trace| trace.get("tool_executions")),
1023 ),
1024 tool_rejection_count: json_usize(
1025 trace.and_then(|trace| trace.get("tool_rejections")),
1026 ),
1027 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1028 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1029 tools_used,
1030 successful_tools,
1031 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1032 task_ledger,
1033 research_facts,
1034 };
1035 let has_agentic_detail = planner_round.iteration_count > 0
1036 || planner_round.llm_call_count > 0
1037 || planner_round.tool_execution_count > 0
1038 || planner_round.ledger_done_rejections > 0
1039 || planner_round.task_ledger.is_some()
1040 || !planner_round.tools_used.is_empty()
1041 || !planner_round.successful_tools.is_empty();
1042 if has_agentic_detail {
1043 planner_rounds.push(planner_round);
1044 }
1045 }
1046 }
1047
1048 for transition in &run.transitions {
1049 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1050 continue;
1051 };
1052 let from_stage = transition
1053 .from_stage_id
1054 .as_deref()
1055 .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1056 let from_id = transition
1057 .from_stage_id
1058 .as_ref()
1059 .and_then(|stage_id| stage_node_ids.get(stage_id))
1060 .cloned()
1061 .or_else(|| {
1062 transition
1063 .from_node_id
1064 .as_ref()
1065 .and_then(|node_id| stage_by_node_id.get(node_id))
1066 .cloned()
1067 })
1068 .unwrap_or_else(|| root_node_id.clone());
1069 action_graph_edges.push(RunActionGraphEdgeRecord {
1070 from_id,
1071 to_id,
1072 kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1073 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1074 } else {
1075 ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1076 },
1077 label: transition.branch.clone(),
1078 });
1079 }
1080
1081 let worker_lineage = run
1082 .child_runs
1083 .iter()
1084 .map(|child| {
1085 let worker_node_id = format!("worker:{}", child.worker_id);
1086 append_action_graph_node(
1087 &mut action_graph_nodes,
1088 RunActionGraphNodeRecord {
1089 id: worker_node_id.clone(),
1090 label: child.worker_name.clone(),
1091 kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1092 status: child.status.clone(),
1093 outcome: child.status.clone(),
1094 trace_id: propagated_trace_id.clone(),
1095 stage_id: child.parent_stage_id.clone(),
1096 node_id: None,
1097 worker_id: Some(child.worker_id.clone()),
1098 run_id: child.run_id.clone(),
1099 run_path: child.run_path.clone(),
1100 },
1101 );
1102 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1103 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1104 action_graph_edges.push(RunActionGraphEdgeRecord {
1105 from_id: stage_node_id.clone(),
1106 to_id: worker_node_id,
1107 kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1108 label: Some(child.worker_name.clone()),
1109 });
1110 }
1111 }
1112 RunWorkerLineageRecord {
1113 worker_id: child.worker_id.clone(),
1114 worker_name: child.worker_name.clone(),
1115 parent_stage_id: child.parent_stage_id.clone(),
1116 task: child.task.clone(),
1117 status: child.status.clone(),
1118 session_id: child.session_id.clone(),
1119 parent_session_id: child.parent_session_id.clone(),
1120 run_id: child.run_id.clone(),
1121 run_path: child.run_path.clone(),
1122 snapshot_path: child.snapshot_path.clone(),
1123 }
1124 })
1125 .collect::<Vec<_>>();
1126
1127 if run.transcript.is_some() {
1128 transcript_pointers.push(RunTranscriptPointerRecord {
1129 id: "run:transcript".to_string(),
1130 label: "Run transcript".to_string(),
1131 kind: "embedded_transcript".to_string(),
1132 location: "run.transcript".to_string(),
1133 path: run.persisted_path.clone(),
1134 available: true,
1135 });
1136 if let Some(transcript) = run.transcript.as_ref() {
1137 compaction_events.extend(compaction_events_from_transcript(
1138 transcript,
1139 None,
1140 None,
1141 "run.transcript",
1142 persisted_path,
1143 ));
1144 }
1145 }
1146
1147 if let Some(path) = persisted_path {
1148 if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1149 transcript_pointers.push(RunTranscriptPointerRecord {
1150 id: "run:llm_transcript".to_string(),
1151 label: "LLM transcript sidecar".to_string(),
1152 kind: "llm_jsonl".to_string(),
1153 location: "run sidecar".to_string(),
1154 path: Some(sidecar_path.to_string_lossy().into_owned()),
1155 available: sidecar_path.exists(),
1156 });
1157 }
1158 daemon_events.extend(daemon_events_from_sidecar(path));
1159 }
1160
1161 RunObservabilityRecord {
1162 schema_version: 4,
1163 planner_rounds,
1164 research_fact_count,
1165 action_graph_nodes,
1166 action_graph_edges,
1167 worker_lineage,
1168 verification_outcomes,
1169 transcript_pointers,
1170 compaction_events,
1171 daemon_events,
1172 }
1173}
1174
1175fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1176 run.observability = Some(derive_run_observability(run, persisted_path));
1177}
1178
1179pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1180 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1181 if run.type_name.is_empty() {
1182 run.type_name = "run_record".to_string();
1183 }
1184 if run.id.is_empty() {
1185 run.id = new_id("run");
1186 }
1187 if run.started_at.is_empty() {
1188 run.started_at = now_rfc3339();
1189 }
1190 if run.status.is_empty() {
1191 run.status = "running".to_string();
1192 }
1193 if run.root_run_id.is_none() {
1194 run.root_run_id = Some(run.id.clone());
1195 }
1196 if run.replay_fixture.is_none() {
1197 run.replay_fixture = Some(replay_fixture_from_run(&run));
1198 }
1199 if run.observability.is_none() {
1200 let persisted_path = run.persisted_path.clone();
1201 let persisted = persisted_path.as_deref().map(Path::new);
1202 refresh_run_observability(&mut run, persisted);
1203 }
1204 Ok(run)
1205}
1206
1207pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1208 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1209 if manifest.type_name.is_empty() {
1210 manifest.type_name = "eval_suite_manifest".to_string();
1211 }
1212 if manifest.id.is_empty() {
1213 manifest.id = new_id("eval_suite");
1214 }
1215 Ok(manifest)
1216}
1217
1218fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1219 let content = std::fs::read_to_string(path)
1220 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1221 serde_json::from_str(&content)
1222 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1223}
1224
1225fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1226 let path_buf = PathBuf::from(path);
1227 if path_buf.is_absolute() {
1228 path_buf
1229 } else if let Some(base_dir) = base_dir {
1230 base_dir.join(path_buf)
1231 } else {
1232 path_buf
1233 }
1234}
1235
1236pub fn evaluate_run_suite_manifest(
1237 manifest: &EvalSuiteManifest,
1238) -> Result<ReplayEvalSuiteReport, VmError> {
1239 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1240 let mut reports = Vec::new();
1241 for case in &manifest.cases {
1242 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1243 let run = load_run_record(&run_path)?;
1244 let fixture = match &case.fixture_path {
1245 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1246 None => run
1247 .replay_fixture
1248 .clone()
1249 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1250 };
1251 let eval = evaluate_run_against_fixture(&run, &fixture);
1252 let mut pass = eval.pass;
1253 let mut failures = eval.failures;
1254 let comparison = match &case.compare_to {
1255 Some(path) => {
1256 let baseline_path = resolve_manifest_path(base_dir, path);
1257 let baseline = load_run_record(&baseline_path)?;
1258 let diff = diff_run_records(&baseline, &run);
1259 if !diff.identical {
1260 pass = false;
1261 failures.push(format!(
1262 "run differs from baseline {} with {} stage changes",
1263 baseline_path.display(),
1264 diff.stage_diffs.len()
1265 ));
1266 }
1267 Some(diff)
1268 }
1269 None => None,
1270 };
1271 reports.push(ReplayEvalCaseReport {
1272 run_id: run.id.clone(),
1273 workflow_id: run.workflow_id.clone(),
1274 label: case.label.clone(),
1275 pass,
1276 failures,
1277 stage_count: eval.stage_count,
1278 source_path: Some(run_path.display().to_string()),
1279 comparison,
1280 });
1281 }
1282 let total = reports.len();
1283 let passed = reports.iter().filter(|report| report.pass).count();
1284 let failed = total.saturating_sub(passed);
1285 Ok(ReplayEvalSuiteReport {
1286 pass: failed == 0,
1287 total,
1288 passed,
1289 failed,
1290 cases: reports,
1291 })
1292}
1293
1294#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1296pub(crate) enum DiffOp {
1297 Equal,
1298 Delete,
1299 Insert,
1300}
1301
1302pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1306 let n = a.len() as isize;
1307 let m = b.len() as isize;
1308 if n == 0 && m == 0 {
1309 return Vec::new();
1310 }
1311 if n == 0 {
1312 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1313 }
1314 if m == 0 {
1315 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1316 }
1317
1318 let max_d = (n + m) as usize;
1319 let offset = max_d as isize;
1320 let v_size = 2 * max_d + 1;
1321 let mut v = vec![0isize; v_size];
1322 let mut trace: Vec<Vec<isize>> = Vec::new();
1324
1325 'outer: for d in 0..=max_d as isize {
1326 trace.push(v.clone());
1327 let mut new_v = v.clone();
1328 for k in (-d..=d).step_by(2) {
1329 let ki = (k + offset) as usize;
1330 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1331 v[ki + 1]
1332 } else {
1333 v[ki - 1] + 1
1334 };
1335 let mut y = x - k;
1336 while x < n && y < m && a[x as usize] == b[y as usize] {
1337 x += 1;
1338 y += 1;
1339 }
1340 new_v[ki] = x;
1341 if x >= n && y >= m {
1342 let _ = new_v;
1343 break 'outer;
1344 }
1345 }
1346 v = new_v;
1347 }
1348
1349 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1350 let mut x = n;
1351 let mut y = m;
1352 for d in (1..trace.len() as isize).rev() {
1353 let k = x - y;
1354 let v_prev = &trace[d as usize];
1355 let prev_k = if k == -d
1356 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1357 {
1358 k + 1
1359 } else {
1360 k - 1
1361 };
1362 let prev_x = v_prev[(prev_k + offset) as usize];
1363 let prev_y = prev_x - prev_k;
1364
1365 while x > prev_x && y > prev_y {
1366 x -= 1;
1367 y -= 1;
1368 ops.push((DiffOp::Equal, x as usize));
1369 }
1370 if prev_k < k {
1371 x -= 1;
1372 ops.push((DiffOp::Delete, x as usize));
1373 } else {
1374 y -= 1;
1375 ops.push((DiffOp::Insert, y as usize));
1376 }
1377 }
1378 while x > 0 && y > 0 {
1379 x -= 1;
1380 y -= 1;
1381 ops.push((DiffOp::Equal, x as usize));
1382 }
1383 ops.reverse();
1384 ops
1385}
1386
1387pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1388 let before_lines: Vec<&str> = before.lines().collect();
1389 let after_lines: Vec<&str> = after.lines().collect();
1390 let ops = myers_diff(&before_lines, &after_lines);
1391
1392 let mut diff = String::new();
1393 let file = path.unwrap_or("artifact");
1394 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1395 for &(op, idx) in &ops {
1396 match op {
1397 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1398 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1399 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1400 }
1401 }
1402 diff
1403}
1404
1405pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1406 let path = path
1407 .map(PathBuf::from)
1408 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1409 let mut materialized = run.clone();
1410 if materialized.replay_fixture.is_none() {
1411 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1412 }
1413 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1414 refresh_run_observability(&mut materialized, Some(&path));
1415 if let Some(parent) = path.parent() {
1416 std::fs::create_dir_all(parent)
1417 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1418 }
1419 let json = serde_json::to_string_pretty(&materialized)
1420 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1421 let tmp_path = path.with_extension("json.tmp");
1423 std::fs::write(&tmp_path, &json)
1424 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1425 std::fs::rename(&tmp_path, &path).map_err(|e| {
1426 let _ = std::fs::write(&path, &json);
1428 VmError::Runtime(format!("failed to finalize run record: {e}"))
1429 })?;
1430 if let Some(observability) = materialized.observability.as_ref() {
1431 publish_action_graph_event(&materialized, observability, &path);
1432 }
1433 Ok(path.to_string_lossy().into_owned())
1434}
1435
1436pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1437 let content = std::fs::read_to_string(path)
1438 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1439 let mut run: RunRecord = serde_json::from_str(&content)
1440 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1441 if run.replay_fixture.is_none() {
1442 run.replay_fixture = Some(replay_fixture_from_run(&run));
1443 }
1444 run.persisted_path
1445 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1446 refresh_run_observability(&mut run, Some(path));
1447 Ok(run)
1448}
1449
1450pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1451 ReplayFixture {
1452 type_name: "replay_fixture".to_string(),
1453 id: new_id("fixture"),
1454 source_run_id: run.id.clone(),
1455 workflow_id: run.workflow_id.clone(),
1456 workflow_name: run.workflow_name.clone(),
1457 created_at: now_rfc3339(),
1458 expected_status: run.status.clone(),
1459 stage_assertions: run
1460 .stages
1461 .iter()
1462 .map(|stage| ReplayStageAssertion {
1463 node_id: stage.node_id.clone(),
1464 expected_status: stage.status.clone(),
1465 expected_outcome: stage.outcome.clone(),
1466 expected_branch: stage.branch.clone(),
1467 required_artifact_kinds: stage
1468 .artifacts
1469 .iter()
1470 .map(|artifact| artifact.kind.clone())
1471 .collect(),
1472 visible_text_contains: stage
1473 .visible_text
1474 .as_ref()
1475 .filter(|text| !text.is_empty())
1476 .map(|text| text.chars().take(80).collect()),
1477 })
1478 .collect(),
1479 }
1480}
1481
1482pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1483 let mut failures = Vec::new();
1484 if run.status != fixture.expected_status {
1485 failures.push(format!(
1486 "run status mismatch: expected {}, got {}",
1487 fixture.expected_status, run.status
1488 ));
1489 }
1490 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1491 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1492 for assertion in &fixture.stage_assertions {
1493 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1494 failures.push(format!("missing stage {}", assertion.node_id));
1495 continue;
1496 };
1497 if stage.status != assertion.expected_status {
1498 failures.push(format!(
1499 "stage {} status mismatch: expected {}, got {}",
1500 assertion.node_id, assertion.expected_status, stage.status
1501 ));
1502 }
1503 if stage.outcome != assertion.expected_outcome {
1504 failures.push(format!(
1505 "stage {} outcome mismatch: expected {}, got {}",
1506 assertion.node_id, assertion.expected_outcome, stage.outcome
1507 ));
1508 }
1509 if stage.branch != assertion.expected_branch {
1510 failures.push(format!(
1511 "stage {} branch mismatch: expected {:?}, got {:?}",
1512 assertion.node_id, assertion.expected_branch, stage.branch
1513 ));
1514 }
1515 for required_kind in &assertion.required_artifact_kinds {
1516 if !stage
1517 .artifacts
1518 .iter()
1519 .any(|artifact| &artifact.kind == required_kind)
1520 {
1521 failures.push(format!(
1522 "stage {} missing artifact kind {}",
1523 assertion.node_id, required_kind
1524 ));
1525 }
1526 }
1527 if let Some(snippet) = &assertion.visible_text_contains {
1528 let actual = stage.visible_text.clone().unwrap_or_default();
1529 if !actual.contains(snippet) {
1530 failures.push(format!(
1531 "stage {} visible text does not contain expected snippet {:?}",
1532 assertion.node_id, snippet
1533 ));
1534 }
1535 }
1536 }
1537
1538 ReplayEvalReport {
1539 pass: failures.is_empty(),
1540 failures,
1541 stage_count: run.stages.len(),
1542 }
1543}
1544
1545pub fn evaluate_run_suite(
1546 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1547) -> ReplayEvalSuiteReport {
1548 let mut reports = Vec::new();
1549 for (run, fixture, source_path) in cases {
1550 let report = evaluate_run_against_fixture(&run, &fixture);
1551 reports.push(ReplayEvalCaseReport {
1552 run_id: run.id.clone(),
1553 workflow_id: run.workflow_id.clone(),
1554 label: None,
1555 pass: report.pass,
1556 failures: report.failures,
1557 stage_count: report.stage_count,
1558 source_path,
1559 comparison: None,
1560 });
1561 }
1562 let total = reports.len();
1563 let passed = reports.iter().filter(|report| report.pass).count();
1564 let failed = total.saturating_sub(passed);
1565 ReplayEvalSuiteReport {
1566 pass: failed == 0,
1567 total,
1568 passed,
1569 failed,
1570 cases: reports,
1571 }
1572}
1573
1574pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1575 let mut stage_diffs = Vec::new();
1576 let mut all_node_ids = BTreeSet::new();
1577 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1578 .stages
1579 .iter()
1580 .map(|s| (s.node_id.as_str(), s))
1581 .collect();
1582 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1583 .stages
1584 .iter()
1585 .map(|s| (s.node_id.as_str(), s))
1586 .collect();
1587 all_node_ids.extend(left_by_id.keys().copied());
1588 all_node_ids.extend(right_by_id.keys().copied());
1589
1590 for node_id in all_node_ids {
1591 let left_stage = left_by_id.get(node_id).copied();
1592 let right_stage = right_by_id.get(node_id).copied();
1593 match (left_stage, right_stage) {
1594 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1595 node_id: node_id.to_string(),
1596 change: "removed".to_string(),
1597 details: vec!["stage missing from right run".to_string()],
1598 }),
1599 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1600 node_id: node_id.to_string(),
1601 change: "added".to_string(),
1602 details: vec!["stage missing from left run".to_string()],
1603 }),
1604 (Some(left_stage), Some(right_stage)) => {
1605 let mut details = Vec::new();
1606 if left_stage.status != right_stage.status {
1607 details.push(format!(
1608 "status: {} -> {}",
1609 left_stage.status, right_stage.status
1610 ));
1611 }
1612 if left_stage.outcome != right_stage.outcome {
1613 details.push(format!(
1614 "outcome: {} -> {}",
1615 left_stage.outcome, right_stage.outcome
1616 ));
1617 }
1618 if left_stage.branch != right_stage.branch {
1619 details.push(format!(
1620 "branch: {:?} -> {:?}",
1621 left_stage.branch, right_stage.branch
1622 ));
1623 }
1624 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1625 {
1626 details.push(format!(
1627 "produced_artifacts: {} -> {}",
1628 left_stage.produced_artifact_ids.len(),
1629 right_stage.produced_artifact_ids.len()
1630 ));
1631 }
1632 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1633 details.push(format!(
1634 "artifact_records: {} -> {}",
1635 left_stage.artifacts.len(),
1636 right_stage.artifacts.len()
1637 ));
1638 }
1639 if !details.is_empty() {
1640 stage_diffs.push(RunStageDiffRecord {
1641 node_id: node_id.to_string(),
1642 change: "changed".to_string(),
1643 details,
1644 });
1645 }
1646 }
1647 (None, None) => {}
1648 }
1649 }
1650
1651 let mut tool_diffs = Vec::new();
1652 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1653 .tool_recordings
1654 .iter()
1655 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1656 .collect();
1657 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1658 .tool_recordings
1659 .iter()
1660 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1661 .collect();
1662 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1663 .keys()
1664 .chain(right_tools.keys())
1665 .cloned()
1666 .collect();
1667 for key in &all_tool_keys {
1668 let l = left_tools.get(key);
1669 let r = right_tools.get(key);
1670 let result_changed = match (l, r) {
1671 (Some(a), Some(b)) => a.result != b.result,
1672 _ => true,
1673 };
1674 if result_changed {
1675 tool_diffs.push(ToolCallDiffRecord {
1676 tool_name: key.0.clone(),
1677 args_hash: key.1.clone(),
1678 result_changed,
1679 left_result: l.map(|t| t.result.clone()),
1680 right_result: r.map(|t| t.result.clone()),
1681 });
1682 }
1683 }
1684
1685 let left_observability = left.observability.clone().unwrap_or_else(|| {
1686 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1687 });
1688 let right_observability = right.observability.clone().unwrap_or_else(|| {
1689 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1690 });
1691 let mut observability_diffs = Vec::new();
1692
1693 let left_workers = left_observability
1694 .worker_lineage
1695 .iter()
1696 .map(|worker| {
1697 (
1698 worker.worker_id.clone(),
1699 (
1700 worker.status.clone(),
1701 worker.run_id.clone(),
1702 worker.run_path.clone(),
1703 ),
1704 )
1705 })
1706 .collect::<BTreeMap<_, _>>();
1707 let right_workers = right_observability
1708 .worker_lineage
1709 .iter()
1710 .map(|worker| {
1711 (
1712 worker.worker_id.clone(),
1713 (
1714 worker.status.clone(),
1715 worker.run_id.clone(),
1716 worker.run_path.clone(),
1717 ),
1718 )
1719 })
1720 .collect::<BTreeMap<_, _>>();
1721 let worker_ids = left_workers
1722 .keys()
1723 .chain(right_workers.keys())
1724 .cloned()
1725 .collect::<BTreeSet<_>>();
1726 for worker_id in worker_ids {
1727 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1728 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1729 section: "worker_lineage".to_string(),
1730 label: worker_id,
1731 details: vec!["worker missing from right run".to_string()],
1732 }),
1733 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1734 section: "worker_lineage".to_string(),
1735 label: worker_id,
1736 details: vec!["worker missing from left run".to_string()],
1737 }),
1738 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1739 let mut details = Vec::new();
1740 if left_worker.0 != right_worker.0 {
1741 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1742 }
1743 if left_worker.1 != right_worker.1 {
1744 details.push(format!(
1745 "run_id: {:?} -> {:?}",
1746 left_worker.1, right_worker.1
1747 ));
1748 }
1749 if left_worker.2 != right_worker.2 {
1750 details.push(format!(
1751 "run_path: {:?} -> {:?}",
1752 left_worker.2, right_worker.2
1753 ));
1754 }
1755 observability_diffs.push(RunObservabilityDiffRecord {
1756 section: "worker_lineage".to_string(),
1757 label: worker_id,
1758 details,
1759 });
1760 }
1761 _ => {}
1762 }
1763 }
1764
1765 let left_rounds = left_observability
1766 .planner_rounds
1767 .iter()
1768 .map(|round| (round.stage_id.clone(), round))
1769 .collect::<BTreeMap<_, _>>();
1770 let right_rounds = right_observability
1771 .planner_rounds
1772 .iter()
1773 .map(|round| (round.stage_id.clone(), round))
1774 .collect::<BTreeMap<_, _>>();
1775 let round_ids = left_rounds
1776 .keys()
1777 .chain(right_rounds.keys())
1778 .cloned()
1779 .collect::<BTreeSet<_>>();
1780 for stage_id in round_ids {
1781 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1782 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1783 section: "planner_rounds".to_string(),
1784 label: stage_id,
1785 details: vec!["planner summary missing from right run".to_string()],
1786 }),
1787 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1788 section: "planner_rounds".to_string(),
1789 label: stage_id,
1790 details: vec!["planner summary missing from left run".to_string()],
1791 }),
1792 (Some(left_round), Some(right_round)) => {
1793 let mut details = Vec::new();
1794 if left_round.iteration_count != right_round.iteration_count {
1795 details.push(format!(
1796 "iterations: {} -> {}",
1797 left_round.iteration_count, right_round.iteration_count
1798 ));
1799 }
1800 if left_round.tool_execution_count != right_round.tool_execution_count {
1801 details.push(format!(
1802 "tool_executions: {} -> {}",
1803 left_round.tool_execution_count, right_round.tool_execution_count
1804 ));
1805 }
1806 if left_round.research_facts != right_round.research_facts {
1807 details.push(format!(
1808 "research_facts: {:?} -> {:?}",
1809 left_round.research_facts, right_round.research_facts
1810 ));
1811 }
1812 let left_deliverables = left_round
1813 .task_ledger
1814 .as_ref()
1815 .map(|ledger| {
1816 ledger
1817 .deliverables
1818 .iter()
1819 .map(|item| format!("{}:{}", item.id, item.status))
1820 .collect::<Vec<_>>()
1821 })
1822 .unwrap_or_default();
1823 let right_deliverables = right_round
1824 .task_ledger
1825 .as_ref()
1826 .map(|ledger| {
1827 ledger
1828 .deliverables
1829 .iter()
1830 .map(|item| format!("{}:{}", item.id, item.status))
1831 .collect::<Vec<_>>()
1832 })
1833 .unwrap_or_default();
1834 if left_deliverables != right_deliverables {
1835 details.push(format!(
1836 "deliverables: {:?} -> {:?}",
1837 left_deliverables, right_deliverables
1838 ));
1839 }
1840 if left_round.successful_tools != right_round.successful_tools {
1841 details.push(format!(
1842 "successful_tools: {:?} -> {:?}",
1843 left_round.successful_tools, right_round.successful_tools
1844 ));
1845 }
1846 if !details.is_empty() {
1847 observability_diffs.push(RunObservabilityDiffRecord {
1848 section: "planner_rounds".to_string(),
1849 label: left_round.node_id.clone(),
1850 details,
1851 });
1852 }
1853 }
1854 _ => {}
1855 }
1856 }
1857
1858 let left_pointers = left_observability
1859 .transcript_pointers
1860 .iter()
1861 .map(|pointer| {
1862 (
1863 pointer.id.clone(),
1864 (
1865 pointer.available,
1866 pointer.path.clone(),
1867 pointer.location.clone(),
1868 ),
1869 )
1870 })
1871 .collect::<BTreeMap<_, _>>();
1872 let right_pointers = right_observability
1873 .transcript_pointers
1874 .iter()
1875 .map(|pointer| {
1876 (
1877 pointer.id.clone(),
1878 (
1879 pointer.available,
1880 pointer.path.clone(),
1881 pointer.location.clone(),
1882 ),
1883 )
1884 })
1885 .collect::<BTreeMap<_, _>>();
1886 let pointer_ids = left_pointers
1887 .keys()
1888 .chain(right_pointers.keys())
1889 .cloned()
1890 .collect::<BTreeSet<_>>();
1891 for pointer_id in pointer_ids {
1892 match (
1893 left_pointers.get(&pointer_id),
1894 right_pointers.get(&pointer_id),
1895 ) {
1896 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1897 section: "transcript_pointers".to_string(),
1898 label: pointer_id,
1899 details: vec!["pointer missing from right run".to_string()],
1900 }),
1901 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1902 section: "transcript_pointers".to_string(),
1903 label: pointer_id,
1904 details: vec!["pointer missing from left run".to_string()],
1905 }),
1906 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
1907 observability_diffs.push(RunObservabilityDiffRecord {
1908 section: "transcript_pointers".to_string(),
1909 label: pointer_id,
1910 details: vec![format!(
1911 "pointer: {:?} -> {:?}",
1912 left_pointer, right_pointer
1913 )],
1914 });
1915 }
1916 _ => {}
1917 }
1918 }
1919
1920 let left_compactions = left_observability
1921 .compaction_events
1922 .iter()
1923 .map(|event| {
1924 (
1925 event.id.clone(),
1926 (
1927 event.strategy.clone(),
1928 event.archived_messages,
1929 event.snapshot_asset_id.clone(),
1930 event.available,
1931 ),
1932 )
1933 })
1934 .collect::<BTreeMap<_, _>>();
1935 let right_compactions = right_observability
1936 .compaction_events
1937 .iter()
1938 .map(|event| {
1939 (
1940 event.id.clone(),
1941 (
1942 event.strategy.clone(),
1943 event.archived_messages,
1944 event.snapshot_asset_id.clone(),
1945 event.available,
1946 ),
1947 )
1948 })
1949 .collect::<BTreeMap<_, _>>();
1950 let compaction_ids = left_compactions
1951 .keys()
1952 .chain(right_compactions.keys())
1953 .cloned()
1954 .collect::<BTreeSet<_>>();
1955 for compaction_id in compaction_ids {
1956 match (
1957 left_compactions.get(&compaction_id),
1958 right_compactions.get(&compaction_id),
1959 ) {
1960 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1961 section: "compaction_events".to_string(),
1962 label: compaction_id,
1963 details: vec!["compaction event missing from right run".to_string()],
1964 }),
1965 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1966 section: "compaction_events".to_string(),
1967 label: compaction_id,
1968 details: vec!["compaction event missing from left run".to_string()],
1969 }),
1970 (Some(left_event), Some(right_event)) if left_event != right_event => {
1971 observability_diffs.push(RunObservabilityDiffRecord {
1972 section: "compaction_events".to_string(),
1973 label: compaction_id,
1974 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
1975 });
1976 }
1977 _ => {}
1978 }
1979 }
1980
1981 let left_daemons = left_observability
1982 .daemon_events
1983 .iter()
1984 .map(|event| {
1985 (
1986 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
1987 (
1988 event.name.clone(),
1989 event.persist_path.clone(),
1990 event.payload_summary.clone(),
1991 ),
1992 )
1993 })
1994 .collect::<BTreeMap<_, _>>();
1995 let right_daemons = right_observability
1996 .daemon_events
1997 .iter()
1998 .map(|event| {
1999 (
2000 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2001 (
2002 event.name.clone(),
2003 event.persist_path.clone(),
2004 event.payload_summary.clone(),
2005 ),
2006 )
2007 })
2008 .collect::<BTreeMap<_, _>>();
2009 let daemon_keys = left_daemons
2010 .keys()
2011 .chain(right_daemons.keys())
2012 .cloned()
2013 .collect::<BTreeSet<_>>();
2014 for daemon_key in daemon_keys {
2015 let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2016 match (
2017 left_daemons.get(&daemon_key),
2018 right_daemons.get(&daemon_key),
2019 ) {
2020 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2021 section: "daemon_events".to_string(),
2022 label,
2023 details: vec!["daemon event missing from right run".to_string()],
2024 }),
2025 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2026 section: "daemon_events".to_string(),
2027 label,
2028 details: vec!["daemon event missing from left run".to_string()],
2029 }),
2030 (Some(left_event), Some(right_event)) if left_event != right_event => {
2031 observability_diffs.push(RunObservabilityDiffRecord {
2032 section: "daemon_events".to_string(),
2033 label,
2034 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2035 });
2036 }
2037 _ => {}
2038 }
2039 }
2040
2041 let left_verification = left_observability
2042 .verification_outcomes
2043 .iter()
2044 .map(|item| (item.stage_id.clone(), item))
2045 .collect::<BTreeMap<_, _>>();
2046 let right_verification = right_observability
2047 .verification_outcomes
2048 .iter()
2049 .map(|item| (item.stage_id.clone(), item))
2050 .collect::<BTreeMap<_, _>>();
2051 let verification_ids = left_verification
2052 .keys()
2053 .chain(right_verification.keys())
2054 .cloned()
2055 .collect::<BTreeSet<_>>();
2056 for stage_id in verification_ids {
2057 match (
2058 left_verification.get(&stage_id),
2059 right_verification.get(&stage_id),
2060 ) {
2061 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2062 section: "verification".to_string(),
2063 label: stage_id,
2064 details: vec!["verification missing from right run".to_string()],
2065 }),
2066 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2067 section: "verification".to_string(),
2068 label: stage_id,
2069 details: vec!["verification missing from left run".to_string()],
2070 }),
2071 (Some(left_item), Some(right_item)) if left_item != right_item => {
2072 let mut details = Vec::new();
2073 if left_item.passed != right_item.passed {
2074 details.push(format!(
2075 "passed: {:?} -> {:?}",
2076 left_item.passed, right_item.passed
2077 ));
2078 }
2079 if left_item.summary != right_item.summary {
2080 details.push(format!(
2081 "summary: {:?} -> {:?}",
2082 left_item.summary, right_item.summary
2083 ));
2084 }
2085 observability_diffs.push(RunObservabilityDiffRecord {
2086 section: "verification".to_string(),
2087 label: left_item.node_id.clone(),
2088 details,
2089 });
2090 }
2091 _ => {}
2092 }
2093 }
2094
2095 let left_graph = (
2096 left_observability.action_graph_nodes.len(),
2097 left_observability.action_graph_edges.len(),
2098 );
2099 let right_graph = (
2100 right_observability.action_graph_nodes.len(),
2101 right_observability.action_graph_edges.len(),
2102 );
2103 if left_graph != right_graph {
2104 observability_diffs.push(RunObservabilityDiffRecord {
2105 section: "action_graph".to_string(),
2106 label: "shape".to_string(),
2107 details: vec![format!(
2108 "nodes/edges: {}/{} -> {}/{}",
2109 left_graph.0, left_graph.1, right_graph.0, right_graph.1
2110 )],
2111 });
2112 }
2113
2114 let status_changed = left.status != right.status;
2115 let identical = !status_changed
2116 && stage_diffs.is_empty()
2117 && tool_diffs.is_empty()
2118 && observability_diffs.is_empty()
2119 && left.transitions.len() == right.transitions.len()
2120 && left.artifacts.len() == right.artifacts.len()
2121 && left.checkpoints.len() == right.checkpoints.len();
2122
2123 RunDiffReport {
2124 left_run_id: left.id.clone(),
2125 right_run_id: right.id.clone(),
2126 identical,
2127 status_changed,
2128 left_status: left.status.clone(),
2129 right_status: right.status.clone(),
2130 stage_diffs,
2131 tool_diffs,
2132 observability_diffs,
2133 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2134 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2135 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2136 }
2137}