Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10    CapabilityPolicy,
11};
12use crate::llm::vm_value_to_json;
13use crate::value::{VmError, VmValue};
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct LlmUsageRecord {
18    pub input_tokens: i64,
19    pub output_tokens: i64,
20    pub total_duration_ms: i64,
21    pub call_count: i64,
22    pub total_cost: f64,
23    pub models: Vec<String>,
24}
25
26#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
27#[serde(default)]
28pub struct RunStageRecord {
29    pub id: String,
30    pub node_id: String,
31    pub kind: String,
32    pub status: String,
33    pub outcome: String,
34    pub branch: Option<String>,
35    pub started_at: String,
36    pub finished_at: Option<String>,
37    pub visible_text: Option<String>,
38    pub private_reasoning: Option<String>,
39    pub transcript: Option<serde_json::Value>,
40    pub verification: Option<serde_json::Value>,
41    pub usage: Option<LlmUsageRecord>,
42    pub artifacts: Vec<ArtifactRecord>,
43    pub consumed_artifact_ids: Vec<String>,
44    pub produced_artifact_ids: Vec<String>,
45    pub attempts: Vec<RunStageAttemptRecord>,
46    pub metadata: BTreeMap<String, serde_json::Value>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageAttemptRecord {
52    pub attempt: usize,
53    pub status: String,
54    pub outcome: String,
55    pub branch: Option<String>,
56    pub error: Option<String>,
57    pub verification: Option<serde_json::Value>,
58    pub started_at: String,
59    pub finished_at: Option<String>,
60}
61
62#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct RunTransitionRecord {
65    pub id: String,
66    pub from_stage_id: Option<String>,
67    pub from_node_id: Option<String>,
68    pub to_node_id: String,
69    pub branch: Option<String>,
70    pub timestamp: String,
71    pub consumed_artifact_ids: Vec<String>,
72    pub produced_artifact_ids: Vec<String>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(default)]
77pub struct RunCheckpointRecord {
78    pub id: String,
79    pub ready_nodes: Vec<String>,
80    pub completed_nodes: Vec<String>,
81    pub last_stage_id: Option<String>,
82    pub persisted_at: String,
83    pub reason: String,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayFixture {
89    #[serde(rename = "_type")]
90    pub type_name: String,
91    pub id: String,
92    pub source_run_id: String,
93    pub workflow_id: String,
94    pub workflow_name: Option<String>,
95    pub created_at: String,
96    pub expected_status: String,
97    pub stage_assertions: Vec<ReplayStageAssertion>,
98}
99
100#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(default)]
102pub struct ReplayStageAssertion {
103    pub node_id: String,
104    pub expected_status: String,
105    pub expected_outcome: String,
106    pub expected_branch: Option<String>,
107    pub required_artifact_kinds: Vec<String>,
108    pub visible_text_contains: Option<String>,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ReplayEvalReport {
114    pub pass: bool,
115    pub failures: Vec<String>,
116    pub stage_count: usize,
117}
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(default)]
121pub struct ReplayEvalCaseReport {
122    pub run_id: String,
123    pub workflow_id: String,
124    pub label: Option<String>,
125    pub pass: bool,
126    pub failures: Vec<String>,
127    pub stage_count: usize,
128    pub source_path: Option<String>,
129    pub comparison: Option<RunDiffReport>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct ReplayEvalSuiteReport {
135    pub pass: bool,
136    pub total: usize,
137    pub passed: usize,
138    pub failed: usize,
139    pub cases: Vec<ReplayEvalCaseReport>,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct RunDeliverableSummaryRecord {
145    pub id: String,
146    pub text: String,
147    pub status: String,
148    pub note: Option<String>,
149}
150
151#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
152#[serde(default)]
153pub struct RunTaskLedgerSummaryRecord {
154    pub root_task: String,
155    pub rationale: String,
156    pub deliverables: Vec<RunDeliverableSummaryRecord>,
157    pub observations: Vec<String>,
158    pub blocking_count: usize,
159}
160
161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct RunPlannerRoundRecord {
164    pub stage_id: String,
165    pub node_id: String,
166    pub stage_kind: String,
167    pub status: String,
168    pub outcome: String,
169    pub iteration_count: usize,
170    pub llm_call_count: usize,
171    pub tool_execution_count: usize,
172    pub tool_rejection_count: usize,
173    pub intervention_count: usize,
174    pub compaction_count: usize,
175    pub tools_used: Vec<String>,
176    pub successful_tools: Vec<String>,
177    pub ledger_done_rejections: usize,
178    pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
179    pub research_facts: Vec<String>,
180}
181
182#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
183#[serde(default)]
184pub struct RunWorkerLineageRecord {
185    pub worker_id: String,
186    pub worker_name: String,
187    pub parent_stage_id: Option<String>,
188    pub task: String,
189    pub status: String,
190    pub session_id: Option<String>,
191    pub parent_session_id: Option<String>,
192    pub run_id: Option<String>,
193    pub run_path: Option<String>,
194    pub snapshot_path: Option<String>,
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
198#[serde(default)]
199pub struct RunActionGraphNodeRecord {
200    pub id: String,
201    pub label: String,
202    pub kind: String,
203    pub status: String,
204    pub outcome: String,
205    pub stage_id: Option<String>,
206    pub node_id: Option<String>,
207    pub worker_id: Option<String>,
208    pub run_id: Option<String>,
209    pub run_path: Option<String>,
210}
211
212#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
213#[serde(default)]
214pub struct RunActionGraphEdgeRecord {
215    pub from_id: String,
216    pub to_id: String,
217    pub kind: String,
218    pub label: Option<String>,
219}
220
221#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
222#[serde(default)]
223pub struct RunVerificationOutcomeRecord {
224    pub stage_id: String,
225    pub node_id: String,
226    pub status: String,
227    pub passed: Option<bool>,
228    pub summary: Option<String>,
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
232#[serde(default)]
233pub struct RunTranscriptPointerRecord {
234    pub id: String,
235    pub label: String,
236    pub kind: String,
237    pub location: String,
238    pub path: Option<String>,
239    pub available: bool,
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct CompactionEventRecord {
245    pub id: String,
246    pub transcript_id: Option<String>,
247    pub stage_id: Option<String>,
248    pub node_id: Option<String>,
249    pub mode: String,
250    pub strategy: String,
251    pub archived_messages: usize,
252    pub estimated_tokens_before: usize,
253    pub estimated_tokens_after: usize,
254    pub snapshot_asset_id: Option<String>,
255    pub snapshot_location: String,
256    pub snapshot_path: Option<String>,
257    pub available: bool,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct RunObservabilityRecord {
263    pub schema_version: usize,
264    pub planner_rounds: Vec<RunPlannerRoundRecord>,
265    pub research_fact_count: usize,
266    pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
267    pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
268    pub worker_lineage: Vec<RunWorkerLineageRecord>,
269    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
270    pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
271    pub compaction_events: Vec<CompactionEventRecord>,
272}
273
274#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
275#[serde(default)]
276pub struct RunStageDiffRecord {
277    pub node_id: String,
278    pub change: String,
279    pub details: Vec<String>,
280}
281
282#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
283#[serde(default)]
284pub struct ToolCallDiffRecord {
285    pub tool_name: String,
286    pub args_hash: String,
287    pub result_changed: bool,
288    pub left_result: Option<String>,
289    pub right_result: Option<String>,
290}
291
292#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
293#[serde(default)]
294pub struct RunObservabilityDiffRecord {
295    pub section: String,
296    pub label: String,
297    pub details: Vec<String>,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct RunDiffReport {
303    pub left_run_id: String,
304    pub right_run_id: String,
305    pub identical: bool,
306    pub status_changed: bool,
307    pub left_status: String,
308    pub right_status: String,
309    pub stage_diffs: Vec<RunStageDiffRecord>,
310    pub tool_diffs: Vec<ToolCallDiffRecord>,
311    pub observability_diffs: Vec<RunObservabilityDiffRecord>,
312    pub transition_count_delta: isize,
313    pub artifact_count_delta: isize,
314    pub checkpoint_count_delta: isize,
315}
316
317#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
318#[serde(default)]
319pub struct EvalSuiteManifest {
320    #[serde(rename = "_type")]
321    pub type_name: String,
322    pub id: String,
323    pub name: Option<String>,
324    pub base_dir: Option<String>,
325    pub cases: Vec<EvalSuiteCase>,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct EvalSuiteCase {
331    pub label: Option<String>,
332    pub run_path: String,
333    pub fixture_path: Option<String>,
334    pub compare_to: Option<String>,
335}
336
337#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
338#[serde(default)]
339pub struct RunRecord {
340    #[serde(rename = "_type")]
341    pub type_name: String,
342    pub id: String,
343    pub workflow_id: String,
344    pub workflow_name: Option<String>,
345    pub task: String,
346    pub status: String,
347    pub started_at: String,
348    pub finished_at: Option<String>,
349    pub parent_run_id: Option<String>,
350    pub root_run_id: Option<String>,
351    pub stages: Vec<RunStageRecord>,
352    pub transitions: Vec<RunTransitionRecord>,
353    pub checkpoints: Vec<RunCheckpointRecord>,
354    pub pending_nodes: Vec<String>,
355    pub completed_nodes: Vec<String>,
356    pub child_runs: Vec<RunChildRecord>,
357    pub artifacts: Vec<ArtifactRecord>,
358    pub policy: CapabilityPolicy,
359    pub execution: Option<RunExecutionRecord>,
360    pub transcript: Option<serde_json::Value>,
361    pub usage: Option<LlmUsageRecord>,
362    pub replay_fixture: Option<ReplayFixture>,
363    pub observability: Option<RunObservabilityRecord>,
364    pub trace_spans: Vec<RunTraceSpanRecord>,
365    pub tool_recordings: Vec<ToolCallRecord>,
366    pub metadata: BTreeMap<String, serde_json::Value>,
367    pub persisted_path: Option<String>,
368}
369
370#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
371#[serde(default)]
372pub struct ToolCallRecord {
373    pub tool_name: String,
374    pub tool_use_id: String,
375    pub args_hash: String,
376    pub result: String,
377    pub is_rejected: bool,
378    pub duration_ms: u64,
379    pub iteration: usize,
380    pub timestamp: String,
381}
382
383/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
384pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
385    use std::hash::{Hash, Hasher};
386    let mut hasher = std::collections::hash_map::DefaultHasher::new();
387    tool_name.hash(&mut hasher);
388    let args_str = serde_json::to_string(args).unwrap_or_default();
389    args_str.hash(&mut hasher);
390    format!("{:016x}", hasher.finish())
391}
392
393#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
394#[serde(default)]
395pub struct RunTraceSpanRecord {
396    pub span_id: u64,
397    pub parent_id: Option<u64>,
398    pub kind: String,
399    pub name: String,
400    pub start_ms: u64,
401    pub duration_ms: u64,
402    pub metadata: BTreeMap<String, serde_json::Value>,
403}
404
405#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
406#[serde(default)]
407pub struct RunChildRecord {
408    pub worker_id: String,
409    pub worker_name: String,
410    pub parent_stage_id: Option<String>,
411    pub session_id: Option<String>,
412    pub parent_session_id: Option<String>,
413    pub mutation_scope: Option<String>,
414    pub approval_policy: Option<super::ToolApprovalPolicy>,
415    pub task: String,
416    pub request: Option<serde_json::Value>,
417    pub provenance: Option<serde_json::Value>,
418    pub status: String,
419    pub started_at: String,
420    pub finished_at: Option<String>,
421    pub run_id: Option<String>,
422    pub run_path: Option<String>,
423    pub snapshot_path: Option<String>,
424    pub execution: Option<RunExecutionRecord>,
425}
426
427#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
428#[serde(default)]
429pub struct RunExecutionRecord {
430    pub cwd: Option<String>,
431    pub source_dir: Option<String>,
432    pub env: BTreeMap<String, String>,
433    pub adapter: Option<String>,
434    pub repo_path: Option<String>,
435    pub worktree_path: Option<String>,
436    pub branch: Option<String>,
437    pub base_ref: Option<String>,
438    pub cleanup: Option<String>,
439}
440
441fn compact_json_value(value: &serde_json::Value) -> String {
442    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
443}
444
445fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
446    value
447        .and_then(|value| value.as_array())
448        .map(|items| {
449            items
450                .iter()
451                .filter_map(|item| item.as_str().map(str::to_string))
452                .collect::<Vec<_>>()
453        })
454        .unwrap_or_default()
455}
456
457fn json_usize(value: Option<&serde_json::Value>) -> usize {
458    value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
459}
460
461fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
462    value.and_then(|value| value.as_bool())
463}
464
465fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
466    stage
467        .artifacts
468        .iter()
469        .find_map(|artifact| artifact.data.as_ref())
470}
471
472fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
473    let deliverables = value
474        .get("deliverables")
475        .and_then(|raw| raw.as_array())
476        .map(|items| {
477            items
478                .iter()
479                .map(|item| RunDeliverableSummaryRecord {
480                    id: item
481                        .get("id")
482                        .and_then(|value| value.as_str())
483                        .unwrap_or_default()
484                        .to_string(),
485                    text: item
486                        .get("text")
487                        .and_then(|value| value.as_str())
488                        .unwrap_or_default()
489                        .to_string(),
490                    status: item
491                        .get("status")
492                        .and_then(|value| value.as_str())
493                        .unwrap_or_default()
494                        .to_string(),
495                    note: item
496                        .get("note")
497                        .and_then(|value| value.as_str())
498                        .map(str::to_string),
499                })
500                .collect::<Vec<_>>()
501        })
502        .unwrap_or_default();
503    let observations = json_string_array(value.get("observations"));
504    let root_task = value
505        .get("root_task")
506        .and_then(|value| value.as_str())
507        .unwrap_or_default()
508        .to_string();
509    let rationale = value
510        .get("rationale")
511        .and_then(|value| value.as_str())
512        .unwrap_or_default()
513        .to_string();
514    if root_task.is_empty()
515        && rationale.is_empty()
516        && deliverables.is_empty()
517        && observations.is_empty()
518    {
519        return None;
520    }
521    let blocking_count = deliverables
522        .iter()
523        .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
524        .count();
525    Some(RunTaskLedgerSummaryRecord {
526        root_task,
527        rationale,
528        deliverables,
529        observations,
530        blocking_count,
531    })
532}
533
534fn compaction_events_from_transcript(
535    transcript: &serde_json::Value,
536    stage_id: Option<&str>,
537    node_id: Option<&str>,
538    location_prefix: &str,
539    persisted_path: Option<&Path>,
540) -> Vec<CompactionEventRecord> {
541    let transcript_id = transcript
542        .get("id")
543        .and_then(|value| value.as_str())
544        .map(str::to_string);
545    let asset_ids = transcript
546        .get("assets")
547        .and_then(|value| value.as_array())
548        .map(|assets| {
549            assets
550                .iter()
551                .filter_map(|asset| {
552                    asset
553                        .get("id")
554                        .and_then(|value| value.as_str())
555                        .map(str::to_string)
556                })
557                .collect::<BTreeSet<_>>()
558        })
559        .unwrap_or_default();
560    transcript
561        .get("events")
562        .and_then(|value| value.as_array())
563        .map(|events| {
564            events
565                .iter()
566                .filter(|event| {
567                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
568                })
569                .map(|event| {
570                    let metadata = event.get("metadata");
571                    let snapshot_asset_id = metadata
572                        .and_then(|value| value.get("snapshot_asset_id"))
573                        .and_then(|value| value.as_str())
574                        .map(str::to_string);
575                    let available = snapshot_asset_id
576                        .as_ref()
577                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
578                    let snapshot_location = snapshot_asset_id
579                        .as_ref()
580                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
581                        .unwrap_or_else(|| location_prefix.to_string());
582                    CompactionEventRecord {
583                        id: event
584                            .get("id")
585                            .and_then(|value| value.as_str())
586                            .unwrap_or_default()
587                            .to_string(),
588                        transcript_id: transcript_id.clone(),
589                        stage_id: stage_id.map(str::to_string),
590                        node_id: node_id.map(str::to_string),
591                        mode: metadata
592                            .and_then(|value| value.get("mode"))
593                            .and_then(|value| value.as_str())
594                            .unwrap_or_default()
595                            .to_string(),
596                        strategy: metadata
597                            .and_then(|value| value.get("strategy"))
598                            .and_then(|value| value.as_str())
599                            .unwrap_or_default()
600                            .to_string(),
601                        archived_messages: json_usize(
602                            metadata.and_then(|value| value.get("archived_messages")),
603                        ),
604                        estimated_tokens_before: json_usize(
605                            metadata.and_then(|value| value.get("estimated_tokens_before")),
606                        ),
607                        estimated_tokens_after: json_usize(
608                            metadata.and_then(|value| value.get("estimated_tokens_after")),
609                        ),
610                        snapshot_asset_id,
611                        snapshot_location,
612                        snapshot_path: persisted_path
613                            .map(|path| path.to_string_lossy().into_owned()),
614                        available,
615                    }
616                })
617                .collect()
618        })
619        .unwrap_or_default()
620}
621
622pub fn derive_run_observability(
623    run: &RunRecord,
624    persisted_path: Option<&Path>,
625) -> RunObservabilityRecord {
626    let mut action_graph_nodes = Vec::new();
627    let mut action_graph_edges = Vec::new();
628    let mut verification_outcomes = Vec::new();
629    let mut planner_rounds = Vec::new();
630    let mut transcript_pointers = Vec::new();
631    let mut compaction_events = Vec::new();
632    let mut research_fact_count = 0usize;
633
634    let root_node_id = format!("run:{}", run.id);
635    action_graph_nodes.push(RunActionGraphNodeRecord {
636        id: root_node_id.clone(),
637        label: run
638            .workflow_name
639            .clone()
640            .unwrap_or_else(|| run.workflow_id.clone()),
641        kind: "run".to_string(),
642        status: run.status.clone(),
643        outcome: run.status.clone(),
644        stage_id: None,
645        node_id: None,
646        worker_id: None,
647        run_id: Some(run.id.clone()),
648        run_path: run.persisted_path.clone(),
649    });
650
651    let stage_node_ids = run
652        .stages
653        .iter()
654        .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
655        .collect::<BTreeMap<_, _>>();
656    let stage_by_node_id = run
657        .stages
658        .iter()
659        .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
660        .collect::<BTreeMap<_, _>>();
661
662    let incoming_nodes = run
663        .transitions
664        .iter()
665        .map(|transition| transition.to_node_id.clone())
666        .collect::<BTreeSet<_>>();
667
668    for stage in &run.stages {
669        let graph_node_id = stage_node_ids
670            .get(&stage.id)
671            .cloned()
672            .unwrap_or_else(|| format!("stage:{}", stage.id));
673        action_graph_nodes.push(RunActionGraphNodeRecord {
674            id: graph_node_id.clone(),
675            label: stage.node_id.clone(),
676            kind: "stage".to_string(),
677            status: stage.status.clone(),
678            outcome: stage.outcome.clone(),
679            stage_id: Some(stage.id.clone()),
680            node_id: Some(stage.node_id.clone()),
681            worker_id: stage
682                .metadata
683                .get("worker_id")
684                .and_then(|value| value.as_str())
685                .map(str::to_string),
686            run_id: None,
687            run_path: None,
688        });
689        if !incoming_nodes.contains(&stage.node_id) {
690            action_graph_edges.push(RunActionGraphEdgeRecord {
691                from_id: root_node_id.clone(),
692                to_id: graph_node_id.clone(),
693                kind: "entry".to_string(),
694                label: None,
695            });
696        }
697
698        if stage.kind == "verify" || stage.verification.is_some() {
699            let passed = json_bool(
700                stage
701                    .verification
702                    .as_ref()
703                    .and_then(|value| value.get("pass")),
704            )
705            .or_else(|| {
706                json_bool(
707                    stage
708                        .verification
709                        .as_ref()
710                        .and_then(|value| value.get("success")),
711                )
712            })
713            .or_else(|| {
714                if stage.status == "completed" && stage.outcome == "success" {
715                    Some(true)
716                } else if stage.status == "failed" || stage.outcome == "failed" {
717                    Some(false)
718                } else {
719                    None
720                }
721            });
722            verification_outcomes.push(RunVerificationOutcomeRecord {
723                stage_id: stage.id.clone(),
724                node_id: stage.node_id.clone(),
725                status: stage.status.clone(),
726                passed,
727                summary: stage
728                    .verification
729                    .as_ref()
730                    .map(compact_json_value)
731                    .or_else(|| {
732                        stage
733                            .visible_text
734                            .as_ref()
735                            .filter(|value| !value.trim().is_empty())
736                            .cloned()
737                    }),
738            });
739        }
740
741        if stage.transcript.is_some() {
742            transcript_pointers.push(RunTranscriptPointerRecord {
743                id: format!("stage:{}:transcript", stage.id),
744                label: format!("Stage {} transcript", stage.node_id),
745                kind: "embedded_transcript".to_string(),
746                location: format!("run.stages[{}].transcript", stage.node_id),
747                path: run.persisted_path.clone(),
748                available: true,
749            });
750            if let Some(transcript) = stage.transcript.as_ref() {
751                compaction_events.extend(compaction_events_from_transcript(
752                    transcript,
753                    Some(&stage.id),
754                    Some(&stage.node_id),
755                    &format!("run.stages[{}].transcript", stage.node_id),
756                    persisted_path,
757                ));
758            }
759        }
760
761        if let Some(payload) = stage_result_payload(stage) {
762            let trace = payload.get("trace");
763            let task_ledger = payload
764                .get("task_ledger")
765                .and_then(task_ledger_summary_from_value);
766            let research_facts = task_ledger
767                .as_ref()
768                .map(|ledger| ledger.observations.clone())
769                .unwrap_or_default();
770            research_fact_count += research_facts.len();
771            let tools_used = json_string_array(
772                payload
773                    .get("tools_used")
774                    .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
775            );
776            let successful_tools = json_string_array(payload.get("successful_tools"));
777            let planner_round = RunPlannerRoundRecord {
778                stage_id: stage.id.clone(),
779                node_id: stage.node_id.clone(),
780                stage_kind: stage.kind.clone(),
781                status: stage.status.clone(),
782                outcome: stage.outcome.clone(),
783                iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
784                llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
785                tool_execution_count: json_usize(
786                    trace.and_then(|trace| trace.get("tool_executions")),
787                ),
788                tool_rejection_count: json_usize(
789                    trace.and_then(|trace| trace.get("tool_rejections")),
790                ),
791                intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
792                compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
793                tools_used,
794                successful_tools,
795                ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
796                task_ledger,
797                research_facts,
798            };
799            let has_agentic_detail = planner_round.iteration_count > 0
800                || planner_round.llm_call_count > 0
801                || planner_round.tool_execution_count > 0
802                || planner_round.ledger_done_rejections > 0
803                || planner_round.task_ledger.is_some()
804                || !planner_round.tools_used.is_empty()
805                || !planner_round.successful_tools.is_empty();
806            if has_agentic_detail {
807                planner_rounds.push(planner_round);
808            }
809        }
810    }
811
812    for transition in &run.transitions {
813        let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
814            continue;
815        };
816        let from_id = transition
817            .from_stage_id
818            .as_ref()
819            .and_then(|stage_id| stage_node_ids.get(stage_id))
820            .cloned()
821            .or_else(|| {
822                transition
823                    .from_node_id
824                    .as_ref()
825                    .and_then(|node_id| stage_by_node_id.get(node_id))
826                    .cloned()
827            })
828            .unwrap_or_else(|| root_node_id.clone());
829        action_graph_edges.push(RunActionGraphEdgeRecord {
830            from_id,
831            to_id,
832            kind: "transition".to_string(),
833            label: transition.branch.clone(),
834        });
835    }
836
837    let worker_lineage = run
838        .child_runs
839        .iter()
840        .map(|child| {
841            let worker_node_id = format!("worker:{}", child.worker_id);
842            action_graph_nodes.push(RunActionGraphNodeRecord {
843                id: worker_node_id.clone(),
844                label: child.worker_name.clone(),
845                kind: "worker".to_string(),
846                status: child.status.clone(),
847                outcome: child.status.clone(),
848                stage_id: child.parent_stage_id.clone(),
849                node_id: None,
850                worker_id: Some(child.worker_id.clone()),
851                run_id: child.run_id.clone(),
852                run_path: child.run_path.clone(),
853            });
854            if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
855                if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
856                    action_graph_edges.push(RunActionGraphEdgeRecord {
857                        from_id: stage_node_id.clone(),
858                        to_id: worker_node_id,
859                        kind: "delegates".to_string(),
860                        label: Some(child.worker_name.clone()),
861                    });
862                }
863            }
864            RunWorkerLineageRecord {
865                worker_id: child.worker_id.clone(),
866                worker_name: child.worker_name.clone(),
867                parent_stage_id: child.parent_stage_id.clone(),
868                task: child.task.clone(),
869                status: child.status.clone(),
870                session_id: child.session_id.clone(),
871                parent_session_id: child.parent_session_id.clone(),
872                run_id: child.run_id.clone(),
873                run_path: child.run_path.clone(),
874                snapshot_path: child.snapshot_path.clone(),
875            }
876        })
877        .collect::<Vec<_>>();
878
879    if run.transcript.is_some() {
880        transcript_pointers.push(RunTranscriptPointerRecord {
881            id: "run:transcript".to_string(),
882            label: "Run transcript".to_string(),
883            kind: "embedded_transcript".to_string(),
884            location: "run.transcript".to_string(),
885            path: run.persisted_path.clone(),
886            available: true,
887        });
888        if let Some(transcript) = run.transcript.as_ref() {
889            compaction_events.extend(compaction_events_from_transcript(
890                transcript,
891                None,
892                None,
893                "run.transcript",
894                persisted_path,
895            ));
896        }
897    }
898
899    if let Some(path) = persisted_path {
900        let stem = path
901            .file_stem()
902            .and_then(|value| value.to_str())
903            .unwrap_or_default();
904        if !stem.is_empty() {
905            let sidecar_path = path
906                .parent()
907                .unwrap_or_else(|| Path::new("."))
908                .join(format!("{stem}-llm/llm_transcript.jsonl"));
909            transcript_pointers.push(RunTranscriptPointerRecord {
910                id: "run:llm_transcript".to_string(),
911                label: "LLM transcript sidecar".to_string(),
912                kind: "llm_jsonl".to_string(),
913                location: "run sidecar".to_string(),
914                path: Some(sidecar_path.to_string_lossy().into_owned()),
915                available: sidecar_path.exists(),
916            });
917        }
918    }
919
920    RunObservabilityRecord {
921        schema_version: 2,
922        planner_rounds,
923        research_fact_count,
924        action_graph_nodes,
925        action_graph_edges,
926        worker_lineage,
927        verification_outcomes,
928        transcript_pointers,
929        compaction_events,
930    }
931}
932
933fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
934    run.observability = Some(derive_run_observability(run, persisted_path));
935}
936
937pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
938    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
939    if run.type_name.is_empty() {
940        run.type_name = "run_record".to_string();
941    }
942    if run.id.is_empty() {
943        run.id = new_id("run");
944    }
945    if run.started_at.is_empty() {
946        run.started_at = now_rfc3339();
947    }
948    if run.status.is_empty() {
949        run.status = "running".to_string();
950    }
951    if run.root_run_id.is_none() {
952        run.root_run_id = Some(run.id.clone());
953    }
954    if run.replay_fixture.is_none() {
955        run.replay_fixture = Some(replay_fixture_from_run(&run));
956    }
957    if run.observability.is_none() {
958        let persisted_path = run.persisted_path.clone();
959        let persisted = persisted_path.as_deref().map(Path::new);
960        refresh_run_observability(&mut run, persisted);
961    }
962    Ok(run)
963}
964
965pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
966    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
967    if manifest.type_name.is_empty() {
968        manifest.type_name = "eval_suite_manifest".to_string();
969    }
970    if manifest.id.is_empty() {
971        manifest.id = new_id("eval_suite");
972    }
973    Ok(manifest)
974}
975
976fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
977    let content = std::fs::read_to_string(path)
978        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
979    serde_json::from_str(&content)
980        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
981}
982
983fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
984    let path_buf = PathBuf::from(path);
985    if path_buf.is_absolute() {
986        path_buf
987    } else if let Some(base_dir) = base_dir {
988        base_dir.join(path_buf)
989    } else {
990        path_buf
991    }
992}
993
994pub fn evaluate_run_suite_manifest(
995    manifest: &EvalSuiteManifest,
996) -> Result<ReplayEvalSuiteReport, VmError> {
997    let base_dir = manifest.base_dir.as_deref().map(Path::new);
998    let mut reports = Vec::new();
999    for case in &manifest.cases {
1000        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1001        let run = load_run_record(&run_path)?;
1002        let fixture = match &case.fixture_path {
1003            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1004            None => run
1005                .replay_fixture
1006                .clone()
1007                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1008        };
1009        let eval = evaluate_run_against_fixture(&run, &fixture);
1010        let mut pass = eval.pass;
1011        let mut failures = eval.failures;
1012        let comparison = match &case.compare_to {
1013            Some(path) => {
1014                let baseline_path = resolve_manifest_path(base_dir, path);
1015                let baseline = load_run_record(&baseline_path)?;
1016                let diff = diff_run_records(&baseline, &run);
1017                if !diff.identical {
1018                    pass = false;
1019                    failures.push(format!(
1020                        "run differs from baseline {} with {} stage changes",
1021                        baseline_path.display(),
1022                        diff.stage_diffs.len()
1023                    ));
1024                }
1025                Some(diff)
1026            }
1027            None => None,
1028        };
1029        reports.push(ReplayEvalCaseReport {
1030            run_id: run.id.clone(),
1031            workflow_id: run.workflow_id.clone(),
1032            label: case.label.clone(),
1033            pass,
1034            failures,
1035            stage_count: eval.stage_count,
1036            source_path: Some(run_path.display().to_string()),
1037            comparison,
1038        });
1039    }
1040    let total = reports.len();
1041    let passed = reports.iter().filter(|report| report.pass).count();
1042    let failed = total.saturating_sub(passed);
1043    Ok(ReplayEvalSuiteReport {
1044        pass: failed == 0,
1045        total,
1046        passed,
1047        failed,
1048        cases: reports,
1049    })
1050}
1051
1052/// Edit operation in a diff sequence.
1053#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1054pub(crate) enum DiffOp {
1055    Equal,
1056    Delete,
1057    Insert,
1058}
1059
1060/// Compute the shortest edit script using Myers' O(nd) algorithm.
1061/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
1062/// Time: O(nd) where d = edit distance. Space: O(d * n).
1063pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1064    let n = a.len() as isize;
1065    let m = b.len() as isize;
1066    if n == 0 && m == 0 {
1067        return Vec::new();
1068    }
1069    if n == 0 {
1070        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1071    }
1072    if m == 0 {
1073        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1074    }
1075
1076    let max_d = (n + m) as usize;
1077    let offset = max_d as isize;
1078    let v_size = 2 * max_d + 1;
1079    let mut v = vec![0isize; v_size];
1080    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
1081    let mut trace: Vec<Vec<isize>> = Vec::new();
1082
1083    'outer: for d in 0..=max_d as isize {
1084        trace.push(v.clone());
1085        let mut new_v = v.clone();
1086        for k in (-d..=d).step_by(2) {
1087            let ki = (k + offset) as usize;
1088            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1089                v[ki + 1]
1090            } else {
1091                v[ki - 1] + 1
1092            };
1093            let mut y = x - k;
1094            while x < n && y < m && a[x as usize] == b[y as usize] {
1095                x += 1;
1096                y += 1;
1097            }
1098            new_v[ki] = x;
1099            if x >= n && y >= m {
1100                let _ = new_v;
1101                break 'outer;
1102            }
1103        }
1104        v = new_v;
1105    }
1106
1107    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1108    let mut x = n;
1109    let mut y = m;
1110    for d in (1..trace.len() as isize).rev() {
1111        let k = x - y;
1112        let v_prev = &trace[d as usize];
1113        let prev_k = if k == -d
1114            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1115        {
1116            k + 1
1117        } else {
1118            k - 1
1119        };
1120        let prev_x = v_prev[(prev_k + offset) as usize];
1121        let prev_y = prev_x - prev_k;
1122
1123        while x > prev_x && y > prev_y {
1124            x -= 1;
1125            y -= 1;
1126            ops.push((DiffOp::Equal, x as usize));
1127        }
1128        if prev_k < k {
1129            x -= 1;
1130            ops.push((DiffOp::Delete, x as usize));
1131        } else {
1132            y -= 1;
1133            ops.push((DiffOp::Insert, y as usize));
1134        }
1135    }
1136    while x > 0 && y > 0 {
1137        x -= 1;
1138        y -= 1;
1139        ops.push((DiffOp::Equal, x as usize));
1140    }
1141    ops.reverse();
1142    ops
1143}
1144
1145pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1146    let before_lines: Vec<&str> = before.lines().collect();
1147    let after_lines: Vec<&str> = after.lines().collect();
1148    let ops = myers_diff(&before_lines, &after_lines);
1149
1150    let mut diff = String::new();
1151    let file = path.unwrap_or("artifact");
1152    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1153    for &(op, idx) in &ops {
1154        match op {
1155            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1156            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1157            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1158        }
1159    }
1160    diff
1161}
1162
1163pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1164    let path = path
1165        .map(PathBuf::from)
1166        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1167    let mut materialized = run.clone();
1168    if materialized.replay_fixture.is_none() {
1169        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1170    }
1171    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1172    refresh_run_observability(&mut materialized, Some(&path));
1173    if let Some(parent) = path.parent() {
1174        std::fs::create_dir_all(parent)
1175            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1176    }
1177    let json = serde_json::to_string_pretty(&materialized)
1178        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1179    // Atomic write: .tmp then rename guards against partial writes on kill.
1180    let tmp_path = path.with_extension("json.tmp");
1181    std::fs::write(&tmp_path, &json)
1182        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1183    std::fs::rename(&tmp_path, &path).map_err(|e| {
1184        // Cross-device renames fail on some filesystems; best-effort direct write.
1185        let _ = std::fs::write(&path, &json);
1186        VmError::Runtime(format!("failed to finalize run record: {e}"))
1187    })?;
1188    Ok(path.to_string_lossy().into_owned())
1189}
1190
1191pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1192    let content = std::fs::read_to_string(path)
1193        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1194    let mut run: RunRecord = serde_json::from_str(&content)
1195        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1196    if run.replay_fixture.is_none() {
1197        run.replay_fixture = Some(replay_fixture_from_run(&run));
1198    }
1199    run.persisted_path
1200        .get_or_insert_with(|| path.to_string_lossy().into_owned());
1201    refresh_run_observability(&mut run, Some(path));
1202    Ok(run)
1203}
1204
1205pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1206    ReplayFixture {
1207        type_name: "replay_fixture".to_string(),
1208        id: new_id("fixture"),
1209        source_run_id: run.id.clone(),
1210        workflow_id: run.workflow_id.clone(),
1211        workflow_name: run.workflow_name.clone(),
1212        created_at: now_rfc3339(),
1213        expected_status: run.status.clone(),
1214        stage_assertions: run
1215            .stages
1216            .iter()
1217            .map(|stage| ReplayStageAssertion {
1218                node_id: stage.node_id.clone(),
1219                expected_status: stage.status.clone(),
1220                expected_outcome: stage.outcome.clone(),
1221                expected_branch: stage.branch.clone(),
1222                required_artifact_kinds: stage
1223                    .artifacts
1224                    .iter()
1225                    .map(|artifact| artifact.kind.clone())
1226                    .collect(),
1227                visible_text_contains: stage
1228                    .visible_text
1229                    .as_ref()
1230                    .filter(|text| !text.is_empty())
1231                    .map(|text| text.chars().take(80).collect()),
1232            })
1233            .collect(),
1234    }
1235}
1236
1237pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1238    let mut failures = Vec::new();
1239    if run.status != fixture.expected_status {
1240        failures.push(format!(
1241            "run status mismatch: expected {}, got {}",
1242            fixture.expected_status, run.status
1243        ));
1244    }
1245    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1246        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1247    for assertion in &fixture.stage_assertions {
1248        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1249            failures.push(format!("missing stage {}", assertion.node_id));
1250            continue;
1251        };
1252        if stage.status != assertion.expected_status {
1253            failures.push(format!(
1254                "stage {} status mismatch: expected {}, got {}",
1255                assertion.node_id, assertion.expected_status, stage.status
1256            ));
1257        }
1258        if stage.outcome != assertion.expected_outcome {
1259            failures.push(format!(
1260                "stage {} outcome mismatch: expected {}, got {}",
1261                assertion.node_id, assertion.expected_outcome, stage.outcome
1262            ));
1263        }
1264        if stage.branch != assertion.expected_branch {
1265            failures.push(format!(
1266                "stage {} branch mismatch: expected {:?}, got {:?}",
1267                assertion.node_id, assertion.expected_branch, stage.branch
1268            ));
1269        }
1270        for required_kind in &assertion.required_artifact_kinds {
1271            if !stage
1272                .artifacts
1273                .iter()
1274                .any(|artifact| &artifact.kind == required_kind)
1275            {
1276                failures.push(format!(
1277                    "stage {} missing artifact kind {}",
1278                    assertion.node_id, required_kind
1279                ));
1280            }
1281        }
1282        if let Some(snippet) = &assertion.visible_text_contains {
1283            let actual = stage.visible_text.clone().unwrap_or_default();
1284            if !actual.contains(snippet) {
1285                failures.push(format!(
1286                    "stage {} visible text does not contain expected snippet {:?}",
1287                    assertion.node_id, snippet
1288                ));
1289            }
1290        }
1291    }
1292
1293    ReplayEvalReport {
1294        pass: failures.is_empty(),
1295        failures,
1296        stage_count: run.stages.len(),
1297    }
1298}
1299
1300pub fn evaluate_run_suite(
1301    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1302) -> ReplayEvalSuiteReport {
1303    let mut reports = Vec::new();
1304    for (run, fixture, source_path) in cases {
1305        let report = evaluate_run_against_fixture(&run, &fixture);
1306        reports.push(ReplayEvalCaseReport {
1307            run_id: run.id.clone(),
1308            workflow_id: run.workflow_id.clone(),
1309            label: None,
1310            pass: report.pass,
1311            failures: report.failures,
1312            stage_count: report.stage_count,
1313            source_path,
1314            comparison: None,
1315        });
1316    }
1317    let total = reports.len();
1318    let passed = reports.iter().filter(|report| report.pass).count();
1319    let failed = total.saturating_sub(passed);
1320    ReplayEvalSuiteReport {
1321        pass: failed == 0,
1322        total,
1323        passed,
1324        failed,
1325        cases: reports,
1326    }
1327}
1328
1329pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1330    let mut stage_diffs = Vec::new();
1331    let mut all_node_ids = BTreeSet::new();
1332    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1333        .stages
1334        .iter()
1335        .map(|s| (s.node_id.as_str(), s))
1336        .collect();
1337    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1338        .stages
1339        .iter()
1340        .map(|s| (s.node_id.as_str(), s))
1341        .collect();
1342    all_node_ids.extend(left_by_id.keys().copied());
1343    all_node_ids.extend(right_by_id.keys().copied());
1344
1345    for node_id in all_node_ids {
1346        let left_stage = left_by_id.get(node_id).copied();
1347        let right_stage = right_by_id.get(node_id).copied();
1348        match (left_stage, right_stage) {
1349            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1350                node_id: node_id.to_string(),
1351                change: "removed".to_string(),
1352                details: vec!["stage missing from right run".to_string()],
1353            }),
1354            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1355                node_id: node_id.to_string(),
1356                change: "added".to_string(),
1357                details: vec!["stage missing from left run".to_string()],
1358            }),
1359            (Some(left_stage), Some(right_stage)) => {
1360                let mut details = Vec::new();
1361                if left_stage.status != right_stage.status {
1362                    details.push(format!(
1363                        "status: {} -> {}",
1364                        left_stage.status, right_stage.status
1365                    ));
1366                }
1367                if left_stage.outcome != right_stage.outcome {
1368                    details.push(format!(
1369                        "outcome: {} -> {}",
1370                        left_stage.outcome, right_stage.outcome
1371                    ));
1372                }
1373                if left_stage.branch != right_stage.branch {
1374                    details.push(format!(
1375                        "branch: {:?} -> {:?}",
1376                        left_stage.branch, right_stage.branch
1377                    ));
1378                }
1379                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1380                {
1381                    details.push(format!(
1382                        "produced_artifacts: {} -> {}",
1383                        left_stage.produced_artifact_ids.len(),
1384                        right_stage.produced_artifact_ids.len()
1385                    ));
1386                }
1387                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1388                    details.push(format!(
1389                        "artifact_records: {} -> {}",
1390                        left_stage.artifacts.len(),
1391                        right_stage.artifacts.len()
1392                    ));
1393                }
1394                if !details.is_empty() {
1395                    stage_diffs.push(RunStageDiffRecord {
1396                        node_id: node_id.to_string(),
1397                        change: "changed".to_string(),
1398                        details,
1399                    });
1400                }
1401            }
1402            (None, None) => {}
1403        }
1404    }
1405
1406    let mut tool_diffs = Vec::new();
1407    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1408        .tool_recordings
1409        .iter()
1410        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1411        .collect();
1412    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1413        .tool_recordings
1414        .iter()
1415        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1416        .collect();
1417    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1418        .keys()
1419        .chain(right_tools.keys())
1420        .cloned()
1421        .collect();
1422    for key in &all_tool_keys {
1423        let l = left_tools.get(key);
1424        let r = right_tools.get(key);
1425        let result_changed = match (l, r) {
1426            (Some(a), Some(b)) => a.result != b.result,
1427            _ => true,
1428        };
1429        if result_changed {
1430            tool_diffs.push(ToolCallDiffRecord {
1431                tool_name: key.0.clone(),
1432                args_hash: key.1.clone(),
1433                result_changed,
1434                left_result: l.map(|t| t.result.clone()),
1435                right_result: r.map(|t| t.result.clone()),
1436            });
1437        }
1438    }
1439
1440    let left_observability = left.observability.clone().unwrap_or_else(|| {
1441        derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1442    });
1443    let right_observability = right.observability.clone().unwrap_or_else(|| {
1444        derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1445    });
1446    let mut observability_diffs = Vec::new();
1447
1448    let left_workers = left_observability
1449        .worker_lineage
1450        .iter()
1451        .map(|worker| {
1452            (
1453                worker.worker_id.clone(),
1454                (
1455                    worker.status.clone(),
1456                    worker.run_id.clone(),
1457                    worker.run_path.clone(),
1458                ),
1459            )
1460        })
1461        .collect::<BTreeMap<_, _>>();
1462    let right_workers = right_observability
1463        .worker_lineage
1464        .iter()
1465        .map(|worker| {
1466            (
1467                worker.worker_id.clone(),
1468                (
1469                    worker.status.clone(),
1470                    worker.run_id.clone(),
1471                    worker.run_path.clone(),
1472                ),
1473            )
1474        })
1475        .collect::<BTreeMap<_, _>>();
1476    let worker_ids = left_workers
1477        .keys()
1478        .chain(right_workers.keys())
1479        .cloned()
1480        .collect::<BTreeSet<_>>();
1481    for worker_id in worker_ids {
1482        match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1483            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1484                section: "worker_lineage".to_string(),
1485                label: worker_id,
1486                details: vec!["worker missing from right run".to_string()],
1487            }),
1488            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1489                section: "worker_lineage".to_string(),
1490                label: worker_id,
1491                details: vec!["worker missing from left run".to_string()],
1492            }),
1493            (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1494                let mut details = Vec::new();
1495                if left_worker.0 != right_worker.0 {
1496                    details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1497                }
1498                if left_worker.1 != right_worker.1 {
1499                    details.push(format!(
1500                        "run_id: {:?} -> {:?}",
1501                        left_worker.1, right_worker.1
1502                    ));
1503                }
1504                if left_worker.2 != right_worker.2 {
1505                    details.push(format!(
1506                        "run_path: {:?} -> {:?}",
1507                        left_worker.2, right_worker.2
1508                    ));
1509                }
1510                observability_diffs.push(RunObservabilityDiffRecord {
1511                    section: "worker_lineage".to_string(),
1512                    label: worker_id,
1513                    details,
1514                });
1515            }
1516            _ => {}
1517        }
1518    }
1519
1520    let left_rounds = left_observability
1521        .planner_rounds
1522        .iter()
1523        .map(|round| (round.stage_id.clone(), round))
1524        .collect::<BTreeMap<_, _>>();
1525    let right_rounds = right_observability
1526        .planner_rounds
1527        .iter()
1528        .map(|round| (round.stage_id.clone(), round))
1529        .collect::<BTreeMap<_, _>>();
1530    let round_ids = left_rounds
1531        .keys()
1532        .chain(right_rounds.keys())
1533        .cloned()
1534        .collect::<BTreeSet<_>>();
1535    for stage_id in round_ids {
1536        match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1537            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1538                section: "planner_rounds".to_string(),
1539                label: stage_id,
1540                details: vec!["planner summary missing from right run".to_string()],
1541            }),
1542            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1543                section: "planner_rounds".to_string(),
1544                label: stage_id,
1545                details: vec!["planner summary missing from left run".to_string()],
1546            }),
1547            (Some(left_round), Some(right_round)) => {
1548                let mut details = Vec::new();
1549                if left_round.iteration_count != right_round.iteration_count {
1550                    details.push(format!(
1551                        "iterations: {} -> {}",
1552                        left_round.iteration_count, right_round.iteration_count
1553                    ));
1554                }
1555                if left_round.tool_execution_count != right_round.tool_execution_count {
1556                    details.push(format!(
1557                        "tool_executions: {} -> {}",
1558                        left_round.tool_execution_count, right_round.tool_execution_count
1559                    ));
1560                }
1561                if left_round.research_facts != right_round.research_facts {
1562                    details.push(format!(
1563                        "research_facts: {:?} -> {:?}",
1564                        left_round.research_facts, right_round.research_facts
1565                    ));
1566                }
1567                let left_deliverables = left_round
1568                    .task_ledger
1569                    .as_ref()
1570                    .map(|ledger| {
1571                        ledger
1572                            .deliverables
1573                            .iter()
1574                            .map(|item| format!("{}:{}", item.id, item.status))
1575                            .collect::<Vec<_>>()
1576                    })
1577                    .unwrap_or_default();
1578                let right_deliverables = right_round
1579                    .task_ledger
1580                    .as_ref()
1581                    .map(|ledger| {
1582                        ledger
1583                            .deliverables
1584                            .iter()
1585                            .map(|item| format!("{}:{}", item.id, item.status))
1586                            .collect::<Vec<_>>()
1587                    })
1588                    .unwrap_or_default();
1589                if left_deliverables != right_deliverables {
1590                    details.push(format!(
1591                        "deliverables: {:?} -> {:?}",
1592                        left_deliverables, right_deliverables
1593                    ));
1594                }
1595                if left_round.successful_tools != right_round.successful_tools {
1596                    details.push(format!(
1597                        "successful_tools: {:?} -> {:?}",
1598                        left_round.successful_tools, right_round.successful_tools
1599                    ));
1600                }
1601                if !details.is_empty() {
1602                    observability_diffs.push(RunObservabilityDiffRecord {
1603                        section: "planner_rounds".to_string(),
1604                        label: left_round.node_id.clone(),
1605                        details,
1606                    });
1607                }
1608            }
1609            _ => {}
1610        }
1611    }
1612
1613    let left_pointers = left_observability
1614        .transcript_pointers
1615        .iter()
1616        .map(|pointer| {
1617            (
1618                pointer.id.clone(),
1619                (
1620                    pointer.available,
1621                    pointer.path.clone(),
1622                    pointer.location.clone(),
1623                ),
1624            )
1625        })
1626        .collect::<BTreeMap<_, _>>();
1627    let right_pointers = right_observability
1628        .transcript_pointers
1629        .iter()
1630        .map(|pointer| {
1631            (
1632                pointer.id.clone(),
1633                (
1634                    pointer.available,
1635                    pointer.path.clone(),
1636                    pointer.location.clone(),
1637                ),
1638            )
1639        })
1640        .collect::<BTreeMap<_, _>>();
1641    let pointer_ids = left_pointers
1642        .keys()
1643        .chain(right_pointers.keys())
1644        .cloned()
1645        .collect::<BTreeSet<_>>();
1646    for pointer_id in pointer_ids {
1647        match (
1648            left_pointers.get(&pointer_id),
1649            right_pointers.get(&pointer_id),
1650        ) {
1651            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1652                section: "transcript_pointers".to_string(),
1653                label: pointer_id,
1654                details: vec!["pointer missing from right run".to_string()],
1655            }),
1656            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1657                section: "transcript_pointers".to_string(),
1658                label: pointer_id,
1659                details: vec!["pointer missing from left run".to_string()],
1660            }),
1661            (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
1662                observability_diffs.push(RunObservabilityDiffRecord {
1663                    section: "transcript_pointers".to_string(),
1664                    label: pointer_id,
1665                    details: vec![format!(
1666                        "pointer: {:?} -> {:?}",
1667                        left_pointer, right_pointer
1668                    )],
1669                });
1670            }
1671            _ => {}
1672        }
1673    }
1674
1675    let left_compactions = left_observability
1676        .compaction_events
1677        .iter()
1678        .map(|event| {
1679            (
1680                event.id.clone(),
1681                (
1682                    event.strategy.clone(),
1683                    event.archived_messages,
1684                    event.snapshot_asset_id.clone(),
1685                    event.available,
1686                ),
1687            )
1688        })
1689        .collect::<BTreeMap<_, _>>();
1690    let right_compactions = right_observability
1691        .compaction_events
1692        .iter()
1693        .map(|event| {
1694            (
1695                event.id.clone(),
1696                (
1697                    event.strategy.clone(),
1698                    event.archived_messages,
1699                    event.snapshot_asset_id.clone(),
1700                    event.available,
1701                ),
1702            )
1703        })
1704        .collect::<BTreeMap<_, _>>();
1705    let compaction_ids = left_compactions
1706        .keys()
1707        .chain(right_compactions.keys())
1708        .cloned()
1709        .collect::<BTreeSet<_>>();
1710    for compaction_id in compaction_ids {
1711        match (
1712            left_compactions.get(&compaction_id),
1713            right_compactions.get(&compaction_id),
1714        ) {
1715            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1716                section: "compaction_events".to_string(),
1717                label: compaction_id,
1718                details: vec!["compaction event missing from right run".to_string()],
1719            }),
1720            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1721                section: "compaction_events".to_string(),
1722                label: compaction_id,
1723                details: vec!["compaction event missing from left run".to_string()],
1724            }),
1725            (Some(left_event), Some(right_event)) if left_event != right_event => {
1726                observability_diffs.push(RunObservabilityDiffRecord {
1727                    section: "compaction_events".to_string(),
1728                    label: compaction_id,
1729                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
1730                });
1731            }
1732            _ => {}
1733        }
1734    }
1735
1736    let left_verification = left_observability
1737        .verification_outcomes
1738        .iter()
1739        .map(|item| (item.stage_id.clone(), item))
1740        .collect::<BTreeMap<_, _>>();
1741    let right_verification = right_observability
1742        .verification_outcomes
1743        .iter()
1744        .map(|item| (item.stage_id.clone(), item))
1745        .collect::<BTreeMap<_, _>>();
1746    let verification_ids = left_verification
1747        .keys()
1748        .chain(right_verification.keys())
1749        .cloned()
1750        .collect::<BTreeSet<_>>();
1751    for stage_id in verification_ids {
1752        match (
1753            left_verification.get(&stage_id),
1754            right_verification.get(&stage_id),
1755        ) {
1756            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1757                section: "verification".to_string(),
1758                label: stage_id,
1759                details: vec!["verification missing from right run".to_string()],
1760            }),
1761            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1762                section: "verification".to_string(),
1763                label: stage_id,
1764                details: vec!["verification missing from left run".to_string()],
1765            }),
1766            (Some(left_item), Some(right_item)) if left_item != right_item => {
1767                let mut details = Vec::new();
1768                if left_item.passed != right_item.passed {
1769                    details.push(format!(
1770                        "passed: {:?} -> {:?}",
1771                        left_item.passed, right_item.passed
1772                    ));
1773                }
1774                if left_item.summary != right_item.summary {
1775                    details.push(format!(
1776                        "summary: {:?} -> {:?}",
1777                        left_item.summary, right_item.summary
1778                    ));
1779                }
1780                observability_diffs.push(RunObservabilityDiffRecord {
1781                    section: "verification".to_string(),
1782                    label: left_item.node_id.clone(),
1783                    details,
1784                });
1785            }
1786            _ => {}
1787        }
1788    }
1789
1790    let left_graph = (
1791        left_observability.action_graph_nodes.len(),
1792        left_observability.action_graph_edges.len(),
1793    );
1794    let right_graph = (
1795        right_observability.action_graph_nodes.len(),
1796        right_observability.action_graph_edges.len(),
1797    );
1798    if left_graph != right_graph {
1799        observability_diffs.push(RunObservabilityDiffRecord {
1800            section: "action_graph".to_string(),
1801            label: "shape".to_string(),
1802            details: vec![format!(
1803                "nodes/edges: {}/{} -> {}/{}",
1804                left_graph.0, left_graph.1, right_graph.0, right_graph.1
1805            )],
1806        });
1807    }
1808
1809    let status_changed = left.status != right.status;
1810    let identical = !status_changed
1811        && stage_diffs.is_empty()
1812        && tool_diffs.is_empty()
1813        && observability_diffs.is_empty()
1814        && left.transitions.len() == right.transitions.len()
1815        && left.artifacts.len() == right.artifacts.len()
1816        && left.checkpoints.len() == right.checkpoints.len();
1817
1818    RunDiffReport {
1819        left_run_id: left.id.clone(),
1820        right_run_id: right.id.clone(),
1821        identical,
1822        status_changed,
1823        left_status: left.status.clone(),
1824        right_status: right.status.clone(),
1825        stage_diffs,
1826        tool_diffs,
1827        observability_diffs,
1828        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
1829        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
1830        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
1831    }
1832}