Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10    CapabilityPolicy,
11};
12use crate::event_log::{active_event_log, EventLog, LogEvent as EventLogRecord, Topic};
13use crate::llm::vm_value_to_json;
14use crate::triggers::{SignatureStatus, TriggerEvent};
15use crate::value::{VmError, VmValue};
16
17pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
18pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
19pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
21pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
22pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
23pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
24pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
25pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
26pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
27
28pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
29pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
30pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
31pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
32pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
33pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
34pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
35pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
36pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
39#[serde(default)]
40pub struct LlmUsageRecord {
41    pub input_tokens: i64,
42    pub output_tokens: i64,
43    pub total_duration_ms: i64,
44    pub call_count: i64,
45    pub total_cost: f64,
46    pub models: Vec<String>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageRecord {
52    pub id: String,
53    pub node_id: String,
54    pub kind: String,
55    pub status: String,
56    pub outcome: String,
57    pub branch: Option<String>,
58    pub started_at: String,
59    pub finished_at: Option<String>,
60    pub visible_text: Option<String>,
61    pub private_reasoning: Option<String>,
62    pub transcript: Option<serde_json::Value>,
63    pub verification: Option<serde_json::Value>,
64    pub usage: Option<LlmUsageRecord>,
65    pub artifacts: Vec<ArtifactRecord>,
66    pub consumed_artifact_ids: Vec<String>,
67    pub produced_artifact_ids: Vec<String>,
68    pub attempts: Vec<RunStageAttemptRecord>,
69    pub metadata: BTreeMap<String, serde_json::Value>,
70}
71
72#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
73#[serde(default)]
74pub struct RunStageAttemptRecord {
75    pub attempt: usize,
76    pub status: String,
77    pub outcome: String,
78    pub branch: Option<String>,
79    pub error: Option<String>,
80    pub verification: Option<serde_json::Value>,
81    pub started_at: String,
82    pub finished_at: Option<String>,
83}
84
85#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(default)]
87pub struct RunTransitionRecord {
88    pub id: String,
89    pub from_stage_id: Option<String>,
90    pub from_node_id: Option<String>,
91    pub to_node_id: String,
92    pub branch: Option<String>,
93    pub timestamp: String,
94    pub consumed_artifact_ids: Vec<String>,
95    pub produced_artifact_ids: Vec<String>,
96}
97
98#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
99#[serde(default)]
100pub struct RunCheckpointRecord {
101    pub id: String,
102    pub ready_nodes: Vec<String>,
103    pub completed_nodes: Vec<String>,
104    pub last_stage_id: Option<String>,
105    pub persisted_at: String,
106    pub reason: String,
107}
108
109#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
110#[serde(default)]
111pub struct ReplayFixture {
112    #[serde(rename = "_type")]
113    pub type_name: String,
114    pub id: String,
115    pub source_run_id: String,
116    pub workflow_id: String,
117    pub workflow_name: Option<String>,
118    pub created_at: String,
119    pub expected_status: String,
120    pub stage_assertions: Vec<ReplayStageAssertion>,
121}
122
123#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
124#[serde(default)]
125pub struct ReplayStageAssertion {
126    pub node_id: String,
127    pub expected_status: String,
128    pub expected_outcome: String,
129    pub expected_branch: Option<String>,
130    pub required_artifact_kinds: Vec<String>,
131    pub visible_text_contains: Option<String>,
132}
133
134#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(default)]
136pub struct ReplayEvalReport {
137    pub pass: bool,
138    pub failures: Vec<String>,
139    pub stage_count: usize,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct ReplayEvalCaseReport {
145    pub run_id: String,
146    pub workflow_id: String,
147    pub label: Option<String>,
148    pub pass: bool,
149    pub failures: Vec<String>,
150    pub stage_count: usize,
151    pub source_path: Option<String>,
152    pub comparison: Option<RunDiffReport>,
153}
154
155#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
156#[serde(default)]
157pub struct ReplayEvalSuiteReport {
158    pub pass: bool,
159    pub total: usize,
160    pub passed: usize,
161    pub failed: usize,
162    pub cases: Vec<ReplayEvalCaseReport>,
163}
164
165#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166#[serde(default)]
167pub struct RunDeliverableSummaryRecord {
168    pub id: String,
169    pub text: String,
170    pub status: String,
171    pub note: Option<String>,
172}
173
174#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
175#[serde(default)]
176pub struct RunTaskLedgerSummaryRecord {
177    pub root_task: String,
178    pub rationale: String,
179    pub deliverables: Vec<RunDeliverableSummaryRecord>,
180    pub observations: Vec<String>,
181    pub blocking_count: usize,
182}
183
184#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
185#[serde(default)]
186pub struct RunPlannerRoundRecord {
187    pub stage_id: String,
188    pub node_id: String,
189    pub stage_kind: String,
190    pub status: String,
191    pub outcome: String,
192    pub iteration_count: usize,
193    pub llm_call_count: usize,
194    pub tool_execution_count: usize,
195    pub tool_rejection_count: usize,
196    pub intervention_count: usize,
197    pub compaction_count: usize,
198    pub tools_used: Vec<String>,
199    pub successful_tools: Vec<String>,
200    pub ledger_done_rejections: usize,
201    pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
202    pub research_facts: Vec<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(default)]
207pub struct RunWorkerLineageRecord {
208    pub worker_id: String,
209    pub worker_name: String,
210    pub parent_stage_id: Option<String>,
211    pub task: String,
212    pub status: String,
213    pub session_id: Option<String>,
214    pub parent_session_id: Option<String>,
215    pub run_id: Option<String>,
216    pub run_path: Option<String>,
217    pub snapshot_path: Option<String>,
218}
219
220#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
221#[serde(default)]
222pub struct RunActionGraphNodeRecord {
223    pub id: String,
224    pub label: String,
225    pub kind: String,
226    pub status: String,
227    pub outcome: String,
228    pub trace_id: Option<String>,
229    pub stage_id: Option<String>,
230    pub node_id: Option<String>,
231    pub worker_id: Option<String>,
232    pub run_id: Option<String>,
233    pub run_path: Option<String>,
234}
235
236#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
237#[serde(default)]
238pub struct RunActionGraphEdgeRecord {
239    pub from_id: String,
240    pub to_id: String,
241    pub kind: String,
242    pub label: Option<String>,
243}
244
245#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(default)]
247pub struct RunVerificationOutcomeRecord {
248    pub stage_id: String,
249    pub node_id: String,
250    pub status: String,
251    pub passed: Option<bool>,
252    pub summary: Option<String>,
253}
254
255#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
256#[serde(default)]
257pub struct RunTranscriptPointerRecord {
258    pub id: String,
259    pub label: String,
260    pub kind: String,
261    pub location: String,
262    pub path: Option<String>,
263    pub available: bool,
264}
265
266#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
267#[serde(default)]
268pub struct CompactionEventRecord {
269    pub id: String,
270    pub transcript_id: Option<String>,
271    pub stage_id: Option<String>,
272    pub node_id: Option<String>,
273    pub mode: String,
274    pub strategy: String,
275    pub archived_messages: usize,
276    pub estimated_tokens_before: usize,
277    pub estimated_tokens_after: usize,
278    pub snapshot_asset_id: Option<String>,
279    pub snapshot_location: String,
280    pub snapshot_path: Option<String>,
281    pub available: bool,
282}
283
284#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
285#[serde(rename_all = "snake_case")]
286pub enum DaemonEventKindRecord {
287    #[default]
288    Spawned,
289    Triggered,
290    Snapshotted,
291    Resumed,
292    Stopped,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
296#[serde(default)]
297pub struct DaemonEventRecord {
298    pub daemon_id: String,
299    pub name: String,
300    pub kind: DaemonEventKindRecord,
301    pub timestamp: String,
302    pub persist_path: String,
303    pub payload_summary: Option<String>,
304}
305
306#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
307#[serde(default)]
308pub struct RunObservabilityRecord {
309    pub schema_version: usize,
310    pub planner_rounds: Vec<RunPlannerRoundRecord>,
311    pub research_fact_count: usize,
312    pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
313    pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
314    pub worker_lineage: Vec<RunWorkerLineageRecord>,
315    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
316    pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
317    pub compaction_events: Vec<CompactionEventRecord>,
318    pub daemon_events: Vec<DaemonEventRecord>,
319}
320
321#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(default)]
323pub struct RunStageDiffRecord {
324    pub node_id: String,
325    pub change: String,
326    pub details: Vec<String>,
327}
328
329#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
330#[serde(default)]
331pub struct ToolCallDiffRecord {
332    pub tool_name: String,
333    pub args_hash: String,
334    pub result_changed: bool,
335    pub left_result: Option<String>,
336    pub right_result: Option<String>,
337}
338
339#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
340#[serde(default)]
341pub struct RunObservabilityDiffRecord {
342    pub section: String,
343    pub label: String,
344    pub details: Vec<String>,
345}
346
347#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
348#[serde(default)]
349pub struct RunDiffReport {
350    pub left_run_id: String,
351    pub right_run_id: String,
352    pub identical: bool,
353    pub status_changed: bool,
354    pub left_status: String,
355    pub right_status: String,
356    pub stage_diffs: Vec<RunStageDiffRecord>,
357    pub tool_diffs: Vec<ToolCallDiffRecord>,
358    pub observability_diffs: Vec<RunObservabilityDiffRecord>,
359    pub transition_count_delta: isize,
360    pub artifact_count_delta: isize,
361    pub checkpoint_count_delta: isize,
362}
363
364#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
365#[serde(default)]
366pub struct EvalSuiteManifest {
367    #[serde(rename = "_type")]
368    pub type_name: String,
369    pub id: String,
370    pub name: Option<String>,
371    pub base_dir: Option<String>,
372    pub cases: Vec<EvalSuiteCase>,
373}
374
375#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
376#[serde(default)]
377pub struct EvalSuiteCase {
378    pub label: Option<String>,
379    pub run_path: String,
380    pub fixture_path: Option<String>,
381    pub compare_to: Option<String>,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
385#[serde(default)]
386pub struct RunRecord {
387    #[serde(rename = "_type")]
388    pub type_name: String,
389    pub id: String,
390    pub workflow_id: String,
391    pub workflow_name: Option<String>,
392    pub task: String,
393    pub status: String,
394    pub started_at: String,
395    pub finished_at: Option<String>,
396    pub parent_run_id: Option<String>,
397    pub root_run_id: Option<String>,
398    pub stages: Vec<RunStageRecord>,
399    pub transitions: Vec<RunTransitionRecord>,
400    pub checkpoints: Vec<RunCheckpointRecord>,
401    pub pending_nodes: Vec<String>,
402    pub completed_nodes: Vec<String>,
403    pub child_runs: Vec<RunChildRecord>,
404    pub artifacts: Vec<ArtifactRecord>,
405    pub policy: CapabilityPolicy,
406    pub execution: Option<RunExecutionRecord>,
407    pub transcript: Option<serde_json::Value>,
408    pub usage: Option<LlmUsageRecord>,
409    pub replay_fixture: Option<ReplayFixture>,
410    pub observability: Option<RunObservabilityRecord>,
411    pub trace_spans: Vec<RunTraceSpanRecord>,
412    pub tool_recordings: Vec<ToolCallRecord>,
413    pub metadata: BTreeMap<String, serde_json::Value>,
414    pub persisted_path: Option<String>,
415}
416
417#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
418#[serde(default)]
419pub struct ToolCallRecord {
420    pub tool_name: String,
421    pub tool_use_id: String,
422    pub args_hash: String,
423    pub result: String,
424    pub is_rejected: bool,
425    pub duration_ms: u64,
426    pub iteration: usize,
427    pub timestamp: String,
428}
429
430/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
431pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
432    use std::hash::{Hash, Hasher};
433    let mut hasher = std::collections::hash_map::DefaultHasher::new();
434    tool_name.hash(&mut hasher);
435    let args_str = serde_json::to_string(args).unwrap_or_default();
436    args_str.hash(&mut hasher);
437    format!("{:016x}", hasher.finish())
438}
439
440#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
441#[serde(default)]
442pub struct RunTraceSpanRecord {
443    pub span_id: u64,
444    pub parent_id: Option<u64>,
445    pub kind: String,
446    pub name: String,
447    pub start_ms: u64,
448    pub duration_ms: u64,
449    pub metadata: BTreeMap<String, serde_json::Value>,
450}
451
452#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
453#[serde(default)]
454pub struct RunChildRecord {
455    pub worker_id: String,
456    pub worker_name: String,
457    pub parent_stage_id: Option<String>,
458    pub session_id: Option<String>,
459    pub parent_session_id: Option<String>,
460    pub mutation_scope: Option<String>,
461    pub approval_policy: Option<super::ToolApprovalPolicy>,
462    pub task: String,
463    pub request: Option<serde_json::Value>,
464    pub provenance: Option<serde_json::Value>,
465    pub status: String,
466    pub started_at: String,
467    pub finished_at: Option<String>,
468    pub run_id: Option<String>,
469    pub run_path: Option<String>,
470    pub snapshot_path: Option<String>,
471    pub execution: Option<RunExecutionRecord>,
472}
473
474#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
475#[serde(default)]
476pub struct RunExecutionRecord {
477    pub cwd: Option<String>,
478    pub source_dir: Option<String>,
479    pub env: BTreeMap<String, String>,
480    pub adapter: Option<String>,
481    pub repo_path: Option<String>,
482    pub worktree_path: Option<String>,
483    pub branch: Option<String>,
484    pub base_ref: Option<String>,
485    pub cleanup: Option<String>,
486}
487
488fn compact_json_value(value: &serde_json::Value) -> String {
489    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
490}
491
492fn signature_status_label(status: &SignatureStatus) -> &'static str {
493    match status {
494        SignatureStatus::Verified => "verified",
495        SignatureStatus::Unsigned => "unsigned",
496        SignatureStatus::Failed { .. } => "failed",
497    }
498}
499
500fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
501    run.metadata
502        .get("trigger_event")
503        .cloned()
504        .and_then(|value| serde_json::from_value(value).ok())
505}
506
507fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
508    trigger_event
509        .map(|event| event.trace_id.0.clone())
510        .or_else(|| {
511            run.metadata
512                .get("trace_id")
513                .and_then(|value| value.as_str())
514                .map(str::to_string)
515        })
516}
517
518fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
519    run.metadata
520        .get("replay_of_event_id")
521        .and_then(|value| value.as_str())
522        .map(str::to_string)
523}
524
525fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
526    if stage.kind == "condition" {
527        ACTION_GRAPH_NODE_KIND_PREDICATE
528    } else {
529        ACTION_GRAPH_NODE_KIND_STAGE
530    }
531}
532
533fn append_action_graph_node(
534    nodes: &mut Vec<RunActionGraphNodeRecord>,
535    record: RunActionGraphNodeRecord,
536) {
537    nodes.push(record);
538}
539
540pub async fn append_action_graph_update(
541    headers: BTreeMap<String, String>,
542    payload: serde_json::Value,
543) -> Result<(), crate::event_log::LogError> {
544    let Some(log) = active_event_log() else {
545        return Ok(());
546    };
547    let topic = Topic::new("observability.action_graph")
548        .expect("static observability.action_graph topic should always be valid");
549    let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
550    log.append(&topic, record).await.map(|_| ())
551}
552
553fn publish_action_graph_event(
554    run: &RunRecord,
555    observability: &RunObservabilityRecord,
556    path: &Path,
557) {
558    let trigger_event = trigger_event_from_run(run);
559    let mut headers = BTreeMap::new();
560    headers.insert("run_id".to_string(), run.id.clone());
561    headers.insert("workflow_id".to_string(), run.workflow_id.clone());
562    if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
563        headers.insert("trace_id".to_string(), trace_id);
564    }
565    let payload = serde_json::json!({
566        "run_id": run.id,
567        "workflow_id": run.workflow_id,
568        "persisted_path": path.to_string_lossy(),
569        "status": run.status,
570        "observability": observability,
571    });
572    if let Ok(handle) = tokio::runtime::Handle::try_current() {
573        handle.spawn(async move {
574            let _ = append_action_graph_update(headers, payload).await;
575        });
576    } else {
577        let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
578    }
579}
580
581fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
582    let stem = run_path.file_stem()?.to_str()?;
583    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
584    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
585}
586
587fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
588    value
589        .and_then(|value| value.as_array())
590        .map(|items| {
591            items
592                .iter()
593                .filter_map(|item| item.as_str().map(str::to_string))
594                .collect::<Vec<_>>()
595        })
596        .unwrap_or_default()
597}
598
599fn json_usize(value: Option<&serde_json::Value>) -> usize {
600    value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
601}
602
603fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
604    value.and_then(|value| value.as_bool())
605}
606
607fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
608    stage
609        .artifacts
610        .iter()
611        .find_map(|artifact| artifact.data.as_ref())
612}
613
614fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
615    let deliverables = value
616        .get("deliverables")
617        .and_then(|raw| raw.as_array())
618        .map(|items| {
619            items
620                .iter()
621                .map(|item| RunDeliverableSummaryRecord {
622                    id: item
623                        .get("id")
624                        .and_then(|value| value.as_str())
625                        .unwrap_or_default()
626                        .to_string(),
627                    text: item
628                        .get("text")
629                        .and_then(|value| value.as_str())
630                        .unwrap_or_default()
631                        .to_string(),
632                    status: item
633                        .get("status")
634                        .and_then(|value| value.as_str())
635                        .unwrap_or_default()
636                        .to_string(),
637                    note: item
638                        .get("note")
639                        .and_then(|value| value.as_str())
640                        .map(str::to_string),
641                })
642                .collect::<Vec<_>>()
643        })
644        .unwrap_or_default();
645    let observations = json_string_array(value.get("observations"));
646    let root_task = value
647        .get("root_task")
648        .and_then(|value| value.as_str())
649        .unwrap_or_default()
650        .to_string();
651    let rationale = value
652        .get("rationale")
653        .and_then(|value| value.as_str())
654        .unwrap_or_default()
655        .to_string();
656    if root_task.is_empty()
657        && rationale.is_empty()
658        && deliverables.is_empty()
659        && observations.is_empty()
660    {
661        return None;
662    }
663    let blocking_count = deliverables
664        .iter()
665        .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
666        .count();
667    Some(RunTaskLedgerSummaryRecord {
668        root_task,
669        rationale,
670        deliverables,
671        observations,
672        blocking_count,
673    })
674}
675
676fn compaction_events_from_transcript(
677    transcript: &serde_json::Value,
678    stage_id: Option<&str>,
679    node_id: Option<&str>,
680    location_prefix: &str,
681    persisted_path: Option<&Path>,
682) -> Vec<CompactionEventRecord> {
683    let transcript_id = transcript
684        .get("id")
685        .and_then(|value| value.as_str())
686        .map(str::to_string);
687    let asset_ids = transcript
688        .get("assets")
689        .and_then(|value| value.as_array())
690        .map(|assets| {
691            assets
692                .iter()
693                .filter_map(|asset| {
694                    asset
695                        .get("id")
696                        .and_then(|value| value.as_str())
697                        .map(str::to_string)
698                })
699                .collect::<BTreeSet<_>>()
700        })
701        .unwrap_or_default();
702    transcript
703        .get("events")
704        .and_then(|value| value.as_array())
705        .map(|events| {
706            events
707                .iter()
708                .filter(|event| {
709                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
710                })
711                .map(|event| {
712                    let metadata = event.get("metadata");
713                    let snapshot_asset_id = metadata
714                        .and_then(|value| value.get("snapshot_asset_id"))
715                        .and_then(|value| value.as_str())
716                        .map(str::to_string);
717                    let available = snapshot_asset_id
718                        .as_ref()
719                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
720                    let snapshot_location = snapshot_asset_id
721                        .as_ref()
722                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
723                        .unwrap_or_else(|| location_prefix.to_string());
724                    CompactionEventRecord {
725                        id: event
726                            .get("id")
727                            .and_then(|value| value.as_str())
728                            .unwrap_or_default()
729                            .to_string(),
730                        transcript_id: transcript_id.clone(),
731                        stage_id: stage_id.map(str::to_string),
732                        node_id: node_id.map(str::to_string),
733                        mode: metadata
734                            .and_then(|value| value.get("mode"))
735                            .and_then(|value| value.as_str())
736                            .unwrap_or_default()
737                            .to_string(),
738                        strategy: metadata
739                            .and_then(|value| value.get("strategy"))
740                            .and_then(|value| value.as_str())
741                            .unwrap_or_default()
742                            .to_string(),
743                        archived_messages: json_usize(
744                            metadata.and_then(|value| value.get("archived_messages")),
745                        ),
746                        estimated_tokens_before: json_usize(
747                            metadata.and_then(|value| value.get("estimated_tokens_before")),
748                        ),
749                        estimated_tokens_after: json_usize(
750                            metadata.and_then(|value| value.get("estimated_tokens_after")),
751                        ),
752                        snapshot_asset_id,
753                        snapshot_location,
754                        snapshot_path: persisted_path
755                            .map(|path| path.to_string_lossy().into_owned()),
756                        available,
757                    }
758                })
759                .collect()
760        })
761        .unwrap_or_default()
762}
763
764fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
765    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
766        return Vec::new();
767    };
768    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
769        return Vec::new();
770    };
771
772    content
773        .lines()
774        .filter(|line| !line.trim().is_empty())
775        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
776        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
777        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
778        .collect()
779}
780
781pub fn derive_run_observability(
782    run: &RunRecord,
783    persisted_path: Option<&Path>,
784) -> RunObservabilityRecord {
785    let mut action_graph_nodes = Vec::new();
786    let mut action_graph_edges = Vec::new();
787    let mut verification_outcomes = Vec::new();
788    let mut planner_rounds = Vec::new();
789    let mut transcript_pointers = Vec::new();
790    let mut compaction_events = Vec::new();
791    let mut daemon_events = Vec::new();
792    let mut research_fact_count = 0usize;
793
794    let root_node_id = format!("run:{}", run.id);
795    let trigger_event = trigger_event_from_run(run);
796    let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
797    append_action_graph_node(
798        &mut action_graph_nodes,
799        RunActionGraphNodeRecord {
800            id: root_node_id.clone(),
801            label: run
802                .workflow_name
803                .clone()
804                .unwrap_or_else(|| run.workflow_id.clone()),
805            kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
806            status: run.status.clone(),
807            outcome: run.status.clone(),
808            trace_id: propagated_trace_id.clone(),
809            stage_id: None,
810            node_id: None,
811            worker_id: None,
812            run_id: Some(run.id.clone()),
813            run_path: run.persisted_path.clone(),
814        },
815    );
816    let mut entry_node_id = root_node_id.clone();
817    if let Some(trigger_event) = trigger_event.as_ref() {
818        if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
819            let replay_source_node_id = format!("trigger:{replay_of_event_id}");
820            append_action_graph_node(
821                &mut action_graph_nodes,
822                RunActionGraphNodeRecord {
823                    id: replay_source_node_id.clone(),
824                    label: format!(
825                        "{}:{} (original {})",
826                        trigger_event.provider.as_str(),
827                        trigger_event.kind,
828                        replay_of_event_id
829                    ),
830                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
831                    status: "historical".to_string(),
832                    outcome: "replayed_from".to_string(),
833                    trace_id: Some(trigger_event.trace_id.0.clone()),
834                    stage_id: None,
835                    node_id: None,
836                    worker_id: None,
837                    run_id: Some(run.id.clone()),
838                    run_path: run.persisted_path.clone(),
839                },
840            );
841            action_graph_edges.push(RunActionGraphEdgeRecord {
842                from_id: replay_source_node_id,
843                to_id: format!("trigger:{}", trigger_event.id.0),
844                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
845                label: Some("replay chain".to_string()),
846            });
847        }
848        let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
849        append_action_graph_node(
850            &mut action_graph_nodes,
851            RunActionGraphNodeRecord {
852                id: trigger_node_id.clone(),
853                label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
854                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
855                status: "received".to_string(),
856                outcome: signature_status_label(&trigger_event.signature_status).to_string(),
857                trace_id: Some(trigger_event.trace_id.0.clone()),
858                stage_id: None,
859                node_id: None,
860                worker_id: None,
861                run_id: Some(run.id.clone()),
862                run_path: run.persisted_path.clone(),
863            },
864        );
865        action_graph_edges.push(RunActionGraphEdgeRecord {
866            from_id: root_node_id.clone(),
867            to_id: trigger_node_id.clone(),
868            kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
869            label: Some(trigger_event.id.0.clone()),
870        });
871        entry_node_id = trigger_node_id;
872    }
873
874    let stage_node_ids = run
875        .stages
876        .iter()
877        .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
878        .collect::<BTreeMap<_, _>>();
879    let stage_by_id = run
880        .stages
881        .iter()
882        .map(|stage| (stage.id.as_str(), stage))
883        .collect::<BTreeMap<_, _>>();
884    let stage_by_node_id = run
885        .stages
886        .iter()
887        .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
888        .collect::<BTreeMap<_, _>>();
889
890    let incoming_nodes = run
891        .transitions
892        .iter()
893        .map(|transition| transition.to_node_id.clone())
894        .collect::<BTreeSet<_>>();
895
896    for stage in &run.stages {
897        let graph_node_id = stage_node_ids
898            .get(&stage.id)
899            .cloned()
900            .unwrap_or_else(|| format!("stage:{}", stage.id));
901        append_action_graph_node(
902            &mut action_graph_nodes,
903            RunActionGraphNodeRecord {
904                id: graph_node_id.clone(),
905                label: stage.node_id.clone(),
906                kind: action_graph_kind_for_stage(stage).to_string(),
907                status: stage.status.clone(),
908                outcome: stage.outcome.clone(),
909                trace_id: propagated_trace_id.clone(),
910                stage_id: Some(stage.id.clone()),
911                node_id: Some(stage.node_id.clone()),
912                worker_id: stage
913                    .metadata
914                    .get("worker_id")
915                    .and_then(|value| value.as_str())
916                    .map(str::to_string),
917                run_id: None,
918                run_path: None,
919            },
920        );
921        if !incoming_nodes.contains(&stage.node_id) {
922            action_graph_edges.push(RunActionGraphEdgeRecord {
923                from_id: entry_node_id.clone(),
924                to_id: graph_node_id.clone(),
925                kind: if trigger_event.is_some() {
926                    ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
927                } else {
928                    ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
929                },
930                label: None,
931            });
932        }
933
934        if stage.kind == "verify" || stage.verification.is_some() {
935            let passed = json_bool(
936                stage
937                    .verification
938                    .as_ref()
939                    .and_then(|value| value.get("pass")),
940            )
941            .or_else(|| {
942                json_bool(
943                    stage
944                        .verification
945                        .as_ref()
946                        .and_then(|value| value.get("success")),
947                )
948            })
949            .or_else(|| {
950                if stage.status == "completed" && stage.outcome == "success" {
951                    Some(true)
952                } else if stage.status == "failed" || stage.outcome == "failed" {
953                    Some(false)
954                } else {
955                    None
956                }
957            });
958            verification_outcomes.push(RunVerificationOutcomeRecord {
959                stage_id: stage.id.clone(),
960                node_id: stage.node_id.clone(),
961                status: stage.status.clone(),
962                passed,
963                summary: stage
964                    .verification
965                    .as_ref()
966                    .map(compact_json_value)
967                    .or_else(|| {
968                        stage
969                            .visible_text
970                            .as_ref()
971                            .filter(|value| !value.trim().is_empty())
972                            .cloned()
973                    }),
974            });
975        }
976
977        if stage.transcript.is_some() {
978            transcript_pointers.push(RunTranscriptPointerRecord {
979                id: format!("stage:{}:transcript", stage.id),
980                label: format!("Stage {} transcript", stage.node_id),
981                kind: "embedded_transcript".to_string(),
982                location: format!("run.stages[{}].transcript", stage.node_id),
983                path: run.persisted_path.clone(),
984                available: true,
985            });
986            if let Some(transcript) = stage.transcript.as_ref() {
987                compaction_events.extend(compaction_events_from_transcript(
988                    transcript,
989                    Some(&stage.id),
990                    Some(&stage.node_id),
991                    &format!("run.stages[{}].transcript", stage.node_id),
992                    persisted_path,
993                ));
994            }
995        }
996
997        if let Some(payload) = stage_result_payload(stage) {
998            let trace = payload.get("trace");
999            let task_ledger = payload
1000                .get("task_ledger")
1001                .and_then(task_ledger_summary_from_value);
1002            let research_facts = task_ledger
1003                .as_ref()
1004                .map(|ledger| ledger.observations.clone())
1005                .unwrap_or_default();
1006            research_fact_count += research_facts.len();
1007            let tools_used = json_string_array(
1008                payload
1009                    .get("tools_used")
1010                    .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1011            );
1012            let successful_tools = json_string_array(payload.get("successful_tools"));
1013            let planner_round = RunPlannerRoundRecord {
1014                stage_id: stage.id.clone(),
1015                node_id: stage.node_id.clone(),
1016                stage_kind: stage.kind.clone(),
1017                status: stage.status.clone(),
1018                outcome: stage.outcome.clone(),
1019                iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1020                llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1021                tool_execution_count: json_usize(
1022                    trace.and_then(|trace| trace.get("tool_executions")),
1023                ),
1024                tool_rejection_count: json_usize(
1025                    trace.and_then(|trace| trace.get("tool_rejections")),
1026                ),
1027                intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1028                compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1029                tools_used,
1030                successful_tools,
1031                ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1032                task_ledger,
1033                research_facts,
1034            };
1035            let has_agentic_detail = planner_round.iteration_count > 0
1036                || planner_round.llm_call_count > 0
1037                || planner_round.tool_execution_count > 0
1038                || planner_round.ledger_done_rejections > 0
1039                || planner_round.task_ledger.is_some()
1040                || !planner_round.tools_used.is_empty()
1041                || !planner_round.successful_tools.is_empty();
1042            if has_agentic_detail {
1043                planner_rounds.push(planner_round);
1044            }
1045        }
1046    }
1047
1048    for transition in &run.transitions {
1049        let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1050            continue;
1051        };
1052        let from_stage = transition
1053            .from_stage_id
1054            .as_deref()
1055            .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1056        let from_id = transition
1057            .from_stage_id
1058            .as_ref()
1059            .and_then(|stage_id| stage_node_ids.get(stage_id))
1060            .cloned()
1061            .or_else(|| {
1062                transition
1063                    .from_node_id
1064                    .as_ref()
1065                    .and_then(|node_id| stage_by_node_id.get(node_id))
1066                    .cloned()
1067            })
1068            .unwrap_or_else(|| root_node_id.clone());
1069        action_graph_edges.push(RunActionGraphEdgeRecord {
1070            from_id,
1071            to_id,
1072            kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1073                ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1074            } else {
1075                ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1076            },
1077            label: transition.branch.clone(),
1078        });
1079    }
1080
1081    let worker_lineage = run
1082        .child_runs
1083        .iter()
1084        .map(|child| {
1085            let worker_node_id = format!("worker:{}", child.worker_id);
1086            append_action_graph_node(
1087                &mut action_graph_nodes,
1088                RunActionGraphNodeRecord {
1089                    id: worker_node_id.clone(),
1090                    label: child.worker_name.clone(),
1091                    kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1092                    status: child.status.clone(),
1093                    outcome: child.status.clone(),
1094                    trace_id: propagated_trace_id.clone(),
1095                    stage_id: child.parent_stage_id.clone(),
1096                    node_id: None,
1097                    worker_id: Some(child.worker_id.clone()),
1098                    run_id: child.run_id.clone(),
1099                    run_path: child.run_path.clone(),
1100                },
1101            );
1102            if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1103                if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1104                    action_graph_edges.push(RunActionGraphEdgeRecord {
1105                        from_id: stage_node_id.clone(),
1106                        to_id: worker_node_id,
1107                        kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1108                        label: Some(child.worker_name.clone()),
1109                    });
1110                }
1111            }
1112            RunWorkerLineageRecord {
1113                worker_id: child.worker_id.clone(),
1114                worker_name: child.worker_name.clone(),
1115                parent_stage_id: child.parent_stage_id.clone(),
1116                task: child.task.clone(),
1117                status: child.status.clone(),
1118                session_id: child.session_id.clone(),
1119                parent_session_id: child.parent_session_id.clone(),
1120                run_id: child.run_id.clone(),
1121                run_path: child.run_path.clone(),
1122                snapshot_path: child.snapshot_path.clone(),
1123            }
1124        })
1125        .collect::<Vec<_>>();
1126
1127    if run.transcript.is_some() {
1128        transcript_pointers.push(RunTranscriptPointerRecord {
1129            id: "run:transcript".to_string(),
1130            label: "Run transcript".to_string(),
1131            kind: "embedded_transcript".to_string(),
1132            location: "run.transcript".to_string(),
1133            path: run.persisted_path.clone(),
1134            available: true,
1135        });
1136        if let Some(transcript) = run.transcript.as_ref() {
1137            compaction_events.extend(compaction_events_from_transcript(
1138                transcript,
1139                None,
1140                None,
1141                "run.transcript",
1142                persisted_path,
1143            ));
1144        }
1145    }
1146
1147    if let Some(path) = persisted_path {
1148        if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1149            transcript_pointers.push(RunTranscriptPointerRecord {
1150                id: "run:llm_transcript".to_string(),
1151                label: "LLM transcript sidecar".to_string(),
1152                kind: "llm_jsonl".to_string(),
1153                location: "run sidecar".to_string(),
1154                path: Some(sidecar_path.to_string_lossy().into_owned()),
1155                available: sidecar_path.exists(),
1156            });
1157        }
1158        daemon_events.extend(daemon_events_from_sidecar(path));
1159    }
1160
1161    RunObservabilityRecord {
1162        schema_version: 4,
1163        planner_rounds,
1164        research_fact_count,
1165        action_graph_nodes,
1166        action_graph_edges,
1167        worker_lineage,
1168        verification_outcomes,
1169        transcript_pointers,
1170        compaction_events,
1171        daemon_events,
1172    }
1173}
1174
1175fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1176    run.observability = Some(derive_run_observability(run, persisted_path));
1177}
1178
1179pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1180    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1181    if run.type_name.is_empty() {
1182        run.type_name = "run_record".to_string();
1183    }
1184    if run.id.is_empty() {
1185        run.id = new_id("run");
1186    }
1187    if run.started_at.is_empty() {
1188        run.started_at = now_rfc3339();
1189    }
1190    if run.status.is_empty() {
1191        run.status = "running".to_string();
1192    }
1193    if run.root_run_id.is_none() {
1194        run.root_run_id = Some(run.id.clone());
1195    }
1196    if run.replay_fixture.is_none() {
1197        run.replay_fixture = Some(replay_fixture_from_run(&run));
1198    }
1199    if run.observability.is_none() {
1200        let persisted_path = run.persisted_path.clone();
1201        let persisted = persisted_path.as_deref().map(Path::new);
1202        refresh_run_observability(&mut run, persisted);
1203    }
1204    Ok(run)
1205}
1206
1207pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1208    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1209    if manifest.type_name.is_empty() {
1210        manifest.type_name = "eval_suite_manifest".to_string();
1211    }
1212    if manifest.id.is_empty() {
1213        manifest.id = new_id("eval_suite");
1214    }
1215    Ok(manifest)
1216}
1217
1218fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1219    let content = std::fs::read_to_string(path)
1220        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1221    serde_json::from_str(&content)
1222        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1223}
1224
1225fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1226    let path_buf = PathBuf::from(path);
1227    if path_buf.is_absolute() {
1228        path_buf
1229    } else if let Some(base_dir) = base_dir {
1230        base_dir.join(path_buf)
1231    } else {
1232        path_buf
1233    }
1234}
1235
1236pub fn evaluate_run_suite_manifest(
1237    manifest: &EvalSuiteManifest,
1238) -> Result<ReplayEvalSuiteReport, VmError> {
1239    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1240    let mut reports = Vec::new();
1241    for case in &manifest.cases {
1242        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1243        let run = load_run_record(&run_path)?;
1244        let fixture = match &case.fixture_path {
1245            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1246            None => run
1247                .replay_fixture
1248                .clone()
1249                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1250        };
1251        let eval = evaluate_run_against_fixture(&run, &fixture);
1252        let mut pass = eval.pass;
1253        let mut failures = eval.failures;
1254        let comparison = match &case.compare_to {
1255            Some(path) => {
1256                let baseline_path = resolve_manifest_path(base_dir, path);
1257                let baseline = load_run_record(&baseline_path)?;
1258                let diff = diff_run_records(&baseline, &run);
1259                if !diff.identical {
1260                    pass = false;
1261                    failures.push(format!(
1262                        "run differs from baseline {} with {} stage changes",
1263                        baseline_path.display(),
1264                        diff.stage_diffs.len()
1265                    ));
1266                }
1267                Some(diff)
1268            }
1269            None => None,
1270        };
1271        reports.push(ReplayEvalCaseReport {
1272            run_id: run.id.clone(),
1273            workflow_id: run.workflow_id.clone(),
1274            label: case.label.clone(),
1275            pass,
1276            failures,
1277            stage_count: eval.stage_count,
1278            source_path: Some(run_path.display().to_string()),
1279            comparison,
1280        });
1281    }
1282    let total = reports.len();
1283    let passed = reports.iter().filter(|report| report.pass).count();
1284    let failed = total.saturating_sub(passed);
1285    Ok(ReplayEvalSuiteReport {
1286        pass: failed == 0,
1287        total,
1288        passed,
1289        failed,
1290        cases: reports,
1291    })
1292}
1293
1294/// Edit operation in a diff sequence.
1295#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1296pub(crate) enum DiffOp {
1297    Equal,
1298    Delete,
1299    Insert,
1300}
1301
1302/// Compute the shortest edit script using Myers' O(nd) algorithm.
1303/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
1304/// Time: O(nd) where d = edit distance. Space: O(d * n).
1305pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1306    let n = a.len() as isize;
1307    let m = b.len() as isize;
1308    if n == 0 && m == 0 {
1309        return Vec::new();
1310    }
1311    if n == 0 {
1312        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1313    }
1314    if m == 0 {
1315        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1316    }
1317
1318    let max_d = (n + m) as usize;
1319    let offset = max_d as isize;
1320    let v_size = 2 * max_d + 1;
1321    let mut v = vec![0isize; v_size];
1322    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
1323    let mut trace: Vec<Vec<isize>> = Vec::new();
1324
1325    'outer: for d in 0..=max_d as isize {
1326        trace.push(v.clone());
1327        let mut new_v = v.clone();
1328        for k in (-d..=d).step_by(2) {
1329            let ki = (k + offset) as usize;
1330            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1331                v[ki + 1]
1332            } else {
1333                v[ki - 1] + 1
1334            };
1335            let mut y = x - k;
1336            while x < n && y < m && a[x as usize] == b[y as usize] {
1337                x += 1;
1338                y += 1;
1339            }
1340            new_v[ki] = x;
1341            if x >= n && y >= m {
1342                let _ = new_v;
1343                break 'outer;
1344            }
1345        }
1346        v = new_v;
1347    }
1348
1349    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1350    let mut x = n;
1351    let mut y = m;
1352    for d in (1..trace.len() as isize).rev() {
1353        let k = x - y;
1354        let v_prev = &trace[d as usize];
1355        let prev_k = if k == -d
1356            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1357        {
1358            k + 1
1359        } else {
1360            k - 1
1361        };
1362        let prev_x = v_prev[(prev_k + offset) as usize];
1363        let prev_y = prev_x - prev_k;
1364
1365        while x > prev_x && y > prev_y {
1366            x -= 1;
1367            y -= 1;
1368            ops.push((DiffOp::Equal, x as usize));
1369        }
1370        if prev_k < k {
1371            x -= 1;
1372            ops.push((DiffOp::Delete, x as usize));
1373        } else {
1374            y -= 1;
1375            ops.push((DiffOp::Insert, y as usize));
1376        }
1377    }
1378    while x > 0 && y > 0 {
1379        x -= 1;
1380        y -= 1;
1381        ops.push((DiffOp::Equal, x as usize));
1382    }
1383    ops.reverse();
1384    ops
1385}
1386
1387pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1388    let before_lines: Vec<&str> = before.lines().collect();
1389    let after_lines: Vec<&str> = after.lines().collect();
1390    let ops = myers_diff(&before_lines, &after_lines);
1391
1392    let mut diff = String::new();
1393    let file = path.unwrap_or("artifact");
1394    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1395    for &(op, idx) in &ops {
1396        match op {
1397            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1398            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1399            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1400        }
1401    }
1402    diff
1403}
1404
1405pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1406    let path = path
1407        .map(PathBuf::from)
1408        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1409    let mut materialized = run.clone();
1410    if materialized.replay_fixture.is_none() {
1411        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1412    }
1413    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1414    refresh_run_observability(&mut materialized, Some(&path));
1415    if let Some(parent) = path.parent() {
1416        std::fs::create_dir_all(parent)
1417            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1418    }
1419    let json = serde_json::to_string_pretty(&materialized)
1420        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1421    // Atomic write: .tmp then rename guards against partial writes on kill.
1422    let tmp_path = path.with_extension("json.tmp");
1423    std::fs::write(&tmp_path, &json)
1424        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1425    std::fs::rename(&tmp_path, &path).map_err(|e| {
1426        // Cross-device renames fail on some filesystems; best-effort direct write.
1427        let _ = std::fs::write(&path, &json);
1428        VmError::Runtime(format!("failed to finalize run record: {e}"))
1429    })?;
1430    if let Some(observability) = materialized.observability.as_ref() {
1431        publish_action_graph_event(&materialized, observability, &path);
1432    }
1433    Ok(path.to_string_lossy().into_owned())
1434}
1435
1436pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1437    let content = std::fs::read_to_string(path)
1438        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1439    let mut run: RunRecord = serde_json::from_str(&content)
1440        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1441    if run.replay_fixture.is_none() {
1442        run.replay_fixture = Some(replay_fixture_from_run(&run));
1443    }
1444    run.persisted_path
1445        .get_or_insert_with(|| path.to_string_lossy().into_owned());
1446    refresh_run_observability(&mut run, Some(path));
1447    Ok(run)
1448}
1449
1450pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1451    ReplayFixture {
1452        type_name: "replay_fixture".to_string(),
1453        id: new_id("fixture"),
1454        source_run_id: run.id.clone(),
1455        workflow_id: run.workflow_id.clone(),
1456        workflow_name: run.workflow_name.clone(),
1457        created_at: now_rfc3339(),
1458        expected_status: run.status.clone(),
1459        stage_assertions: run
1460            .stages
1461            .iter()
1462            .map(|stage| ReplayStageAssertion {
1463                node_id: stage.node_id.clone(),
1464                expected_status: stage.status.clone(),
1465                expected_outcome: stage.outcome.clone(),
1466                expected_branch: stage.branch.clone(),
1467                required_artifact_kinds: stage
1468                    .artifacts
1469                    .iter()
1470                    .map(|artifact| artifact.kind.clone())
1471                    .collect(),
1472                visible_text_contains: stage
1473                    .visible_text
1474                    .as_ref()
1475                    .filter(|text| !text.is_empty())
1476                    .map(|text| text.chars().take(80).collect()),
1477            })
1478            .collect(),
1479    }
1480}
1481
1482pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1483    let mut failures = Vec::new();
1484    if run.status != fixture.expected_status {
1485        failures.push(format!(
1486            "run status mismatch: expected {}, got {}",
1487            fixture.expected_status, run.status
1488        ));
1489    }
1490    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1491        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1492    for assertion in &fixture.stage_assertions {
1493        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1494            failures.push(format!("missing stage {}", assertion.node_id));
1495            continue;
1496        };
1497        if stage.status != assertion.expected_status {
1498            failures.push(format!(
1499                "stage {} status mismatch: expected {}, got {}",
1500                assertion.node_id, assertion.expected_status, stage.status
1501            ));
1502        }
1503        if stage.outcome != assertion.expected_outcome {
1504            failures.push(format!(
1505                "stage {} outcome mismatch: expected {}, got {}",
1506                assertion.node_id, assertion.expected_outcome, stage.outcome
1507            ));
1508        }
1509        if stage.branch != assertion.expected_branch {
1510            failures.push(format!(
1511                "stage {} branch mismatch: expected {:?}, got {:?}",
1512                assertion.node_id, assertion.expected_branch, stage.branch
1513            ));
1514        }
1515        for required_kind in &assertion.required_artifact_kinds {
1516            if !stage
1517                .artifacts
1518                .iter()
1519                .any(|artifact| &artifact.kind == required_kind)
1520            {
1521                failures.push(format!(
1522                    "stage {} missing artifact kind {}",
1523                    assertion.node_id, required_kind
1524                ));
1525            }
1526        }
1527        if let Some(snippet) = &assertion.visible_text_contains {
1528            let actual = stage.visible_text.clone().unwrap_or_default();
1529            if !actual.contains(snippet) {
1530                failures.push(format!(
1531                    "stage {} visible text does not contain expected snippet {:?}",
1532                    assertion.node_id, snippet
1533                ));
1534            }
1535        }
1536    }
1537
1538    ReplayEvalReport {
1539        pass: failures.is_empty(),
1540        failures,
1541        stage_count: run.stages.len(),
1542    }
1543}
1544
1545pub fn evaluate_run_suite(
1546    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1547) -> ReplayEvalSuiteReport {
1548    let mut reports = Vec::new();
1549    for (run, fixture, source_path) in cases {
1550        let report = evaluate_run_against_fixture(&run, &fixture);
1551        reports.push(ReplayEvalCaseReport {
1552            run_id: run.id.clone(),
1553            workflow_id: run.workflow_id.clone(),
1554            label: None,
1555            pass: report.pass,
1556            failures: report.failures,
1557            stage_count: report.stage_count,
1558            source_path,
1559            comparison: None,
1560        });
1561    }
1562    let total = reports.len();
1563    let passed = reports.iter().filter(|report| report.pass).count();
1564    let failed = total.saturating_sub(passed);
1565    ReplayEvalSuiteReport {
1566        pass: failed == 0,
1567        total,
1568        passed,
1569        failed,
1570        cases: reports,
1571    }
1572}
1573
1574pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1575    let mut stage_diffs = Vec::new();
1576    let mut all_node_ids = BTreeSet::new();
1577    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1578        .stages
1579        .iter()
1580        .map(|s| (s.node_id.as_str(), s))
1581        .collect();
1582    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1583        .stages
1584        .iter()
1585        .map(|s| (s.node_id.as_str(), s))
1586        .collect();
1587    all_node_ids.extend(left_by_id.keys().copied());
1588    all_node_ids.extend(right_by_id.keys().copied());
1589
1590    for node_id in all_node_ids {
1591        let left_stage = left_by_id.get(node_id).copied();
1592        let right_stage = right_by_id.get(node_id).copied();
1593        match (left_stage, right_stage) {
1594            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1595                node_id: node_id.to_string(),
1596                change: "removed".to_string(),
1597                details: vec!["stage missing from right run".to_string()],
1598            }),
1599            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1600                node_id: node_id.to_string(),
1601                change: "added".to_string(),
1602                details: vec!["stage missing from left run".to_string()],
1603            }),
1604            (Some(left_stage), Some(right_stage)) => {
1605                let mut details = Vec::new();
1606                if left_stage.status != right_stage.status {
1607                    details.push(format!(
1608                        "status: {} -> {}",
1609                        left_stage.status, right_stage.status
1610                    ));
1611                }
1612                if left_stage.outcome != right_stage.outcome {
1613                    details.push(format!(
1614                        "outcome: {} -> {}",
1615                        left_stage.outcome, right_stage.outcome
1616                    ));
1617                }
1618                if left_stage.branch != right_stage.branch {
1619                    details.push(format!(
1620                        "branch: {:?} -> {:?}",
1621                        left_stage.branch, right_stage.branch
1622                    ));
1623                }
1624                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1625                {
1626                    details.push(format!(
1627                        "produced_artifacts: {} -> {}",
1628                        left_stage.produced_artifact_ids.len(),
1629                        right_stage.produced_artifact_ids.len()
1630                    ));
1631                }
1632                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1633                    details.push(format!(
1634                        "artifact_records: {} -> {}",
1635                        left_stage.artifacts.len(),
1636                        right_stage.artifacts.len()
1637                    ));
1638                }
1639                if !details.is_empty() {
1640                    stage_diffs.push(RunStageDiffRecord {
1641                        node_id: node_id.to_string(),
1642                        change: "changed".to_string(),
1643                        details,
1644                    });
1645                }
1646            }
1647            (None, None) => {}
1648        }
1649    }
1650
1651    let mut tool_diffs = Vec::new();
1652    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1653        .tool_recordings
1654        .iter()
1655        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1656        .collect();
1657    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1658        .tool_recordings
1659        .iter()
1660        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1661        .collect();
1662    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1663        .keys()
1664        .chain(right_tools.keys())
1665        .cloned()
1666        .collect();
1667    for key in &all_tool_keys {
1668        let l = left_tools.get(key);
1669        let r = right_tools.get(key);
1670        let result_changed = match (l, r) {
1671            (Some(a), Some(b)) => a.result != b.result,
1672            _ => true,
1673        };
1674        if result_changed {
1675            tool_diffs.push(ToolCallDiffRecord {
1676                tool_name: key.0.clone(),
1677                args_hash: key.1.clone(),
1678                result_changed,
1679                left_result: l.map(|t| t.result.clone()),
1680                right_result: r.map(|t| t.result.clone()),
1681            });
1682        }
1683    }
1684
1685    let left_observability = left.observability.clone().unwrap_or_else(|| {
1686        derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1687    });
1688    let right_observability = right.observability.clone().unwrap_or_else(|| {
1689        derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1690    });
1691    let mut observability_diffs = Vec::new();
1692
1693    let left_workers = left_observability
1694        .worker_lineage
1695        .iter()
1696        .map(|worker| {
1697            (
1698                worker.worker_id.clone(),
1699                (
1700                    worker.status.clone(),
1701                    worker.run_id.clone(),
1702                    worker.run_path.clone(),
1703                ),
1704            )
1705        })
1706        .collect::<BTreeMap<_, _>>();
1707    let right_workers = right_observability
1708        .worker_lineage
1709        .iter()
1710        .map(|worker| {
1711            (
1712                worker.worker_id.clone(),
1713                (
1714                    worker.status.clone(),
1715                    worker.run_id.clone(),
1716                    worker.run_path.clone(),
1717                ),
1718            )
1719        })
1720        .collect::<BTreeMap<_, _>>();
1721    let worker_ids = left_workers
1722        .keys()
1723        .chain(right_workers.keys())
1724        .cloned()
1725        .collect::<BTreeSet<_>>();
1726    for worker_id in worker_ids {
1727        match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1728            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1729                section: "worker_lineage".to_string(),
1730                label: worker_id,
1731                details: vec!["worker missing from right run".to_string()],
1732            }),
1733            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1734                section: "worker_lineage".to_string(),
1735                label: worker_id,
1736                details: vec!["worker missing from left run".to_string()],
1737            }),
1738            (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1739                let mut details = Vec::new();
1740                if left_worker.0 != right_worker.0 {
1741                    details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1742                }
1743                if left_worker.1 != right_worker.1 {
1744                    details.push(format!(
1745                        "run_id: {:?} -> {:?}",
1746                        left_worker.1, right_worker.1
1747                    ));
1748                }
1749                if left_worker.2 != right_worker.2 {
1750                    details.push(format!(
1751                        "run_path: {:?} -> {:?}",
1752                        left_worker.2, right_worker.2
1753                    ));
1754                }
1755                observability_diffs.push(RunObservabilityDiffRecord {
1756                    section: "worker_lineage".to_string(),
1757                    label: worker_id,
1758                    details,
1759                });
1760            }
1761            _ => {}
1762        }
1763    }
1764
1765    let left_rounds = left_observability
1766        .planner_rounds
1767        .iter()
1768        .map(|round| (round.stage_id.clone(), round))
1769        .collect::<BTreeMap<_, _>>();
1770    let right_rounds = right_observability
1771        .planner_rounds
1772        .iter()
1773        .map(|round| (round.stage_id.clone(), round))
1774        .collect::<BTreeMap<_, _>>();
1775    let round_ids = left_rounds
1776        .keys()
1777        .chain(right_rounds.keys())
1778        .cloned()
1779        .collect::<BTreeSet<_>>();
1780    for stage_id in round_ids {
1781        match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1782            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1783                section: "planner_rounds".to_string(),
1784                label: stage_id,
1785                details: vec!["planner summary missing from right run".to_string()],
1786            }),
1787            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1788                section: "planner_rounds".to_string(),
1789                label: stage_id,
1790                details: vec!["planner summary missing from left run".to_string()],
1791            }),
1792            (Some(left_round), Some(right_round)) => {
1793                let mut details = Vec::new();
1794                if left_round.iteration_count != right_round.iteration_count {
1795                    details.push(format!(
1796                        "iterations: {} -> {}",
1797                        left_round.iteration_count, right_round.iteration_count
1798                    ));
1799                }
1800                if left_round.tool_execution_count != right_round.tool_execution_count {
1801                    details.push(format!(
1802                        "tool_executions: {} -> {}",
1803                        left_round.tool_execution_count, right_round.tool_execution_count
1804                    ));
1805                }
1806                if left_round.research_facts != right_round.research_facts {
1807                    details.push(format!(
1808                        "research_facts: {:?} -> {:?}",
1809                        left_round.research_facts, right_round.research_facts
1810                    ));
1811                }
1812                let left_deliverables = left_round
1813                    .task_ledger
1814                    .as_ref()
1815                    .map(|ledger| {
1816                        ledger
1817                            .deliverables
1818                            .iter()
1819                            .map(|item| format!("{}:{}", item.id, item.status))
1820                            .collect::<Vec<_>>()
1821                    })
1822                    .unwrap_or_default();
1823                let right_deliverables = right_round
1824                    .task_ledger
1825                    .as_ref()
1826                    .map(|ledger| {
1827                        ledger
1828                            .deliverables
1829                            .iter()
1830                            .map(|item| format!("{}:{}", item.id, item.status))
1831                            .collect::<Vec<_>>()
1832                    })
1833                    .unwrap_or_default();
1834                if left_deliverables != right_deliverables {
1835                    details.push(format!(
1836                        "deliverables: {:?} -> {:?}",
1837                        left_deliverables, right_deliverables
1838                    ));
1839                }
1840                if left_round.successful_tools != right_round.successful_tools {
1841                    details.push(format!(
1842                        "successful_tools: {:?} -> {:?}",
1843                        left_round.successful_tools, right_round.successful_tools
1844                    ));
1845                }
1846                if !details.is_empty() {
1847                    observability_diffs.push(RunObservabilityDiffRecord {
1848                        section: "planner_rounds".to_string(),
1849                        label: left_round.node_id.clone(),
1850                        details,
1851                    });
1852                }
1853            }
1854            _ => {}
1855        }
1856    }
1857
1858    let left_pointers = left_observability
1859        .transcript_pointers
1860        .iter()
1861        .map(|pointer| {
1862            (
1863                pointer.id.clone(),
1864                (
1865                    pointer.available,
1866                    pointer.path.clone(),
1867                    pointer.location.clone(),
1868                ),
1869            )
1870        })
1871        .collect::<BTreeMap<_, _>>();
1872    let right_pointers = right_observability
1873        .transcript_pointers
1874        .iter()
1875        .map(|pointer| {
1876            (
1877                pointer.id.clone(),
1878                (
1879                    pointer.available,
1880                    pointer.path.clone(),
1881                    pointer.location.clone(),
1882                ),
1883            )
1884        })
1885        .collect::<BTreeMap<_, _>>();
1886    let pointer_ids = left_pointers
1887        .keys()
1888        .chain(right_pointers.keys())
1889        .cloned()
1890        .collect::<BTreeSet<_>>();
1891    for pointer_id in pointer_ids {
1892        match (
1893            left_pointers.get(&pointer_id),
1894            right_pointers.get(&pointer_id),
1895        ) {
1896            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1897                section: "transcript_pointers".to_string(),
1898                label: pointer_id,
1899                details: vec!["pointer missing from right run".to_string()],
1900            }),
1901            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1902                section: "transcript_pointers".to_string(),
1903                label: pointer_id,
1904                details: vec!["pointer missing from left run".to_string()],
1905            }),
1906            (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
1907                observability_diffs.push(RunObservabilityDiffRecord {
1908                    section: "transcript_pointers".to_string(),
1909                    label: pointer_id,
1910                    details: vec![format!(
1911                        "pointer: {:?} -> {:?}",
1912                        left_pointer, right_pointer
1913                    )],
1914                });
1915            }
1916            _ => {}
1917        }
1918    }
1919
1920    let left_compactions = left_observability
1921        .compaction_events
1922        .iter()
1923        .map(|event| {
1924            (
1925                event.id.clone(),
1926                (
1927                    event.strategy.clone(),
1928                    event.archived_messages,
1929                    event.snapshot_asset_id.clone(),
1930                    event.available,
1931                ),
1932            )
1933        })
1934        .collect::<BTreeMap<_, _>>();
1935    let right_compactions = right_observability
1936        .compaction_events
1937        .iter()
1938        .map(|event| {
1939            (
1940                event.id.clone(),
1941                (
1942                    event.strategy.clone(),
1943                    event.archived_messages,
1944                    event.snapshot_asset_id.clone(),
1945                    event.available,
1946                ),
1947            )
1948        })
1949        .collect::<BTreeMap<_, _>>();
1950    let compaction_ids = left_compactions
1951        .keys()
1952        .chain(right_compactions.keys())
1953        .cloned()
1954        .collect::<BTreeSet<_>>();
1955    for compaction_id in compaction_ids {
1956        match (
1957            left_compactions.get(&compaction_id),
1958            right_compactions.get(&compaction_id),
1959        ) {
1960            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1961                section: "compaction_events".to_string(),
1962                label: compaction_id,
1963                details: vec!["compaction event missing from right run".to_string()],
1964            }),
1965            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1966                section: "compaction_events".to_string(),
1967                label: compaction_id,
1968                details: vec!["compaction event missing from left run".to_string()],
1969            }),
1970            (Some(left_event), Some(right_event)) if left_event != right_event => {
1971                observability_diffs.push(RunObservabilityDiffRecord {
1972                    section: "compaction_events".to_string(),
1973                    label: compaction_id,
1974                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
1975                });
1976            }
1977            _ => {}
1978        }
1979    }
1980
1981    let left_daemons = left_observability
1982        .daemon_events
1983        .iter()
1984        .map(|event| {
1985            (
1986                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
1987                (
1988                    event.name.clone(),
1989                    event.persist_path.clone(),
1990                    event.payload_summary.clone(),
1991                ),
1992            )
1993        })
1994        .collect::<BTreeMap<_, _>>();
1995    let right_daemons = right_observability
1996        .daemon_events
1997        .iter()
1998        .map(|event| {
1999            (
2000                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2001                (
2002                    event.name.clone(),
2003                    event.persist_path.clone(),
2004                    event.payload_summary.clone(),
2005                ),
2006            )
2007        })
2008        .collect::<BTreeMap<_, _>>();
2009    let daemon_keys = left_daemons
2010        .keys()
2011        .chain(right_daemons.keys())
2012        .cloned()
2013        .collect::<BTreeSet<_>>();
2014    for daemon_key in daemon_keys {
2015        let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2016        match (
2017            left_daemons.get(&daemon_key),
2018            right_daemons.get(&daemon_key),
2019        ) {
2020            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2021                section: "daemon_events".to_string(),
2022                label,
2023                details: vec!["daemon event missing from right run".to_string()],
2024            }),
2025            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2026                section: "daemon_events".to_string(),
2027                label,
2028                details: vec!["daemon event missing from left run".to_string()],
2029            }),
2030            (Some(left_event), Some(right_event)) if left_event != right_event => {
2031                observability_diffs.push(RunObservabilityDiffRecord {
2032                    section: "daemon_events".to_string(),
2033                    label,
2034                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2035                });
2036            }
2037            _ => {}
2038        }
2039    }
2040
2041    let left_verification = left_observability
2042        .verification_outcomes
2043        .iter()
2044        .map(|item| (item.stage_id.clone(), item))
2045        .collect::<BTreeMap<_, _>>();
2046    let right_verification = right_observability
2047        .verification_outcomes
2048        .iter()
2049        .map(|item| (item.stage_id.clone(), item))
2050        .collect::<BTreeMap<_, _>>();
2051    let verification_ids = left_verification
2052        .keys()
2053        .chain(right_verification.keys())
2054        .cloned()
2055        .collect::<BTreeSet<_>>();
2056    for stage_id in verification_ids {
2057        match (
2058            left_verification.get(&stage_id),
2059            right_verification.get(&stage_id),
2060        ) {
2061            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2062                section: "verification".to_string(),
2063                label: stage_id,
2064                details: vec!["verification missing from right run".to_string()],
2065            }),
2066            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2067                section: "verification".to_string(),
2068                label: stage_id,
2069                details: vec!["verification missing from left run".to_string()],
2070            }),
2071            (Some(left_item), Some(right_item)) if left_item != right_item => {
2072                let mut details = Vec::new();
2073                if left_item.passed != right_item.passed {
2074                    details.push(format!(
2075                        "passed: {:?} -> {:?}",
2076                        left_item.passed, right_item.passed
2077                    ));
2078                }
2079                if left_item.summary != right_item.summary {
2080                    details.push(format!(
2081                        "summary: {:?} -> {:?}",
2082                        left_item.summary, right_item.summary
2083                    ));
2084                }
2085                observability_diffs.push(RunObservabilityDiffRecord {
2086                    section: "verification".to_string(),
2087                    label: left_item.node_id.clone(),
2088                    details,
2089                });
2090            }
2091            _ => {}
2092        }
2093    }
2094
2095    let left_graph = (
2096        left_observability.action_graph_nodes.len(),
2097        left_observability.action_graph_edges.len(),
2098    );
2099    let right_graph = (
2100        right_observability.action_graph_nodes.len(),
2101        right_observability.action_graph_edges.len(),
2102    );
2103    if left_graph != right_graph {
2104        observability_diffs.push(RunObservabilityDiffRecord {
2105            section: "action_graph".to_string(),
2106            label: "shape".to_string(),
2107            details: vec![format!(
2108                "nodes/edges: {}/{} -> {}/{}",
2109                left_graph.0, left_graph.1, right_graph.0, right_graph.1
2110            )],
2111        });
2112    }
2113
2114    let status_changed = left.status != right.status;
2115    let identical = !status_changed
2116        && stage_diffs.is_empty()
2117        && tool_diffs.is_empty()
2118        && observability_diffs.is_empty()
2119        && left.transitions.len() == right.transitions.len()
2120        && left.artifacts.len() == right.artifacts.len()
2121        && left.checkpoints.len() == right.checkpoints.len();
2122
2123    RunDiffReport {
2124        left_run_id: left.id.clone(),
2125        right_run_id: right.id.clone(),
2126        identical,
2127        status_changed,
2128        left_status: left.status.clone(),
2129        right_status: right.status.clone(),
2130        stage_diffs,
2131        tool_diffs,
2132        observability_diffs,
2133        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2134        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2135        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2136    }
2137}