Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, sync_run_handoffs,
10    ArtifactRecord, CapabilityPolicy, HandoffArtifact,
11};
12use crate::event_log::{
13    active_event_log, AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic,
14};
15use crate::llm::vm_value_to_json;
16use crate::triggers::{SignatureStatus, TriggerEvent};
17use crate::value::{VmError, VmValue};
18
19pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
21pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
22pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
23pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
24pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
25pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
26pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
27pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
28pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
29pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
30
31pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
32pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
33pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
34pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
35pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
36pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
37pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
38pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
39pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct LlmUsageRecord {
44    pub input_tokens: i64,
45    pub output_tokens: i64,
46    pub total_duration_ms: i64,
47    pub call_count: i64,
48    pub total_cost: f64,
49    pub models: Vec<String>,
50}
51
52#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
53#[serde(default)]
54pub struct RunStageRecord {
55    pub id: String,
56    pub node_id: String,
57    pub kind: String,
58    pub status: String,
59    pub outcome: String,
60    pub branch: Option<String>,
61    pub started_at: String,
62    pub finished_at: Option<String>,
63    pub visible_text: Option<String>,
64    pub private_reasoning: Option<String>,
65    pub transcript: Option<serde_json::Value>,
66    pub verification: Option<serde_json::Value>,
67    pub usage: Option<LlmUsageRecord>,
68    pub artifacts: Vec<ArtifactRecord>,
69    pub consumed_artifact_ids: Vec<String>,
70    pub produced_artifact_ids: Vec<String>,
71    pub attempts: Vec<RunStageAttemptRecord>,
72    pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
76#[serde(default)]
77pub struct RunStageAttemptRecord {
78    pub attempt: usize,
79    pub status: String,
80    pub outcome: String,
81    pub branch: Option<String>,
82    pub error: Option<String>,
83    pub verification: Option<serde_json::Value>,
84    pub started_at: String,
85    pub finished_at: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct RunTransitionRecord {
91    pub id: String,
92    pub from_stage_id: Option<String>,
93    pub from_node_id: Option<String>,
94    pub to_node_id: String,
95    pub branch: Option<String>,
96    pub timestamp: String,
97    pub consumed_artifact_ids: Vec<String>,
98    pub produced_artifact_ids: Vec<String>,
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct RunCheckpointRecord {
104    pub id: String,
105    pub ready_nodes: Vec<String>,
106    pub completed_nodes: Vec<String>,
107    pub last_stage_id: Option<String>,
108    pub persisted_at: String,
109    pub reason: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ReplayFixture {
115    #[serde(rename = "_type")]
116    pub type_name: String,
117    pub id: String,
118    pub source_run_id: String,
119    pub workflow_id: String,
120    pub workflow_name: Option<String>,
121    pub created_at: String,
122    pub eval_kind: Option<String>,
123    pub clarifying_question: Option<ClarifyingQuestionEvalSpec>,
124    pub expected_status: String,
125    pub stage_assertions: Vec<ReplayStageAssertion>,
126}
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(default)]
130pub struct ClarifyingQuestionEvalSpec {
131    pub expected_question: Option<String>,
132    pub accepted_questions: Vec<String>,
133    pub required_terms: Vec<String>,
134    pub forbidden_terms: Vec<String>,
135    pub min_questions: usize,
136    pub max_questions: Option<usize>,
137}
138
139#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
140#[serde(default)]
141pub struct ReplayStageAssertion {
142    pub node_id: String,
143    pub expected_status: String,
144    pub expected_outcome: String,
145    pub expected_branch: Option<String>,
146    pub required_artifact_kinds: Vec<String>,
147    pub visible_text_contains: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ReplayEvalReport {
153    pub pass: bool,
154    pub failures: Vec<String>,
155    pub stage_count: usize,
156}
157
158#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct ReplayEvalCaseReport {
161    pub run_id: String,
162    pub workflow_id: String,
163    pub label: Option<String>,
164    pub pass: bool,
165    pub failures: Vec<String>,
166    pub stage_count: usize,
167    pub source_path: Option<String>,
168    pub comparison: Option<RunDiffReport>,
169}
170
171#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
172#[serde(default)]
173pub struct ReplayEvalSuiteReport {
174    pub pass: bool,
175    pub total: usize,
176    pub passed: usize,
177    pub failed: usize,
178    pub cases: Vec<ReplayEvalCaseReport>,
179}
180
181#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
182#[serde(default)]
183pub struct RunDeliverableSummaryRecord {
184    pub id: String,
185    pub text: String,
186    pub status: String,
187    pub note: Option<String>,
188}
189
190#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
191#[serde(default)]
192pub struct RunTaskLedgerSummaryRecord {
193    pub root_task: String,
194    pub rationale: String,
195    pub deliverables: Vec<RunDeliverableSummaryRecord>,
196    pub observations: Vec<String>,
197    pub blocking_count: usize,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunPlannerRoundRecord {
203    pub stage_id: String,
204    pub node_id: String,
205    pub stage_kind: String,
206    pub status: String,
207    pub outcome: String,
208    pub iteration_count: usize,
209    pub llm_call_count: usize,
210    pub tool_execution_count: usize,
211    pub tool_rejection_count: usize,
212    pub intervention_count: usize,
213    pub compaction_count: usize,
214    pub native_text_tool_fallback_count: usize,
215    pub native_text_tool_fallback_rejection_count: usize,
216    pub empty_completion_retry_count: usize,
217    pub tools_used: Vec<String>,
218    pub successful_tools: Vec<String>,
219    pub ledger_done_rejections: usize,
220    pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
221    pub research_facts: Vec<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunWorkerLineageRecord {
227    pub worker_id: String,
228    pub worker_name: String,
229    pub parent_stage_id: Option<String>,
230    pub task: String,
231    pub status: String,
232    pub session_id: Option<String>,
233    pub parent_session_id: Option<String>,
234    pub run_id: Option<String>,
235    pub run_path: Option<String>,
236    pub snapshot_path: Option<String>,
237}
238
239#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
240#[serde(default)]
241pub struct RunActionGraphNodeRecord {
242    pub id: String,
243    pub label: String,
244    pub kind: String,
245    pub status: String,
246    pub outcome: String,
247    pub trace_id: Option<String>,
248    pub stage_id: Option<String>,
249    pub node_id: Option<String>,
250    pub worker_id: Option<String>,
251    pub run_id: Option<String>,
252    pub run_path: Option<String>,
253    pub metadata: BTreeMap<String, serde_json::Value>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
257#[serde(default)]
258pub struct RunActionGraphEdgeRecord {
259    pub from_id: String,
260    pub to_id: String,
261    pub kind: String,
262    pub label: Option<String>,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct RunVerificationOutcomeRecord {
268    pub stage_id: String,
269    pub node_id: String,
270    pub status: String,
271    pub passed: Option<bool>,
272    pub summary: Option<String>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct RunTranscriptPointerRecord {
278    pub id: String,
279    pub label: String,
280    pub kind: String,
281    pub location: String,
282    pub path: Option<String>,
283    pub available: bool,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct CompactionEventRecord {
289    pub id: String,
290    pub transcript_id: Option<String>,
291    pub stage_id: Option<String>,
292    pub node_id: Option<String>,
293    pub mode: String,
294    pub strategy: String,
295    pub archived_messages: usize,
296    pub estimated_tokens_before: usize,
297    pub estimated_tokens_after: usize,
298    pub snapshot_asset_id: Option<String>,
299    pub snapshot_location: String,
300    pub snapshot_path: Option<String>,
301    pub available: bool,
302}
303
304#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
305#[serde(rename_all = "snake_case")]
306pub enum DaemonEventKindRecord {
307    #[default]
308    Spawned,
309    Triggered,
310    Snapshotted,
311    Resumed,
312    Stopped,
313}
314
315#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
316#[serde(default)]
317pub struct DaemonEventRecord {
318    pub daemon_id: String,
319    pub name: String,
320    pub kind: DaemonEventKindRecord,
321    pub timestamp: String,
322    pub persist_path: String,
323    pub payload_summary: Option<String>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunObservabilityRecord {
329    pub schema_version: usize,
330    pub planner_rounds: Vec<RunPlannerRoundRecord>,
331    pub research_fact_count: usize,
332    pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
333    pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
334    pub worker_lineage: Vec<RunWorkerLineageRecord>,
335    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
336    pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
337    pub compaction_events: Vec<CompactionEventRecord>,
338    pub daemon_events: Vec<DaemonEventRecord>,
339}
340
341#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(default)]
343pub struct RunStageDiffRecord {
344    pub node_id: String,
345    pub change: String,
346    pub details: Vec<String>,
347}
348
349#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
350#[serde(default)]
351pub struct ToolCallDiffRecord {
352    pub tool_name: String,
353    pub args_hash: String,
354    pub result_changed: bool,
355    pub left_result: Option<String>,
356    pub right_result: Option<String>,
357}
358
359#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
360#[serde(default)]
361pub struct RunObservabilityDiffRecord {
362    pub section: String,
363    pub label: String,
364    pub details: Vec<String>,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct RunDiffReport {
370    pub left_run_id: String,
371    pub right_run_id: String,
372    pub identical: bool,
373    pub status_changed: bool,
374    pub left_status: String,
375    pub right_status: String,
376    pub stage_diffs: Vec<RunStageDiffRecord>,
377    pub tool_diffs: Vec<ToolCallDiffRecord>,
378    pub observability_diffs: Vec<RunObservabilityDiffRecord>,
379    pub transition_count_delta: isize,
380    pub artifact_count_delta: isize,
381    pub checkpoint_count_delta: isize,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct EvalSuiteManifest {
387    #[serde(rename = "_type")]
388    pub type_name: String,
389    pub id: String,
390    pub name: Option<String>,
391    pub base_dir: Option<String>,
392    pub cases: Vec<EvalSuiteCase>,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct EvalSuiteCase {
398    pub label: Option<String>,
399    pub run_path: String,
400    pub fixture_path: Option<String>,
401    pub compare_to: Option<String>,
402}
403
404#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
405#[serde(default)]
406pub struct RunHitlQuestionRecord {
407    pub request_id: String,
408    pub prompt: String,
409    pub agent: String,
410    pub trace_id: Option<String>,
411    pub asked_at: String,
412}
413
414#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
415#[serde(default)]
416pub struct RunRecord {
417    #[serde(rename = "_type")]
418    pub type_name: String,
419    pub id: String,
420    pub workflow_id: String,
421    pub workflow_name: Option<String>,
422    pub task: String,
423    pub status: String,
424    pub started_at: String,
425    pub finished_at: Option<String>,
426    pub parent_run_id: Option<String>,
427    pub root_run_id: Option<String>,
428    pub stages: Vec<RunStageRecord>,
429    pub transitions: Vec<RunTransitionRecord>,
430    pub checkpoints: Vec<RunCheckpointRecord>,
431    pub pending_nodes: Vec<String>,
432    pub completed_nodes: Vec<String>,
433    pub child_runs: Vec<RunChildRecord>,
434    pub artifacts: Vec<ArtifactRecord>,
435    pub handoffs: Vec<HandoffArtifact>,
436    pub policy: CapabilityPolicy,
437    pub execution: Option<RunExecutionRecord>,
438    pub transcript: Option<serde_json::Value>,
439    pub usage: Option<LlmUsageRecord>,
440    pub replay_fixture: Option<ReplayFixture>,
441    pub observability: Option<RunObservabilityRecord>,
442    pub trace_spans: Vec<RunTraceSpanRecord>,
443    pub tool_recordings: Vec<ToolCallRecord>,
444    pub hitl_questions: Vec<RunHitlQuestionRecord>,
445    pub metadata: BTreeMap<String, serde_json::Value>,
446    pub persisted_path: Option<String>,
447}
448
449#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
450#[serde(default)]
451pub struct ToolCallRecord {
452    pub tool_name: String,
453    pub tool_use_id: String,
454    pub args_hash: String,
455    pub result: String,
456    pub is_rejected: bool,
457    pub duration_ms: u64,
458    pub iteration: usize,
459    pub timestamp: String,
460}
461
462/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
463pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
464    use std::hash::{Hash, Hasher};
465    let mut hasher = std::collections::hash_map::DefaultHasher::new();
466    tool_name.hash(&mut hasher);
467    let args_str = serde_json::to_string(args).unwrap_or_default();
468    args_str.hash(&mut hasher);
469    format!("{:016x}", hasher.finish())
470}
471
472#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
473#[serde(default)]
474pub struct RunTraceSpanRecord {
475    pub span_id: u64,
476    pub parent_id: Option<u64>,
477    pub kind: String,
478    pub name: String,
479    pub start_ms: u64,
480    pub duration_ms: u64,
481    pub metadata: BTreeMap<String, serde_json::Value>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
485#[serde(default)]
486pub struct RunChildRecord {
487    pub worker_id: String,
488    pub worker_name: String,
489    pub parent_stage_id: Option<String>,
490    pub session_id: Option<String>,
491    pub parent_session_id: Option<String>,
492    pub mutation_scope: Option<String>,
493    pub approval_policy: Option<super::ToolApprovalPolicy>,
494    pub task: String,
495    pub request: Option<serde_json::Value>,
496    pub provenance: Option<serde_json::Value>,
497    pub status: String,
498    pub started_at: String,
499    pub finished_at: Option<String>,
500    pub run_id: Option<String>,
501    pub run_path: Option<String>,
502    pub snapshot_path: Option<String>,
503    pub execution: Option<RunExecutionRecord>,
504}
505
506#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(default)]
508pub struct RunExecutionRecord {
509    pub cwd: Option<String>,
510    pub source_dir: Option<String>,
511    pub env: BTreeMap<String, String>,
512    pub adapter: Option<String>,
513    pub repo_path: Option<String>,
514    pub worktree_path: Option<String>,
515    pub branch: Option<String>,
516    pub base_ref: Option<String>,
517    pub cleanup: Option<String>,
518}
519
520fn compact_json_value(value: &serde_json::Value) -> String {
521    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
522}
523
524fn normalize_question_text(text: &str) -> String {
525    text.chars()
526        .map(|ch| {
527            if ch.is_ascii_alphanumeric() || ch.is_whitespace() {
528                ch.to_ascii_lowercase()
529            } else {
530                ' '
531            }
532        })
533        .collect::<String>()
534        .split_whitespace()
535        .collect::<Vec<_>>()
536        .join(" ")
537}
538
539fn clarifying_min_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
540    spec.min_questions.max(1)
541}
542
543fn clarifying_max_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
544    spec.max_questions.unwrap_or(1).max(1)
545}
546
547fn read_topic_records(
548    log: &AnyEventLog,
549    topic: &Topic,
550) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
551    let mut from = None;
552    let mut records = Vec::new();
553    loop {
554        let batch =
555            futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
556        if batch.is_empty() {
557            break;
558        }
559        from = batch.last().map(|(event_id, _)| *event_id);
560        records.extend(batch);
561    }
562    records
563}
564
565fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
566    let Some(log) = active_event_log() else {
567        return;
568    };
569    let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
570        .expect("static hitl.questions topic should always be valid");
571    let mut merged = run
572        .hitl_questions
573        .iter()
574        .cloned()
575        .map(|question| (question.request_id.clone(), question))
576        .collect::<BTreeMap<_, _>>();
577
578    for (_, event) in read_topic_records(log.as_ref(), &topic) {
579        if event.kind != "hitl.question_asked" {
580            continue;
581        }
582        let payload = &event.payload;
583        let matches_run = event
584            .headers
585            .get("run_id")
586            .is_some_and(|value| value == &run.id)
587            || payload
588                .get("run_id")
589                .and_then(|value| value.as_str())
590                .is_some_and(|value| value == run.id);
591        if !matches_run {
592            continue;
593        }
594        let request_id = payload
595            .get("request_id")
596            .and_then(|value| value.as_str())
597            .or_else(|| event.headers.get("request_id").map(String::as_str))
598            .unwrap_or_default();
599        let prompt = payload
600            .get("payload")
601            .and_then(|value| value.get("prompt"))
602            .and_then(|value| value.as_str())
603            .unwrap_or_default();
604        if request_id.is_empty() || prompt.is_empty() {
605            continue;
606        }
607        merged.insert(
608            request_id.to_string(),
609            RunHitlQuestionRecord {
610                request_id: request_id.to_string(),
611                prompt: prompt.to_string(),
612                agent: payload
613                    .get("agent")
614                    .and_then(|value| value.as_str())
615                    .unwrap_or_default()
616                    .to_string(),
617                trace_id: payload
618                    .get("trace_id")
619                    .and_then(|value| value.as_str())
620                    .map(str::to_string),
621                asked_at: payload
622                    .get("requested_at")
623                    .and_then(|value| value.as_str())
624                    .unwrap_or_default()
625                    .to_string(),
626            },
627        );
628    }
629
630    run.hitl_questions = merged.into_values().collect();
631    run.hitl_questions.sort_by(|left, right| {
632        (left.asked_at.as_str(), left.request_id.as_str())
633            .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
634    });
635}
636
637fn signature_status_label(status: &SignatureStatus) -> &'static str {
638    match status {
639        SignatureStatus::Verified => "verified",
640        SignatureStatus::Unsigned => "unsigned",
641        SignatureStatus::Failed { .. } => "failed",
642    }
643}
644
645fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
646    run.metadata
647        .get("trigger_event")
648        .cloned()
649        .and_then(|value| serde_json::from_value(value).ok())
650}
651
652fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
653    trigger_event
654        .map(|event| event.trace_id.0.clone())
655        .or_else(|| {
656            run.metadata
657                .get("trace_id")
658                .and_then(|value| value.as_str())
659                .map(str::to_string)
660        })
661}
662
663fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
664    run.metadata
665        .get("replay_of_event_id")
666        .and_then(|value| value.as_str())
667        .map(str::to_string)
668}
669
670fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
671    if stage.kind == "condition" {
672        ACTION_GRAPH_NODE_KIND_PREDICATE
673    } else {
674        ACTION_GRAPH_NODE_KIND_STAGE
675    }
676}
677
678fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
679    let mut metadata = BTreeMap::new();
680    metadata.insert(
681        "provider".to_string(),
682        serde_json::json!(trigger_event.provider.as_str()),
683    );
684    metadata.insert(
685        "event_kind".to_string(),
686        serde_json::json!(trigger_event.kind),
687    );
688    metadata.insert(
689        "dedupe_key".to_string(),
690        serde_json::json!(trigger_event.dedupe_key),
691    );
692    metadata.insert(
693        "signature_status".to_string(),
694        serde_json::json!(signature_status_label(&trigger_event.signature_status)),
695    );
696    metadata
697}
698
699fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
700    let mut metadata = BTreeMap::new();
701    metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
702    if let Some(branch) = stage.branch.as_ref() {
703        metadata.insert("branch".to_string(), serde_json::json!(branch));
704    }
705    if let Some(worker_id) = stage
706        .metadata
707        .get("worker_id")
708        .and_then(|value| value.as_str())
709    {
710        metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
711    }
712    metadata
713}
714
715fn append_action_graph_node(
716    nodes: &mut Vec<RunActionGraphNodeRecord>,
717    record: RunActionGraphNodeRecord,
718) {
719    nodes.push(record);
720}
721
722pub async fn append_action_graph_update(
723    headers: BTreeMap<String, String>,
724    payload: serde_json::Value,
725) -> Result<(), crate::event_log::LogError> {
726    let Some(log) = active_event_log() else {
727        return Ok(());
728    };
729    let topic = Topic::new("observability.action_graph")
730        .expect("static observability.action_graph topic should always be valid");
731    let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
732    log.append(&topic, record).await.map(|_| ())
733}
734
735fn publish_action_graph_event(
736    run: &RunRecord,
737    observability: &RunObservabilityRecord,
738    path: &Path,
739) {
740    let trigger_event = trigger_event_from_run(run);
741    let mut headers = BTreeMap::new();
742    headers.insert("run_id".to_string(), run.id.clone());
743    headers.insert("workflow_id".to_string(), run.workflow_id.clone());
744    if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
745        headers.insert("trace_id".to_string(), trace_id);
746    }
747    let payload = serde_json::json!({
748        "run_id": run.id,
749        "workflow_id": run.workflow_id,
750        "persisted_path": path.to_string_lossy(),
751        "status": run.status,
752        "observability": observability,
753    });
754    if let Ok(handle) = tokio::runtime::Handle::try_current() {
755        handle.spawn(async move {
756            let _ = append_action_graph_update(headers, payload).await;
757        });
758    } else {
759        let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
760    }
761}
762
763fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
764    let stem = run_path.file_stem()?.to_str()?;
765    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
766    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
767}
768
769fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
770    value
771        .and_then(|value| value.as_array())
772        .map(|items| {
773            items
774                .iter()
775                .filter_map(|item| item.as_str().map(str::to_string))
776                .collect::<Vec<_>>()
777        })
778        .unwrap_or_default()
779}
780
781fn json_usize(value: Option<&serde_json::Value>) -> usize {
782    value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
783}
784
785fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
786    value.and_then(|value| value.as_bool())
787}
788
789fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
790    stage
791        .artifacts
792        .iter()
793        .find_map(|artifact| artifact.data.as_ref())
794}
795
796fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
797    let deliverables = value
798        .get("deliverables")
799        .and_then(|raw| raw.as_array())
800        .map(|items| {
801            items
802                .iter()
803                .map(|item| RunDeliverableSummaryRecord {
804                    id: item
805                        .get("id")
806                        .and_then(|value| value.as_str())
807                        .unwrap_or_default()
808                        .to_string(),
809                    text: item
810                        .get("text")
811                        .and_then(|value| value.as_str())
812                        .unwrap_or_default()
813                        .to_string(),
814                    status: item
815                        .get("status")
816                        .and_then(|value| value.as_str())
817                        .unwrap_or_default()
818                        .to_string(),
819                    note: item
820                        .get("note")
821                        .and_then(|value| value.as_str())
822                        .map(str::to_string),
823                })
824                .collect::<Vec<_>>()
825        })
826        .unwrap_or_default();
827    let observations = json_string_array(value.get("observations"));
828    let root_task = value
829        .get("root_task")
830        .and_then(|value| value.as_str())
831        .unwrap_or_default()
832        .to_string();
833    let rationale = value
834        .get("rationale")
835        .and_then(|value| value.as_str())
836        .unwrap_or_default()
837        .to_string();
838    if root_task.is_empty()
839        && rationale.is_empty()
840        && deliverables.is_empty()
841        && observations.is_empty()
842    {
843        return None;
844    }
845    let blocking_count = deliverables
846        .iter()
847        .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
848        .count();
849    Some(RunTaskLedgerSummaryRecord {
850        root_task,
851        rationale,
852        deliverables,
853        observations,
854        blocking_count,
855    })
856}
857
858fn compaction_events_from_transcript(
859    transcript: &serde_json::Value,
860    stage_id: Option<&str>,
861    node_id: Option<&str>,
862    location_prefix: &str,
863    persisted_path: Option<&Path>,
864) -> Vec<CompactionEventRecord> {
865    let transcript_id = transcript
866        .get("id")
867        .and_then(|value| value.as_str())
868        .map(str::to_string);
869    let asset_ids = transcript
870        .get("assets")
871        .and_then(|value| value.as_array())
872        .map(|assets| {
873            assets
874                .iter()
875                .filter_map(|asset| {
876                    asset
877                        .get("id")
878                        .and_then(|value| value.as_str())
879                        .map(str::to_string)
880                })
881                .collect::<BTreeSet<_>>()
882        })
883        .unwrap_or_default();
884    transcript
885        .get("events")
886        .and_then(|value| value.as_array())
887        .map(|events| {
888            events
889                .iter()
890                .filter(|event| {
891                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
892                })
893                .map(|event| {
894                    let metadata = event.get("metadata");
895                    let snapshot_asset_id = metadata
896                        .and_then(|value| value.get("snapshot_asset_id"))
897                        .and_then(|value| value.as_str())
898                        .map(str::to_string);
899                    let available = snapshot_asset_id
900                        .as_ref()
901                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
902                    let snapshot_location = snapshot_asset_id
903                        .as_ref()
904                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
905                        .unwrap_or_else(|| location_prefix.to_string());
906                    CompactionEventRecord {
907                        id: event
908                            .get("id")
909                            .and_then(|value| value.as_str())
910                            .unwrap_or_default()
911                            .to_string(),
912                        transcript_id: transcript_id.clone(),
913                        stage_id: stage_id.map(str::to_string),
914                        node_id: node_id.map(str::to_string),
915                        mode: metadata
916                            .and_then(|value| value.get("mode"))
917                            .and_then(|value| value.as_str())
918                            .unwrap_or_default()
919                            .to_string(),
920                        strategy: metadata
921                            .and_then(|value| value.get("strategy"))
922                            .and_then(|value| value.as_str())
923                            .unwrap_or_default()
924                            .to_string(),
925                        archived_messages: json_usize(
926                            metadata.and_then(|value| value.get("archived_messages")),
927                        ),
928                        estimated_tokens_before: json_usize(
929                            metadata.and_then(|value| value.get("estimated_tokens_before")),
930                        ),
931                        estimated_tokens_after: json_usize(
932                            metadata.and_then(|value| value.get("estimated_tokens_after")),
933                        ),
934                        snapshot_asset_id,
935                        snapshot_location,
936                        snapshot_path: persisted_path
937                            .map(|path| path.to_string_lossy().into_owned()),
938                        available,
939                    }
940                })
941                .collect()
942        })
943        .unwrap_or_default()
944}
945
946fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
947    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
948        return Vec::new();
949    };
950    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
951        return Vec::new();
952    };
953
954    content
955        .lines()
956        .filter(|line| !line.trim().is_empty())
957        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
958        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
959        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
960        .collect()
961}
962
963pub fn derive_run_observability(
964    run: &RunRecord,
965    persisted_path: Option<&Path>,
966) -> RunObservabilityRecord {
967    let mut action_graph_nodes = Vec::new();
968    let mut action_graph_edges = Vec::new();
969    let mut verification_outcomes = Vec::new();
970    let mut planner_rounds = Vec::new();
971    let mut transcript_pointers = Vec::new();
972    let mut compaction_events = Vec::new();
973    let mut daemon_events = Vec::new();
974    let mut research_fact_count = 0usize;
975
976    let root_node_id = format!("run:{}", run.id);
977    let trigger_event = trigger_event_from_run(run);
978    let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
979    append_action_graph_node(
980        &mut action_graph_nodes,
981        RunActionGraphNodeRecord {
982            id: root_node_id.clone(),
983            label: run
984                .workflow_name
985                .clone()
986                .unwrap_or_else(|| run.workflow_id.clone()),
987            kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
988            status: run.status.clone(),
989            outcome: run.status.clone(),
990            trace_id: propagated_trace_id.clone(),
991            stage_id: None,
992            node_id: None,
993            worker_id: None,
994            run_id: Some(run.id.clone()),
995            run_path: run.persisted_path.clone(),
996            metadata: BTreeMap::from([(
997                "workflow_id".to_string(),
998                serde_json::json!(run.workflow_id),
999            )]),
1000        },
1001    );
1002    let mut entry_node_id = root_node_id.clone();
1003    if let Some(trigger_event) = trigger_event.as_ref() {
1004        if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
1005            let replay_source_node_id = format!("trigger:{replay_of_event_id}");
1006            append_action_graph_node(
1007                &mut action_graph_nodes,
1008                RunActionGraphNodeRecord {
1009                    id: replay_source_node_id.clone(),
1010                    label: format!(
1011                        "{}:{} (original {})",
1012                        trigger_event.provider.as_str(),
1013                        trigger_event.kind,
1014                        replay_of_event_id
1015                    ),
1016                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1017                    status: "historical".to_string(),
1018                    outcome: "replayed_from".to_string(),
1019                    trace_id: Some(trigger_event.trace_id.0.clone()),
1020                    stage_id: None,
1021                    node_id: None,
1022                    worker_id: None,
1023                    run_id: Some(run.id.clone()),
1024                    run_path: run.persisted_path.clone(),
1025                    metadata: trigger_node_metadata(trigger_event),
1026                },
1027            );
1028            action_graph_edges.push(RunActionGraphEdgeRecord {
1029                from_id: replay_source_node_id,
1030                to_id: format!("trigger:{}", trigger_event.id.0),
1031                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1032                label: Some("replay chain".to_string()),
1033            });
1034        }
1035        let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
1036        append_action_graph_node(
1037            &mut action_graph_nodes,
1038            RunActionGraphNodeRecord {
1039                id: trigger_node_id.clone(),
1040                label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
1041                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1042                status: "received".to_string(),
1043                outcome: signature_status_label(&trigger_event.signature_status).to_string(),
1044                trace_id: Some(trigger_event.trace_id.0.clone()),
1045                stage_id: None,
1046                node_id: None,
1047                worker_id: None,
1048                run_id: Some(run.id.clone()),
1049                run_path: run.persisted_path.clone(),
1050                metadata: trigger_node_metadata(trigger_event),
1051            },
1052        );
1053        action_graph_edges.push(RunActionGraphEdgeRecord {
1054            from_id: root_node_id.clone(),
1055            to_id: trigger_node_id.clone(),
1056            kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
1057            label: Some(trigger_event.id.0.clone()),
1058        });
1059        entry_node_id = trigger_node_id;
1060    }
1061
1062    let stage_node_ids = run
1063        .stages
1064        .iter()
1065        .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
1066        .collect::<BTreeMap<_, _>>();
1067    let stage_by_id = run
1068        .stages
1069        .iter()
1070        .map(|stage| (stage.id.as_str(), stage))
1071        .collect::<BTreeMap<_, _>>();
1072    let stage_by_node_id = run
1073        .stages
1074        .iter()
1075        .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
1076        .collect::<BTreeMap<_, _>>();
1077
1078    let incoming_nodes = run
1079        .transitions
1080        .iter()
1081        .map(|transition| transition.to_node_id.clone())
1082        .collect::<BTreeSet<_>>();
1083
1084    for stage in &run.stages {
1085        let graph_node_id = stage_node_ids
1086            .get(&stage.id)
1087            .cloned()
1088            .unwrap_or_else(|| format!("stage:{}", stage.id));
1089        append_action_graph_node(
1090            &mut action_graph_nodes,
1091            RunActionGraphNodeRecord {
1092                id: graph_node_id.clone(),
1093                label: stage.node_id.clone(),
1094                kind: action_graph_kind_for_stage(stage).to_string(),
1095                status: stage.status.clone(),
1096                outcome: stage.outcome.clone(),
1097                trace_id: propagated_trace_id.clone(),
1098                stage_id: Some(stage.id.clone()),
1099                node_id: Some(stage.node_id.clone()),
1100                worker_id: stage
1101                    .metadata
1102                    .get("worker_id")
1103                    .and_then(|value| value.as_str())
1104                    .map(str::to_string),
1105                run_id: None,
1106                run_path: None,
1107                metadata: stage_node_metadata(stage),
1108            },
1109        );
1110        if !incoming_nodes.contains(&stage.node_id) {
1111            action_graph_edges.push(RunActionGraphEdgeRecord {
1112                from_id: entry_node_id.clone(),
1113                to_id: graph_node_id.clone(),
1114                kind: if trigger_event.is_some() {
1115                    ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
1116                } else {
1117                    ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
1118                },
1119                label: None,
1120            });
1121        }
1122
1123        if stage.kind == "verify" || stage.verification.is_some() {
1124            let passed = json_bool(
1125                stage
1126                    .verification
1127                    .as_ref()
1128                    .and_then(|value| value.get("pass")),
1129            )
1130            .or_else(|| {
1131                json_bool(
1132                    stage
1133                        .verification
1134                        .as_ref()
1135                        .and_then(|value| value.get("success")),
1136                )
1137            })
1138            .or_else(|| {
1139                if stage.status == "completed" && stage.outcome == "success" {
1140                    Some(true)
1141                } else if stage.status == "failed" || stage.outcome == "failed" {
1142                    Some(false)
1143                } else {
1144                    None
1145                }
1146            });
1147            verification_outcomes.push(RunVerificationOutcomeRecord {
1148                stage_id: stage.id.clone(),
1149                node_id: stage.node_id.clone(),
1150                status: stage.status.clone(),
1151                passed,
1152                summary: stage
1153                    .verification
1154                    .as_ref()
1155                    .map(compact_json_value)
1156                    .or_else(|| {
1157                        stage
1158                            .visible_text
1159                            .as_ref()
1160                            .filter(|value| !value.trim().is_empty())
1161                            .cloned()
1162                    }),
1163            });
1164        }
1165
1166        if stage.transcript.is_some() {
1167            transcript_pointers.push(RunTranscriptPointerRecord {
1168                id: format!("stage:{}:transcript", stage.id),
1169                label: format!("Stage {} transcript", stage.node_id),
1170                kind: "embedded_transcript".to_string(),
1171                location: format!("run.stages[{}].transcript", stage.node_id),
1172                path: run.persisted_path.clone(),
1173                available: true,
1174            });
1175            if let Some(transcript) = stage.transcript.as_ref() {
1176                compaction_events.extend(compaction_events_from_transcript(
1177                    transcript,
1178                    Some(&stage.id),
1179                    Some(&stage.node_id),
1180                    &format!("run.stages[{}].transcript", stage.node_id),
1181                    persisted_path,
1182                ));
1183            }
1184        }
1185
1186        if let Some(payload) = stage_result_payload(stage) {
1187            let trace = payload.get("trace");
1188            let task_ledger = payload
1189                .get("task_ledger")
1190                .and_then(task_ledger_summary_from_value);
1191            let research_facts = task_ledger
1192                .as_ref()
1193                .map(|ledger| ledger.observations.clone())
1194                .unwrap_or_default();
1195            research_fact_count += research_facts.len();
1196            let tools_payload = payload.get("tools");
1197            let tools_used = json_string_array(
1198                tools_payload
1199                    .and_then(|tools| tools.get("calls"))
1200                    .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1201            );
1202            let successful_tools =
1203                json_string_array(tools_payload.and_then(|tools| tools.get("successful")));
1204            let planner_round = RunPlannerRoundRecord {
1205                stage_id: stage.id.clone(),
1206                node_id: stage.node_id.clone(),
1207                stage_kind: stage.kind.clone(),
1208                status: stage.status.clone(),
1209                outcome: stage.outcome.clone(),
1210                iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1211                llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1212                tool_execution_count: json_usize(
1213                    trace.and_then(|trace| trace.get("tool_executions")),
1214                ),
1215                tool_rejection_count: json_usize(
1216                    trace.and_then(|trace| trace.get("tool_rejections")),
1217                ),
1218                intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1219                compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1220                native_text_tool_fallback_count: json_usize(
1221                    trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1222                ),
1223                native_text_tool_fallback_rejection_count: json_usize(
1224                    trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1225                ),
1226                empty_completion_retry_count: json_usize(
1227                    trace.and_then(|trace| trace.get("empty_completion_retries")),
1228                ),
1229                tools_used,
1230                successful_tools,
1231                ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1232                task_ledger,
1233                research_facts,
1234            };
1235            let has_agentic_detail = planner_round.iteration_count > 0
1236                || planner_round.llm_call_count > 0
1237                || planner_round.tool_execution_count > 0
1238                || planner_round.native_text_tool_fallback_count > 0
1239                || planner_round.native_text_tool_fallback_rejection_count > 0
1240                || planner_round.empty_completion_retry_count > 0
1241                || planner_round.ledger_done_rejections > 0
1242                || planner_round.task_ledger.is_some()
1243                || !planner_round.tools_used.is_empty()
1244                || !planner_round.successful_tools.is_empty();
1245            if has_agentic_detail {
1246                planner_rounds.push(planner_round);
1247            }
1248        }
1249    }
1250
1251    for transition in &run.transitions {
1252        let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1253            continue;
1254        };
1255        let from_stage = transition
1256            .from_stage_id
1257            .as_deref()
1258            .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1259        let from_id = transition
1260            .from_stage_id
1261            .as_ref()
1262            .and_then(|stage_id| stage_node_ids.get(stage_id))
1263            .cloned()
1264            .or_else(|| {
1265                transition
1266                    .from_node_id
1267                    .as_ref()
1268                    .and_then(|node_id| stage_by_node_id.get(node_id))
1269                    .cloned()
1270            })
1271            .unwrap_or_else(|| root_node_id.clone());
1272        action_graph_edges.push(RunActionGraphEdgeRecord {
1273            from_id,
1274            to_id,
1275            kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1276                ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1277            } else {
1278                ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1279            },
1280            label: transition.branch.clone(),
1281        });
1282    }
1283
1284    let worker_lineage = run
1285        .child_runs
1286        .iter()
1287        .map(|child| {
1288            let worker_node_id = format!("worker:{}", child.worker_id);
1289            append_action_graph_node(
1290                &mut action_graph_nodes,
1291                RunActionGraphNodeRecord {
1292                    id: worker_node_id.clone(),
1293                    label: child.worker_name.clone(),
1294                    kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1295                    status: child.status.clone(),
1296                    outcome: child.status.clone(),
1297                    trace_id: propagated_trace_id.clone(),
1298                    stage_id: child.parent_stage_id.clone(),
1299                    node_id: None,
1300                    worker_id: Some(child.worker_id.clone()),
1301                    run_id: child.run_id.clone(),
1302                    run_path: child.run_path.clone(),
1303                    metadata: BTreeMap::from([
1304                        (
1305                            "worker_name".to_string(),
1306                            serde_json::json!(child.worker_name),
1307                        ),
1308                        ("task".to_string(), serde_json::json!(child.task)),
1309                    ]),
1310                },
1311            );
1312            if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1313                if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1314                    action_graph_edges.push(RunActionGraphEdgeRecord {
1315                        from_id: stage_node_id.clone(),
1316                        to_id: worker_node_id,
1317                        kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1318                        label: Some(child.worker_name.clone()),
1319                    });
1320                }
1321            }
1322            RunWorkerLineageRecord {
1323                worker_id: child.worker_id.clone(),
1324                worker_name: child.worker_name.clone(),
1325                parent_stage_id: child.parent_stage_id.clone(),
1326                task: child.task.clone(),
1327                status: child.status.clone(),
1328                session_id: child.session_id.clone(),
1329                parent_session_id: child.parent_session_id.clone(),
1330                run_id: child.run_id.clone(),
1331                run_path: child.run_path.clone(),
1332                snapshot_path: child.snapshot_path.clone(),
1333            }
1334        })
1335        .collect::<Vec<_>>();
1336
1337    if run.transcript.is_some() {
1338        transcript_pointers.push(RunTranscriptPointerRecord {
1339            id: "run:transcript".to_string(),
1340            label: "Run transcript".to_string(),
1341            kind: "embedded_transcript".to_string(),
1342            location: "run.transcript".to_string(),
1343            path: run.persisted_path.clone(),
1344            available: true,
1345        });
1346        if let Some(transcript) = run.transcript.as_ref() {
1347            compaction_events.extend(compaction_events_from_transcript(
1348                transcript,
1349                None,
1350                None,
1351                "run.transcript",
1352                persisted_path,
1353            ));
1354        }
1355    }
1356
1357    if let Some(path) = persisted_path {
1358        if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1359            transcript_pointers.push(RunTranscriptPointerRecord {
1360                id: "run:llm_transcript".to_string(),
1361                label: "LLM transcript sidecar".to_string(),
1362                kind: "llm_jsonl".to_string(),
1363                location: "run sidecar".to_string(),
1364                path: Some(sidecar_path.to_string_lossy().into_owned()),
1365                available: sidecar_path.exists(),
1366            });
1367        }
1368        daemon_events.extend(daemon_events_from_sidecar(path));
1369    }
1370
1371    RunObservabilityRecord {
1372        schema_version: 4,
1373        planner_rounds,
1374        research_fact_count,
1375        action_graph_nodes,
1376        action_graph_edges,
1377        worker_lineage,
1378        verification_outcomes,
1379        transcript_pointers,
1380        compaction_events,
1381        daemon_events,
1382    }
1383}
1384
1385fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1386    run.observability = Some(derive_run_observability(run, persisted_path));
1387}
1388
1389pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1390    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1391    if run.type_name.is_empty() {
1392        run.type_name = "run_record".to_string();
1393    }
1394    if run.id.is_empty() {
1395        run.id = new_id("run");
1396    }
1397    if run.started_at.is_empty() {
1398        run.started_at = now_rfc3339();
1399    }
1400    if run.status.is_empty() {
1401        run.status = "running".to_string();
1402    }
1403    if run.root_run_id.is_none() {
1404        run.root_run_id = Some(run.id.clone());
1405    }
1406    if run.replay_fixture.is_none() {
1407        run.replay_fixture = Some(replay_fixture_from_run(&run));
1408    }
1409    merge_hitl_questions_from_active_log(&mut run);
1410    sync_run_handoffs(&mut run);
1411    if run.observability.is_none() {
1412        let persisted_path = run.persisted_path.clone();
1413        let persisted = persisted_path.as_deref().map(Path::new);
1414        refresh_run_observability(&mut run, persisted);
1415    }
1416    Ok(run)
1417}
1418
1419pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1420    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1421    if manifest.type_name.is_empty() {
1422        manifest.type_name = "eval_suite_manifest".to_string();
1423    }
1424    if manifest.id.is_empty() {
1425        manifest.id = new_id("eval_suite");
1426    }
1427    Ok(manifest)
1428}
1429
1430fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1431    let content = std::fs::read_to_string(path)
1432        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1433    serde_json::from_str(&content)
1434        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1435}
1436
1437fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1438    let path_buf = PathBuf::from(path);
1439    if path_buf.is_absolute() {
1440        path_buf
1441    } else if let Some(base_dir) = base_dir {
1442        base_dir.join(path_buf)
1443    } else {
1444        path_buf
1445    }
1446}
1447
1448pub fn evaluate_run_suite_manifest(
1449    manifest: &EvalSuiteManifest,
1450) -> Result<ReplayEvalSuiteReport, VmError> {
1451    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1452    let mut reports = Vec::new();
1453    for case in &manifest.cases {
1454        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1455        let run = load_run_record(&run_path)?;
1456        let fixture = match &case.fixture_path {
1457            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1458            None => run
1459                .replay_fixture
1460                .clone()
1461                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1462        };
1463        let eval = evaluate_run_against_fixture(&run, &fixture);
1464        let mut pass = eval.pass;
1465        let mut failures = eval.failures;
1466        let comparison = match &case.compare_to {
1467            Some(path) => {
1468                let baseline_path = resolve_manifest_path(base_dir, path);
1469                let baseline = load_run_record(&baseline_path)?;
1470                let diff = diff_run_records(&baseline, &run);
1471                if !diff.identical {
1472                    pass = false;
1473                    failures.push(format!(
1474                        "run differs from baseline {} with {} stage changes",
1475                        baseline_path.display(),
1476                        diff.stage_diffs.len()
1477                    ));
1478                }
1479                Some(diff)
1480            }
1481            None => None,
1482        };
1483        reports.push(ReplayEvalCaseReport {
1484            run_id: run.id.clone(),
1485            workflow_id: run.workflow_id.clone(),
1486            label: case.label.clone(),
1487            pass,
1488            failures,
1489            stage_count: eval.stage_count,
1490            source_path: Some(run_path.display().to_string()),
1491            comparison,
1492        });
1493    }
1494    let total = reports.len();
1495    let passed = reports.iter().filter(|report| report.pass).count();
1496    let failed = total.saturating_sub(passed);
1497    Ok(ReplayEvalSuiteReport {
1498        pass: failed == 0,
1499        total,
1500        passed,
1501        failed,
1502        cases: reports,
1503    })
1504}
1505
1506/// Edit operation in a diff sequence.
1507#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1508pub(crate) enum DiffOp {
1509    Equal,
1510    Delete,
1511    Insert,
1512}
1513
1514/// Compute the shortest edit script using Myers' O(nd) algorithm.
1515/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
1516/// Time: O(nd) where d = edit distance. Space: O(d * n).
1517pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1518    let n = a.len() as isize;
1519    let m = b.len() as isize;
1520    if n == 0 && m == 0 {
1521        return Vec::new();
1522    }
1523    if n == 0 {
1524        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1525    }
1526    if m == 0 {
1527        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1528    }
1529
1530    let max_d = (n + m) as usize;
1531    let offset = max_d as isize;
1532    let v_size = 2 * max_d + 1;
1533    let mut v = vec![0isize; v_size];
1534    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
1535    let mut trace: Vec<Vec<isize>> = Vec::new();
1536
1537    'outer: for d in 0..=max_d as isize {
1538        trace.push(v.clone());
1539        let mut new_v = v.clone();
1540        for k in (-d..=d).step_by(2) {
1541            let ki = (k + offset) as usize;
1542            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1543                v[ki + 1]
1544            } else {
1545                v[ki - 1] + 1
1546            };
1547            let mut y = x - k;
1548            while x < n && y < m && a[x as usize] == b[y as usize] {
1549                x += 1;
1550                y += 1;
1551            }
1552            new_v[ki] = x;
1553            if x >= n && y >= m {
1554                let _ = new_v;
1555                break 'outer;
1556            }
1557        }
1558        v = new_v;
1559    }
1560
1561    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1562    let mut x = n;
1563    let mut y = m;
1564    for d in (1..trace.len() as isize).rev() {
1565        let k = x - y;
1566        let v_prev = &trace[d as usize];
1567        let prev_k = if k == -d
1568            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1569        {
1570            k + 1
1571        } else {
1572            k - 1
1573        };
1574        let prev_x = v_prev[(prev_k + offset) as usize];
1575        let prev_y = prev_x - prev_k;
1576
1577        while x > prev_x && y > prev_y {
1578            x -= 1;
1579            y -= 1;
1580            ops.push((DiffOp::Equal, x as usize));
1581        }
1582        if prev_k < k {
1583            x -= 1;
1584            ops.push((DiffOp::Delete, x as usize));
1585        } else {
1586            y -= 1;
1587            ops.push((DiffOp::Insert, y as usize));
1588        }
1589    }
1590    while x > 0 && y > 0 {
1591        x -= 1;
1592        y -= 1;
1593        ops.push((DiffOp::Equal, x as usize));
1594    }
1595    ops.reverse();
1596    ops
1597}
1598
1599pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1600    let before_lines: Vec<&str> = before.lines().collect();
1601    let after_lines: Vec<&str> = after.lines().collect();
1602    let ops = myers_diff(&before_lines, &after_lines);
1603
1604    let mut diff = String::new();
1605    let file = path.unwrap_or("artifact");
1606    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1607    for &(op, idx) in &ops {
1608        match op {
1609            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1610            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1611            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1612        }
1613    }
1614    diff
1615}
1616
1617pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1618    let path = path
1619        .map(PathBuf::from)
1620        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1621    let mut materialized = run.clone();
1622    merge_hitl_questions_from_active_log(&mut materialized);
1623    if materialized.replay_fixture.is_none() {
1624        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1625    }
1626    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1627    sync_run_handoffs(&mut materialized);
1628    refresh_run_observability(&mut materialized, Some(&path));
1629    if let Some(parent) = path.parent() {
1630        std::fs::create_dir_all(parent)
1631            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1632    }
1633    let json = serde_json::to_string_pretty(&materialized)
1634        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1635    // Atomic write: .tmp then rename guards against partial writes on kill.
1636    let tmp_path = path.with_extension("json.tmp");
1637    std::fs::write(&tmp_path, &json)
1638        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1639    std::fs::rename(&tmp_path, &path).map_err(|e| {
1640        // Cross-device renames fail on some filesystems; best-effort direct write.
1641        let _ = std::fs::write(&path, &json);
1642        VmError::Runtime(format!("failed to finalize run record: {e}"))
1643    })?;
1644    if let Some(observability) = materialized.observability.as_ref() {
1645        publish_action_graph_event(&materialized, observability, &path);
1646    }
1647    Ok(path.to_string_lossy().into_owned())
1648}
1649
1650pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1651    let content = std::fs::read_to_string(path)
1652        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1653    let mut run: RunRecord = serde_json::from_str(&content)
1654        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1655    if run.replay_fixture.is_none() {
1656        run.replay_fixture = Some(replay_fixture_from_run(&run));
1657    }
1658    run.persisted_path
1659        .get_or_insert_with(|| path.to_string_lossy().into_owned());
1660    sync_run_handoffs(&mut run);
1661    refresh_run_observability(&mut run, Some(path));
1662    Ok(run)
1663}
1664
1665pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1666    ReplayFixture {
1667        type_name: "replay_fixture".to_string(),
1668        id: new_id("fixture"),
1669        source_run_id: run.id.clone(),
1670        workflow_id: run.workflow_id.clone(),
1671        workflow_name: run.workflow_name.clone(),
1672        created_at: now_rfc3339(),
1673        eval_kind: Some("replay".to_string()),
1674        clarifying_question: None,
1675        expected_status: run.status.clone(),
1676        stage_assertions: run
1677            .stages
1678            .iter()
1679            .map(|stage| ReplayStageAssertion {
1680                node_id: stage.node_id.clone(),
1681                expected_status: stage.status.clone(),
1682                expected_outcome: stage.outcome.clone(),
1683                expected_branch: stage.branch.clone(),
1684                required_artifact_kinds: stage
1685                    .artifacts
1686                    .iter()
1687                    .map(|artifact| artifact.kind.clone())
1688                    .collect(),
1689                visible_text_contains: stage
1690                    .visible_text
1691                    .as_ref()
1692                    .filter(|text| !text.is_empty())
1693                    .map(|text| text.chars().take(80).collect()),
1694            })
1695            .collect(),
1696    }
1697}
1698
1699pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1700    if fixture.eval_kind.as_deref() == Some("clarifying_question") {
1701        return evaluate_clarifying_question(run, fixture);
1702    }
1703    let mut failures = Vec::new();
1704    if run.status != fixture.expected_status {
1705        failures.push(format!(
1706            "run status mismatch: expected {}, got {}",
1707            fixture.expected_status, run.status
1708        ));
1709    }
1710    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1711        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1712    for assertion in &fixture.stage_assertions {
1713        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1714            failures.push(format!("missing stage {}", assertion.node_id));
1715            continue;
1716        };
1717        if stage.status != assertion.expected_status {
1718            failures.push(format!(
1719                "stage {} status mismatch: expected {}, got {}",
1720                assertion.node_id, assertion.expected_status, stage.status
1721            ));
1722        }
1723        if stage.outcome != assertion.expected_outcome {
1724            failures.push(format!(
1725                "stage {} outcome mismatch: expected {}, got {}",
1726                assertion.node_id, assertion.expected_outcome, stage.outcome
1727            ));
1728        }
1729        if stage.branch != assertion.expected_branch {
1730            failures.push(format!(
1731                "stage {} branch mismatch: expected {:?}, got {:?}",
1732                assertion.node_id, assertion.expected_branch, stage.branch
1733            ));
1734        }
1735        for required_kind in &assertion.required_artifact_kinds {
1736            if !stage
1737                .artifacts
1738                .iter()
1739                .any(|artifact| &artifact.kind == required_kind)
1740            {
1741                failures.push(format!(
1742                    "stage {} missing artifact kind {}",
1743                    assertion.node_id, required_kind
1744                ));
1745            }
1746        }
1747        if let Some(snippet) = &assertion.visible_text_contains {
1748            let actual = stage.visible_text.clone().unwrap_or_default();
1749            if !actual.contains(snippet) {
1750                failures.push(format!(
1751                    "stage {} visible text does not contain expected snippet {:?}",
1752                    assertion.node_id, snippet
1753                ));
1754            }
1755        }
1756    }
1757
1758    ReplayEvalReport {
1759        pass: failures.is_empty(),
1760        failures,
1761        stage_count: run.stages.len(),
1762    }
1763}
1764
1765fn evaluate_clarifying_question(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1766    let mut failures = Vec::new();
1767    let spec = fixture.clarifying_question.clone().unwrap_or_default();
1768    let min_questions = clarifying_min_questions(&spec);
1769    let max_questions = clarifying_max_questions(&spec);
1770    let questions = &run.hitl_questions;
1771
1772    if run.status != fixture.expected_status {
1773        failures.push(format!(
1774            "run status mismatch: expected {}, got {}",
1775            fixture.expected_status, run.status
1776        ));
1777    }
1778    if questions.len() < min_questions {
1779        failures.push(format!(
1780            "expected at least {min_questions} clarifying question(s), got {}",
1781            questions.len()
1782        ));
1783    }
1784    if questions.len() > max_questions {
1785        failures.push(format!(
1786            "expected at most {max_questions} clarifying question(s), got {}",
1787            questions.len()
1788        ));
1789    }
1790
1791    let normalized_expected = spec
1792        .expected_question
1793        .as_deref()
1794        .map(normalize_question_text);
1795    let normalized_accepted = spec
1796        .accepted_questions
1797        .iter()
1798        .map(|question| normalize_question_text(question))
1799        .collect::<Vec<_>>();
1800    let required_terms = spec
1801        .required_terms
1802        .iter()
1803        .map(|term| normalize_question_text(term))
1804        .collect::<Vec<_>>();
1805    let forbidden_terms = spec
1806        .forbidden_terms
1807        .iter()
1808        .map(|term| normalize_question_text(term))
1809        .collect::<Vec<_>>();
1810
1811    let matched = questions.iter().any(|question| {
1812        let normalized = normalize_question_text(&question.prompt);
1813        let matches_expected = normalized_expected
1814            .as_ref()
1815            .is_none_or(|expected| &normalized == expected)
1816            && (normalized_accepted.is_empty()
1817                || normalized_accepted
1818                    .iter()
1819                    .any(|candidate| candidate == &normalized));
1820        let has_required_terms = required_terms
1821            .iter()
1822            .all(|term| normalized.contains(term.as_str()));
1823        let avoids_forbidden_terms = forbidden_terms
1824            .iter()
1825            .all(|term| !normalized.contains(term.as_str()));
1826        matches_expected && has_required_terms && avoids_forbidden_terms
1827    });
1828
1829    if !questions.is_empty()
1830        && (!normalized_accepted.is_empty()
1831            || normalized_expected.is_some()
1832            || !required_terms.is_empty()
1833            || !forbidden_terms.is_empty())
1834        && !matched
1835    {
1836        failures.push(format!(
1837            "no clarifying question matched fixture; actual questions: {}",
1838            questions
1839                .iter()
1840                .map(|question| format!("{:?}", question.prompt))
1841                .collect::<Vec<_>>()
1842                .join(", ")
1843        ));
1844    }
1845
1846    ReplayEvalReport {
1847        pass: failures.is_empty(),
1848        failures,
1849        stage_count: run.stages.len(),
1850    }
1851}
1852
1853pub fn evaluate_run_suite(
1854    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1855) -> ReplayEvalSuiteReport {
1856    let mut reports = Vec::new();
1857    for (run, fixture, source_path) in cases {
1858        let report = evaluate_run_against_fixture(&run, &fixture);
1859        reports.push(ReplayEvalCaseReport {
1860            run_id: run.id.clone(),
1861            workflow_id: run.workflow_id.clone(),
1862            label: None,
1863            pass: report.pass,
1864            failures: report.failures,
1865            stage_count: report.stage_count,
1866            source_path,
1867            comparison: None,
1868        });
1869    }
1870    let total = reports.len();
1871    let passed = reports.iter().filter(|report| report.pass).count();
1872    let failed = total.saturating_sub(passed);
1873    ReplayEvalSuiteReport {
1874        pass: failed == 0,
1875        total,
1876        passed,
1877        failed,
1878        cases: reports,
1879    }
1880}
1881
1882pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1883    let mut stage_diffs = Vec::new();
1884    let mut all_node_ids = BTreeSet::new();
1885    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1886        .stages
1887        .iter()
1888        .map(|s| (s.node_id.as_str(), s))
1889        .collect();
1890    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1891        .stages
1892        .iter()
1893        .map(|s| (s.node_id.as_str(), s))
1894        .collect();
1895    all_node_ids.extend(left_by_id.keys().copied());
1896    all_node_ids.extend(right_by_id.keys().copied());
1897
1898    for node_id in all_node_ids {
1899        let left_stage = left_by_id.get(node_id).copied();
1900        let right_stage = right_by_id.get(node_id).copied();
1901        match (left_stage, right_stage) {
1902            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1903                node_id: node_id.to_string(),
1904                change: "removed".to_string(),
1905                details: vec!["stage missing from right run".to_string()],
1906            }),
1907            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1908                node_id: node_id.to_string(),
1909                change: "added".to_string(),
1910                details: vec!["stage missing from left run".to_string()],
1911            }),
1912            (Some(left_stage), Some(right_stage)) => {
1913                let mut details = Vec::new();
1914                if left_stage.status != right_stage.status {
1915                    details.push(format!(
1916                        "status: {} -> {}",
1917                        left_stage.status, right_stage.status
1918                    ));
1919                }
1920                if left_stage.outcome != right_stage.outcome {
1921                    details.push(format!(
1922                        "outcome: {} -> {}",
1923                        left_stage.outcome, right_stage.outcome
1924                    ));
1925                }
1926                if left_stage.branch != right_stage.branch {
1927                    details.push(format!(
1928                        "branch: {:?} -> {:?}",
1929                        left_stage.branch, right_stage.branch
1930                    ));
1931                }
1932                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1933                {
1934                    details.push(format!(
1935                        "produced_artifacts: {} -> {}",
1936                        left_stage.produced_artifact_ids.len(),
1937                        right_stage.produced_artifact_ids.len()
1938                    ));
1939                }
1940                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1941                    details.push(format!(
1942                        "artifact_records: {} -> {}",
1943                        left_stage.artifacts.len(),
1944                        right_stage.artifacts.len()
1945                    ));
1946                }
1947                if !details.is_empty() {
1948                    stage_diffs.push(RunStageDiffRecord {
1949                        node_id: node_id.to_string(),
1950                        change: "changed".to_string(),
1951                        details,
1952                    });
1953                }
1954            }
1955            (None, None) => {}
1956        }
1957    }
1958
1959    let mut tool_diffs = Vec::new();
1960    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1961        .tool_recordings
1962        .iter()
1963        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1964        .collect();
1965    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1966        .tool_recordings
1967        .iter()
1968        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1969        .collect();
1970    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1971        .keys()
1972        .chain(right_tools.keys())
1973        .cloned()
1974        .collect();
1975    for key in &all_tool_keys {
1976        let l = left_tools.get(key);
1977        let r = right_tools.get(key);
1978        let result_changed = match (l, r) {
1979            (Some(a), Some(b)) => a.result != b.result,
1980            _ => true,
1981        };
1982        if result_changed {
1983            tool_diffs.push(ToolCallDiffRecord {
1984                tool_name: key.0.clone(),
1985                args_hash: key.1.clone(),
1986                result_changed,
1987                left_result: l.map(|t| t.result.clone()),
1988                right_result: r.map(|t| t.result.clone()),
1989            });
1990        }
1991    }
1992
1993    let left_observability = left.observability.clone().unwrap_or_else(|| {
1994        derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1995    });
1996    let right_observability = right.observability.clone().unwrap_or_else(|| {
1997        derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1998    });
1999    let mut observability_diffs = Vec::new();
2000
2001    let left_workers = left_observability
2002        .worker_lineage
2003        .iter()
2004        .map(|worker| {
2005            (
2006                worker.worker_id.clone(),
2007                (
2008                    worker.status.clone(),
2009                    worker.run_id.clone(),
2010                    worker.run_path.clone(),
2011                ),
2012            )
2013        })
2014        .collect::<BTreeMap<_, _>>();
2015    let right_workers = right_observability
2016        .worker_lineage
2017        .iter()
2018        .map(|worker| {
2019            (
2020                worker.worker_id.clone(),
2021                (
2022                    worker.status.clone(),
2023                    worker.run_id.clone(),
2024                    worker.run_path.clone(),
2025                ),
2026            )
2027        })
2028        .collect::<BTreeMap<_, _>>();
2029    let worker_ids = left_workers
2030        .keys()
2031        .chain(right_workers.keys())
2032        .cloned()
2033        .collect::<BTreeSet<_>>();
2034    for worker_id in worker_ids {
2035        match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
2036            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2037                section: "worker_lineage".to_string(),
2038                label: worker_id,
2039                details: vec!["worker missing from right run".to_string()],
2040            }),
2041            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2042                section: "worker_lineage".to_string(),
2043                label: worker_id,
2044                details: vec!["worker missing from left run".to_string()],
2045            }),
2046            (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
2047                let mut details = Vec::new();
2048                if left_worker.0 != right_worker.0 {
2049                    details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
2050                }
2051                if left_worker.1 != right_worker.1 {
2052                    details.push(format!(
2053                        "run_id: {:?} -> {:?}",
2054                        left_worker.1, right_worker.1
2055                    ));
2056                }
2057                if left_worker.2 != right_worker.2 {
2058                    details.push(format!(
2059                        "run_path: {:?} -> {:?}",
2060                        left_worker.2, right_worker.2
2061                    ));
2062                }
2063                observability_diffs.push(RunObservabilityDiffRecord {
2064                    section: "worker_lineage".to_string(),
2065                    label: worker_id,
2066                    details,
2067                });
2068            }
2069            _ => {}
2070        }
2071    }
2072
2073    let left_rounds = left_observability
2074        .planner_rounds
2075        .iter()
2076        .map(|round| (round.stage_id.clone(), round))
2077        .collect::<BTreeMap<_, _>>();
2078    let right_rounds = right_observability
2079        .planner_rounds
2080        .iter()
2081        .map(|round| (round.stage_id.clone(), round))
2082        .collect::<BTreeMap<_, _>>();
2083    let round_ids = left_rounds
2084        .keys()
2085        .chain(right_rounds.keys())
2086        .cloned()
2087        .collect::<BTreeSet<_>>();
2088    for stage_id in round_ids {
2089        match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
2090            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2091                section: "planner_rounds".to_string(),
2092                label: stage_id,
2093                details: vec!["planner summary missing from right run".to_string()],
2094            }),
2095            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2096                section: "planner_rounds".to_string(),
2097                label: stage_id,
2098                details: vec!["planner summary missing from left run".to_string()],
2099            }),
2100            (Some(left_round), Some(right_round)) => {
2101                let mut details = Vec::new();
2102                if left_round.iteration_count != right_round.iteration_count {
2103                    details.push(format!(
2104                        "iterations: {} -> {}",
2105                        left_round.iteration_count, right_round.iteration_count
2106                    ));
2107                }
2108                if left_round.tool_execution_count != right_round.tool_execution_count {
2109                    details.push(format!(
2110                        "tool_executions: {} -> {}",
2111                        left_round.tool_execution_count, right_round.tool_execution_count
2112                    ));
2113                }
2114                if left_round.native_text_tool_fallback_count
2115                    != right_round.native_text_tool_fallback_count
2116                {
2117                    details.push(format!(
2118                        "native_text_tool_fallbacks: {} -> {}",
2119                        left_round.native_text_tool_fallback_count,
2120                        right_round.native_text_tool_fallback_count
2121                    ));
2122                }
2123                if left_round.native_text_tool_fallback_rejection_count
2124                    != right_round.native_text_tool_fallback_rejection_count
2125                {
2126                    details.push(format!(
2127                        "native_text_tool_fallback_rejections: {} -> {}",
2128                        left_round.native_text_tool_fallback_rejection_count,
2129                        right_round.native_text_tool_fallback_rejection_count
2130                    ));
2131                }
2132                if left_round.empty_completion_retry_count
2133                    != right_round.empty_completion_retry_count
2134                {
2135                    details.push(format!(
2136                        "empty_completion_retries: {} -> {}",
2137                        left_round.empty_completion_retry_count,
2138                        right_round.empty_completion_retry_count
2139                    ));
2140                }
2141                if left_round.research_facts != right_round.research_facts {
2142                    details.push(format!(
2143                        "research_facts: {:?} -> {:?}",
2144                        left_round.research_facts, right_round.research_facts
2145                    ));
2146                }
2147                let left_deliverables = left_round
2148                    .task_ledger
2149                    .as_ref()
2150                    .map(|ledger| {
2151                        ledger
2152                            .deliverables
2153                            .iter()
2154                            .map(|item| format!("{}:{}", item.id, item.status))
2155                            .collect::<Vec<_>>()
2156                    })
2157                    .unwrap_or_default();
2158                let right_deliverables = right_round
2159                    .task_ledger
2160                    .as_ref()
2161                    .map(|ledger| {
2162                        ledger
2163                            .deliverables
2164                            .iter()
2165                            .map(|item| format!("{}:{}", item.id, item.status))
2166                            .collect::<Vec<_>>()
2167                    })
2168                    .unwrap_or_default();
2169                if left_deliverables != right_deliverables {
2170                    details.push(format!(
2171                        "deliverables: {:?} -> {:?}",
2172                        left_deliverables, right_deliverables
2173                    ));
2174                }
2175                if left_round.successful_tools != right_round.successful_tools {
2176                    details.push(format!(
2177                        "successful_tools: {:?} -> {:?}",
2178                        left_round.successful_tools, right_round.successful_tools
2179                    ));
2180                }
2181                if !details.is_empty() {
2182                    observability_diffs.push(RunObservabilityDiffRecord {
2183                        section: "planner_rounds".to_string(),
2184                        label: left_round.node_id.clone(),
2185                        details,
2186                    });
2187                }
2188            }
2189            _ => {}
2190        }
2191    }
2192
2193    let left_pointers = left_observability
2194        .transcript_pointers
2195        .iter()
2196        .map(|pointer| {
2197            (
2198                pointer.id.clone(),
2199                (
2200                    pointer.available,
2201                    pointer.path.clone(),
2202                    pointer.location.clone(),
2203                ),
2204            )
2205        })
2206        .collect::<BTreeMap<_, _>>();
2207    let right_pointers = right_observability
2208        .transcript_pointers
2209        .iter()
2210        .map(|pointer| {
2211            (
2212                pointer.id.clone(),
2213                (
2214                    pointer.available,
2215                    pointer.path.clone(),
2216                    pointer.location.clone(),
2217                ),
2218            )
2219        })
2220        .collect::<BTreeMap<_, _>>();
2221    let pointer_ids = left_pointers
2222        .keys()
2223        .chain(right_pointers.keys())
2224        .cloned()
2225        .collect::<BTreeSet<_>>();
2226    for pointer_id in pointer_ids {
2227        match (
2228            left_pointers.get(&pointer_id),
2229            right_pointers.get(&pointer_id),
2230        ) {
2231            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2232                section: "transcript_pointers".to_string(),
2233                label: pointer_id,
2234                details: vec!["pointer missing from right run".to_string()],
2235            }),
2236            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2237                section: "transcript_pointers".to_string(),
2238                label: pointer_id,
2239                details: vec!["pointer missing from left run".to_string()],
2240            }),
2241            (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2242                observability_diffs.push(RunObservabilityDiffRecord {
2243                    section: "transcript_pointers".to_string(),
2244                    label: pointer_id,
2245                    details: vec![format!(
2246                        "pointer: {:?} -> {:?}",
2247                        left_pointer, right_pointer
2248                    )],
2249                });
2250            }
2251            _ => {}
2252        }
2253    }
2254
2255    let left_compactions = left_observability
2256        .compaction_events
2257        .iter()
2258        .map(|event| {
2259            (
2260                event.id.clone(),
2261                (
2262                    event.strategy.clone(),
2263                    event.archived_messages,
2264                    event.snapshot_asset_id.clone(),
2265                    event.available,
2266                ),
2267            )
2268        })
2269        .collect::<BTreeMap<_, _>>();
2270    let right_compactions = right_observability
2271        .compaction_events
2272        .iter()
2273        .map(|event| {
2274            (
2275                event.id.clone(),
2276                (
2277                    event.strategy.clone(),
2278                    event.archived_messages,
2279                    event.snapshot_asset_id.clone(),
2280                    event.available,
2281                ),
2282            )
2283        })
2284        .collect::<BTreeMap<_, _>>();
2285    let compaction_ids = left_compactions
2286        .keys()
2287        .chain(right_compactions.keys())
2288        .cloned()
2289        .collect::<BTreeSet<_>>();
2290    for compaction_id in compaction_ids {
2291        match (
2292            left_compactions.get(&compaction_id),
2293            right_compactions.get(&compaction_id),
2294        ) {
2295            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2296                section: "compaction_events".to_string(),
2297                label: compaction_id,
2298                details: vec!["compaction event missing from right run".to_string()],
2299            }),
2300            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2301                section: "compaction_events".to_string(),
2302                label: compaction_id,
2303                details: vec!["compaction event missing from left run".to_string()],
2304            }),
2305            (Some(left_event), Some(right_event)) if left_event != right_event => {
2306                observability_diffs.push(RunObservabilityDiffRecord {
2307                    section: "compaction_events".to_string(),
2308                    label: compaction_id,
2309                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2310                });
2311            }
2312            _ => {}
2313        }
2314    }
2315
2316    let left_daemons = left_observability
2317        .daemon_events
2318        .iter()
2319        .map(|event| {
2320            (
2321                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2322                (
2323                    event.name.clone(),
2324                    event.persist_path.clone(),
2325                    event.payload_summary.clone(),
2326                ),
2327            )
2328        })
2329        .collect::<BTreeMap<_, _>>();
2330    let right_daemons = right_observability
2331        .daemon_events
2332        .iter()
2333        .map(|event| {
2334            (
2335                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2336                (
2337                    event.name.clone(),
2338                    event.persist_path.clone(),
2339                    event.payload_summary.clone(),
2340                ),
2341            )
2342        })
2343        .collect::<BTreeMap<_, _>>();
2344    let daemon_keys = left_daemons
2345        .keys()
2346        .chain(right_daemons.keys())
2347        .cloned()
2348        .collect::<BTreeSet<_>>();
2349    for daemon_key in daemon_keys {
2350        let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2351        match (
2352            left_daemons.get(&daemon_key),
2353            right_daemons.get(&daemon_key),
2354        ) {
2355            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2356                section: "daemon_events".to_string(),
2357                label,
2358                details: vec!["daemon event missing from right run".to_string()],
2359            }),
2360            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2361                section: "daemon_events".to_string(),
2362                label,
2363                details: vec!["daemon event missing from left run".to_string()],
2364            }),
2365            (Some(left_event), Some(right_event)) if left_event != right_event => {
2366                observability_diffs.push(RunObservabilityDiffRecord {
2367                    section: "daemon_events".to_string(),
2368                    label,
2369                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2370                });
2371            }
2372            _ => {}
2373        }
2374    }
2375
2376    let left_verification = left_observability
2377        .verification_outcomes
2378        .iter()
2379        .map(|item| (item.stage_id.clone(), item))
2380        .collect::<BTreeMap<_, _>>();
2381    let right_verification = right_observability
2382        .verification_outcomes
2383        .iter()
2384        .map(|item| (item.stage_id.clone(), item))
2385        .collect::<BTreeMap<_, _>>();
2386    let verification_ids = left_verification
2387        .keys()
2388        .chain(right_verification.keys())
2389        .cloned()
2390        .collect::<BTreeSet<_>>();
2391    for stage_id in verification_ids {
2392        match (
2393            left_verification.get(&stage_id),
2394            right_verification.get(&stage_id),
2395        ) {
2396            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2397                section: "verification".to_string(),
2398                label: stage_id,
2399                details: vec!["verification missing from right run".to_string()],
2400            }),
2401            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2402                section: "verification".to_string(),
2403                label: stage_id,
2404                details: vec!["verification missing from left run".to_string()],
2405            }),
2406            (Some(left_item), Some(right_item)) if left_item != right_item => {
2407                let mut details = Vec::new();
2408                if left_item.passed != right_item.passed {
2409                    details.push(format!(
2410                        "passed: {:?} -> {:?}",
2411                        left_item.passed, right_item.passed
2412                    ));
2413                }
2414                if left_item.summary != right_item.summary {
2415                    details.push(format!(
2416                        "summary: {:?} -> {:?}",
2417                        left_item.summary, right_item.summary
2418                    ));
2419                }
2420                observability_diffs.push(RunObservabilityDiffRecord {
2421                    section: "verification".to_string(),
2422                    label: left_item.node_id.clone(),
2423                    details,
2424                });
2425            }
2426            _ => {}
2427        }
2428    }
2429
2430    let left_graph = (
2431        left_observability.action_graph_nodes.len(),
2432        left_observability.action_graph_edges.len(),
2433    );
2434    let right_graph = (
2435        right_observability.action_graph_nodes.len(),
2436        right_observability.action_graph_edges.len(),
2437    );
2438    if left_graph != right_graph {
2439        observability_diffs.push(RunObservabilityDiffRecord {
2440            section: "action_graph".to_string(),
2441            label: "shape".to_string(),
2442            details: vec![format!(
2443                "nodes/edges: {}/{} -> {}/{}",
2444                left_graph.0, left_graph.1, right_graph.0, right_graph.1
2445            )],
2446        });
2447    }
2448
2449    let status_changed = left.status != right.status;
2450    let identical = !status_changed
2451        && stage_diffs.is_empty()
2452        && tool_diffs.is_empty()
2453        && observability_diffs.is_empty()
2454        && left.transitions.len() == right.transitions.len()
2455        && left.artifacts.len() == right.artifacts.len()
2456        && left.checkpoints.len() == right.checkpoints.len();
2457
2458    RunDiffReport {
2459        left_run_id: left.id.clone(),
2460        right_run_id: right.id.clone(),
2461        identical,
2462        status_changed,
2463        left_status: left.status.clone(),
2464        right_status: right.status.clone(),
2465        stage_diffs,
2466        tool_diffs,
2467        observability_diffs,
2468        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2469        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2470        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2471    }
2472}