Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10    CapabilityPolicy,
11};
12use crate::event_log::{active_event_log, EventLog, LogEvent as EventLogRecord, Topic};
13use crate::llm::vm_value_to_json;
14use crate::triggers::{SignatureStatus, TriggerEvent};
15use crate::value::{VmError, VmValue};
16
17pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
18pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
19pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
21pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
22pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
23pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
24pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
25pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
26pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
27pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
28
29pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
30pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
31pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
32pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
33pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
34pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
35pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
36pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
37pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
38
39#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
40#[serde(default)]
41pub struct LlmUsageRecord {
42    pub input_tokens: i64,
43    pub output_tokens: i64,
44    pub total_duration_ms: i64,
45    pub call_count: i64,
46    pub total_cost: f64,
47    pub models: Vec<String>,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
51#[serde(default)]
52pub struct RunStageRecord {
53    pub id: String,
54    pub node_id: String,
55    pub kind: String,
56    pub status: String,
57    pub outcome: String,
58    pub branch: Option<String>,
59    pub started_at: String,
60    pub finished_at: Option<String>,
61    pub visible_text: Option<String>,
62    pub private_reasoning: Option<String>,
63    pub transcript: Option<serde_json::Value>,
64    pub verification: Option<serde_json::Value>,
65    pub usage: Option<LlmUsageRecord>,
66    pub artifacts: Vec<ArtifactRecord>,
67    pub consumed_artifact_ids: Vec<String>,
68    pub produced_artifact_ids: Vec<String>,
69    pub attempts: Vec<RunStageAttemptRecord>,
70    pub metadata: BTreeMap<String, serde_json::Value>,
71}
72
73#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
74#[serde(default)]
75pub struct RunStageAttemptRecord {
76    pub attempt: usize,
77    pub status: String,
78    pub outcome: String,
79    pub branch: Option<String>,
80    pub error: Option<String>,
81    pub verification: Option<serde_json::Value>,
82    pub started_at: String,
83    pub finished_at: Option<String>,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct RunTransitionRecord {
89    pub id: String,
90    pub from_stage_id: Option<String>,
91    pub from_node_id: Option<String>,
92    pub to_node_id: String,
93    pub branch: Option<String>,
94    pub timestamp: String,
95    pub consumed_artifact_ids: Vec<String>,
96    pub produced_artifact_ids: Vec<String>,
97}
98
99#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
100#[serde(default)]
101pub struct RunCheckpointRecord {
102    pub id: String,
103    pub ready_nodes: Vec<String>,
104    pub completed_nodes: Vec<String>,
105    pub last_stage_id: Option<String>,
106    pub persisted_at: String,
107    pub reason: String,
108}
109
110#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
111#[serde(default)]
112pub struct ReplayFixture {
113    #[serde(rename = "_type")]
114    pub type_name: String,
115    pub id: String,
116    pub source_run_id: String,
117    pub workflow_id: String,
118    pub workflow_name: Option<String>,
119    pub created_at: String,
120    pub expected_status: String,
121    pub stage_assertions: Vec<ReplayStageAssertion>,
122}
123
124#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
125#[serde(default)]
126pub struct ReplayStageAssertion {
127    pub node_id: String,
128    pub expected_status: String,
129    pub expected_outcome: String,
130    pub expected_branch: Option<String>,
131    pub required_artifact_kinds: Vec<String>,
132    pub visible_text_contains: Option<String>,
133}
134
135#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
136#[serde(default)]
137pub struct ReplayEvalReport {
138    pub pass: bool,
139    pub failures: Vec<String>,
140    pub stage_count: usize,
141}
142
143#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
144#[serde(default)]
145pub struct ReplayEvalCaseReport {
146    pub run_id: String,
147    pub workflow_id: String,
148    pub label: Option<String>,
149    pub pass: bool,
150    pub failures: Vec<String>,
151    pub stage_count: usize,
152    pub source_path: Option<String>,
153    pub comparison: Option<RunDiffReport>,
154}
155
156#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
157#[serde(default)]
158pub struct ReplayEvalSuiteReport {
159    pub pass: bool,
160    pub total: usize,
161    pub passed: usize,
162    pub failed: usize,
163    pub cases: Vec<ReplayEvalCaseReport>,
164}
165
166#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
167#[serde(default)]
168pub struct RunDeliverableSummaryRecord {
169    pub id: String,
170    pub text: String,
171    pub status: String,
172    pub note: Option<String>,
173}
174
175#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
176#[serde(default)]
177pub struct RunTaskLedgerSummaryRecord {
178    pub root_task: String,
179    pub rationale: String,
180    pub deliverables: Vec<RunDeliverableSummaryRecord>,
181    pub observations: Vec<String>,
182    pub blocking_count: usize,
183}
184
185#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
186#[serde(default)]
187pub struct RunPlannerRoundRecord {
188    pub stage_id: String,
189    pub node_id: String,
190    pub stage_kind: String,
191    pub status: String,
192    pub outcome: String,
193    pub iteration_count: usize,
194    pub llm_call_count: usize,
195    pub tool_execution_count: usize,
196    pub tool_rejection_count: usize,
197    pub intervention_count: usize,
198    pub compaction_count: usize,
199    pub native_text_tool_fallback_count: usize,
200    pub native_text_tool_fallback_rejection_count: usize,
201    pub empty_completion_retry_count: usize,
202    pub tools_used: Vec<String>,
203    pub successful_tools: Vec<String>,
204    pub ledger_done_rejections: usize,
205    pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
206    pub research_facts: Vec<String>,
207}
208
209#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RunWorkerLineageRecord {
212    pub worker_id: String,
213    pub worker_name: String,
214    pub parent_stage_id: Option<String>,
215    pub task: String,
216    pub status: String,
217    pub session_id: Option<String>,
218    pub parent_session_id: Option<String>,
219    pub run_id: Option<String>,
220    pub run_path: Option<String>,
221    pub snapshot_path: Option<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunActionGraphNodeRecord {
227    pub id: String,
228    pub label: String,
229    pub kind: String,
230    pub status: String,
231    pub outcome: String,
232    pub trace_id: Option<String>,
233    pub stage_id: Option<String>,
234    pub node_id: Option<String>,
235    pub worker_id: Option<String>,
236    pub run_id: Option<String>,
237    pub run_path: Option<String>,
238    pub metadata: BTreeMap<String, serde_json::Value>,
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct RunActionGraphEdgeRecord {
244    pub from_id: String,
245    pub to_id: String,
246    pub kind: String,
247    pub label: Option<String>,
248}
249
250#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
251#[serde(default)]
252pub struct RunVerificationOutcomeRecord {
253    pub stage_id: String,
254    pub node_id: String,
255    pub status: String,
256    pub passed: Option<bool>,
257    pub summary: Option<String>,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct RunTranscriptPointerRecord {
263    pub id: String,
264    pub label: String,
265    pub kind: String,
266    pub location: String,
267    pub path: Option<String>,
268    pub available: bool,
269}
270
271#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
272#[serde(default)]
273pub struct CompactionEventRecord {
274    pub id: String,
275    pub transcript_id: Option<String>,
276    pub stage_id: Option<String>,
277    pub node_id: Option<String>,
278    pub mode: String,
279    pub strategy: String,
280    pub archived_messages: usize,
281    pub estimated_tokens_before: usize,
282    pub estimated_tokens_after: usize,
283    pub snapshot_asset_id: Option<String>,
284    pub snapshot_location: String,
285    pub snapshot_path: Option<String>,
286    pub available: bool,
287}
288
289#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
290#[serde(rename_all = "snake_case")]
291pub enum DaemonEventKindRecord {
292    #[default]
293    Spawned,
294    Triggered,
295    Snapshotted,
296    Resumed,
297    Stopped,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct DaemonEventRecord {
303    pub daemon_id: String,
304    pub name: String,
305    pub kind: DaemonEventKindRecord,
306    pub timestamp: String,
307    pub persist_path: String,
308    pub payload_summary: Option<String>,
309}
310
311#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
312#[serde(default)]
313pub struct RunObservabilityRecord {
314    pub schema_version: usize,
315    pub planner_rounds: Vec<RunPlannerRoundRecord>,
316    pub research_fact_count: usize,
317    pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
318    pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
319    pub worker_lineage: Vec<RunWorkerLineageRecord>,
320    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
321    pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
322    pub compaction_events: Vec<CompactionEventRecord>,
323    pub daemon_events: Vec<DaemonEventRecord>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunStageDiffRecord {
329    pub node_id: String,
330    pub change: String,
331    pub details: Vec<String>,
332}
333
334#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
335#[serde(default)]
336pub struct ToolCallDiffRecord {
337    pub tool_name: String,
338    pub args_hash: String,
339    pub result_changed: bool,
340    pub left_result: Option<String>,
341    pub right_result: Option<String>,
342}
343
344#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
345#[serde(default)]
346pub struct RunObservabilityDiffRecord {
347    pub section: String,
348    pub label: String,
349    pub details: Vec<String>,
350}
351
352#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
353#[serde(default)]
354pub struct RunDiffReport {
355    pub left_run_id: String,
356    pub right_run_id: String,
357    pub identical: bool,
358    pub status_changed: bool,
359    pub left_status: String,
360    pub right_status: String,
361    pub stage_diffs: Vec<RunStageDiffRecord>,
362    pub tool_diffs: Vec<ToolCallDiffRecord>,
363    pub observability_diffs: Vec<RunObservabilityDiffRecord>,
364    pub transition_count_delta: isize,
365    pub artifact_count_delta: isize,
366    pub checkpoint_count_delta: isize,
367}
368
369#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
370#[serde(default)]
371pub struct EvalSuiteManifest {
372    #[serde(rename = "_type")]
373    pub type_name: String,
374    pub id: String,
375    pub name: Option<String>,
376    pub base_dir: Option<String>,
377    pub cases: Vec<EvalSuiteCase>,
378}
379
380#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
381#[serde(default)]
382pub struct EvalSuiteCase {
383    pub label: Option<String>,
384    pub run_path: String,
385    pub fixture_path: Option<String>,
386    pub compare_to: Option<String>,
387}
388
389#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
390#[serde(default)]
391pub struct RunRecord {
392    #[serde(rename = "_type")]
393    pub type_name: String,
394    pub id: String,
395    pub workflow_id: String,
396    pub workflow_name: Option<String>,
397    pub task: String,
398    pub status: String,
399    pub started_at: String,
400    pub finished_at: Option<String>,
401    pub parent_run_id: Option<String>,
402    pub root_run_id: Option<String>,
403    pub stages: Vec<RunStageRecord>,
404    pub transitions: Vec<RunTransitionRecord>,
405    pub checkpoints: Vec<RunCheckpointRecord>,
406    pub pending_nodes: Vec<String>,
407    pub completed_nodes: Vec<String>,
408    pub child_runs: Vec<RunChildRecord>,
409    pub artifacts: Vec<ArtifactRecord>,
410    pub policy: CapabilityPolicy,
411    pub execution: Option<RunExecutionRecord>,
412    pub transcript: Option<serde_json::Value>,
413    pub usage: Option<LlmUsageRecord>,
414    pub replay_fixture: Option<ReplayFixture>,
415    pub observability: Option<RunObservabilityRecord>,
416    pub trace_spans: Vec<RunTraceSpanRecord>,
417    pub tool_recordings: Vec<ToolCallRecord>,
418    pub metadata: BTreeMap<String, serde_json::Value>,
419    pub persisted_path: Option<String>,
420}
421
422#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
423#[serde(default)]
424pub struct ToolCallRecord {
425    pub tool_name: String,
426    pub tool_use_id: String,
427    pub args_hash: String,
428    pub result: String,
429    pub is_rejected: bool,
430    pub duration_ms: u64,
431    pub iteration: usize,
432    pub timestamp: String,
433}
434
435/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
436pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
437    use std::hash::{Hash, Hasher};
438    let mut hasher = std::collections::hash_map::DefaultHasher::new();
439    tool_name.hash(&mut hasher);
440    let args_str = serde_json::to_string(args).unwrap_or_default();
441    args_str.hash(&mut hasher);
442    format!("{:016x}", hasher.finish())
443}
444
445#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
446#[serde(default)]
447pub struct RunTraceSpanRecord {
448    pub span_id: u64,
449    pub parent_id: Option<u64>,
450    pub kind: String,
451    pub name: String,
452    pub start_ms: u64,
453    pub duration_ms: u64,
454    pub metadata: BTreeMap<String, serde_json::Value>,
455}
456
457#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(default)]
459pub struct RunChildRecord {
460    pub worker_id: String,
461    pub worker_name: String,
462    pub parent_stage_id: Option<String>,
463    pub session_id: Option<String>,
464    pub parent_session_id: Option<String>,
465    pub mutation_scope: Option<String>,
466    pub approval_policy: Option<super::ToolApprovalPolicy>,
467    pub task: String,
468    pub request: Option<serde_json::Value>,
469    pub provenance: Option<serde_json::Value>,
470    pub status: String,
471    pub started_at: String,
472    pub finished_at: Option<String>,
473    pub run_id: Option<String>,
474    pub run_path: Option<String>,
475    pub snapshot_path: Option<String>,
476    pub execution: Option<RunExecutionRecord>,
477}
478
479#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
480#[serde(default)]
481pub struct RunExecutionRecord {
482    pub cwd: Option<String>,
483    pub source_dir: Option<String>,
484    pub env: BTreeMap<String, String>,
485    pub adapter: Option<String>,
486    pub repo_path: Option<String>,
487    pub worktree_path: Option<String>,
488    pub branch: Option<String>,
489    pub base_ref: Option<String>,
490    pub cleanup: Option<String>,
491}
492
493fn compact_json_value(value: &serde_json::Value) -> String {
494    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
495}
496
497fn signature_status_label(status: &SignatureStatus) -> &'static str {
498    match status {
499        SignatureStatus::Verified => "verified",
500        SignatureStatus::Unsigned => "unsigned",
501        SignatureStatus::Failed { .. } => "failed",
502    }
503}
504
505fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
506    run.metadata
507        .get("trigger_event")
508        .cloned()
509        .and_then(|value| serde_json::from_value(value).ok())
510}
511
512fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
513    trigger_event
514        .map(|event| event.trace_id.0.clone())
515        .or_else(|| {
516            run.metadata
517                .get("trace_id")
518                .and_then(|value| value.as_str())
519                .map(str::to_string)
520        })
521}
522
523fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
524    run.metadata
525        .get("replay_of_event_id")
526        .and_then(|value| value.as_str())
527        .map(str::to_string)
528}
529
530fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
531    if stage.kind == "condition" {
532        ACTION_GRAPH_NODE_KIND_PREDICATE
533    } else {
534        ACTION_GRAPH_NODE_KIND_STAGE
535    }
536}
537
538fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
539    let mut metadata = BTreeMap::new();
540    metadata.insert(
541        "provider".to_string(),
542        serde_json::json!(trigger_event.provider.as_str()),
543    );
544    metadata.insert(
545        "event_kind".to_string(),
546        serde_json::json!(trigger_event.kind),
547    );
548    metadata.insert(
549        "dedupe_key".to_string(),
550        serde_json::json!(trigger_event.dedupe_key),
551    );
552    metadata.insert(
553        "signature_status".to_string(),
554        serde_json::json!(signature_status_label(&trigger_event.signature_status)),
555    );
556    metadata
557}
558
559fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
560    let mut metadata = BTreeMap::new();
561    metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
562    if let Some(branch) = stage.branch.as_ref() {
563        metadata.insert("branch".to_string(), serde_json::json!(branch));
564    }
565    if let Some(worker_id) = stage
566        .metadata
567        .get("worker_id")
568        .and_then(|value| value.as_str())
569    {
570        metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
571    }
572    metadata
573}
574
575fn append_action_graph_node(
576    nodes: &mut Vec<RunActionGraphNodeRecord>,
577    record: RunActionGraphNodeRecord,
578) {
579    nodes.push(record);
580}
581
582pub async fn append_action_graph_update(
583    headers: BTreeMap<String, String>,
584    payload: serde_json::Value,
585) -> Result<(), crate::event_log::LogError> {
586    let Some(log) = active_event_log() else {
587        return Ok(());
588    };
589    let topic = Topic::new("observability.action_graph")
590        .expect("static observability.action_graph topic should always be valid");
591    let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
592    log.append(&topic, record).await.map(|_| ())
593}
594
595fn publish_action_graph_event(
596    run: &RunRecord,
597    observability: &RunObservabilityRecord,
598    path: &Path,
599) {
600    let trigger_event = trigger_event_from_run(run);
601    let mut headers = BTreeMap::new();
602    headers.insert("run_id".to_string(), run.id.clone());
603    headers.insert("workflow_id".to_string(), run.workflow_id.clone());
604    if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
605        headers.insert("trace_id".to_string(), trace_id);
606    }
607    let payload = serde_json::json!({
608        "run_id": run.id,
609        "workflow_id": run.workflow_id,
610        "persisted_path": path.to_string_lossy(),
611        "status": run.status,
612        "observability": observability,
613    });
614    if let Ok(handle) = tokio::runtime::Handle::try_current() {
615        handle.spawn(async move {
616            let _ = append_action_graph_update(headers, payload).await;
617        });
618    } else {
619        let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
620    }
621}
622
623fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
624    let stem = run_path.file_stem()?.to_str()?;
625    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
626    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
627}
628
629fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
630    value
631        .and_then(|value| value.as_array())
632        .map(|items| {
633            items
634                .iter()
635                .filter_map(|item| item.as_str().map(str::to_string))
636                .collect::<Vec<_>>()
637        })
638        .unwrap_or_default()
639}
640
641fn json_usize(value: Option<&serde_json::Value>) -> usize {
642    value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
643}
644
645fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
646    value.and_then(|value| value.as_bool())
647}
648
649fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
650    stage
651        .artifacts
652        .iter()
653        .find_map(|artifact| artifact.data.as_ref())
654}
655
656fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
657    let deliverables = value
658        .get("deliverables")
659        .and_then(|raw| raw.as_array())
660        .map(|items| {
661            items
662                .iter()
663                .map(|item| RunDeliverableSummaryRecord {
664                    id: item
665                        .get("id")
666                        .and_then(|value| value.as_str())
667                        .unwrap_or_default()
668                        .to_string(),
669                    text: item
670                        .get("text")
671                        .and_then(|value| value.as_str())
672                        .unwrap_or_default()
673                        .to_string(),
674                    status: item
675                        .get("status")
676                        .and_then(|value| value.as_str())
677                        .unwrap_or_default()
678                        .to_string(),
679                    note: item
680                        .get("note")
681                        .and_then(|value| value.as_str())
682                        .map(str::to_string),
683                })
684                .collect::<Vec<_>>()
685        })
686        .unwrap_or_default();
687    let observations = json_string_array(value.get("observations"));
688    let root_task = value
689        .get("root_task")
690        .and_then(|value| value.as_str())
691        .unwrap_or_default()
692        .to_string();
693    let rationale = value
694        .get("rationale")
695        .and_then(|value| value.as_str())
696        .unwrap_or_default()
697        .to_string();
698    if root_task.is_empty()
699        && rationale.is_empty()
700        && deliverables.is_empty()
701        && observations.is_empty()
702    {
703        return None;
704    }
705    let blocking_count = deliverables
706        .iter()
707        .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
708        .count();
709    Some(RunTaskLedgerSummaryRecord {
710        root_task,
711        rationale,
712        deliverables,
713        observations,
714        blocking_count,
715    })
716}
717
718fn compaction_events_from_transcript(
719    transcript: &serde_json::Value,
720    stage_id: Option<&str>,
721    node_id: Option<&str>,
722    location_prefix: &str,
723    persisted_path: Option<&Path>,
724) -> Vec<CompactionEventRecord> {
725    let transcript_id = transcript
726        .get("id")
727        .and_then(|value| value.as_str())
728        .map(str::to_string);
729    let asset_ids = transcript
730        .get("assets")
731        .and_then(|value| value.as_array())
732        .map(|assets| {
733            assets
734                .iter()
735                .filter_map(|asset| {
736                    asset
737                        .get("id")
738                        .and_then(|value| value.as_str())
739                        .map(str::to_string)
740                })
741                .collect::<BTreeSet<_>>()
742        })
743        .unwrap_or_default();
744    transcript
745        .get("events")
746        .and_then(|value| value.as_array())
747        .map(|events| {
748            events
749                .iter()
750                .filter(|event| {
751                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
752                })
753                .map(|event| {
754                    let metadata = event.get("metadata");
755                    let snapshot_asset_id = metadata
756                        .and_then(|value| value.get("snapshot_asset_id"))
757                        .and_then(|value| value.as_str())
758                        .map(str::to_string);
759                    let available = snapshot_asset_id
760                        .as_ref()
761                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
762                    let snapshot_location = snapshot_asset_id
763                        .as_ref()
764                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
765                        .unwrap_or_else(|| location_prefix.to_string());
766                    CompactionEventRecord {
767                        id: event
768                            .get("id")
769                            .and_then(|value| value.as_str())
770                            .unwrap_or_default()
771                            .to_string(),
772                        transcript_id: transcript_id.clone(),
773                        stage_id: stage_id.map(str::to_string),
774                        node_id: node_id.map(str::to_string),
775                        mode: metadata
776                            .and_then(|value| value.get("mode"))
777                            .and_then(|value| value.as_str())
778                            .unwrap_or_default()
779                            .to_string(),
780                        strategy: metadata
781                            .and_then(|value| value.get("strategy"))
782                            .and_then(|value| value.as_str())
783                            .unwrap_or_default()
784                            .to_string(),
785                        archived_messages: json_usize(
786                            metadata.and_then(|value| value.get("archived_messages")),
787                        ),
788                        estimated_tokens_before: json_usize(
789                            metadata.and_then(|value| value.get("estimated_tokens_before")),
790                        ),
791                        estimated_tokens_after: json_usize(
792                            metadata.and_then(|value| value.get("estimated_tokens_after")),
793                        ),
794                        snapshot_asset_id,
795                        snapshot_location,
796                        snapshot_path: persisted_path
797                            .map(|path| path.to_string_lossy().into_owned()),
798                        available,
799                    }
800                })
801                .collect()
802        })
803        .unwrap_or_default()
804}
805
806fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
807    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
808        return Vec::new();
809    };
810    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
811        return Vec::new();
812    };
813
814    content
815        .lines()
816        .filter(|line| !line.trim().is_empty())
817        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
818        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
819        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
820        .collect()
821}
822
823pub fn derive_run_observability(
824    run: &RunRecord,
825    persisted_path: Option<&Path>,
826) -> RunObservabilityRecord {
827    let mut action_graph_nodes = Vec::new();
828    let mut action_graph_edges = Vec::new();
829    let mut verification_outcomes = Vec::new();
830    let mut planner_rounds = Vec::new();
831    let mut transcript_pointers = Vec::new();
832    let mut compaction_events = Vec::new();
833    let mut daemon_events = Vec::new();
834    let mut research_fact_count = 0usize;
835
836    let root_node_id = format!("run:{}", run.id);
837    let trigger_event = trigger_event_from_run(run);
838    let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
839    append_action_graph_node(
840        &mut action_graph_nodes,
841        RunActionGraphNodeRecord {
842            id: root_node_id.clone(),
843            label: run
844                .workflow_name
845                .clone()
846                .unwrap_or_else(|| run.workflow_id.clone()),
847            kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
848            status: run.status.clone(),
849            outcome: run.status.clone(),
850            trace_id: propagated_trace_id.clone(),
851            stage_id: None,
852            node_id: None,
853            worker_id: None,
854            run_id: Some(run.id.clone()),
855            run_path: run.persisted_path.clone(),
856            metadata: BTreeMap::from([(
857                "workflow_id".to_string(),
858                serde_json::json!(run.workflow_id),
859            )]),
860        },
861    );
862    let mut entry_node_id = root_node_id.clone();
863    if let Some(trigger_event) = trigger_event.as_ref() {
864        if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
865            let replay_source_node_id = format!("trigger:{replay_of_event_id}");
866            append_action_graph_node(
867                &mut action_graph_nodes,
868                RunActionGraphNodeRecord {
869                    id: replay_source_node_id.clone(),
870                    label: format!(
871                        "{}:{} (original {})",
872                        trigger_event.provider.as_str(),
873                        trigger_event.kind,
874                        replay_of_event_id
875                    ),
876                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
877                    status: "historical".to_string(),
878                    outcome: "replayed_from".to_string(),
879                    trace_id: Some(trigger_event.trace_id.0.clone()),
880                    stage_id: None,
881                    node_id: None,
882                    worker_id: None,
883                    run_id: Some(run.id.clone()),
884                    run_path: run.persisted_path.clone(),
885                    metadata: trigger_node_metadata(trigger_event),
886                },
887            );
888            action_graph_edges.push(RunActionGraphEdgeRecord {
889                from_id: replay_source_node_id,
890                to_id: format!("trigger:{}", trigger_event.id.0),
891                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
892                label: Some("replay chain".to_string()),
893            });
894        }
895        let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
896        append_action_graph_node(
897            &mut action_graph_nodes,
898            RunActionGraphNodeRecord {
899                id: trigger_node_id.clone(),
900                label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
901                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
902                status: "received".to_string(),
903                outcome: signature_status_label(&trigger_event.signature_status).to_string(),
904                trace_id: Some(trigger_event.trace_id.0.clone()),
905                stage_id: None,
906                node_id: None,
907                worker_id: None,
908                run_id: Some(run.id.clone()),
909                run_path: run.persisted_path.clone(),
910                metadata: trigger_node_metadata(trigger_event),
911            },
912        );
913        action_graph_edges.push(RunActionGraphEdgeRecord {
914            from_id: root_node_id.clone(),
915            to_id: trigger_node_id.clone(),
916            kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
917            label: Some(trigger_event.id.0.clone()),
918        });
919        entry_node_id = trigger_node_id;
920    }
921
922    let stage_node_ids = run
923        .stages
924        .iter()
925        .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
926        .collect::<BTreeMap<_, _>>();
927    let stage_by_id = run
928        .stages
929        .iter()
930        .map(|stage| (stage.id.as_str(), stage))
931        .collect::<BTreeMap<_, _>>();
932    let stage_by_node_id = run
933        .stages
934        .iter()
935        .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
936        .collect::<BTreeMap<_, _>>();
937
938    let incoming_nodes = run
939        .transitions
940        .iter()
941        .map(|transition| transition.to_node_id.clone())
942        .collect::<BTreeSet<_>>();
943
944    for stage in &run.stages {
945        let graph_node_id = stage_node_ids
946            .get(&stage.id)
947            .cloned()
948            .unwrap_or_else(|| format!("stage:{}", stage.id));
949        append_action_graph_node(
950            &mut action_graph_nodes,
951            RunActionGraphNodeRecord {
952                id: graph_node_id.clone(),
953                label: stage.node_id.clone(),
954                kind: action_graph_kind_for_stage(stage).to_string(),
955                status: stage.status.clone(),
956                outcome: stage.outcome.clone(),
957                trace_id: propagated_trace_id.clone(),
958                stage_id: Some(stage.id.clone()),
959                node_id: Some(stage.node_id.clone()),
960                worker_id: stage
961                    .metadata
962                    .get("worker_id")
963                    .and_then(|value| value.as_str())
964                    .map(str::to_string),
965                run_id: None,
966                run_path: None,
967                metadata: stage_node_metadata(stage),
968            },
969        );
970        if !incoming_nodes.contains(&stage.node_id) {
971            action_graph_edges.push(RunActionGraphEdgeRecord {
972                from_id: entry_node_id.clone(),
973                to_id: graph_node_id.clone(),
974                kind: if trigger_event.is_some() {
975                    ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
976                } else {
977                    ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
978                },
979                label: None,
980            });
981        }
982
983        if stage.kind == "verify" || stage.verification.is_some() {
984            let passed = json_bool(
985                stage
986                    .verification
987                    .as_ref()
988                    .and_then(|value| value.get("pass")),
989            )
990            .or_else(|| {
991                json_bool(
992                    stage
993                        .verification
994                        .as_ref()
995                        .and_then(|value| value.get("success")),
996                )
997            })
998            .or_else(|| {
999                if stage.status == "completed" && stage.outcome == "success" {
1000                    Some(true)
1001                } else if stage.status == "failed" || stage.outcome == "failed" {
1002                    Some(false)
1003                } else {
1004                    None
1005                }
1006            });
1007            verification_outcomes.push(RunVerificationOutcomeRecord {
1008                stage_id: stage.id.clone(),
1009                node_id: stage.node_id.clone(),
1010                status: stage.status.clone(),
1011                passed,
1012                summary: stage
1013                    .verification
1014                    .as_ref()
1015                    .map(compact_json_value)
1016                    .or_else(|| {
1017                        stage
1018                            .visible_text
1019                            .as_ref()
1020                            .filter(|value| !value.trim().is_empty())
1021                            .cloned()
1022                    }),
1023            });
1024        }
1025
1026        if stage.transcript.is_some() {
1027            transcript_pointers.push(RunTranscriptPointerRecord {
1028                id: format!("stage:{}:transcript", stage.id),
1029                label: format!("Stage {} transcript", stage.node_id),
1030                kind: "embedded_transcript".to_string(),
1031                location: format!("run.stages[{}].transcript", stage.node_id),
1032                path: run.persisted_path.clone(),
1033                available: true,
1034            });
1035            if let Some(transcript) = stage.transcript.as_ref() {
1036                compaction_events.extend(compaction_events_from_transcript(
1037                    transcript,
1038                    Some(&stage.id),
1039                    Some(&stage.node_id),
1040                    &format!("run.stages[{}].transcript", stage.node_id),
1041                    persisted_path,
1042                ));
1043            }
1044        }
1045
1046        if let Some(payload) = stage_result_payload(stage) {
1047            let trace = payload.get("trace");
1048            let task_ledger = payload
1049                .get("task_ledger")
1050                .and_then(task_ledger_summary_from_value);
1051            let research_facts = task_ledger
1052                .as_ref()
1053                .map(|ledger| ledger.observations.clone())
1054                .unwrap_or_default();
1055            research_fact_count += research_facts.len();
1056            let tools_used = json_string_array(
1057                payload
1058                    .get("tools_used")
1059                    .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1060            );
1061            let successful_tools = json_string_array(payload.get("successful_tools"));
1062            let planner_round = RunPlannerRoundRecord {
1063                stage_id: stage.id.clone(),
1064                node_id: stage.node_id.clone(),
1065                stage_kind: stage.kind.clone(),
1066                status: stage.status.clone(),
1067                outcome: stage.outcome.clone(),
1068                iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1069                llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1070                tool_execution_count: json_usize(
1071                    trace.and_then(|trace| trace.get("tool_executions")),
1072                ),
1073                tool_rejection_count: json_usize(
1074                    trace.and_then(|trace| trace.get("tool_rejections")),
1075                ),
1076                intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1077                compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1078                native_text_tool_fallback_count: json_usize(
1079                    trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1080                ),
1081                native_text_tool_fallback_rejection_count: json_usize(
1082                    trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1083                ),
1084                empty_completion_retry_count: json_usize(
1085                    trace.and_then(|trace| trace.get("empty_completion_retries")),
1086                ),
1087                tools_used,
1088                successful_tools,
1089                ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1090                task_ledger,
1091                research_facts,
1092            };
1093            let has_agentic_detail = planner_round.iteration_count > 0
1094                || planner_round.llm_call_count > 0
1095                || planner_round.tool_execution_count > 0
1096                || planner_round.native_text_tool_fallback_count > 0
1097                || planner_round.native_text_tool_fallback_rejection_count > 0
1098                || planner_round.empty_completion_retry_count > 0
1099                || planner_round.ledger_done_rejections > 0
1100                || planner_round.task_ledger.is_some()
1101                || !planner_round.tools_used.is_empty()
1102                || !planner_round.successful_tools.is_empty();
1103            if has_agentic_detail {
1104                planner_rounds.push(planner_round);
1105            }
1106        }
1107    }
1108
1109    for transition in &run.transitions {
1110        let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1111            continue;
1112        };
1113        let from_stage = transition
1114            .from_stage_id
1115            .as_deref()
1116            .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1117        let from_id = transition
1118            .from_stage_id
1119            .as_ref()
1120            .and_then(|stage_id| stage_node_ids.get(stage_id))
1121            .cloned()
1122            .or_else(|| {
1123                transition
1124                    .from_node_id
1125                    .as_ref()
1126                    .and_then(|node_id| stage_by_node_id.get(node_id))
1127                    .cloned()
1128            })
1129            .unwrap_or_else(|| root_node_id.clone());
1130        action_graph_edges.push(RunActionGraphEdgeRecord {
1131            from_id,
1132            to_id,
1133            kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1134                ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1135            } else {
1136                ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1137            },
1138            label: transition.branch.clone(),
1139        });
1140    }
1141
1142    let worker_lineage = run
1143        .child_runs
1144        .iter()
1145        .map(|child| {
1146            let worker_node_id = format!("worker:{}", child.worker_id);
1147            append_action_graph_node(
1148                &mut action_graph_nodes,
1149                RunActionGraphNodeRecord {
1150                    id: worker_node_id.clone(),
1151                    label: child.worker_name.clone(),
1152                    kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1153                    status: child.status.clone(),
1154                    outcome: child.status.clone(),
1155                    trace_id: propagated_trace_id.clone(),
1156                    stage_id: child.parent_stage_id.clone(),
1157                    node_id: None,
1158                    worker_id: Some(child.worker_id.clone()),
1159                    run_id: child.run_id.clone(),
1160                    run_path: child.run_path.clone(),
1161                    metadata: BTreeMap::from([
1162                        (
1163                            "worker_name".to_string(),
1164                            serde_json::json!(child.worker_name),
1165                        ),
1166                        ("task".to_string(), serde_json::json!(child.task)),
1167                    ]),
1168                },
1169            );
1170            if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1171                if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1172                    action_graph_edges.push(RunActionGraphEdgeRecord {
1173                        from_id: stage_node_id.clone(),
1174                        to_id: worker_node_id,
1175                        kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1176                        label: Some(child.worker_name.clone()),
1177                    });
1178                }
1179            }
1180            RunWorkerLineageRecord {
1181                worker_id: child.worker_id.clone(),
1182                worker_name: child.worker_name.clone(),
1183                parent_stage_id: child.parent_stage_id.clone(),
1184                task: child.task.clone(),
1185                status: child.status.clone(),
1186                session_id: child.session_id.clone(),
1187                parent_session_id: child.parent_session_id.clone(),
1188                run_id: child.run_id.clone(),
1189                run_path: child.run_path.clone(),
1190                snapshot_path: child.snapshot_path.clone(),
1191            }
1192        })
1193        .collect::<Vec<_>>();
1194
1195    if run.transcript.is_some() {
1196        transcript_pointers.push(RunTranscriptPointerRecord {
1197            id: "run:transcript".to_string(),
1198            label: "Run transcript".to_string(),
1199            kind: "embedded_transcript".to_string(),
1200            location: "run.transcript".to_string(),
1201            path: run.persisted_path.clone(),
1202            available: true,
1203        });
1204        if let Some(transcript) = run.transcript.as_ref() {
1205            compaction_events.extend(compaction_events_from_transcript(
1206                transcript,
1207                None,
1208                None,
1209                "run.transcript",
1210                persisted_path,
1211            ));
1212        }
1213    }
1214
1215    if let Some(path) = persisted_path {
1216        if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1217            transcript_pointers.push(RunTranscriptPointerRecord {
1218                id: "run:llm_transcript".to_string(),
1219                label: "LLM transcript sidecar".to_string(),
1220                kind: "llm_jsonl".to_string(),
1221                location: "run sidecar".to_string(),
1222                path: Some(sidecar_path.to_string_lossy().into_owned()),
1223                available: sidecar_path.exists(),
1224            });
1225        }
1226        daemon_events.extend(daemon_events_from_sidecar(path));
1227    }
1228
1229    RunObservabilityRecord {
1230        schema_version: 4,
1231        planner_rounds,
1232        research_fact_count,
1233        action_graph_nodes,
1234        action_graph_edges,
1235        worker_lineage,
1236        verification_outcomes,
1237        transcript_pointers,
1238        compaction_events,
1239        daemon_events,
1240    }
1241}
1242
1243fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1244    run.observability = Some(derive_run_observability(run, persisted_path));
1245}
1246
1247pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1248    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1249    if run.type_name.is_empty() {
1250        run.type_name = "run_record".to_string();
1251    }
1252    if run.id.is_empty() {
1253        run.id = new_id("run");
1254    }
1255    if run.started_at.is_empty() {
1256        run.started_at = now_rfc3339();
1257    }
1258    if run.status.is_empty() {
1259        run.status = "running".to_string();
1260    }
1261    if run.root_run_id.is_none() {
1262        run.root_run_id = Some(run.id.clone());
1263    }
1264    if run.replay_fixture.is_none() {
1265        run.replay_fixture = Some(replay_fixture_from_run(&run));
1266    }
1267    if run.observability.is_none() {
1268        let persisted_path = run.persisted_path.clone();
1269        let persisted = persisted_path.as_deref().map(Path::new);
1270        refresh_run_observability(&mut run, persisted);
1271    }
1272    Ok(run)
1273}
1274
1275pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1276    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1277    if manifest.type_name.is_empty() {
1278        manifest.type_name = "eval_suite_manifest".to_string();
1279    }
1280    if manifest.id.is_empty() {
1281        manifest.id = new_id("eval_suite");
1282    }
1283    Ok(manifest)
1284}
1285
1286fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1287    let content = std::fs::read_to_string(path)
1288        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1289    serde_json::from_str(&content)
1290        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1291}
1292
1293fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1294    let path_buf = PathBuf::from(path);
1295    if path_buf.is_absolute() {
1296        path_buf
1297    } else if let Some(base_dir) = base_dir {
1298        base_dir.join(path_buf)
1299    } else {
1300        path_buf
1301    }
1302}
1303
1304pub fn evaluate_run_suite_manifest(
1305    manifest: &EvalSuiteManifest,
1306) -> Result<ReplayEvalSuiteReport, VmError> {
1307    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1308    let mut reports = Vec::new();
1309    for case in &manifest.cases {
1310        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1311        let run = load_run_record(&run_path)?;
1312        let fixture = match &case.fixture_path {
1313            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1314            None => run
1315                .replay_fixture
1316                .clone()
1317                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1318        };
1319        let eval = evaluate_run_against_fixture(&run, &fixture);
1320        let mut pass = eval.pass;
1321        let mut failures = eval.failures;
1322        let comparison = match &case.compare_to {
1323            Some(path) => {
1324                let baseline_path = resolve_manifest_path(base_dir, path);
1325                let baseline = load_run_record(&baseline_path)?;
1326                let diff = diff_run_records(&baseline, &run);
1327                if !diff.identical {
1328                    pass = false;
1329                    failures.push(format!(
1330                        "run differs from baseline {} with {} stage changes",
1331                        baseline_path.display(),
1332                        diff.stage_diffs.len()
1333                    ));
1334                }
1335                Some(diff)
1336            }
1337            None => None,
1338        };
1339        reports.push(ReplayEvalCaseReport {
1340            run_id: run.id.clone(),
1341            workflow_id: run.workflow_id.clone(),
1342            label: case.label.clone(),
1343            pass,
1344            failures,
1345            stage_count: eval.stage_count,
1346            source_path: Some(run_path.display().to_string()),
1347            comparison,
1348        });
1349    }
1350    let total = reports.len();
1351    let passed = reports.iter().filter(|report| report.pass).count();
1352    let failed = total.saturating_sub(passed);
1353    Ok(ReplayEvalSuiteReport {
1354        pass: failed == 0,
1355        total,
1356        passed,
1357        failed,
1358        cases: reports,
1359    })
1360}
1361
1362/// Edit operation in a diff sequence.
1363#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1364pub(crate) enum DiffOp {
1365    Equal,
1366    Delete,
1367    Insert,
1368}
1369
1370/// Compute the shortest edit script using Myers' O(nd) algorithm.
1371/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
1372/// Time: O(nd) where d = edit distance. Space: O(d * n).
1373pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1374    let n = a.len() as isize;
1375    let m = b.len() as isize;
1376    if n == 0 && m == 0 {
1377        return Vec::new();
1378    }
1379    if n == 0 {
1380        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1381    }
1382    if m == 0 {
1383        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1384    }
1385
1386    let max_d = (n + m) as usize;
1387    let offset = max_d as isize;
1388    let v_size = 2 * max_d + 1;
1389    let mut v = vec![0isize; v_size];
1390    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
1391    let mut trace: Vec<Vec<isize>> = Vec::new();
1392
1393    'outer: for d in 0..=max_d as isize {
1394        trace.push(v.clone());
1395        let mut new_v = v.clone();
1396        for k in (-d..=d).step_by(2) {
1397            let ki = (k + offset) as usize;
1398            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1399                v[ki + 1]
1400            } else {
1401                v[ki - 1] + 1
1402            };
1403            let mut y = x - k;
1404            while x < n && y < m && a[x as usize] == b[y as usize] {
1405                x += 1;
1406                y += 1;
1407            }
1408            new_v[ki] = x;
1409            if x >= n && y >= m {
1410                let _ = new_v;
1411                break 'outer;
1412            }
1413        }
1414        v = new_v;
1415    }
1416
1417    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1418    let mut x = n;
1419    let mut y = m;
1420    for d in (1..trace.len() as isize).rev() {
1421        let k = x - y;
1422        let v_prev = &trace[d as usize];
1423        let prev_k = if k == -d
1424            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1425        {
1426            k + 1
1427        } else {
1428            k - 1
1429        };
1430        let prev_x = v_prev[(prev_k + offset) as usize];
1431        let prev_y = prev_x - prev_k;
1432
1433        while x > prev_x && y > prev_y {
1434            x -= 1;
1435            y -= 1;
1436            ops.push((DiffOp::Equal, x as usize));
1437        }
1438        if prev_k < k {
1439            x -= 1;
1440            ops.push((DiffOp::Delete, x as usize));
1441        } else {
1442            y -= 1;
1443            ops.push((DiffOp::Insert, y as usize));
1444        }
1445    }
1446    while x > 0 && y > 0 {
1447        x -= 1;
1448        y -= 1;
1449        ops.push((DiffOp::Equal, x as usize));
1450    }
1451    ops.reverse();
1452    ops
1453}
1454
1455pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1456    let before_lines: Vec<&str> = before.lines().collect();
1457    let after_lines: Vec<&str> = after.lines().collect();
1458    let ops = myers_diff(&before_lines, &after_lines);
1459
1460    let mut diff = String::new();
1461    let file = path.unwrap_or("artifact");
1462    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1463    for &(op, idx) in &ops {
1464        match op {
1465            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1466            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1467            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1468        }
1469    }
1470    diff
1471}
1472
1473pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1474    let path = path
1475        .map(PathBuf::from)
1476        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1477    let mut materialized = run.clone();
1478    if materialized.replay_fixture.is_none() {
1479        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1480    }
1481    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1482    refresh_run_observability(&mut materialized, Some(&path));
1483    if let Some(parent) = path.parent() {
1484        std::fs::create_dir_all(parent)
1485            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1486    }
1487    let json = serde_json::to_string_pretty(&materialized)
1488        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1489    // Atomic write: .tmp then rename guards against partial writes on kill.
1490    let tmp_path = path.with_extension("json.tmp");
1491    std::fs::write(&tmp_path, &json)
1492        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1493    std::fs::rename(&tmp_path, &path).map_err(|e| {
1494        // Cross-device renames fail on some filesystems; best-effort direct write.
1495        let _ = std::fs::write(&path, &json);
1496        VmError::Runtime(format!("failed to finalize run record: {e}"))
1497    })?;
1498    if let Some(observability) = materialized.observability.as_ref() {
1499        publish_action_graph_event(&materialized, observability, &path);
1500    }
1501    Ok(path.to_string_lossy().into_owned())
1502}
1503
1504pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1505    let content = std::fs::read_to_string(path)
1506        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1507    let mut run: RunRecord = serde_json::from_str(&content)
1508        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1509    if run.replay_fixture.is_none() {
1510        run.replay_fixture = Some(replay_fixture_from_run(&run));
1511    }
1512    run.persisted_path
1513        .get_or_insert_with(|| path.to_string_lossy().into_owned());
1514    refresh_run_observability(&mut run, Some(path));
1515    Ok(run)
1516}
1517
1518pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1519    ReplayFixture {
1520        type_name: "replay_fixture".to_string(),
1521        id: new_id("fixture"),
1522        source_run_id: run.id.clone(),
1523        workflow_id: run.workflow_id.clone(),
1524        workflow_name: run.workflow_name.clone(),
1525        created_at: now_rfc3339(),
1526        expected_status: run.status.clone(),
1527        stage_assertions: run
1528            .stages
1529            .iter()
1530            .map(|stage| ReplayStageAssertion {
1531                node_id: stage.node_id.clone(),
1532                expected_status: stage.status.clone(),
1533                expected_outcome: stage.outcome.clone(),
1534                expected_branch: stage.branch.clone(),
1535                required_artifact_kinds: stage
1536                    .artifacts
1537                    .iter()
1538                    .map(|artifact| artifact.kind.clone())
1539                    .collect(),
1540                visible_text_contains: stage
1541                    .visible_text
1542                    .as_ref()
1543                    .filter(|text| !text.is_empty())
1544                    .map(|text| text.chars().take(80).collect()),
1545            })
1546            .collect(),
1547    }
1548}
1549
1550pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1551    let mut failures = Vec::new();
1552    if run.status != fixture.expected_status {
1553        failures.push(format!(
1554            "run status mismatch: expected {}, got {}",
1555            fixture.expected_status, run.status
1556        ));
1557    }
1558    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1559        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1560    for assertion in &fixture.stage_assertions {
1561        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1562            failures.push(format!("missing stage {}", assertion.node_id));
1563            continue;
1564        };
1565        if stage.status != assertion.expected_status {
1566            failures.push(format!(
1567                "stage {} status mismatch: expected {}, got {}",
1568                assertion.node_id, assertion.expected_status, stage.status
1569            ));
1570        }
1571        if stage.outcome != assertion.expected_outcome {
1572            failures.push(format!(
1573                "stage {} outcome mismatch: expected {}, got {}",
1574                assertion.node_id, assertion.expected_outcome, stage.outcome
1575            ));
1576        }
1577        if stage.branch != assertion.expected_branch {
1578            failures.push(format!(
1579                "stage {} branch mismatch: expected {:?}, got {:?}",
1580                assertion.node_id, assertion.expected_branch, stage.branch
1581            ));
1582        }
1583        for required_kind in &assertion.required_artifact_kinds {
1584            if !stage
1585                .artifacts
1586                .iter()
1587                .any(|artifact| &artifact.kind == required_kind)
1588            {
1589                failures.push(format!(
1590                    "stage {} missing artifact kind {}",
1591                    assertion.node_id, required_kind
1592                ));
1593            }
1594        }
1595        if let Some(snippet) = &assertion.visible_text_contains {
1596            let actual = stage.visible_text.clone().unwrap_or_default();
1597            if !actual.contains(snippet) {
1598                failures.push(format!(
1599                    "stage {} visible text does not contain expected snippet {:?}",
1600                    assertion.node_id, snippet
1601                ));
1602            }
1603        }
1604    }
1605
1606    ReplayEvalReport {
1607        pass: failures.is_empty(),
1608        failures,
1609        stage_count: run.stages.len(),
1610    }
1611}
1612
1613pub fn evaluate_run_suite(
1614    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1615) -> ReplayEvalSuiteReport {
1616    let mut reports = Vec::new();
1617    for (run, fixture, source_path) in cases {
1618        let report = evaluate_run_against_fixture(&run, &fixture);
1619        reports.push(ReplayEvalCaseReport {
1620            run_id: run.id.clone(),
1621            workflow_id: run.workflow_id.clone(),
1622            label: None,
1623            pass: report.pass,
1624            failures: report.failures,
1625            stage_count: report.stage_count,
1626            source_path,
1627            comparison: None,
1628        });
1629    }
1630    let total = reports.len();
1631    let passed = reports.iter().filter(|report| report.pass).count();
1632    let failed = total.saturating_sub(passed);
1633    ReplayEvalSuiteReport {
1634        pass: failed == 0,
1635        total,
1636        passed,
1637        failed,
1638        cases: reports,
1639    }
1640}
1641
1642pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1643    let mut stage_diffs = Vec::new();
1644    let mut all_node_ids = BTreeSet::new();
1645    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1646        .stages
1647        .iter()
1648        .map(|s| (s.node_id.as_str(), s))
1649        .collect();
1650    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1651        .stages
1652        .iter()
1653        .map(|s| (s.node_id.as_str(), s))
1654        .collect();
1655    all_node_ids.extend(left_by_id.keys().copied());
1656    all_node_ids.extend(right_by_id.keys().copied());
1657
1658    for node_id in all_node_ids {
1659        let left_stage = left_by_id.get(node_id).copied();
1660        let right_stage = right_by_id.get(node_id).copied();
1661        match (left_stage, right_stage) {
1662            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1663                node_id: node_id.to_string(),
1664                change: "removed".to_string(),
1665                details: vec!["stage missing from right run".to_string()],
1666            }),
1667            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1668                node_id: node_id.to_string(),
1669                change: "added".to_string(),
1670                details: vec!["stage missing from left run".to_string()],
1671            }),
1672            (Some(left_stage), Some(right_stage)) => {
1673                let mut details = Vec::new();
1674                if left_stage.status != right_stage.status {
1675                    details.push(format!(
1676                        "status: {} -> {}",
1677                        left_stage.status, right_stage.status
1678                    ));
1679                }
1680                if left_stage.outcome != right_stage.outcome {
1681                    details.push(format!(
1682                        "outcome: {} -> {}",
1683                        left_stage.outcome, right_stage.outcome
1684                    ));
1685                }
1686                if left_stage.branch != right_stage.branch {
1687                    details.push(format!(
1688                        "branch: {:?} -> {:?}",
1689                        left_stage.branch, right_stage.branch
1690                    ));
1691                }
1692                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1693                {
1694                    details.push(format!(
1695                        "produced_artifacts: {} -> {}",
1696                        left_stage.produced_artifact_ids.len(),
1697                        right_stage.produced_artifact_ids.len()
1698                    ));
1699                }
1700                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1701                    details.push(format!(
1702                        "artifact_records: {} -> {}",
1703                        left_stage.artifacts.len(),
1704                        right_stage.artifacts.len()
1705                    ));
1706                }
1707                if !details.is_empty() {
1708                    stage_diffs.push(RunStageDiffRecord {
1709                        node_id: node_id.to_string(),
1710                        change: "changed".to_string(),
1711                        details,
1712                    });
1713                }
1714            }
1715            (None, None) => {}
1716        }
1717    }
1718
1719    let mut tool_diffs = Vec::new();
1720    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1721        .tool_recordings
1722        .iter()
1723        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1724        .collect();
1725    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1726        .tool_recordings
1727        .iter()
1728        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1729        .collect();
1730    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1731        .keys()
1732        .chain(right_tools.keys())
1733        .cloned()
1734        .collect();
1735    for key in &all_tool_keys {
1736        let l = left_tools.get(key);
1737        let r = right_tools.get(key);
1738        let result_changed = match (l, r) {
1739            (Some(a), Some(b)) => a.result != b.result,
1740            _ => true,
1741        };
1742        if result_changed {
1743            tool_diffs.push(ToolCallDiffRecord {
1744                tool_name: key.0.clone(),
1745                args_hash: key.1.clone(),
1746                result_changed,
1747                left_result: l.map(|t| t.result.clone()),
1748                right_result: r.map(|t| t.result.clone()),
1749            });
1750        }
1751    }
1752
1753    let left_observability = left.observability.clone().unwrap_or_else(|| {
1754        derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1755    });
1756    let right_observability = right.observability.clone().unwrap_or_else(|| {
1757        derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1758    });
1759    let mut observability_diffs = Vec::new();
1760
1761    let left_workers = left_observability
1762        .worker_lineage
1763        .iter()
1764        .map(|worker| {
1765            (
1766                worker.worker_id.clone(),
1767                (
1768                    worker.status.clone(),
1769                    worker.run_id.clone(),
1770                    worker.run_path.clone(),
1771                ),
1772            )
1773        })
1774        .collect::<BTreeMap<_, _>>();
1775    let right_workers = right_observability
1776        .worker_lineage
1777        .iter()
1778        .map(|worker| {
1779            (
1780                worker.worker_id.clone(),
1781                (
1782                    worker.status.clone(),
1783                    worker.run_id.clone(),
1784                    worker.run_path.clone(),
1785                ),
1786            )
1787        })
1788        .collect::<BTreeMap<_, _>>();
1789    let worker_ids = left_workers
1790        .keys()
1791        .chain(right_workers.keys())
1792        .cloned()
1793        .collect::<BTreeSet<_>>();
1794    for worker_id in worker_ids {
1795        match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1796            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1797                section: "worker_lineage".to_string(),
1798                label: worker_id,
1799                details: vec!["worker missing from right run".to_string()],
1800            }),
1801            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1802                section: "worker_lineage".to_string(),
1803                label: worker_id,
1804                details: vec!["worker missing from left run".to_string()],
1805            }),
1806            (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1807                let mut details = Vec::new();
1808                if left_worker.0 != right_worker.0 {
1809                    details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1810                }
1811                if left_worker.1 != right_worker.1 {
1812                    details.push(format!(
1813                        "run_id: {:?} -> {:?}",
1814                        left_worker.1, right_worker.1
1815                    ));
1816                }
1817                if left_worker.2 != right_worker.2 {
1818                    details.push(format!(
1819                        "run_path: {:?} -> {:?}",
1820                        left_worker.2, right_worker.2
1821                    ));
1822                }
1823                observability_diffs.push(RunObservabilityDiffRecord {
1824                    section: "worker_lineage".to_string(),
1825                    label: worker_id,
1826                    details,
1827                });
1828            }
1829            _ => {}
1830        }
1831    }
1832
1833    let left_rounds = left_observability
1834        .planner_rounds
1835        .iter()
1836        .map(|round| (round.stage_id.clone(), round))
1837        .collect::<BTreeMap<_, _>>();
1838    let right_rounds = right_observability
1839        .planner_rounds
1840        .iter()
1841        .map(|round| (round.stage_id.clone(), round))
1842        .collect::<BTreeMap<_, _>>();
1843    let round_ids = left_rounds
1844        .keys()
1845        .chain(right_rounds.keys())
1846        .cloned()
1847        .collect::<BTreeSet<_>>();
1848    for stage_id in round_ids {
1849        match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1850            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1851                section: "planner_rounds".to_string(),
1852                label: stage_id,
1853                details: vec!["planner summary missing from right run".to_string()],
1854            }),
1855            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1856                section: "planner_rounds".to_string(),
1857                label: stage_id,
1858                details: vec!["planner summary missing from left run".to_string()],
1859            }),
1860            (Some(left_round), Some(right_round)) => {
1861                let mut details = Vec::new();
1862                if left_round.iteration_count != right_round.iteration_count {
1863                    details.push(format!(
1864                        "iterations: {} -> {}",
1865                        left_round.iteration_count, right_round.iteration_count
1866                    ));
1867                }
1868                if left_round.tool_execution_count != right_round.tool_execution_count {
1869                    details.push(format!(
1870                        "tool_executions: {} -> {}",
1871                        left_round.tool_execution_count, right_round.tool_execution_count
1872                    ));
1873                }
1874                if left_round.native_text_tool_fallback_count
1875                    != right_round.native_text_tool_fallback_count
1876                {
1877                    details.push(format!(
1878                        "native_text_tool_fallbacks: {} -> {}",
1879                        left_round.native_text_tool_fallback_count,
1880                        right_round.native_text_tool_fallback_count
1881                    ));
1882                }
1883                if left_round.native_text_tool_fallback_rejection_count
1884                    != right_round.native_text_tool_fallback_rejection_count
1885                {
1886                    details.push(format!(
1887                        "native_text_tool_fallback_rejections: {} -> {}",
1888                        left_round.native_text_tool_fallback_rejection_count,
1889                        right_round.native_text_tool_fallback_rejection_count
1890                    ));
1891                }
1892                if left_round.empty_completion_retry_count
1893                    != right_round.empty_completion_retry_count
1894                {
1895                    details.push(format!(
1896                        "empty_completion_retries: {} -> {}",
1897                        left_round.empty_completion_retry_count,
1898                        right_round.empty_completion_retry_count
1899                    ));
1900                }
1901                if left_round.research_facts != right_round.research_facts {
1902                    details.push(format!(
1903                        "research_facts: {:?} -> {:?}",
1904                        left_round.research_facts, right_round.research_facts
1905                    ));
1906                }
1907                let left_deliverables = left_round
1908                    .task_ledger
1909                    .as_ref()
1910                    .map(|ledger| {
1911                        ledger
1912                            .deliverables
1913                            .iter()
1914                            .map(|item| format!("{}:{}", item.id, item.status))
1915                            .collect::<Vec<_>>()
1916                    })
1917                    .unwrap_or_default();
1918                let right_deliverables = right_round
1919                    .task_ledger
1920                    .as_ref()
1921                    .map(|ledger| {
1922                        ledger
1923                            .deliverables
1924                            .iter()
1925                            .map(|item| format!("{}:{}", item.id, item.status))
1926                            .collect::<Vec<_>>()
1927                    })
1928                    .unwrap_or_default();
1929                if left_deliverables != right_deliverables {
1930                    details.push(format!(
1931                        "deliverables: {:?} -> {:?}",
1932                        left_deliverables, right_deliverables
1933                    ));
1934                }
1935                if left_round.successful_tools != right_round.successful_tools {
1936                    details.push(format!(
1937                        "successful_tools: {:?} -> {:?}",
1938                        left_round.successful_tools, right_round.successful_tools
1939                    ));
1940                }
1941                if !details.is_empty() {
1942                    observability_diffs.push(RunObservabilityDiffRecord {
1943                        section: "planner_rounds".to_string(),
1944                        label: left_round.node_id.clone(),
1945                        details,
1946                    });
1947                }
1948            }
1949            _ => {}
1950        }
1951    }
1952
1953    let left_pointers = left_observability
1954        .transcript_pointers
1955        .iter()
1956        .map(|pointer| {
1957            (
1958                pointer.id.clone(),
1959                (
1960                    pointer.available,
1961                    pointer.path.clone(),
1962                    pointer.location.clone(),
1963                ),
1964            )
1965        })
1966        .collect::<BTreeMap<_, _>>();
1967    let right_pointers = right_observability
1968        .transcript_pointers
1969        .iter()
1970        .map(|pointer| {
1971            (
1972                pointer.id.clone(),
1973                (
1974                    pointer.available,
1975                    pointer.path.clone(),
1976                    pointer.location.clone(),
1977                ),
1978            )
1979        })
1980        .collect::<BTreeMap<_, _>>();
1981    let pointer_ids = left_pointers
1982        .keys()
1983        .chain(right_pointers.keys())
1984        .cloned()
1985        .collect::<BTreeSet<_>>();
1986    for pointer_id in pointer_ids {
1987        match (
1988            left_pointers.get(&pointer_id),
1989            right_pointers.get(&pointer_id),
1990        ) {
1991            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1992                section: "transcript_pointers".to_string(),
1993                label: pointer_id,
1994                details: vec!["pointer missing from right run".to_string()],
1995            }),
1996            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1997                section: "transcript_pointers".to_string(),
1998                label: pointer_id,
1999                details: vec!["pointer missing from left run".to_string()],
2000            }),
2001            (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2002                observability_diffs.push(RunObservabilityDiffRecord {
2003                    section: "transcript_pointers".to_string(),
2004                    label: pointer_id,
2005                    details: vec![format!(
2006                        "pointer: {:?} -> {:?}",
2007                        left_pointer, right_pointer
2008                    )],
2009                });
2010            }
2011            _ => {}
2012        }
2013    }
2014
2015    let left_compactions = left_observability
2016        .compaction_events
2017        .iter()
2018        .map(|event| {
2019            (
2020                event.id.clone(),
2021                (
2022                    event.strategy.clone(),
2023                    event.archived_messages,
2024                    event.snapshot_asset_id.clone(),
2025                    event.available,
2026                ),
2027            )
2028        })
2029        .collect::<BTreeMap<_, _>>();
2030    let right_compactions = right_observability
2031        .compaction_events
2032        .iter()
2033        .map(|event| {
2034            (
2035                event.id.clone(),
2036                (
2037                    event.strategy.clone(),
2038                    event.archived_messages,
2039                    event.snapshot_asset_id.clone(),
2040                    event.available,
2041                ),
2042            )
2043        })
2044        .collect::<BTreeMap<_, _>>();
2045    let compaction_ids = left_compactions
2046        .keys()
2047        .chain(right_compactions.keys())
2048        .cloned()
2049        .collect::<BTreeSet<_>>();
2050    for compaction_id in compaction_ids {
2051        match (
2052            left_compactions.get(&compaction_id),
2053            right_compactions.get(&compaction_id),
2054        ) {
2055            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2056                section: "compaction_events".to_string(),
2057                label: compaction_id,
2058                details: vec!["compaction event missing from right run".to_string()],
2059            }),
2060            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2061                section: "compaction_events".to_string(),
2062                label: compaction_id,
2063                details: vec!["compaction event missing from left run".to_string()],
2064            }),
2065            (Some(left_event), Some(right_event)) if left_event != right_event => {
2066                observability_diffs.push(RunObservabilityDiffRecord {
2067                    section: "compaction_events".to_string(),
2068                    label: compaction_id,
2069                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2070                });
2071            }
2072            _ => {}
2073        }
2074    }
2075
2076    let left_daemons = left_observability
2077        .daemon_events
2078        .iter()
2079        .map(|event| {
2080            (
2081                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2082                (
2083                    event.name.clone(),
2084                    event.persist_path.clone(),
2085                    event.payload_summary.clone(),
2086                ),
2087            )
2088        })
2089        .collect::<BTreeMap<_, _>>();
2090    let right_daemons = right_observability
2091        .daemon_events
2092        .iter()
2093        .map(|event| {
2094            (
2095                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2096                (
2097                    event.name.clone(),
2098                    event.persist_path.clone(),
2099                    event.payload_summary.clone(),
2100                ),
2101            )
2102        })
2103        .collect::<BTreeMap<_, _>>();
2104    let daemon_keys = left_daemons
2105        .keys()
2106        .chain(right_daemons.keys())
2107        .cloned()
2108        .collect::<BTreeSet<_>>();
2109    for daemon_key in daemon_keys {
2110        let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2111        match (
2112            left_daemons.get(&daemon_key),
2113            right_daemons.get(&daemon_key),
2114        ) {
2115            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2116                section: "daemon_events".to_string(),
2117                label,
2118                details: vec!["daemon event missing from right run".to_string()],
2119            }),
2120            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2121                section: "daemon_events".to_string(),
2122                label,
2123                details: vec!["daemon event missing from left run".to_string()],
2124            }),
2125            (Some(left_event), Some(right_event)) if left_event != right_event => {
2126                observability_diffs.push(RunObservabilityDiffRecord {
2127                    section: "daemon_events".to_string(),
2128                    label,
2129                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2130                });
2131            }
2132            _ => {}
2133        }
2134    }
2135
2136    let left_verification = left_observability
2137        .verification_outcomes
2138        .iter()
2139        .map(|item| (item.stage_id.clone(), item))
2140        .collect::<BTreeMap<_, _>>();
2141    let right_verification = right_observability
2142        .verification_outcomes
2143        .iter()
2144        .map(|item| (item.stage_id.clone(), item))
2145        .collect::<BTreeMap<_, _>>();
2146    let verification_ids = left_verification
2147        .keys()
2148        .chain(right_verification.keys())
2149        .cloned()
2150        .collect::<BTreeSet<_>>();
2151    for stage_id in verification_ids {
2152        match (
2153            left_verification.get(&stage_id),
2154            right_verification.get(&stage_id),
2155        ) {
2156            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2157                section: "verification".to_string(),
2158                label: stage_id,
2159                details: vec!["verification missing from right run".to_string()],
2160            }),
2161            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2162                section: "verification".to_string(),
2163                label: stage_id,
2164                details: vec!["verification missing from left run".to_string()],
2165            }),
2166            (Some(left_item), Some(right_item)) if left_item != right_item => {
2167                let mut details = Vec::new();
2168                if left_item.passed != right_item.passed {
2169                    details.push(format!(
2170                        "passed: {:?} -> {:?}",
2171                        left_item.passed, right_item.passed
2172                    ));
2173                }
2174                if left_item.summary != right_item.summary {
2175                    details.push(format!(
2176                        "summary: {:?} -> {:?}",
2177                        left_item.summary, right_item.summary
2178                    ));
2179                }
2180                observability_diffs.push(RunObservabilityDiffRecord {
2181                    section: "verification".to_string(),
2182                    label: left_item.node_id.clone(),
2183                    details,
2184                });
2185            }
2186            _ => {}
2187        }
2188    }
2189
2190    let left_graph = (
2191        left_observability.action_graph_nodes.len(),
2192        left_observability.action_graph_edges.len(),
2193    );
2194    let right_graph = (
2195        right_observability.action_graph_nodes.len(),
2196        right_observability.action_graph_edges.len(),
2197    );
2198    if left_graph != right_graph {
2199        observability_diffs.push(RunObservabilityDiffRecord {
2200            section: "action_graph".to_string(),
2201            label: "shape".to_string(),
2202            details: vec![format!(
2203                "nodes/edges: {}/{} -> {}/{}",
2204                left_graph.0, left_graph.1, right_graph.0, right_graph.1
2205            )],
2206        });
2207    }
2208
2209    let status_changed = left.status != right.status;
2210    let identical = !status_changed
2211        && stage_diffs.is_empty()
2212        && tool_diffs.is_empty()
2213        && observability_diffs.is_empty()
2214        && left.transitions.len() == right.transitions.len()
2215        && left.artifacts.len() == right.artifacts.len()
2216        && left.checkpoints.len() == right.checkpoints.len();
2217
2218    RunDiffReport {
2219        left_run_id: left.id.clone(),
2220        right_run_id: right.id.clone(),
2221        identical,
2222        status_changed,
2223        left_status: left.status.clone(),
2224        right_status: right.status.clone(),
2225        stage_diffs,
2226        tool_diffs,
2227        observability_diffs,
2228        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2229        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2230        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2231    }
2232}