Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, sync_run_handoffs,
10    ArtifactRecord, CapabilityPolicy, HandoffArtifact,
11};
12use crate::event_log::{
13    active_event_log, AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic,
14};
15use crate::llm::vm_value_to_json;
16use crate::triggers::{SignatureStatus, TriggerEvent};
17use crate::value::{VmError, VmValue};
18
19pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
21pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
22pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
23pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
24pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
25pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
26pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
27pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
28pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
29pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
30
31pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
32pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
33pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
34pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
35pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
36pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
37pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
38pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
39pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct LlmUsageRecord {
44    pub input_tokens: i64,
45    pub output_tokens: i64,
46    pub total_duration_ms: i64,
47    pub call_count: i64,
48    pub total_cost: f64,
49    pub models: Vec<String>,
50}
51
52#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
53#[serde(default)]
54pub struct RunStageRecord {
55    pub id: String,
56    pub node_id: String,
57    pub kind: String,
58    pub status: String,
59    pub outcome: String,
60    pub branch: Option<String>,
61    pub started_at: String,
62    pub finished_at: Option<String>,
63    pub visible_text: Option<String>,
64    pub private_reasoning: Option<String>,
65    pub transcript: Option<serde_json::Value>,
66    pub verification: Option<serde_json::Value>,
67    pub usage: Option<LlmUsageRecord>,
68    pub artifacts: Vec<ArtifactRecord>,
69    pub consumed_artifact_ids: Vec<String>,
70    pub produced_artifact_ids: Vec<String>,
71    pub attempts: Vec<RunStageAttemptRecord>,
72    pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
76#[serde(default)]
77pub struct RunStageAttemptRecord {
78    pub attempt: usize,
79    pub status: String,
80    pub outcome: String,
81    pub branch: Option<String>,
82    pub error: Option<String>,
83    pub verification: Option<serde_json::Value>,
84    pub started_at: String,
85    pub finished_at: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct RunTransitionRecord {
91    pub id: String,
92    pub from_stage_id: Option<String>,
93    pub from_node_id: Option<String>,
94    pub to_node_id: String,
95    pub branch: Option<String>,
96    pub timestamp: String,
97    pub consumed_artifact_ids: Vec<String>,
98    pub produced_artifact_ids: Vec<String>,
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct RunCheckpointRecord {
104    pub id: String,
105    pub ready_nodes: Vec<String>,
106    pub completed_nodes: Vec<String>,
107    pub last_stage_id: Option<String>,
108    pub persisted_at: String,
109    pub reason: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ReplayFixture {
115    #[serde(rename = "_type")]
116    pub type_name: String,
117    pub id: String,
118    pub source_run_id: String,
119    pub workflow_id: String,
120    pub workflow_name: Option<String>,
121    pub created_at: String,
122    pub eval_kind: Option<String>,
123    pub clarifying_question: Option<ClarifyingQuestionEvalSpec>,
124    pub expected_status: String,
125    pub stage_assertions: Vec<ReplayStageAssertion>,
126}
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(default)]
130pub struct ClarifyingQuestionEvalSpec {
131    pub expected_question: Option<String>,
132    pub accepted_questions: Vec<String>,
133    pub required_terms: Vec<String>,
134    pub forbidden_terms: Vec<String>,
135    pub min_questions: usize,
136    pub max_questions: Option<usize>,
137}
138
139#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
140#[serde(default)]
141pub struct ReplayStageAssertion {
142    pub node_id: String,
143    pub expected_status: String,
144    pub expected_outcome: String,
145    pub expected_branch: Option<String>,
146    pub required_artifact_kinds: Vec<String>,
147    pub visible_text_contains: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ReplayEvalReport {
153    pub pass: bool,
154    pub failures: Vec<String>,
155    pub stage_count: usize,
156}
157
158#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct ReplayEvalCaseReport {
161    pub run_id: String,
162    pub workflow_id: String,
163    pub label: Option<String>,
164    pub pass: bool,
165    pub failures: Vec<String>,
166    pub stage_count: usize,
167    pub source_path: Option<String>,
168    pub comparison: Option<RunDiffReport>,
169}
170
171#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
172#[serde(default)]
173pub struct ReplayEvalSuiteReport {
174    pub pass: bool,
175    pub total: usize,
176    pub passed: usize,
177    pub failed: usize,
178    pub cases: Vec<ReplayEvalCaseReport>,
179}
180
181#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
182#[serde(default)]
183pub struct RunDeliverableSummaryRecord {
184    pub id: String,
185    pub text: String,
186    pub status: String,
187    pub note: Option<String>,
188}
189
190#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
191#[serde(default)]
192pub struct RunTaskLedgerSummaryRecord {
193    pub root_task: String,
194    pub rationale: String,
195    pub deliverables: Vec<RunDeliverableSummaryRecord>,
196    pub observations: Vec<String>,
197    pub blocking_count: usize,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunPlannerRoundRecord {
203    pub stage_id: String,
204    pub node_id: String,
205    pub stage_kind: String,
206    pub status: String,
207    pub outcome: String,
208    pub iteration_count: usize,
209    pub llm_call_count: usize,
210    pub tool_execution_count: usize,
211    pub tool_rejection_count: usize,
212    pub intervention_count: usize,
213    pub compaction_count: usize,
214    pub native_text_tool_fallback_count: usize,
215    pub native_text_tool_fallback_rejection_count: usize,
216    pub empty_completion_retry_count: usize,
217    pub tools_used: Vec<String>,
218    pub successful_tools: Vec<String>,
219    pub ledger_done_rejections: usize,
220    pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
221    pub research_facts: Vec<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunWorkerLineageRecord {
227    pub worker_id: String,
228    pub worker_name: String,
229    pub parent_stage_id: Option<String>,
230    pub task: String,
231    pub status: String,
232    pub session_id: Option<String>,
233    pub parent_session_id: Option<String>,
234    pub run_id: Option<String>,
235    pub run_path: Option<String>,
236    pub snapshot_path: Option<String>,
237}
238
239#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
240#[serde(default)]
241pub struct RunActionGraphNodeRecord {
242    pub id: String,
243    pub label: String,
244    pub kind: String,
245    pub status: String,
246    pub outcome: String,
247    pub trace_id: Option<String>,
248    pub stage_id: Option<String>,
249    pub node_id: Option<String>,
250    pub worker_id: Option<String>,
251    pub run_id: Option<String>,
252    pub run_path: Option<String>,
253    pub metadata: BTreeMap<String, serde_json::Value>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
257#[serde(default)]
258pub struct RunActionGraphEdgeRecord {
259    pub from_id: String,
260    pub to_id: String,
261    pub kind: String,
262    pub label: Option<String>,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct RunVerificationOutcomeRecord {
268    pub stage_id: String,
269    pub node_id: String,
270    pub status: String,
271    pub passed: Option<bool>,
272    pub summary: Option<String>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct RunTranscriptPointerRecord {
278    pub id: String,
279    pub label: String,
280    pub kind: String,
281    pub location: String,
282    pub path: Option<String>,
283    pub available: bool,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct CompactionEventRecord {
289    pub id: String,
290    pub transcript_id: Option<String>,
291    pub stage_id: Option<String>,
292    pub node_id: Option<String>,
293    pub mode: String,
294    pub strategy: String,
295    pub archived_messages: usize,
296    pub estimated_tokens_before: usize,
297    pub estimated_tokens_after: usize,
298    pub snapshot_asset_id: Option<String>,
299    pub snapshot_location: String,
300    pub snapshot_path: Option<String>,
301    pub available: bool,
302}
303
304#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
305#[serde(rename_all = "snake_case")]
306pub enum DaemonEventKindRecord {
307    #[default]
308    Spawned,
309    Triggered,
310    Snapshotted,
311    Resumed,
312    Stopped,
313}
314
315#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
316#[serde(default)]
317pub struct DaemonEventRecord {
318    pub daemon_id: String,
319    pub name: String,
320    pub kind: DaemonEventKindRecord,
321    pub timestamp: String,
322    pub persist_path: String,
323    pub payload_summary: Option<String>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunObservabilityRecord {
329    pub schema_version: usize,
330    pub planner_rounds: Vec<RunPlannerRoundRecord>,
331    pub research_fact_count: usize,
332    pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
333    pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
334    pub worker_lineage: Vec<RunWorkerLineageRecord>,
335    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
336    pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
337    pub compaction_events: Vec<CompactionEventRecord>,
338    pub daemon_events: Vec<DaemonEventRecord>,
339}
340
341#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(default)]
343pub struct RunStageDiffRecord {
344    pub node_id: String,
345    pub change: String,
346    pub details: Vec<String>,
347}
348
349#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
350#[serde(default)]
351pub struct ToolCallDiffRecord {
352    pub tool_name: String,
353    pub args_hash: String,
354    pub result_changed: bool,
355    pub left_result: Option<String>,
356    pub right_result: Option<String>,
357}
358
359#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
360#[serde(default)]
361pub struct RunObservabilityDiffRecord {
362    pub section: String,
363    pub label: String,
364    pub details: Vec<String>,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct RunDiffReport {
370    pub left_run_id: String,
371    pub right_run_id: String,
372    pub identical: bool,
373    pub status_changed: bool,
374    pub left_status: String,
375    pub right_status: String,
376    pub stage_diffs: Vec<RunStageDiffRecord>,
377    pub tool_diffs: Vec<ToolCallDiffRecord>,
378    pub observability_diffs: Vec<RunObservabilityDiffRecord>,
379    pub transition_count_delta: isize,
380    pub artifact_count_delta: isize,
381    pub checkpoint_count_delta: isize,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct EvalSuiteManifest {
387    #[serde(rename = "_type")]
388    pub type_name: String,
389    pub id: String,
390    pub name: Option<String>,
391    pub base_dir: Option<String>,
392    pub cases: Vec<EvalSuiteCase>,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct EvalSuiteCase {
398    pub label: Option<String>,
399    pub run_path: String,
400    pub fixture_path: Option<String>,
401    pub compare_to: Option<String>,
402}
403
404#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
405#[serde(default)]
406pub struct RunHitlQuestionRecord {
407    pub request_id: String,
408    pub prompt: String,
409    pub agent: String,
410    pub trace_id: Option<String>,
411    pub asked_at: String,
412}
413
414#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
415#[serde(default)]
416pub struct RunRecord {
417    #[serde(rename = "_type")]
418    pub type_name: String,
419    pub id: String,
420    pub workflow_id: String,
421    pub workflow_name: Option<String>,
422    pub task: String,
423    pub status: String,
424    pub started_at: String,
425    pub finished_at: Option<String>,
426    pub parent_run_id: Option<String>,
427    pub root_run_id: Option<String>,
428    pub stages: Vec<RunStageRecord>,
429    pub transitions: Vec<RunTransitionRecord>,
430    pub checkpoints: Vec<RunCheckpointRecord>,
431    pub pending_nodes: Vec<String>,
432    pub completed_nodes: Vec<String>,
433    pub child_runs: Vec<RunChildRecord>,
434    pub artifacts: Vec<ArtifactRecord>,
435    pub handoffs: Vec<HandoffArtifact>,
436    pub policy: CapabilityPolicy,
437    pub execution: Option<RunExecutionRecord>,
438    pub transcript: Option<serde_json::Value>,
439    pub usage: Option<LlmUsageRecord>,
440    pub replay_fixture: Option<ReplayFixture>,
441    pub observability: Option<RunObservabilityRecord>,
442    pub trace_spans: Vec<RunTraceSpanRecord>,
443    pub tool_recordings: Vec<ToolCallRecord>,
444    pub hitl_questions: Vec<RunHitlQuestionRecord>,
445    pub metadata: BTreeMap<String, serde_json::Value>,
446    pub persisted_path: Option<String>,
447}
448
449#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
450#[serde(default)]
451pub struct ToolCallRecord {
452    pub tool_name: String,
453    pub tool_use_id: String,
454    pub args_hash: String,
455    pub result: String,
456    pub is_rejected: bool,
457    pub duration_ms: u64,
458    pub iteration: usize,
459    pub timestamp: String,
460}
461
462/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
463pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
464    use std::hash::{Hash, Hasher};
465    let mut hasher = std::collections::hash_map::DefaultHasher::new();
466    tool_name.hash(&mut hasher);
467    let args_str = serde_json::to_string(args).unwrap_or_default();
468    args_str.hash(&mut hasher);
469    format!("{:016x}", hasher.finish())
470}
471
472#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
473#[serde(default)]
474pub struct RunTraceSpanRecord {
475    pub span_id: u64,
476    pub parent_id: Option<u64>,
477    pub kind: String,
478    pub name: String,
479    pub start_ms: u64,
480    pub duration_ms: u64,
481    pub metadata: BTreeMap<String, serde_json::Value>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
485#[serde(default)]
486pub struct RunChildRecord {
487    pub worker_id: String,
488    pub worker_name: String,
489    pub parent_stage_id: Option<String>,
490    pub session_id: Option<String>,
491    pub parent_session_id: Option<String>,
492    pub mutation_scope: Option<String>,
493    pub approval_policy: Option<super::ToolApprovalPolicy>,
494    pub task: String,
495    pub request: Option<serde_json::Value>,
496    pub provenance: Option<serde_json::Value>,
497    pub status: String,
498    pub started_at: String,
499    pub finished_at: Option<String>,
500    pub run_id: Option<String>,
501    pub run_path: Option<String>,
502    pub snapshot_path: Option<String>,
503    pub execution: Option<RunExecutionRecord>,
504}
505
506#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(default)]
508pub struct RunExecutionRecord {
509    pub cwd: Option<String>,
510    pub source_dir: Option<String>,
511    pub env: BTreeMap<String, String>,
512    pub adapter: Option<String>,
513    pub repo_path: Option<String>,
514    pub worktree_path: Option<String>,
515    pub branch: Option<String>,
516    pub base_ref: Option<String>,
517    pub cleanup: Option<String>,
518}
519
520fn compact_json_value(value: &serde_json::Value) -> String {
521    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
522}
523
524fn normalize_question_text(text: &str) -> String {
525    text.chars()
526        .map(|ch| {
527            if ch.is_ascii_alphanumeric() || ch.is_whitespace() {
528                ch.to_ascii_lowercase()
529            } else {
530                ' '
531            }
532        })
533        .collect::<String>()
534        .split_whitespace()
535        .collect::<Vec<_>>()
536        .join(" ")
537}
538
539fn clarifying_min_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
540    spec.min_questions.max(1)
541}
542
543fn clarifying_max_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
544    spec.max_questions.unwrap_or(1).max(1)
545}
546
547fn read_topic_records(
548    log: &AnyEventLog,
549    topic: &Topic,
550) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
551    let mut from = None;
552    let mut records = Vec::new();
553    loop {
554        let batch =
555            futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
556        if batch.is_empty() {
557            break;
558        }
559        from = batch.last().map(|(event_id, _)| *event_id);
560        records.extend(batch);
561    }
562    records
563}
564
565fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
566    let Some(log) = active_event_log() else {
567        return;
568    };
569    let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
570        .expect("static hitl.questions topic should always be valid");
571    let mut merged = run
572        .hitl_questions
573        .iter()
574        .cloned()
575        .map(|question| (question.request_id.clone(), question))
576        .collect::<BTreeMap<_, _>>();
577
578    for (_, event) in read_topic_records(log.as_ref(), &topic) {
579        if event.kind != "hitl.question_asked" {
580            continue;
581        }
582        let payload = &event.payload;
583        let matches_run = event
584            .headers
585            .get("run_id")
586            .is_some_and(|value| value == &run.id)
587            || payload
588                .get("run_id")
589                .and_then(|value| value.as_str())
590                .is_some_and(|value| value == run.id);
591        if !matches_run {
592            continue;
593        }
594        let request_id = payload
595            .get("request_id")
596            .and_then(|value| value.as_str())
597            .or_else(|| event.headers.get("request_id").map(String::as_str))
598            .unwrap_or_default();
599        let prompt = payload
600            .get("payload")
601            .and_then(|value| value.get("prompt"))
602            .and_then(|value| value.as_str())
603            .unwrap_or_default();
604        if request_id.is_empty() || prompt.is_empty() {
605            continue;
606        }
607        merged.insert(
608            request_id.to_string(),
609            RunHitlQuestionRecord {
610                request_id: request_id.to_string(),
611                prompt: prompt.to_string(),
612                agent: payload
613                    .get("agent")
614                    .and_then(|value| value.as_str())
615                    .unwrap_or_default()
616                    .to_string(),
617                trace_id: payload
618                    .get("trace_id")
619                    .and_then(|value| value.as_str())
620                    .map(str::to_string),
621                asked_at: payload
622                    .get("requested_at")
623                    .and_then(|value| value.as_str())
624                    .unwrap_or_default()
625                    .to_string(),
626            },
627        );
628    }
629
630    run.hitl_questions = merged.into_values().collect();
631    run.hitl_questions.sort_by(|left, right| {
632        (left.asked_at.as_str(), left.request_id.as_str())
633            .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
634    });
635}
636
637fn signature_status_label(status: &SignatureStatus) -> &'static str {
638    match status {
639        SignatureStatus::Verified => "verified",
640        SignatureStatus::Unsigned => "unsigned",
641        SignatureStatus::Failed { .. } => "failed",
642    }
643}
644
645fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
646    run.metadata
647        .get("trigger_event")
648        .cloned()
649        .and_then(|value| serde_json::from_value(value).ok())
650}
651
652fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
653    trigger_event
654        .map(|event| event.trace_id.0.clone())
655        .or_else(|| {
656            run.metadata
657                .get("trace_id")
658                .and_then(|value| value.as_str())
659                .map(str::to_string)
660        })
661}
662
663fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
664    run.metadata
665        .get("replay_of_event_id")
666        .and_then(|value| value.as_str())
667        .map(str::to_string)
668}
669
670fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
671    if stage.kind == "condition" {
672        ACTION_GRAPH_NODE_KIND_PREDICATE
673    } else {
674        ACTION_GRAPH_NODE_KIND_STAGE
675    }
676}
677
678fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
679    let mut metadata = BTreeMap::new();
680    metadata.insert(
681        "provider".to_string(),
682        serde_json::json!(trigger_event.provider.as_str()),
683    );
684    metadata.insert(
685        "event_kind".to_string(),
686        serde_json::json!(trigger_event.kind),
687    );
688    metadata.insert(
689        "dedupe_key".to_string(),
690        serde_json::json!(trigger_event.dedupe_key),
691    );
692    metadata.insert(
693        "signature_status".to_string(),
694        serde_json::json!(signature_status_label(&trigger_event.signature_status)),
695    );
696    metadata
697}
698
699fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
700    let mut metadata = BTreeMap::new();
701    metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
702    if let Some(branch) = stage.branch.as_ref() {
703        metadata.insert("branch".to_string(), serde_json::json!(branch));
704    }
705    if let Some(worker_id) = stage
706        .metadata
707        .get("worker_id")
708        .and_then(|value| value.as_str())
709    {
710        metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
711    }
712    metadata
713}
714
715fn append_action_graph_node(
716    nodes: &mut Vec<RunActionGraphNodeRecord>,
717    record: RunActionGraphNodeRecord,
718) {
719    nodes.push(record);
720}
721
722pub async fn append_action_graph_update(
723    headers: BTreeMap<String, String>,
724    payload: serde_json::Value,
725) -> Result<(), crate::event_log::LogError> {
726    let Some(log) = active_event_log() else {
727        return Ok(());
728    };
729    let topic = Topic::new("observability.action_graph")
730        .expect("static observability.action_graph topic should always be valid");
731    let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
732    log.append(&topic, record).await.map(|_| ())
733}
734
735fn publish_action_graph_event(
736    run: &RunRecord,
737    observability: &RunObservabilityRecord,
738    path: &Path,
739) {
740    let trigger_event = trigger_event_from_run(run);
741    let mut headers = BTreeMap::new();
742    headers.insert("run_id".to_string(), run.id.clone());
743    headers.insert("workflow_id".to_string(), run.workflow_id.clone());
744    if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
745        headers.insert("trace_id".to_string(), trace_id);
746    }
747    let payload = serde_json::json!({
748        "run_id": run.id,
749        "workflow_id": run.workflow_id,
750        "persisted_path": path.to_string_lossy(),
751        "status": run.status,
752        "observability": observability,
753    });
754    if let Ok(handle) = tokio::runtime::Handle::try_current() {
755        handle.spawn(async move {
756            let _ = append_action_graph_update(headers, payload).await;
757        });
758    } else {
759        let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
760    }
761}
762
763fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
764    let stem = run_path.file_stem()?.to_str()?;
765    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
766    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
767}
768
769fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
770    value
771        .and_then(|value| value.as_array())
772        .map(|items| {
773            items
774                .iter()
775                .filter_map(|item| item.as_str().map(str::to_string))
776                .collect::<Vec<_>>()
777        })
778        .unwrap_or_default()
779}
780
781fn json_usize(value: Option<&serde_json::Value>) -> usize {
782    value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
783}
784
785fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
786    value.and_then(|value| value.as_bool())
787}
788
789fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
790    stage
791        .artifacts
792        .iter()
793        .find_map(|artifact| artifact.data.as_ref())
794}
795
796fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
797    let deliverables = value
798        .get("deliverables")
799        .and_then(|raw| raw.as_array())
800        .map(|items| {
801            items
802                .iter()
803                .map(|item| RunDeliverableSummaryRecord {
804                    id: item
805                        .get("id")
806                        .and_then(|value| value.as_str())
807                        .unwrap_or_default()
808                        .to_string(),
809                    text: item
810                        .get("text")
811                        .and_then(|value| value.as_str())
812                        .unwrap_or_default()
813                        .to_string(),
814                    status: item
815                        .get("status")
816                        .and_then(|value| value.as_str())
817                        .unwrap_or_default()
818                        .to_string(),
819                    note: item
820                        .get("note")
821                        .and_then(|value| value.as_str())
822                        .map(str::to_string),
823                })
824                .collect::<Vec<_>>()
825        })
826        .unwrap_or_default();
827    let observations = json_string_array(value.get("observations"));
828    let root_task = value
829        .get("root_task")
830        .and_then(|value| value.as_str())
831        .unwrap_or_default()
832        .to_string();
833    let rationale = value
834        .get("rationale")
835        .and_then(|value| value.as_str())
836        .unwrap_or_default()
837        .to_string();
838    if root_task.is_empty()
839        && rationale.is_empty()
840        && deliverables.is_empty()
841        && observations.is_empty()
842    {
843        return None;
844    }
845    let blocking_count = deliverables
846        .iter()
847        .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
848        .count();
849    Some(RunTaskLedgerSummaryRecord {
850        root_task,
851        rationale,
852        deliverables,
853        observations,
854        blocking_count,
855    })
856}
857
858fn compaction_events_from_transcript(
859    transcript: &serde_json::Value,
860    stage_id: Option<&str>,
861    node_id: Option<&str>,
862    location_prefix: &str,
863    persisted_path: Option<&Path>,
864) -> Vec<CompactionEventRecord> {
865    let transcript_id = transcript
866        .get("id")
867        .and_then(|value| value.as_str())
868        .map(str::to_string);
869    let asset_ids = transcript
870        .get("assets")
871        .and_then(|value| value.as_array())
872        .map(|assets| {
873            assets
874                .iter()
875                .filter_map(|asset| {
876                    asset
877                        .get("id")
878                        .and_then(|value| value.as_str())
879                        .map(str::to_string)
880                })
881                .collect::<BTreeSet<_>>()
882        })
883        .unwrap_or_default();
884    transcript
885        .get("events")
886        .and_then(|value| value.as_array())
887        .map(|events| {
888            events
889                .iter()
890                .filter(|event| {
891                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
892                })
893                .map(|event| {
894                    let metadata = event.get("metadata");
895                    let snapshot_asset_id = metadata
896                        .and_then(|value| value.get("snapshot_asset_id"))
897                        .and_then(|value| value.as_str())
898                        .map(str::to_string);
899                    let available = snapshot_asset_id
900                        .as_ref()
901                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
902                    let snapshot_location = snapshot_asset_id
903                        .as_ref()
904                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
905                        .unwrap_or_else(|| location_prefix.to_string());
906                    CompactionEventRecord {
907                        id: event
908                            .get("id")
909                            .and_then(|value| value.as_str())
910                            .unwrap_or_default()
911                            .to_string(),
912                        transcript_id: transcript_id.clone(),
913                        stage_id: stage_id.map(str::to_string),
914                        node_id: node_id.map(str::to_string),
915                        mode: metadata
916                            .and_then(|value| value.get("mode"))
917                            .and_then(|value| value.as_str())
918                            .unwrap_or_default()
919                            .to_string(),
920                        strategy: metadata
921                            .and_then(|value| value.get("strategy"))
922                            .and_then(|value| value.as_str())
923                            .unwrap_or_default()
924                            .to_string(),
925                        archived_messages: json_usize(
926                            metadata.and_then(|value| value.get("archived_messages")),
927                        ),
928                        estimated_tokens_before: json_usize(
929                            metadata.and_then(|value| value.get("estimated_tokens_before")),
930                        ),
931                        estimated_tokens_after: json_usize(
932                            metadata.and_then(|value| value.get("estimated_tokens_after")),
933                        ),
934                        snapshot_asset_id,
935                        snapshot_location,
936                        snapshot_path: persisted_path
937                            .map(|path| path.to_string_lossy().into_owned()),
938                        available,
939                    }
940                })
941                .collect()
942        })
943        .unwrap_or_default()
944}
945
946fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
947    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
948        return Vec::new();
949    };
950    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
951        return Vec::new();
952    };
953
954    content
955        .lines()
956        .filter(|line| !line.trim().is_empty())
957        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
958        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
959        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
960        .collect()
961}
962
963pub fn derive_run_observability(
964    run: &RunRecord,
965    persisted_path: Option<&Path>,
966) -> RunObservabilityRecord {
967    let mut action_graph_nodes = Vec::new();
968    let mut action_graph_edges = Vec::new();
969    let mut verification_outcomes = Vec::new();
970    let mut planner_rounds = Vec::new();
971    let mut transcript_pointers = Vec::new();
972    let mut compaction_events = Vec::new();
973    let mut daemon_events = Vec::new();
974    let mut research_fact_count = 0usize;
975
976    let root_node_id = format!("run:{}", run.id);
977    let trigger_event = trigger_event_from_run(run);
978    let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
979    append_action_graph_node(
980        &mut action_graph_nodes,
981        RunActionGraphNodeRecord {
982            id: root_node_id.clone(),
983            label: run
984                .workflow_name
985                .clone()
986                .unwrap_or_else(|| run.workflow_id.clone()),
987            kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
988            status: run.status.clone(),
989            outcome: run.status.clone(),
990            trace_id: propagated_trace_id.clone(),
991            stage_id: None,
992            node_id: None,
993            worker_id: None,
994            run_id: Some(run.id.clone()),
995            run_path: run.persisted_path.clone(),
996            metadata: BTreeMap::from([(
997                "workflow_id".to_string(),
998                serde_json::json!(run.workflow_id),
999            )]),
1000        },
1001    );
1002    let mut entry_node_id = root_node_id.clone();
1003    if let Some(trigger_event) = trigger_event.as_ref() {
1004        if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
1005            let replay_source_node_id = format!("trigger:{replay_of_event_id}");
1006            append_action_graph_node(
1007                &mut action_graph_nodes,
1008                RunActionGraphNodeRecord {
1009                    id: replay_source_node_id.clone(),
1010                    label: format!(
1011                        "{}:{} (original {})",
1012                        trigger_event.provider.as_str(),
1013                        trigger_event.kind,
1014                        replay_of_event_id
1015                    ),
1016                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1017                    status: "historical".to_string(),
1018                    outcome: "replayed_from".to_string(),
1019                    trace_id: Some(trigger_event.trace_id.0.clone()),
1020                    stage_id: None,
1021                    node_id: None,
1022                    worker_id: None,
1023                    run_id: Some(run.id.clone()),
1024                    run_path: run.persisted_path.clone(),
1025                    metadata: trigger_node_metadata(trigger_event),
1026                },
1027            );
1028            action_graph_edges.push(RunActionGraphEdgeRecord {
1029                from_id: replay_source_node_id,
1030                to_id: format!("trigger:{}", trigger_event.id.0),
1031                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1032                label: Some("replay chain".to_string()),
1033            });
1034        }
1035        let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
1036        append_action_graph_node(
1037            &mut action_graph_nodes,
1038            RunActionGraphNodeRecord {
1039                id: trigger_node_id.clone(),
1040                label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
1041                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1042                status: "received".to_string(),
1043                outcome: signature_status_label(&trigger_event.signature_status).to_string(),
1044                trace_id: Some(trigger_event.trace_id.0.clone()),
1045                stage_id: None,
1046                node_id: None,
1047                worker_id: None,
1048                run_id: Some(run.id.clone()),
1049                run_path: run.persisted_path.clone(),
1050                metadata: trigger_node_metadata(trigger_event),
1051            },
1052        );
1053        action_graph_edges.push(RunActionGraphEdgeRecord {
1054            from_id: root_node_id.clone(),
1055            to_id: trigger_node_id.clone(),
1056            kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
1057            label: Some(trigger_event.id.0.clone()),
1058        });
1059        entry_node_id = trigger_node_id;
1060    }
1061
1062    let stage_node_ids = run
1063        .stages
1064        .iter()
1065        .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
1066        .collect::<BTreeMap<_, _>>();
1067    let stage_by_id = run
1068        .stages
1069        .iter()
1070        .map(|stage| (stage.id.as_str(), stage))
1071        .collect::<BTreeMap<_, _>>();
1072    let stage_by_node_id = run
1073        .stages
1074        .iter()
1075        .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
1076        .collect::<BTreeMap<_, _>>();
1077
1078    let incoming_nodes = run
1079        .transitions
1080        .iter()
1081        .map(|transition| transition.to_node_id.clone())
1082        .collect::<BTreeSet<_>>();
1083
1084    for stage in &run.stages {
1085        let graph_node_id = stage_node_ids
1086            .get(&stage.id)
1087            .cloned()
1088            .unwrap_or_else(|| format!("stage:{}", stage.id));
1089        append_action_graph_node(
1090            &mut action_graph_nodes,
1091            RunActionGraphNodeRecord {
1092                id: graph_node_id.clone(),
1093                label: stage.node_id.clone(),
1094                kind: action_graph_kind_for_stage(stage).to_string(),
1095                status: stage.status.clone(),
1096                outcome: stage.outcome.clone(),
1097                trace_id: propagated_trace_id.clone(),
1098                stage_id: Some(stage.id.clone()),
1099                node_id: Some(stage.node_id.clone()),
1100                worker_id: stage
1101                    .metadata
1102                    .get("worker_id")
1103                    .and_then(|value| value.as_str())
1104                    .map(str::to_string),
1105                run_id: None,
1106                run_path: None,
1107                metadata: stage_node_metadata(stage),
1108            },
1109        );
1110        if !incoming_nodes.contains(&stage.node_id) {
1111            action_graph_edges.push(RunActionGraphEdgeRecord {
1112                from_id: entry_node_id.clone(),
1113                to_id: graph_node_id.clone(),
1114                kind: if trigger_event.is_some() {
1115                    ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
1116                } else {
1117                    ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
1118                },
1119                label: None,
1120            });
1121        }
1122
1123        if stage.kind == "verify" || stage.verification.is_some() {
1124            let passed = json_bool(
1125                stage
1126                    .verification
1127                    .as_ref()
1128                    .and_then(|value| value.get("pass")),
1129            )
1130            .or_else(|| {
1131                json_bool(
1132                    stage
1133                        .verification
1134                        .as_ref()
1135                        .and_then(|value| value.get("success")),
1136                )
1137            })
1138            .or_else(|| {
1139                if stage.status == "completed" && stage.outcome == "success" {
1140                    Some(true)
1141                } else if stage.status == "failed" || stage.outcome == "failed" {
1142                    Some(false)
1143                } else {
1144                    None
1145                }
1146            });
1147            verification_outcomes.push(RunVerificationOutcomeRecord {
1148                stage_id: stage.id.clone(),
1149                node_id: stage.node_id.clone(),
1150                status: stage.status.clone(),
1151                passed,
1152                summary: stage
1153                    .verification
1154                    .as_ref()
1155                    .map(compact_json_value)
1156                    .or_else(|| {
1157                        stage
1158                            .visible_text
1159                            .as_ref()
1160                            .filter(|value| !value.trim().is_empty())
1161                            .cloned()
1162                    }),
1163            });
1164        }
1165
1166        if stage.transcript.is_some() {
1167            transcript_pointers.push(RunTranscriptPointerRecord {
1168                id: format!("stage:{}:transcript", stage.id),
1169                label: format!("Stage {} transcript", stage.node_id),
1170                kind: "embedded_transcript".to_string(),
1171                location: format!("run.stages[{}].transcript", stage.node_id),
1172                path: run.persisted_path.clone(),
1173                available: true,
1174            });
1175            if let Some(transcript) = stage.transcript.as_ref() {
1176                compaction_events.extend(compaction_events_from_transcript(
1177                    transcript,
1178                    Some(&stage.id),
1179                    Some(&stage.node_id),
1180                    &format!("run.stages[{}].transcript", stage.node_id),
1181                    persisted_path,
1182                ));
1183            }
1184        }
1185
1186        if let Some(payload) = stage_result_payload(stage) {
1187            let trace = payload.get("trace");
1188            let task_ledger = payload
1189                .get("task_ledger")
1190                .and_then(task_ledger_summary_from_value);
1191            let research_facts = task_ledger
1192                .as_ref()
1193                .map(|ledger| ledger.observations.clone())
1194                .unwrap_or_default();
1195            research_fact_count += research_facts.len();
1196            let tools_used = json_string_array(
1197                payload
1198                    .get("tools_used")
1199                    .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1200            );
1201            let successful_tools = json_string_array(payload.get("successful_tools"));
1202            let planner_round = RunPlannerRoundRecord {
1203                stage_id: stage.id.clone(),
1204                node_id: stage.node_id.clone(),
1205                stage_kind: stage.kind.clone(),
1206                status: stage.status.clone(),
1207                outcome: stage.outcome.clone(),
1208                iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1209                llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1210                tool_execution_count: json_usize(
1211                    trace.and_then(|trace| trace.get("tool_executions")),
1212                ),
1213                tool_rejection_count: json_usize(
1214                    trace.and_then(|trace| trace.get("tool_rejections")),
1215                ),
1216                intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1217                compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1218                native_text_tool_fallback_count: json_usize(
1219                    trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1220                ),
1221                native_text_tool_fallback_rejection_count: json_usize(
1222                    trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1223                ),
1224                empty_completion_retry_count: json_usize(
1225                    trace.and_then(|trace| trace.get("empty_completion_retries")),
1226                ),
1227                tools_used,
1228                successful_tools,
1229                ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1230                task_ledger,
1231                research_facts,
1232            };
1233            let has_agentic_detail = planner_round.iteration_count > 0
1234                || planner_round.llm_call_count > 0
1235                || planner_round.tool_execution_count > 0
1236                || planner_round.native_text_tool_fallback_count > 0
1237                || planner_round.native_text_tool_fallback_rejection_count > 0
1238                || planner_round.empty_completion_retry_count > 0
1239                || planner_round.ledger_done_rejections > 0
1240                || planner_round.task_ledger.is_some()
1241                || !planner_round.tools_used.is_empty()
1242                || !planner_round.successful_tools.is_empty();
1243            if has_agentic_detail {
1244                planner_rounds.push(planner_round);
1245            }
1246        }
1247    }
1248
1249    for transition in &run.transitions {
1250        let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1251            continue;
1252        };
1253        let from_stage = transition
1254            .from_stage_id
1255            .as_deref()
1256            .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1257        let from_id = transition
1258            .from_stage_id
1259            .as_ref()
1260            .and_then(|stage_id| stage_node_ids.get(stage_id))
1261            .cloned()
1262            .or_else(|| {
1263                transition
1264                    .from_node_id
1265                    .as_ref()
1266                    .and_then(|node_id| stage_by_node_id.get(node_id))
1267                    .cloned()
1268            })
1269            .unwrap_or_else(|| root_node_id.clone());
1270        action_graph_edges.push(RunActionGraphEdgeRecord {
1271            from_id,
1272            to_id,
1273            kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1274                ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1275            } else {
1276                ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1277            },
1278            label: transition.branch.clone(),
1279        });
1280    }
1281
1282    let worker_lineage = run
1283        .child_runs
1284        .iter()
1285        .map(|child| {
1286            let worker_node_id = format!("worker:{}", child.worker_id);
1287            append_action_graph_node(
1288                &mut action_graph_nodes,
1289                RunActionGraphNodeRecord {
1290                    id: worker_node_id.clone(),
1291                    label: child.worker_name.clone(),
1292                    kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1293                    status: child.status.clone(),
1294                    outcome: child.status.clone(),
1295                    trace_id: propagated_trace_id.clone(),
1296                    stage_id: child.parent_stage_id.clone(),
1297                    node_id: None,
1298                    worker_id: Some(child.worker_id.clone()),
1299                    run_id: child.run_id.clone(),
1300                    run_path: child.run_path.clone(),
1301                    metadata: BTreeMap::from([
1302                        (
1303                            "worker_name".to_string(),
1304                            serde_json::json!(child.worker_name),
1305                        ),
1306                        ("task".to_string(), serde_json::json!(child.task)),
1307                    ]),
1308                },
1309            );
1310            if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1311                if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1312                    action_graph_edges.push(RunActionGraphEdgeRecord {
1313                        from_id: stage_node_id.clone(),
1314                        to_id: worker_node_id,
1315                        kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1316                        label: Some(child.worker_name.clone()),
1317                    });
1318                }
1319            }
1320            RunWorkerLineageRecord {
1321                worker_id: child.worker_id.clone(),
1322                worker_name: child.worker_name.clone(),
1323                parent_stage_id: child.parent_stage_id.clone(),
1324                task: child.task.clone(),
1325                status: child.status.clone(),
1326                session_id: child.session_id.clone(),
1327                parent_session_id: child.parent_session_id.clone(),
1328                run_id: child.run_id.clone(),
1329                run_path: child.run_path.clone(),
1330                snapshot_path: child.snapshot_path.clone(),
1331            }
1332        })
1333        .collect::<Vec<_>>();
1334
1335    if run.transcript.is_some() {
1336        transcript_pointers.push(RunTranscriptPointerRecord {
1337            id: "run:transcript".to_string(),
1338            label: "Run transcript".to_string(),
1339            kind: "embedded_transcript".to_string(),
1340            location: "run.transcript".to_string(),
1341            path: run.persisted_path.clone(),
1342            available: true,
1343        });
1344        if let Some(transcript) = run.transcript.as_ref() {
1345            compaction_events.extend(compaction_events_from_transcript(
1346                transcript,
1347                None,
1348                None,
1349                "run.transcript",
1350                persisted_path,
1351            ));
1352        }
1353    }
1354
1355    if let Some(path) = persisted_path {
1356        if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1357            transcript_pointers.push(RunTranscriptPointerRecord {
1358                id: "run:llm_transcript".to_string(),
1359                label: "LLM transcript sidecar".to_string(),
1360                kind: "llm_jsonl".to_string(),
1361                location: "run sidecar".to_string(),
1362                path: Some(sidecar_path.to_string_lossy().into_owned()),
1363                available: sidecar_path.exists(),
1364            });
1365        }
1366        daemon_events.extend(daemon_events_from_sidecar(path));
1367    }
1368
1369    RunObservabilityRecord {
1370        schema_version: 4,
1371        planner_rounds,
1372        research_fact_count,
1373        action_graph_nodes,
1374        action_graph_edges,
1375        worker_lineage,
1376        verification_outcomes,
1377        transcript_pointers,
1378        compaction_events,
1379        daemon_events,
1380    }
1381}
1382
1383fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1384    run.observability = Some(derive_run_observability(run, persisted_path));
1385}
1386
1387pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1388    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1389    if run.type_name.is_empty() {
1390        run.type_name = "run_record".to_string();
1391    }
1392    if run.id.is_empty() {
1393        run.id = new_id("run");
1394    }
1395    if run.started_at.is_empty() {
1396        run.started_at = now_rfc3339();
1397    }
1398    if run.status.is_empty() {
1399        run.status = "running".to_string();
1400    }
1401    if run.root_run_id.is_none() {
1402        run.root_run_id = Some(run.id.clone());
1403    }
1404    if run.replay_fixture.is_none() {
1405        run.replay_fixture = Some(replay_fixture_from_run(&run));
1406    }
1407    merge_hitl_questions_from_active_log(&mut run);
1408    sync_run_handoffs(&mut run);
1409    if run.observability.is_none() {
1410        let persisted_path = run.persisted_path.clone();
1411        let persisted = persisted_path.as_deref().map(Path::new);
1412        refresh_run_observability(&mut run, persisted);
1413    }
1414    Ok(run)
1415}
1416
1417pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1418    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1419    if manifest.type_name.is_empty() {
1420        manifest.type_name = "eval_suite_manifest".to_string();
1421    }
1422    if manifest.id.is_empty() {
1423        manifest.id = new_id("eval_suite");
1424    }
1425    Ok(manifest)
1426}
1427
1428fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1429    let content = std::fs::read_to_string(path)
1430        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1431    serde_json::from_str(&content)
1432        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1433}
1434
1435fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1436    let path_buf = PathBuf::from(path);
1437    if path_buf.is_absolute() {
1438        path_buf
1439    } else if let Some(base_dir) = base_dir {
1440        base_dir.join(path_buf)
1441    } else {
1442        path_buf
1443    }
1444}
1445
1446pub fn evaluate_run_suite_manifest(
1447    manifest: &EvalSuiteManifest,
1448) -> Result<ReplayEvalSuiteReport, VmError> {
1449    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1450    let mut reports = Vec::new();
1451    for case in &manifest.cases {
1452        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1453        let run = load_run_record(&run_path)?;
1454        let fixture = match &case.fixture_path {
1455            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1456            None => run
1457                .replay_fixture
1458                .clone()
1459                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1460        };
1461        let eval = evaluate_run_against_fixture(&run, &fixture);
1462        let mut pass = eval.pass;
1463        let mut failures = eval.failures;
1464        let comparison = match &case.compare_to {
1465            Some(path) => {
1466                let baseline_path = resolve_manifest_path(base_dir, path);
1467                let baseline = load_run_record(&baseline_path)?;
1468                let diff = diff_run_records(&baseline, &run);
1469                if !diff.identical {
1470                    pass = false;
1471                    failures.push(format!(
1472                        "run differs from baseline {} with {} stage changes",
1473                        baseline_path.display(),
1474                        diff.stage_diffs.len()
1475                    ));
1476                }
1477                Some(diff)
1478            }
1479            None => None,
1480        };
1481        reports.push(ReplayEvalCaseReport {
1482            run_id: run.id.clone(),
1483            workflow_id: run.workflow_id.clone(),
1484            label: case.label.clone(),
1485            pass,
1486            failures,
1487            stage_count: eval.stage_count,
1488            source_path: Some(run_path.display().to_string()),
1489            comparison,
1490        });
1491    }
1492    let total = reports.len();
1493    let passed = reports.iter().filter(|report| report.pass).count();
1494    let failed = total.saturating_sub(passed);
1495    Ok(ReplayEvalSuiteReport {
1496        pass: failed == 0,
1497        total,
1498        passed,
1499        failed,
1500        cases: reports,
1501    })
1502}
1503
1504/// Edit operation in a diff sequence.
1505#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1506pub(crate) enum DiffOp {
1507    Equal,
1508    Delete,
1509    Insert,
1510}
1511
1512/// Compute the shortest edit script using Myers' O(nd) algorithm.
1513/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
1514/// Time: O(nd) where d = edit distance. Space: O(d * n).
1515pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1516    let n = a.len() as isize;
1517    let m = b.len() as isize;
1518    if n == 0 && m == 0 {
1519        return Vec::new();
1520    }
1521    if n == 0 {
1522        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1523    }
1524    if m == 0 {
1525        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1526    }
1527
1528    let max_d = (n + m) as usize;
1529    let offset = max_d as isize;
1530    let v_size = 2 * max_d + 1;
1531    let mut v = vec![0isize; v_size];
1532    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
1533    let mut trace: Vec<Vec<isize>> = Vec::new();
1534
1535    'outer: for d in 0..=max_d as isize {
1536        trace.push(v.clone());
1537        let mut new_v = v.clone();
1538        for k in (-d..=d).step_by(2) {
1539            let ki = (k + offset) as usize;
1540            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1541                v[ki + 1]
1542            } else {
1543                v[ki - 1] + 1
1544            };
1545            let mut y = x - k;
1546            while x < n && y < m && a[x as usize] == b[y as usize] {
1547                x += 1;
1548                y += 1;
1549            }
1550            new_v[ki] = x;
1551            if x >= n && y >= m {
1552                let _ = new_v;
1553                break 'outer;
1554            }
1555        }
1556        v = new_v;
1557    }
1558
1559    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1560    let mut x = n;
1561    let mut y = m;
1562    for d in (1..trace.len() as isize).rev() {
1563        let k = x - y;
1564        let v_prev = &trace[d as usize];
1565        let prev_k = if k == -d
1566            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1567        {
1568            k + 1
1569        } else {
1570            k - 1
1571        };
1572        let prev_x = v_prev[(prev_k + offset) as usize];
1573        let prev_y = prev_x - prev_k;
1574
1575        while x > prev_x && y > prev_y {
1576            x -= 1;
1577            y -= 1;
1578            ops.push((DiffOp::Equal, x as usize));
1579        }
1580        if prev_k < k {
1581            x -= 1;
1582            ops.push((DiffOp::Delete, x as usize));
1583        } else {
1584            y -= 1;
1585            ops.push((DiffOp::Insert, y as usize));
1586        }
1587    }
1588    while x > 0 && y > 0 {
1589        x -= 1;
1590        y -= 1;
1591        ops.push((DiffOp::Equal, x as usize));
1592    }
1593    ops.reverse();
1594    ops
1595}
1596
1597pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1598    let before_lines: Vec<&str> = before.lines().collect();
1599    let after_lines: Vec<&str> = after.lines().collect();
1600    let ops = myers_diff(&before_lines, &after_lines);
1601
1602    let mut diff = String::new();
1603    let file = path.unwrap_or("artifact");
1604    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1605    for &(op, idx) in &ops {
1606        match op {
1607            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1608            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1609            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1610        }
1611    }
1612    diff
1613}
1614
1615pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1616    let path = path
1617        .map(PathBuf::from)
1618        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1619    let mut materialized = run.clone();
1620    merge_hitl_questions_from_active_log(&mut materialized);
1621    if materialized.replay_fixture.is_none() {
1622        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1623    }
1624    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1625    sync_run_handoffs(&mut materialized);
1626    refresh_run_observability(&mut materialized, Some(&path));
1627    if let Some(parent) = path.parent() {
1628        std::fs::create_dir_all(parent)
1629            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1630    }
1631    let json = serde_json::to_string_pretty(&materialized)
1632        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1633    // Atomic write: .tmp then rename guards against partial writes on kill.
1634    let tmp_path = path.with_extension("json.tmp");
1635    std::fs::write(&tmp_path, &json)
1636        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1637    std::fs::rename(&tmp_path, &path).map_err(|e| {
1638        // Cross-device renames fail on some filesystems; best-effort direct write.
1639        let _ = std::fs::write(&path, &json);
1640        VmError::Runtime(format!("failed to finalize run record: {e}"))
1641    })?;
1642    if let Some(observability) = materialized.observability.as_ref() {
1643        publish_action_graph_event(&materialized, observability, &path);
1644    }
1645    Ok(path.to_string_lossy().into_owned())
1646}
1647
1648pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1649    let content = std::fs::read_to_string(path)
1650        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1651    let mut run: RunRecord = serde_json::from_str(&content)
1652        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1653    if run.replay_fixture.is_none() {
1654        run.replay_fixture = Some(replay_fixture_from_run(&run));
1655    }
1656    run.persisted_path
1657        .get_or_insert_with(|| path.to_string_lossy().into_owned());
1658    sync_run_handoffs(&mut run);
1659    refresh_run_observability(&mut run, Some(path));
1660    Ok(run)
1661}
1662
1663pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1664    ReplayFixture {
1665        type_name: "replay_fixture".to_string(),
1666        id: new_id("fixture"),
1667        source_run_id: run.id.clone(),
1668        workflow_id: run.workflow_id.clone(),
1669        workflow_name: run.workflow_name.clone(),
1670        created_at: now_rfc3339(),
1671        eval_kind: Some("replay".to_string()),
1672        clarifying_question: None,
1673        expected_status: run.status.clone(),
1674        stage_assertions: run
1675            .stages
1676            .iter()
1677            .map(|stage| ReplayStageAssertion {
1678                node_id: stage.node_id.clone(),
1679                expected_status: stage.status.clone(),
1680                expected_outcome: stage.outcome.clone(),
1681                expected_branch: stage.branch.clone(),
1682                required_artifact_kinds: stage
1683                    .artifacts
1684                    .iter()
1685                    .map(|artifact| artifact.kind.clone())
1686                    .collect(),
1687                visible_text_contains: stage
1688                    .visible_text
1689                    .as_ref()
1690                    .filter(|text| !text.is_empty())
1691                    .map(|text| text.chars().take(80).collect()),
1692            })
1693            .collect(),
1694    }
1695}
1696
1697pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1698    if fixture.eval_kind.as_deref() == Some("clarifying_question") {
1699        return evaluate_clarifying_question(run, fixture);
1700    }
1701    let mut failures = Vec::new();
1702    if run.status != fixture.expected_status {
1703        failures.push(format!(
1704            "run status mismatch: expected {}, got {}",
1705            fixture.expected_status, run.status
1706        ));
1707    }
1708    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1709        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1710    for assertion in &fixture.stage_assertions {
1711        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1712            failures.push(format!("missing stage {}", assertion.node_id));
1713            continue;
1714        };
1715        if stage.status != assertion.expected_status {
1716            failures.push(format!(
1717                "stage {} status mismatch: expected {}, got {}",
1718                assertion.node_id, assertion.expected_status, stage.status
1719            ));
1720        }
1721        if stage.outcome != assertion.expected_outcome {
1722            failures.push(format!(
1723                "stage {} outcome mismatch: expected {}, got {}",
1724                assertion.node_id, assertion.expected_outcome, stage.outcome
1725            ));
1726        }
1727        if stage.branch != assertion.expected_branch {
1728            failures.push(format!(
1729                "stage {} branch mismatch: expected {:?}, got {:?}",
1730                assertion.node_id, assertion.expected_branch, stage.branch
1731            ));
1732        }
1733        for required_kind in &assertion.required_artifact_kinds {
1734            if !stage
1735                .artifacts
1736                .iter()
1737                .any(|artifact| &artifact.kind == required_kind)
1738            {
1739                failures.push(format!(
1740                    "stage {} missing artifact kind {}",
1741                    assertion.node_id, required_kind
1742                ));
1743            }
1744        }
1745        if let Some(snippet) = &assertion.visible_text_contains {
1746            let actual = stage.visible_text.clone().unwrap_or_default();
1747            if !actual.contains(snippet) {
1748                failures.push(format!(
1749                    "stage {} visible text does not contain expected snippet {:?}",
1750                    assertion.node_id, snippet
1751                ));
1752            }
1753        }
1754    }
1755
1756    ReplayEvalReport {
1757        pass: failures.is_empty(),
1758        failures,
1759        stage_count: run.stages.len(),
1760    }
1761}
1762
1763fn evaluate_clarifying_question(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1764    let mut failures = Vec::new();
1765    let spec = fixture.clarifying_question.clone().unwrap_or_default();
1766    let min_questions = clarifying_min_questions(&spec);
1767    let max_questions = clarifying_max_questions(&spec);
1768    let questions = &run.hitl_questions;
1769
1770    if run.status != fixture.expected_status {
1771        failures.push(format!(
1772            "run status mismatch: expected {}, got {}",
1773            fixture.expected_status, run.status
1774        ));
1775    }
1776    if questions.len() < min_questions {
1777        failures.push(format!(
1778            "expected at least {min_questions} clarifying question(s), got {}",
1779            questions.len()
1780        ));
1781    }
1782    if questions.len() > max_questions {
1783        failures.push(format!(
1784            "expected at most {max_questions} clarifying question(s), got {}",
1785            questions.len()
1786        ));
1787    }
1788
1789    let normalized_expected = spec
1790        .expected_question
1791        .as_deref()
1792        .map(normalize_question_text);
1793    let normalized_accepted = spec
1794        .accepted_questions
1795        .iter()
1796        .map(|question| normalize_question_text(question))
1797        .collect::<Vec<_>>();
1798    let required_terms = spec
1799        .required_terms
1800        .iter()
1801        .map(|term| normalize_question_text(term))
1802        .collect::<Vec<_>>();
1803    let forbidden_terms = spec
1804        .forbidden_terms
1805        .iter()
1806        .map(|term| normalize_question_text(term))
1807        .collect::<Vec<_>>();
1808
1809    let matched = questions.iter().any(|question| {
1810        let normalized = normalize_question_text(&question.prompt);
1811        let matches_expected = normalized_expected
1812            .as_ref()
1813            .is_none_or(|expected| &normalized == expected)
1814            && (normalized_accepted.is_empty()
1815                || normalized_accepted
1816                    .iter()
1817                    .any(|candidate| candidate == &normalized));
1818        let has_required_terms = required_terms
1819            .iter()
1820            .all(|term| normalized.contains(term.as_str()));
1821        let avoids_forbidden_terms = forbidden_terms
1822            .iter()
1823            .all(|term| !normalized.contains(term.as_str()));
1824        matches_expected && has_required_terms && avoids_forbidden_terms
1825    });
1826
1827    if !questions.is_empty()
1828        && (!normalized_accepted.is_empty()
1829            || normalized_expected.is_some()
1830            || !required_terms.is_empty()
1831            || !forbidden_terms.is_empty())
1832        && !matched
1833    {
1834        failures.push(format!(
1835            "no clarifying question matched fixture; actual questions: {}",
1836            questions
1837                .iter()
1838                .map(|question| format!("{:?}", question.prompt))
1839                .collect::<Vec<_>>()
1840                .join(", ")
1841        ));
1842    }
1843
1844    ReplayEvalReport {
1845        pass: failures.is_empty(),
1846        failures,
1847        stage_count: run.stages.len(),
1848    }
1849}
1850
1851pub fn evaluate_run_suite(
1852    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1853) -> ReplayEvalSuiteReport {
1854    let mut reports = Vec::new();
1855    for (run, fixture, source_path) in cases {
1856        let report = evaluate_run_against_fixture(&run, &fixture);
1857        reports.push(ReplayEvalCaseReport {
1858            run_id: run.id.clone(),
1859            workflow_id: run.workflow_id.clone(),
1860            label: None,
1861            pass: report.pass,
1862            failures: report.failures,
1863            stage_count: report.stage_count,
1864            source_path,
1865            comparison: None,
1866        });
1867    }
1868    let total = reports.len();
1869    let passed = reports.iter().filter(|report| report.pass).count();
1870    let failed = total.saturating_sub(passed);
1871    ReplayEvalSuiteReport {
1872        pass: failed == 0,
1873        total,
1874        passed,
1875        failed,
1876        cases: reports,
1877    }
1878}
1879
1880pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1881    let mut stage_diffs = Vec::new();
1882    let mut all_node_ids = BTreeSet::new();
1883    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1884        .stages
1885        .iter()
1886        .map(|s| (s.node_id.as_str(), s))
1887        .collect();
1888    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1889        .stages
1890        .iter()
1891        .map(|s| (s.node_id.as_str(), s))
1892        .collect();
1893    all_node_ids.extend(left_by_id.keys().copied());
1894    all_node_ids.extend(right_by_id.keys().copied());
1895
1896    for node_id in all_node_ids {
1897        let left_stage = left_by_id.get(node_id).copied();
1898        let right_stage = right_by_id.get(node_id).copied();
1899        match (left_stage, right_stage) {
1900            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1901                node_id: node_id.to_string(),
1902                change: "removed".to_string(),
1903                details: vec!["stage missing from right run".to_string()],
1904            }),
1905            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1906                node_id: node_id.to_string(),
1907                change: "added".to_string(),
1908                details: vec!["stage missing from left run".to_string()],
1909            }),
1910            (Some(left_stage), Some(right_stage)) => {
1911                let mut details = Vec::new();
1912                if left_stage.status != right_stage.status {
1913                    details.push(format!(
1914                        "status: {} -> {}",
1915                        left_stage.status, right_stage.status
1916                    ));
1917                }
1918                if left_stage.outcome != right_stage.outcome {
1919                    details.push(format!(
1920                        "outcome: {} -> {}",
1921                        left_stage.outcome, right_stage.outcome
1922                    ));
1923                }
1924                if left_stage.branch != right_stage.branch {
1925                    details.push(format!(
1926                        "branch: {:?} -> {:?}",
1927                        left_stage.branch, right_stage.branch
1928                    ));
1929                }
1930                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1931                {
1932                    details.push(format!(
1933                        "produced_artifacts: {} -> {}",
1934                        left_stage.produced_artifact_ids.len(),
1935                        right_stage.produced_artifact_ids.len()
1936                    ));
1937                }
1938                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1939                    details.push(format!(
1940                        "artifact_records: {} -> {}",
1941                        left_stage.artifacts.len(),
1942                        right_stage.artifacts.len()
1943                    ));
1944                }
1945                if !details.is_empty() {
1946                    stage_diffs.push(RunStageDiffRecord {
1947                        node_id: node_id.to_string(),
1948                        change: "changed".to_string(),
1949                        details,
1950                    });
1951                }
1952            }
1953            (None, None) => {}
1954        }
1955    }
1956
1957    let mut tool_diffs = Vec::new();
1958    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1959        .tool_recordings
1960        .iter()
1961        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1962        .collect();
1963    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1964        .tool_recordings
1965        .iter()
1966        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1967        .collect();
1968    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1969        .keys()
1970        .chain(right_tools.keys())
1971        .cloned()
1972        .collect();
1973    for key in &all_tool_keys {
1974        let l = left_tools.get(key);
1975        let r = right_tools.get(key);
1976        let result_changed = match (l, r) {
1977            (Some(a), Some(b)) => a.result != b.result,
1978            _ => true,
1979        };
1980        if result_changed {
1981            tool_diffs.push(ToolCallDiffRecord {
1982                tool_name: key.0.clone(),
1983                args_hash: key.1.clone(),
1984                result_changed,
1985                left_result: l.map(|t| t.result.clone()),
1986                right_result: r.map(|t| t.result.clone()),
1987            });
1988        }
1989    }
1990
1991    let left_observability = left.observability.clone().unwrap_or_else(|| {
1992        derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1993    });
1994    let right_observability = right.observability.clone().unwrap_or_else(|| {
1995        derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1996    });
1997    let mut observability_diffs = Vec::new();
1998
1999    let left_workers = left_observability
2000        .worker_lineage
2001        .iter()
2002        .map(|worker| {
2003            (
2004                worker.worker_id.clone(),
2005                (
2006                    worker.status.clone(),
2007                    worker.run_id.clone(),
2008                    worker.run_path.clone(),
2009                ),
2010            )
2011        })
2012        .collect::<BTreeMap<_, _>>();
2013    let right_workers = right_observability
2014        .worker_lineage
2015        .iter()
2016        .map(|worker| {
2017            (
2018                worker.worker_id.clone(),
2019                (
2020                    worker.status.clone(),
2021                    worker.run_id.clone(),
2022                    worker.run_path.clone(),
2023                ),
2024            )
2025        })
2026        .collect::<BTreeMap<_, _>>();
2027    let worker_ids = left_workers
2028        .keys()
2029        .chain(right_workers.keys())
2030        .cloned()
2031        .collect::<BTreeSet<_>>();
2032    for worker_id in worker_ids {
2033        match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
2034            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2035                section: "worker_lineage".to_string(),
2036                label: worker_id,
2037                details: vec!["worker missing from right run".to_string()],
2038            }),
2039            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2040                section: "worker_lineage".to_string(),
2041                label: worker_id,
2042                details: vec!["worker missing from left run".to_string()],
2043            }),
2044            (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
2045                let mut details = Vec::new();
2046                if left_worker.0 != right_worker.0 {
2047                    details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
2048                }
2049                if left_worker.1 != right_worker.1 {
2050                    details.push(format!(
2051                        "run_id: {:?} -> {:?}",
2052                        left_worker.1, right_worker.1
2053                    ));
2054                }
2055                if left_worker.2 != right_worker.2 {
2056                    details.push(format!(
2057                        "run_path: {:?} -> {:?}",
2058                        left_worker.2, right_worker.2
2059                    ));
2060                }
2061                observability_diffs.push(RunObservabilityDiffRecord {
2062                    section: "worker_lineage".to_string(),
2063                    label: worker_id,
2064                    details,
2065                });
2066            }
2067            _ => {}
2068        }
2069    }
2070
2071    let left_rounds = left_observability
2072        .planner_rounds
2073        .iter()
2074        .map(|round| (round.stage_id.clone(), round))
2075        .collect::<BTreeMap<_, _>>();
2076    let right_rounds = right_observability
2077        .planner_rounds
2078        .iter()
2079        .map(|round| (round.stage_id.clone(), round))
2080        .collect::<BTreeMap<_, _>>();
2081    let round_ids = left_rounds
2082        .keys()
2083        .chain(right_rounds.keys())
2084        .cloned()
2085        .collect::<BTreeSet<_>>();
2086    for stage_id in round_ids {
2087        match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
2088            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2089                section: "planner_rounds".to_string(),
2090                label: stage_id,
2091                details: vec!["planner summary missing from right run".to_string()],
2092            }),
2093            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2094                section: "planner_rounds".to_string(),
2095                label: stage_id,
2096                details: vec!["planner summary missing from left run".to_string()],
2097            }),
2098            (Some(left_round), Some(right_round)) => {
2099                let mut details = Vec::new();
2100                if left_round.iteration_count != right_round.iteration_count {
2101                    details.push(format!(
2102                        "iterations: {} -> {}",
2103                        left_round.iteration_count, right_round.iteration_count
2104                    ));
2105                }
2106                if left_round.tool_execution_count != right_round.tool_execution_count {
2107                    details.push(format!(
2108                        "tool_executions: {} -> {}",
2109                        left_round.tool_execution_count, right_round.tool_execution_count
2110                    ));
2111                }
2112                if left_round.native_text_tool_fallback_count
2113                    != right_round.native_text_tool_fallback_count
2114                {
2115                    details.push(format!(
2116                        "native_text_tool_fallbacks: {} -> {}",
2117                        left_round.native_text_tool_fallback_count,
2118                        right_round.native_text_tool_fallback_count
2119                    ));
2120                }
2121                if left_round.native_text_tool_fallback_rejection_count
2122                    != right_round.native_text_tool_fallback_rejection_count
2123                {
2124                    details.push(format!(
2125                        "native_text_tool_fallback_rejections: {} -> {}",
2126                        left_round.native_text_tool_fallback_rejection_count,
2127                        right_round.native_text_tool_fallback_rejection_count
2128                    ));
2129                }
2130                if left_round.empty_completion_retry_count
2131                    != right_round.empty_completion_retry_count
2132                {
2133                    details.push(format!(
2134                        "empty_completion_retries: {} -> {}",
2135                        left_round.empty_completion_retry_count,
2136                        right_round.empty_completion_retry_count
2137                    ));
2138                }
2139                if left_round.research_facts != right_round.research_facts {
2140                    details.push(format!(
2141                        "research_facts: {:?} -> {:?}",
2142                        left_round.research_facts, right_round.research_facts
2143                    ));
2144                }
2145                let left_deliverables = left_round
2146                    .task_ledger
2147                    .as_ref()
2148                    .map(|ledger| {
2149                        ledger
2150                            .deliverables
2151                            .iter()
2152                            .map(|item| format!("{}:{}", item.id, item.status))
2153                            .collect::<Vec<_>>()
2154                    })
2155                    .unwrap_or_default();
2156                let right_deliverables = right_round
2157                    .task_ledger
2158                    .as_ref()
2159                    .map(|ledger| {
2160                        ledger
2161                            .deliverables
2162                            .iter()
2163                            .map(|item| format!("{}:{}", item.id, item.status))
2164                            .collect::<Vec<_>>()
2165                    })
2166                    .unwrap_or_default();
2167                if left_deliverables != right_deliverables {
2168                    details.push(format!(
2169                        "deliverables: {:?} -> {:?}",
2170                        left_deliverables, right_deliverables
2171                    ));
2172                }
2173                if left_round.successful_tools != right_round.successful_tools {
2174                    details.push(format!(
2175                        "successful_tools: {:?} -> {:?}",
2176                        left_round.successful_tools, right_round.successful_tools
2177                    ));
2178                }
2179                if !details.is_empty() {
2180                    observability_diffs.push(RunObservabilityDiffRecord {
2181                        section: "planner_rounds".to_string(),
2182                        label: left_round.node_id.clone(),
2183                        details,
2184                    });
2185                }
2186            }
2187            _ => {}
2188        }
2189    }
2190
2191    let left_pointers = left_observability
2192        .transcript_pointers
2193        .iter()
2194        .map(|pointer| {
2195            (
2196                pointer.id.clone(),
2197                (
2198                    pointer.available,
2199                    pointer.path.clone(),
2200                    pointer.location.clone(),
2201                ),
2202            )
2203        })
2204        .collect::<BTreeMap<_, _>>();
2205    let right_pointers = right_observability
2206        .transcript_pointers
2207        .iter()
2208        .map(|pointer| {
2209            (
2210                pointer.id.clone(),
2211                (
2212                    pointer.available,
2213                    pointer.path.clone(),
2214                    pointer.location.clone(),
2215                ),
2216            )
2217        })
2218        .collect::<BTreeMap<_, _>>();
2219    let pointer_ids = left_pointers
2220        .keys()
2221        .chain(right_pointers.keys())
2222        .cloned()
2223        .collect::<BTreeSet<_>>();
2224    for pointer_id in pointer_ids {
2225        match (
2226            left_pointers.get(&pointer_id),
2227            right_pointers.get(&pointer_id),
2228        ) {
2229            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2230                section: "transcript_pointers".to_string(),
2231                label: pointer_id,
2232                details: vec!["pointer missing from right run".to_string()],
2233            }),
2234            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2235                section: "transcript_pointers".to_string(),
2236                label: pointer_id,
2237                details: vec!["pointer missing from left run".to_string()],
2238            }),
2239            (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2240                observability_diffs.push(RunObservabilityDiffRecord {
2241                    section: "transcript_pointers".to_string(),
2242                    label: pointer_id,
2243                    details: vec![format!(
2244                        "pointer: {:?} -> {:?}",
2245                        left_pointer, right_pointer
2246                    )],
2247                });
2248            }
2249            _ => {}
2250        }
2251    }
2252
2253    let left_compactions = left_observability
2254        .compaction_events
2255        .iter()
2256        .map(|event| {
2257            (
2258                event.id.clone(),
2259                (
2260                    event.strategy.clone(),
2261                    event.archived_messages,
2262                    event.snapshot_asset_id.clone(),
2263                    event.available,
2264                ),
2265            )
2266        })
2267        .collect::<BTreeMap<_, _>>();
2268    let right_compactions = right_observability
2269        .compaction_events
2270        .iter()
2271        .map(|event| {
2272            (
2273                event.id.clone(),
2274                (
2275                    event.strategy.clone(),
2276                    event.archived_messages,
2277                    event.snapshot_asset_id.clone(),
2278                    event.available,
2279                ),
2280            )
2281        })
2282        .collect::<BTreeMap<_, _>>();
2283    let compaction_ids = left_compactions
2284        .keys()
2285        .chain(right_compactions.keys())
2286        .cloned()
2287        .collect::<BTreeSet<_>>();
2288    for compaction_id in compaction_ids {
2289        match (
2290            left_compactions.get(&compaction_id),
2291            right_compactions.get(&compaction_id),
2292        ) {
2293            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2294                section: "compaction_events".to_string(),
2295                label: compaction_id,
2296                details: vec!["compaction event missing from right run".to_string()],
2297            }),
2298            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2299                section: "compaction_events".to_string(),
2300                label: compaction_id,
2301                details: vec!["compaction event missing from left run".to_string()],
2302            }),
2303            (Some(left_event), Some(right_event)) if left_event != right_event => {
2304                observability_diffs.push(RunObservabilityDiffRecord {
2305                    section: "compaction_events".to_string(),
2306                    label: compaction_id,
2307                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2308                });
2309            }
2310            _ => {}
2311        }
2312    }
2313
2314    let left_daemons = left_observability
2315        .daemon_events
2316        .iter()
2317        .map(|event| {
2318            (
2319                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2320                (
2321                    event.name.clone(),
2322                    event.persist_path.clone(),
2323                    event.payload_summary.clone(),
2324                ),
2325            )
2326        })
2327        .collect::<BTreeMap<_, _>>();
2328    let right_daemons = right_observability
2329        .daemon_events
2330        .iter()
2331        .map(|event| {
2332            (
2333                (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2334                (
2335                    event.name.clone(),
2336                    event.persist_path.clone(),
2337                    event.payload_summary.clone(),
2338                ),
2339            )
2340        })
2341        .collect::<BTreeMap<_, _>>();
2342    let daemon_keys = left_daemons
2343        .keys()
2344        .chain(right_daemons.keys())
2345        .cloned()
2346        .collect::<BTreeSet<_>>();
2347    for daemon_key in daemon_keys {
2348        let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2349        match (
2350            left_daemons.get(&daemon_key),
2351            right_daemons.get(&daemon_key),
2352        ) {
2353            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2354                section: "daemon_events".to_string(),
2355                label,
2356                details: vec!["daemon event missing from right run".to_string()],
2357            }),
2358            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2359                section: "daemon_events".to_string(),
2360                label,
2361                details: vec!["daemon event missing from left run".to_string()],
2362            }),
2363            (Some(left_event), Some(right_event)) if left_event != right_event => {
2364                observability_diffs.push(RunObservabilityDiffRecord {
2365                    section: "daemon_events".to_string(),
2366                    label,
2367                    details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2368                });
2369            }
2370            _ => {}
2371        }
2372    }
2373
2374    let left_verification = left_observability
2375        .verification_outcomes
2376        .iter()
2377        .map(|item| (item.stage_id.clone(), item))
2378        .collect::<BTreeMap<_, _>>();
2379    let right_verification = right_observability
2380        .verification_outcomes
2381        .iter()
2382        .map(|item| (item.stage_id.clone(), item))
2383        .collect::<BTreeMap<_, _>>();
2384    let verification_ids = left_verification
2385        .keys()
2386        .chain(right_verification.keys())
2387        .cloned()
2388        .collect::<BTreeSet<_>>();
2389    for stage_id in verification_ids {
2390        match (
2391            left_verification.get(&stage_id),
2392            right_verification.get(&stage_id),
2393        ) {
2394            (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2395                section: "verification".to_string(),
2396                label: stage_id,
2397                details: vec!["verification missing from right run".to_string()],
2398            }),
2399            (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2400                section: "verification".to_string(),
2401                label: stage_id,
2402                details: vec!["verification missing from left run".to_string()],
2403            }),
2404            (Some(left_item), Some(right_item)) if left_item != right_item => {
2405                let mut details = Vec::new();
2406                if left_item.passed != right_item.passed {
2407                    details.push(format!(
2408                        "passed: {:?} -> {:?}",
2409                        left_item.passed, right_item.passed
2410                    ));
2411                }
2412                if left_item.summary != right_item.summary {
2413                    details.push(format!(
2414                        "summary: {:?} -> {:?}",
2415                        left_item.summary, right_item.summary
2416                    ));
2417                }
2418                observability_diffs.push(RunObservabilityDiffRecord {
2419                    section: "verification".to_string(),
2420                    label: left_item.node_id.clone(),
2421                    details,
2422                });
2423            }
2424            _ => {}
2425        }
2426    }
2427
2428    let left_graph = (
2429        left_observability.action_graph_nodes.len(),
2430        left_observability.action_graph_edges.len(),
2431    );
2432    let right_graph = (
2433        right_observability.action_graph_nodes.len(),
2434        right_observability.action_graph_edges.len(),
2435    );
2436    if left_graph != right_graph {
2437        observability_diffs.push(RunObservabilityDiffRecord {
2438            section: "action_graph".to_string(),
2439            label: "shape".to_string(),
2440            details: vec![format!(
2441                "nodes/edges: {}/{} -> {}/{}",
2442                left_graph.0, left_graph.1, right_graph.0, right_graph.1
2443            )],
2444        });
2445    }
2446
2447    let status_changed = left.status != right.status;
2448    let identical = !status_changed
2449        && stage_diffs.is_empty()
2450        && tool_diffs.is_empty()
2451        && observability_diffs.is_empty()
2452        && left.transitions.len() == right.transitions.len()
2453        && left.artifacts.len() == right.artifacts.len()
2454        && left.checkpoints.len() == right.checkpoints.len();
2455
2456    RunDiffReport {
2457        left_run_id: left.id.clone(),
2458        right_run_id: right.id.clone(),
2459        identical,
2460        status_changed,
2461        left_status: left.status.clone(),
2462        right_status: right.status.clone(),
2463        stage_diffs,
2464        tool_diffs,
2465        observability_diffs,
2466        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2467        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2468        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2469    }
2470}