1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10 CapabilityPolicy,
11};
12use crate::llm::vm_value_to_json;
13use crate::value::{VmError, VmValue};
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct LlmUsageRecord {
18 pub input_tokens: i64,
19 pub output_tokens: i64,
20 pub total_duration_ms: i64,
21 pub call_count: i64,
22 pub total_cost: f64,
23 pub models: Vec<String>,
24}
25
26#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
27#[serde(default)]
28pub struct RunStageRecord {
29 pub id: String,
30 pub node_id: String,
31 pub kind: String,
32 pub status: String,
33 pub outcome: String,
34 pub branch: Option<String>,
35 pub started_at: String,
36 pub finished_at: Option<String>,
37 pub visible_text: Option<String>,
38 pub private_reasoning: Option<String>,
39 pub transcript: Option<serde_json::Value>,
40 pub verification: Option<serde_json::Value>,
41 pub usage: Option<LlmUsageRecord>,
42 pub artifacts: Vec<ArtifactRecord>,
43 pub consumed_artifact_ids: Vec<String>,
44 pub produced_artifact_ids: Vec<String>,
45 pub attempts: Vec<RunStageAttemptRecord>,
46 pub metadata: BTreeMap<String, serde_json::Value>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageAttemptRecord {
52 pub attempt: usize,
53 pub status: String,
54 pub outcome: String,
55 pub branch: Option<String>,
56 pub error: Option<String>,
57 pub verification: Option<serde_json::Value>,
58 pub started_at: String,
59 pub finished_at: Option<String>,
60}
61
62#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct RunTransitionRecord {
65 pub id: String,
66 pub from_stage_id: Option<String>,
67 pub from_node_id: Option<String>,
68 pub to_node_id: String,
69 pub branch: Option<String>,
70 pub timestamp: String,
71 pub consumed_artifact_ids: Vec<String>,
72 pub produced_artifact_ids: Vec<String>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(default)]
77pub struct RunCheckpointRecord {
78 pub id: String,
79 pub ready_nodes: Vec<String>,
80 pub completed_nodes: Vec<String>,
81 pub last_stage_id: Option<String>,
82 pub persisted_at: String,
83 pub reason: String,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayFixture {
89 #[serde(rename = "_type")]
90 pub type_name: String,
91 pub id: String,
92 pub source_run_id: String,
93 pub workflow_id: String,
94 pub workflow_name: Option<String>,
95 pub created_at: String,
96 pub expected_status: String,
97 pub stage_assertions: Vec<ReplayStageAssertion>,
98}
99
100#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(default)]
102pub struct ReplayStageAssertion {
103 pub node_id: String,
104 pub expected_status: String,
105 pub expected_outcome: String,
106 pub expected_branch: Option<String>,
107 pub required_artifact_kinds: Vec<String>,
108 pub visible_text_contains: Option<String>,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ReplayEvalReport {
114 pub pass: bool,
115 pub failures: Vec<String>,
116 pub stage_count: usize,
117}
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(default)]
121pub struct ReplayEvalCaseReport {
122 pub run_id: String,
123 pub workflow_id: String,
124 pub label: Option<String>,
125 pub pass: bool,
126 pub failures: Vec<String>,
127 pub stage_count: usize,
128 pub source_path: Option<String>,
129 pub comparison: Option<RunDiffReport>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct ReplayEvalSuiteReport {
135 pub pass: bool,
136 pub total: usize,
137 pub passed: usize,
138 pub failed: usize,
139 pub cases: Vec<ReplayEvalCaseReport>,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct RunDeliverableSummaryRecord {
145 pub id: String,
146 pub text: String,
147 pub status: String,
148 pub note: Option<String>,
149}
150
151#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
152#[serde(default)]
153pub struct RunTaskLedgerSummaryRecord {
154 pub root_task: String,
155 pub rationale: String,
156 pub deliverables: Vec<RunDeliverableSummaryRecord>,
157 pub observations: Vec<String>,
158 pub blocking_count: usize,
159}
160
161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct RunPlannerRoundRecord {
164 pub stage_id: String,
165 pub node_id: String,
166 pub stage_kind: String,
167 pub status: String,
168 pub outcome: String,
169 pub iteration_count: usize,
170 pub llm_call_count: usize,
171 pub tool_execution_count: usize,
172 pub tool_rejection_count: usize,
173 pub intervention_count: usize,
174 pub compaction_count: usize,
175 pub tools_used: Vec<String>,
176 pub successful_tools: Vec<String>,
177 pub ledger_done_rejections: usize,
178 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
179 pub research_facts: Vec<String>,
180}
181
182#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
183#[serde(default)]
184pub struct RunWorkerLineageRecord {
185 pub worker_id: String,
186 pub worker_name: String,
187 pub parent_stage_id: Option<String>,
188 pub task: String,
189 pub status: String,
190 pub session_id: Option<String>,
191 pub parent_session_id: Option<String>,
192 pub run_id: Option<String>,
193 pub run_path: Option<String>,
194 pub snapshot_path: Option<String>,
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
198#[serde(default)]
199pub struct RunActionGraphNodeRecord {
200 pub id: String,
201 pub label: String,
202 pub kind: String,
203 pub status: String,
204 pub outcome: String,
205 pub stage_id: Option<String>,
206 pub node_id: Option<String>,
207 pub worker_id: Option<String>,
208 pub run_id: Option<String>,
209 pub run_path: Option<String>,
210}
211
212#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
213#[serde(default)]
214pub struct RunActionGraphEdgeRecord {
215 pub from_id: String,
216 pub to_id: String,
217 pub kind: String,
218 pub label: Option<String>,
219}
220
221#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
222#[serde(default)]
223pub struct RunVerificationOutcomeRecord {
224 pub stage_id: String,
225 pub node_id: String,
226 pub status: String,
227 pub passed: Option<bool>,
228 pub summary: Option<String>,
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
232#[serde(default)]
233pub struct RunTranscriptPointerRecord {
234 pub id: String,
235 pub label: String,
236 pub kind: String,
237 pub location: String,
238 pub path: Option<String>,
239 pub available: bool,
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct RunObservabilityRecord {
245 pub schema_version: usize,
246 pub planner_rounds: Vec<RunPlannerRoundRecord>,
247 pub research_fact_count: usize,
248 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
249 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
250 pub worker_lineage: Vec<RunWorkerLineageRecord>,
251 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
252 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
253}
254
255#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
256#[serde(default)]
257pub struct RunStageDiffRecord {
258 pub node_id: String,
259 pub change: String,
260 pub details: Vec<String>,
261}
262
263#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
264#[serde(default)]
265pub struct ToolCallDiffRecord {
266 pub tool_name: String,
267 pub args_hash: String,
268 pub result_changed: bool,
269 pub left_result: Option<String>,
270 pub right_result: Option<String>,
271}
272
273#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
274#[serde(default)]
275pub struct RunObservabilityDiffRecord {
276 pub section: String,
277 pub label: String,
278 pub details: Vec<String>,
279}
280
281#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
282#[serde(default)]
283pub struct RunDiffReport {
284 pub left_run_id: String,
285 pub right_run_id: String,
286 pub identical: bool,
287 pub status_changed: bool,
288 pub left_status: String,
289 pub right_status: String,
290 pub stage_diffs: Vec<RunStageDiffRecord>,
291 pub tool_diffs: Vec<ToolCallDiffRecord>,
292 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
293 pub transition_count_delta: isize,
294 pub artifact_count_delta: isize,
295 pub checkpoint_count_delta: isize,
296}
297
298#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
299#[serde(default)]
300pub struct EvalSuiteManifest {
301 #[serde(rename = "_type")]
302 pub type_name: String,
303 pub id: String,
304 pub name: Option<String>,
305 pub base_dir: Option<String>,
306 pub cases: Vec<EvalSuiteCase>,
307}
308
309#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
310#[serde(default)]
311pub struct EvalSuiteCase {
312 pub label: Option<String>,
313 pub run_path: String,
314 pub fixture_path: Option<String>,
315 pub compare_to: Option<String>,
316}
317
318#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
319#[serde(default)]
320pub struct RunRecord {
321 #[serde(rename = "_type")]
322 pub type_name: String,
323 pub id: String,
324 pub workflow_id: String,
325 pub workflow_name: Option<String>,
326 pub task: String,
327 pub status: String,
328 pub started_at: String,
329 pub finished_at: Option<String>,
330 pub parent_run_id: Option<String>,
331 pub root_run_id: Option<String>,
332 pub stages: Vec<RunStageRecord>,
333 pub transitions: Vec<RunTransitionRecord>,
334 pub checkpoints: Vec<RunCheckpointRecord>,
335 pub pending_nodes: Vec<String>,
336 pub completed_nodes: Vec<String>,
337 pub child_runs: Vec<RunChildRecord>,
338 pub artifacts: Vec<ArtifactRecord>,
339 pub policy: CapabilityPolicy,
340 pub execution: Option<RunExecutionRecord>,
341 pub transcript: Option<serde_json::Value>,
342 pub usage: Option<LlmUsageRecord>,
343 pub replay_fixture: Option<ReplayFixture>,
344 pub observability: Option<RunObservabilityRecord>,
345 pub trace_spans: Vec<RunTraceSpanRecord>,
346 pub tool_recordings: Vec<ToolCallRecord>,
347 pub metadata: BTreeMap<String, serde_json::Value>,
348 pub persisted_path: Option<String>,
349}
350
351#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
352#[serde(default)]
353pub struct ToolCallRecord {
354 pub tool_name: String,
355 pub tool_use_id: String,
356 pub args_hash: String,
357 pub result: String,
358 pub is_rejected: bool,
359 pub duration_ms: u64,
360 pub iteration: usize,
361 pub timestamp: String,
362}
363
364pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
366 use std::hash::{Hash, Hasher};
367 let mut hasher = std::collections::hash_map::DefaultHasher::new();
368 tool_name.hash(&mut hasher);
369 let args_str = serde_json::to_string(args).unwrap_or_default();
370 args_str.hash(&mut hasher);
371 format!("{:016x}", hasher.finish())
372}
373
374#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
375#[serde(default)]
376pub struct RunTraceSpanRecord {
377 pub span_id: u64,
378 pub parent_id: Option<u64>,
379 pub kind: String,
380 pub name: String,
381 pub start_ms: u64,
382 pub duration_ms: u64,
383 pub metadata: BTreeMap<String, serde_json::Value>,
384}
385
386#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
387#[serde(default)]
388pub struct RunChildRecord {
389 pub worker_id: String,
390 pub worker_name: String,
391 pub parent_stage_id: Option<String>,
392 pub session_id: Option<String>,
393 pub parent_session_id: Option<String>,
394 pub mutation_scope: Option<String>,
395 pub approval_policy: Option<super::ToolApprovalPolicy>,
396 pub task: String,
397 pub request: Option<serde_json::Value>,
398 pub provenance: Option<serde_json::Value>,
399 pub status: String,
400 pub started_at: String,
401 pub finished_at: Option<String>,
402 pub run_id: Option<String>,
403 pub run_path: Option<String>,
404 pub snapshot_path: Option<String>,
405 pub execution: Option<RunExecutionRecord>,
406}
407
408#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
409#[serde(default)]
410pub struct RunExecutionRecord {
411 pub cwd: Option<String>,
412 pub source_dir: Option<String>,
413 pub env: BTreeMap<String, String>,
414 pub adapter: Option<String>,
415 pub repo_path: Option<String>,
416 pub worktree_path: Option<String>,
417 pub branch: Option<String>,
418 pub base_ref: Option<String>,
419 pub cleanup: Option<String>,
420}
421
422fn compact_json_value(value: &serde_json::Value) -> String {
423 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
424}
425
426fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
427 value
428 .and_then(|value| value.as_array())
429 .map(|items| {
430 items
431 .iter()
432 .filter_map(|item| item.as_str().map(str::to_string))
433 .collect::<Vec<_>>()
434 })
435 .unwrap_or_default()
436}
437
438fn json_usize(value: Option<&serde_json::Value>) -> usize {
439 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
440}
441
442fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
443 value.and_then(|value| value.as_bool())
444}
445
446fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
447 stage
448 .artifacts
449 .iter()
450 .find_map(|artifact| artifact.data.as_ref())
451}
452
453fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
454 let deliverables = value
455 .get("deliverables")
456 .and_then(|raw| raw.as_array())
457 .map(|items| {
458 items
459 .iter()
460 .map(|item| RunDeliverableSummaryRecord {
461 id: item
462 .get("id")
463 .and_then(|value| value.as_str())
464 .unwrap_or_default()
465 .to_string(),
466 text: item
467 .get("text")
468 .and_then(|value| value.as_str())
469 .unwrap_or_default()
470 .to_string(),
471 status: item
472 .get("status")
473 .and_then(|value| value.as_str())
474 .unwrap_or_default()
475 .to_string(),
476 note: item
477 .get("note")
478 .and_then(|value| value.as_str())
479 .map(str::to_string),
480 })
481 .collect::<Vec<_>>()
482 })
483 .unwrap_or_default();
484 let observations = json_string_array(value.get("observations"));
485 let root_task = value
486 .get("root_task")
487 .and_then(|value| value.as_str())
488 .unwrap_or_default()
489 .to_string();
490 let rationale = value
491 .get("rationale")
492 .and_then(|value| value.as_str())
493 .unwrap_or_default()
494 .to_string();
495 if root_task.is_empty()
496 && rationale.is_empty()
497 && deliverables.is_empty()
498 && observations.is_empty()
499 {
500 return None;
501 }
502 let blocking_count = deliverables
503 .iter()
504 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
505 .count();
506 Some(RunTaskLedgerSummaryRecord {
507 root_task,
508 rationale,
509 deliverables,
510 observations,
511 blocking_count,
512 })
513}
514
515pub fn derive_run_observability(
516 run: &RunRecord,
517 persisted_path: Option<&Path>,
518) -> RunObservabilityRecord {
519 let mut action_graph_nodes = Vec::new();
520 let mut action_graph_edges = Vec::new();
521 let mut verification_outcomes = Vec::new();
522 let mut planner_rounds = Vec::new();
523 let mut transcript_pointers = Vec::new();
524 let mut research_fact_count = 0usize;
525
526 let root_node_id = format!("run:{}", run.id);
527 action_graph_nodes.push(RunActionGraphNodeRecord {
528 id: root_node_id.clone(),
529 label: run
530 .workflow_name
531 .clone()
532 .unwrap_or_else(|| run.workflow_id.clone()),
533 kind: "run".to_string(),
534 status: run.status.clone(),
535 outcome: run.status.clone(),
536 stage_id: None,
537 node_id: None,
538 worker_id: None,
539 run_id: Some(run.id.clone()),
540 run_path: run.persisted_path.clone(),
541 });
542
543 let stage_node_ids = run
544 .stages
545 .iter()
546 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
547 .collect::<BTreeMap<_, _>>();
548 let stage_by_node_id = run
549 .stages
550 .iter()
551 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
552 .collect::<BTreeMap<_, _>>();
553
554 let incoming_nodes = run
555 .transitions
556 .iter()
557 .map(|transition| transition.to_node_id.clone())
558 .collect::<BTreeSet<_>>();
559
560 for stage in &run.stages {
561 let graph_node_id = stage_node_ids
562 .get(&stage.id)
563 .cloned()
564 .unwrap_or_else(|| format!("stage:{}", stage.id));
565 action_graph_nodes.push(RunActionGraphNodeRecord {
566 id: graph_node_id.clone(),
567 label: stage.node_id.clone(),
568 kind: "stage".to_string(),
569 status: stage.status.clone(),
570 outcome: stage.outcome.clone(),
571 stage_id: Some(stage.id.clone()),
572 node_id: Some(stage.node_id.clone()),
573 worker_id: stage
574 .metadata
575 .get("worker_id")
576 .and_then(|value| value.as_str())
577 .map(str::to_string),
578 run_id: None,
579 run_path: None,
580 });
581 if !incoming_nodes.contains(&stage.node_id) {
582 action_graph_edges.push(RunActionGraphEdgeRecord {
583 from_id: root_node_id.clone(),
584 to_id: graph_node_id.clone(),
585 kind: "entry".to_string(),
586 label: None,
587 });
588 }
589
590 if stage.kind == "verify" || stage.verification.is_some() {
591 let passed = json_bool(
592 stage
593 .verification
594 .as_ref()
595 .and_then(|value| value.get("pass")),
596 )
597 .or_else(|| {
598 json_bool(
599 stage
600 .verification
601 .as_ref()
602 .and_then(|value| value.get("success")),
603 )
604 })
605 .or_else(|| {
606 if stage.status == "completed" && stage.outcome == "success" {
607 Some(true)
608 } else if stage.status == "failed" || stage.outcome == "failed" {
609 Some(false)
610 } else {
611 None
612 }
613 });
614 verification_outcomes.push(RunVerificationOutcomeRecord {
615 stage_id: stage.id.clone(),
616 node_id: stage.node_id.clone(),
617 status: stage.status.clone(),
618 passed,
619 summary: stage
620 .verification
621 .as_ref()
622 .map(compact_json_value)
623 .or_else(|| {
624 stage
625 .visible_text
626 .as_ref()
627 .filter(|value| !value.trim().is_empty())
628 .cloned()
629 }),
630 });
631 }
632
633 if stage.transcript.is_some() {
634 transcript_pointers.push(RunTranscriptPointerRecord {
635 id: format!("stage:{}:transcript", stage.id),
636 label: format!("Stage {} transcript", stage.node_id),
637 kind: "embedded_transcript".to_string(),
638 location: format!("run.stages[{}].transcript", stage.node_id),
639 path: run.persisted_path.clone(),
640 available: true,
641 });
642 }
643
644 if let Some(payload) = stage_result_payload(stage) {
645 let trace = payload.get("trace");
646 let task_ledger = payload
647 .get("task_ledger")
648 .and_then(task_ledger_summary_from_value);
649 let research_facts = task_ledger
650 .as_ref()
651 .map(|ledger| ledger.observations.clone())
652 .unwrap_or_default();
653 research_fact_count += research_facts.len();
654 let tools_used = json_string_array(
655 payload
656 .get("tools_used")
657 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
658 );
659 let successful_tools = json_string_array(payload.get("successful_tools"));
660 let planner_round = RunPlannerRoundRecord {
661 stage_id: stage.id.clone(),
662 node_id: stage.node_id.clone(),
663 stage_kind: stage.kind.clone(),
664 status: stage.status.clone(),
665 outcome: stage.outcome.clone(),
666 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
667 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
668 tool_execution_count: json_usize(
669 trace.and_then(|trace| trace.get("tool_executions")),
670 ),
671 tool_rejection_count: json_usize(
672 trace.and_then(|trace| trace.get("tool_rejections")),
673 ),
674 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
675 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
676 tools_used,
677 successful_tools,
678 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
679 task_ledger,
680 research_facts,
681 };
682 let has_agentic_detail = planner_round.iteration_count > 0
683 || planner_round.llm_call_count > 0
684 || planner_round.tool_execution_count > 0
685 || planner_round.ledger_done_rejections > 0
686 || planner_round.task_ledger.is_some()
687 || !planner_round.tools_used.is_empty()
688 || !planner_round.successful_tools.is_empty();
689 if has_agentic_detail {
690 planner_rounds.push(planner_round);
691 }
692 }
693 }
694
695 for transition in &run.transitions {
696 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
697 continue;
698 };
699 let from_id = transition
700 .from_stage_id
701 .as_ref()
702 .and_then(|stage_id| stage_node_ids.get(stage_id))
703 .cloned()
704 .or_else(|| {
705 transition
706 .from_node_id
707 .as_ref()
708 .and_then(|node_id| stage_by_node_id.get(node_id))
709 .cloned()
710 })
711 .unwrap_or_else(|| root_node_id.clone());
712 action_graph_edges.push(RunActionGraphEdgeRecord {
713 from_id,
714 to_id,
715 kind: "transition".to_string(),
716 label: transition.branch.clone(),
717 });
718 }
719
720 let worker_lineage = run
721 .child_runs
722 .iter()
723 .map(|child| {
724 let worker_node_id = format!("worker:{}", child.worker_id);
725 action_graph_nodes.push(RunActionGraphNodeRecord {
726 id: worker_node_id.clone(),
727 label: child.worker_name.clone(),
728 kind: "worker".to_string(),
729 status: child.status.clone(),
730 outcome: child.status.clone(),
731 stage_id: child.parent_stage_id.clone(),
732 node_id: None,
733 worker_id: Some(child.worker_id.clone()),
734 run_id: child.run_id.clone(),
735 run_path: child.run_path.clone(),
736 });
737 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
738 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
739 action_graph_edges.push(RunActionGraphEdgeRecord {
740 from_id: stage_node_id.clone(),
741 to_id: worker_node_id,
742 kind: "delegates".to_string(),
743 label: Some(child.worker_name.clone()),
744 });
745 }
746 }
747 RunWorkerLineageRecord {
748 worker_id: child.worker_id.clone(),
749 worker_name: child.worker_name.clone(),
750 parent_stage_id: child.parent_stage_id.clone(),
751 task: child.task.clone(),
752 status: child.status.clone(),
753 session_id: child.session_id.clone(),
754 parent_session_id: child.parent_session_id.clone(),
755 run_id: child.run_id.clone(),
756 run_path: child.run_path.clone(),
757 snapshot_path: child.snapshot_path.clone(),
758 }
759 })
760 .collect::<Vec<_>>();
761
762 if run.transcript.is_some() {
763 transcript_pointers.push(RunTranscriptPointerRecord {
764 id: "run:transcript".to_string(),
765 label: "Run transcript".to_string(),
766 kind: "embedded_transcript".to_string(),
767 location: "run.transcript".to_string(),
768 path: run.persisted_path.clone(),
769 available: true,
770 });
771 }
772
773 if let Some(path) = persisted_path {
774 let stem = path
775 .file_stem()
776 .and_then(|value| value.to_str())
777 .unwrap_or_default();
778 if !stem.is_empty() {
779 let sidecar_path = path
780 .parent()
781 .unwrap_or_else(|| Path::new("."))
782 .join(format!("{stem}-llm/llm_transcript.jsonl"));
783 transcript_pointers.push(RunTranscriptPointerRecord {
784 id: "run:llm_transcript".to_string(),
785 label: "LLM transcript sidecar".to_string(),
786 kind: "llm_jsonl".to_string(),
787 location: "run sidecar".to_string(),
788 path: Some(sidecar_path.to_string_lossy().into_owned()),
789 available: sidecar_path.exists(),
790 });
791 }
792 }
793
794 RunObservabilityRecord {
795 schema_version: 1,
796 planner_rounds,
797 research_fact_count,
798 action_graph_nodes,
799 action_graph_edges,
800 worker_lineage,
801 verification_outcomes,
802 transcript_pointers,
803 }
804}
805
806fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
807 run.observability = Some(derive_run_observability(run, persisted_path));
808}
809
810pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
811 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
812 if run.type_name.is_empty() {
813 run.type_name = "run_record".to_string();
814 }
815 if run.id.is_empty() {
816 run.id = new_id("run");
817 }
818 if run.started_at.is_empty() {
819 run.started_at = now_rfc3339();
820 }
821 if run.status.is_empty() {
822 run.status = "running".to_string();
823 }
824 if run.root_run_id.is_none() {
825 run.root_run_id = Some(run.id.clone());
826 }
827 if run.replay_fixture.is_none() {
828 run.replay_fixture = Some(replay_fixture_from_run(&run));
829 }
830 if run.observability.is_none() {
831 let persisted_path = run.persisted_path.clone();
832 let persisted = persisted_path.as_deref().map(Path::new);
833 refresh_run_observability(&mut run, persisted);
834 }
835 Ok(run)
836}
837
838pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
839 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
840 if manifest.type_name.is_empty() {
841 manifest.type_name = "eval_suite_manifest".to_string();
842 }
843 if manifest.id.is_empty() {
844 manifest.id = new_id("eval_suite");
845 }
846 Ok(manifest)
847}
848
849fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
850 let content = std::fs::read_to_string(path)
851 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
852 serde_json::from_str(&content)
853 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
854}
855
856fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
857 let path_buf = PathBuf::from(path);
858 if path_buf.is_absolute() {
859 path_buf
860 } else if let Some(base_dir) = base_dir {
861 base_dir.join(path_buf)
862 } else {
863 path_buf
864 }
865}
866
867pub fn evaluate_run_suite_manifest(
868 manifest: &EvalSuiteManifest,
869) -> Result<ReplayEvalSuiteReport, VmError> {
870 let base_dir = manifest.base_dir.as_deref().map(Path::new);
871 let mut reports = Vec::new();
872 for case in &manifest.cases {
873 let run_path = resolve_manifest_path(base_dir, &case.run_path);
874 let run = load_run_record(&run_path)?;
875 let fixture = match &case.fixture_path {
876 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
877 None => run
878 .replay_fixture
879 .clone()
880 .unwrap_or_else(|| replay_fixture_from_run(&run)),
881 };
882 let eval = evaluate_run_against_fixture(&run, &fixture);
883 let mut pass = eval.pass;
884 let mut failures = eval.failures;
885 let comparison = match &case.compare_to {
886 Some(path) => {
887 let baseline_path = resolve_manifest_path(base_dir, path);
888 let baseline = load_run_record(&baseline_path)?;
889 let diff = diff_run_records(&baseline, &run);
890 if !diff.identical {
891 pass = false;
892 failures.push(format!(
893 "run differs from baseline {} with {} stage changes",
894 baseline_path.display(),
895 diff.stage_diffs.len()
896 ));
897 }
898 Some(diff)
899 }
900 None => None,
901 };
902 reports.push(ReplayEvalCaseReport {
903 run_id: run.id.clone(),
904 workflow_id: run.workflow_id.clone(),
905 label: case.label.clone(),
906 pass,
907 failures,
908 stage_count: eval.stage_count,
909 source_path: Some(run_path.display().to_string()),
910 comparison,
911 });
912 }
913 let total = reports.len();
914 let passed = reports.iter().filter(|report| report.pass).count();
915 let failed = total.saturating_sub(passed);
916 Ok(ReplayEvalSuiteReport {
917 pass: failed == 0,
918 total,
919 passed,
920 failed,
921 cases: reports,
922 })
923}
924
925#[derive(Clone, Copy, PartialEq, Eq, Debug)]
927pub(crate) enum DiffOp {
928 Equal,
929 Delete,
930 Insert,
931}
932
933pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
937 let n = a.len() as isize;
938 let m = b.len() as isize;
939 if n == 0 && m == 0 {
940 return Vec::new();
941 }
942 if n == 0 {
943 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
944 }
945 if m == 0 {
946 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
947 }
948
949 let max_d = (n + m) as usize;
950 let offset = max_d as isize;
951 let v_size = 2 * max_d + 1;
952 let mut v = vec![0isize; v_size];
953 let mut trace: Vec<Vec<isize>> = Vec::new();
955
956 'outer: for d in 0..=max_d as isize {
957 trace.push(v.clone());
958 let mut new_v = v.clone();
959 for k in (-d..=d).step_by(2) {
960 let ki = (k + offset) as usize;
961 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
962 v[ki + 1]
963 } else {
964 v[ki - 1] + 1
965 };
966 let mut y = x - k;
967 while x < n && y < m && a[x as usize] == b[y as usize] {
968 x += 1;
969 y += 1;
970 }
971 new_v[ki] = x;
972 if x >= n && y >= m {
973 let _ = new_v;
974 break 'outer;
975 }
976 }
977 v = new_v;
978 }
979
980 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
981 let mut x = n;
982 let mut y = m;
983 for d in (1..trace.len() as isize).rev() {
984 let k = x - y;
985 let v_prev = &trace[d as usize];
986 let prev_k = if k == -d
987 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
988 {
989 k + 1
990 } else {
991 k - 1
992 };
993 let prev_x = v_prev[(prev_k + offset) as usize];
994 let prev_y = prev_x - prev_k;
995
996 while x > prev_x && y > prev_y {
997 x -= 1;
998 y -= 1;
999 ops.push((DiffOp::Equal, x as usize));
1000 }
1001 if prev_k < k {
1002 x -= 1;
1003 ops.push((DiffOp::Delete, x as usize));
1004 } else {
1005 y -= 1;
1006 ops.push((DiffOp::Insert, y as usize));
1007 }
1008 }
1009 while x > 0 && y > 0 {
1010 x -= 1;
1011 y -= 1;
1012 ops.push((DiffOp::Equal, x as usize));
1013 }
1014 ops.reverse();
1015 ops
1016}
1017
1018pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1019 let before_lines: Vec<&str> = before.lines().collect();
1020 let after_lines: Vec<&str> = after.lines().collect();
1021 let ops = myers_diff(&before_lines, &after_lines);
1022
1023 let mut diff = String::new();
1024 let file = path.unwrap_or("artifact");
1025 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1026 for &(op, idx) in &ops {
1027 match op {
1028 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1029 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1030 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1031 }
1032 }
1033 diff
1034}
1035
1036pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1037 let path = path
1038 .map(PathBuf::from)
1039 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1040 let mut materialized = run.clone();
1041 if materialized.replay_fixture.is_none() {
1042 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1043 }
1044 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1045 refresh_run_observability(&mut materialized, Some(&path));
1046 if let Some(parent) = path.parent() {
1047 std::fs::create_dir_all(parent)
1048 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1049 }
1050 let json = serde_json::to_string_pretty(&materialized)
1051 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1052 let tmp_path = path.with_extension("json.tmp");
1054 std::fs::write(&tmp_path, &json)
1055 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1056 std::fs::rename(&tmp_path, &path).map_err(|e| {
1057 let _ = std::fs::write(&path, &json);
1059 VmError::Runtime(format!("failed to finalize run record: {e}"))
1060 })?;
1061 Ok(path.to_string_lossy().into_owned())
1062}
1063
1064pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1065 let content = std::fs::read_to_string(path)
1066 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1067 let mut run: RunRecord = serde_json::from_str(&content)
1068 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1069 if run.replay_fixture.is_none() {
1070 run.replay_fixture = Some(replay_fixture_from_run(&run));
1071 }
1072 run.persisted_path
1073 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1074 refresh_run_observability(&mut run, Some(path));
1075 Ok(run)
1076}
1077
1078pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1079 ReplayFixture {
1080 type_name: "replay_fixture".to_string(),
1081 id: new_id("fixture"),
1082 source_run_id: run.id.clone(),
1083 workflow_id: run.workflow_id.clone(),
1084 workflow_name: run.workflow_name.clone(),
1085 created_at: now_rfc3339(),
1086 expected_status: run.status.clone(),
1087 stage_assertions: run
1088 .stages
1089 .iter()
1090 .map(|stage| ReplayStageAssertion {
1091 node_id: stage.node_id.clone(),
1092 expected_status: stage.status.clone(),
1093 expected_outcome: stage.outcome.clone(),
1094 expected_branch: stage.branch.clone(),
1095 required_artifact_kinds: stage
1096 .artifacts
1097 .iter()
1098 .map(|artifact| artifact.kind.clone())
1099 .collect(),
1100 visible_text_contains: stage
1101 .visible_text
1102 .as_ref()
1103 .filter(|text| !text.is_empty())
1104 .map(|text| text.chars().take(80).collect()),
1105 })
1106 .collect(),
1107 }
1108}
1109
1110pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1111 let mut failures = Vec::new();
1112 if run.status != fixture.expected_status {
1113 failures.push(format!(
1114 "run status mismatch: expected {}, got {}",
1115 fixture.expected_status, run.status
1116 ));
1117 }
1118 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1119 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1120 for assertion in &fixture.stage_assertions {
1121 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1122 failures.push(format!("missing stage {}", assertion.node_id));
1123 continue;
1124 };
1125 if stage.status != assertion.expected_status {
1126 failures.push(format!(
1127 "stage {} status mismatch: expected {}, got {}",
1128 assertion.node_id, assertion.expected_status, stage.status
1129 ));
1130 }
1131 if stage.outcome != assertion.expected_outcome {
1132 failures.push(format!(
1133 "stage {} outcome mismatch: expected {}, got {}",
1134 assertion.node_id, assertion.expected_outcome, stage.outcome
1135 ));
1136 }
1137 if stage.branch != assertion.expected_branch {
1138 failures.push(format!(
1139 "stage {} branch mismatch: expected {:?}, got {:?}",
1140 assertion.node_id, assertion.expected_branch, stage.branch
1141 ));
1142 }
1143 for required_kind in &assertion.required_artifact_kinds {
1144 if !stage
1145 .artifacts
1146 .iter()
1147 .any(|artifact| &artifact.kind == required_kind)
1148 {
1149 failures.push(format!(
1150 "stage {} missing artifact kind {}",
1151 assertion.node_id, required_kind
1152 ));
1153 }
1154 }
1155 if let Some(snippet) = &assertion.visible_text_contains {
1156 let actual = stage.visible_text.clone().unwrap_or_default();
1157 if !actual.contains(snippet) {
1158 failures.push(format!(
1159 "stage {} visible text does not contain expected snippet {:?}",
1160 assertion.node_id, snippet
1161 ));
1162 }
1163 }
1164 }
1165
1166 ReplayEvalReport {
1167 pass: failures.is_empty(),
1168 failures,
1169 stage_count: run.stages.len(),
1170 }
1171}
1172
1173pub fn evaluate_run_suite(
1174 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1175) -> ReplayEvalSuiteReport {
1176 let mut reports = Vec::new();
1177 for (run, fixture, source_path) in cases {
1178 let report = evaluate_run_against_fixture(&run, &fixture);
1179 reports.push(ReplayEvalCaseReport {
1180 run_id: run.id.clone(),
1181 workflow_id: run.workflow_id.clone(),
1182 label: None,
1183 pass: report.pass,
1184 failures: report.failures,
1185 stage_count: report.stage_count,
1186 source_path,
1187 comparison: None,
1188 });
1189 }
1190 let total = reports.len();
1191 let passed = reports.iter().filter(|report| report.pass).count();
1192 let failed = total.saturating_sub(passed);
1193 ReplayEvalSuiteReport {
1194 pass: failed == 0,
1195 total,
1196 passed,
1197 failed,
1198 cases: reports,
1199 }
1200}
1201
1202pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1203 let mut stage_diffs = Vec::new();
1204 let mut all_node_ids = BTreeSet::new();
1205 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1206 .stages
1207 .iter()
1208 .map(|s| (s.node_id.as_str(), s))
1209 .collect();
1210 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1211 .stages
1212 .iter()
1213 .map(|s| (s.node_id.as_str(), s))
1214 .collect();
1215 all_node_ids.extend(left_by_id.keys().copied());
1216 all_node_ids.extend(right_by_id.keys().copied());
1217
1218 for node_id in all_node_ids {
1219 let left_stage = left_by_id.get(node_id).copied();
1220 let right_stage = right_by_id.get(node_id).copied();
1221 match (left_stage, right_stage) {
1222 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1223 node_id: node_id.to_string(),
1224 change: "removed".to_string(),
1225 details: vec!["stage missing from right run".to_string()],
1226 }),
1227 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1228 node_id: node_id.to_string(),
1229 change: "added".to_string(),
1230 details: vec!["stage missing from left run".to_string()],
1231 }),
1232 (Some(left_stage), Some(right_stage)) => {
1233 let mut details = Vec::new();
1234 if left_stage.status != right_stage.status {
1235 details.push(format!(
1236 "status: {} -> {}",
1237 left_stage.status, right_stage.status
1238 ));
1239 }
1240 if left_stage.outcome != right_stage.outcome {
1241 details.push(format!(
1242 "outcome: {} -> {}",
1243 left_stage.outcome, right_stage.outcome
1244 ));
1245 }
1246 if left_stage.branch != right_stage.branch {
1247 details.push(format!(
1248 "branch: {:?} -> {:?}",
1249 left_stage.branch, right_stage.branch
1250 ));
1251 }
1252 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1253 {
1254 details.push(format!(
1255 "produced_artifacts: {} -> {}",
1256 left_stage.produced_artifact_ids.len(),
1257 right_stage.produced_artifact_ids.len()
1258 ));
1259 }
1260 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1261 details.push(format!(
1262 "artifact_records: {} -> {}",
1263 left_stage.artifacts.len(),
1264 right_stage.artifacts.len()
1265 ));
1266 }
1267 if !details.is_empty() {
1268 stage_diffs.push(RunStageDiffRecord {
1269 node_id: node_id.to_string(),
1270 change: "changed".to_string(),
1271 details,
1272 });
1273 }
1274 }
1275 (None, None) => {}
1276 }
1277 }
1278
1279 let mut tool_diffs = Vec::new();
1280 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1281 .tool_recordings
1282 .iter()
1283 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1284 .collect();
1285 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1286 .tool_recordings
1287 .iter()
1288 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1289 .collect();
1290 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1291 .keys()
1292 .chain(right_tools.keys())
1293 .cloned()
1294 .collect();
1295 for key in &all_tool_keys {
1296 let l = left_tools.get(key);
1297 let r = right_tools.get(key);
1298 let result_changed = match (l, r) {
1299 (Some(a), Some(b)) => a.result != b.result,
1300 _ => true,
1301 };
1302 if result_changed {
1303 tool_diffs.push(ToolCallDiffRecord {
1304 tool_name: key.0.clone(),
1305 args_hash: key.1.clone(),
1306 result_changed,
1307 left_result: l.map(|t| t.result.clone()),
1308 right_result: r.map(|t| t.result.clone()),
1309 });
1310 }
1311 }
1312
1313 let left_observability = left.observability.clone().unwrap_or_else(|| {
1314 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1315 });
1316 let right_observability = right.observability.clone().unwrap_or_else(|| {
1317 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1318 });
1319 let mut observability_diffs = Vec::new();
1320
1321 let left_workers = left_observability
1322 .worker_lineage
1323 .iter()
1324 .map(|worker| {
1325 (
1326 worker.worker_id.clone(),
1327 (
1328 worker.status.clone(),
1329 worker.run_id.clone(),
1330 worker.run_path.clone(),
1331 ),
1332 )
1333 })
1334 .collect::<BTreeMap<_, _>>();
1335 let right_workers = right_observability
1336 .worker_lineage
1337 .iter()
1338 .map(|worker| {
1339 (
1340 worker.worker_id.clone(),
1341 (
1342 worker.status.clone(),
1343 worker.run_id.clone(),
1344 worker.run_path.clone(),
1345 ),
1346 )
1347 })
1348 .collect::<BTreeMap<_, _>>();
1349 let worker_ids = left_workers
1350 .keys()
1351 .chain(right_workers.keys())
1352 .cloned()
1353 .collect::<BTreeSet<_>>();
1354 for worker_id in worker_ids {
1355 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1356 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1357 section: "worker_lineage".to_string(),
1358 label: worker_id,
1359 details: vec!["worker missing from right run".to_string()],
1360 }),
1361 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1362 section: "worker_lineage".to_string(),
1363 label: worker_id,
1364 details: vec!["worker missing from left run".to_string()],
1365 }),
1366 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1367 let mut details = Vec::new();
1368 if left_worker.0 != right_worker.0 {
1369 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1370 }
1371 if left_worker.1 != right_worker.1 {
1372 details.push(format!(
1373 "run_id: {:?} -> {:?}",
1374 left_worker.1, right_worker.1
1375 ));
1376 }
1377 if left_worker.2 != right_worker.2 {
1378 details.push(format!(
1379 "run_path: {:?} -> {:?}",
1380 left_worker.2, right_worker.2
1381 ));
1382 }
1383 observability_diffs.push(RunObservabilityDiffRecord {
1384 section: "worker_lineage".to_string(),
1385 label: worker_id,
1386 details,
1387 });
1388 }
1389 _ => {}
1390 }
1391 }
1392
1393 let left_rounds = left_observability
1394 .planner_rounds
1395 .iter()
1396 .map(|round| (round.stage_id.clone(), round))
1397 .collect::<BTreeMap<_, _>>();
1398 let right_rounds = right_observability
1399 .planner_rounds
1400 .iter()
1401 .map(|round| (round.stage_id.clone(), round))
1402 .collect::<BTreeMap<_, _>>();
1403 let round_ids = left_rounds
1404 .keys()
1405 .chain(right_rounds.keys())
1406 .cloned()
1407 .collect::<BTreeSet<_>>();
1408 for stage_id in round_ids {
1409 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1410 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1411 section: "planner_rounds".to_string(),
1412 label: stage_id,
1413 details: vec!["planner summary missing from right run".to_string()],
1414 }),
1415 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1416 section: "planner_rounds".to_string(),
1417 label: stage_id,
1418 details: vec!["planner summary missing from left run".to_string()],
1419 }),
1420 (Some(left_round), Some(right_round)) => {
1421 let mut details = Vec::new();
1422 if left_round.iteration_count != right_round.iteration_count {
1423 details.push(format!(
1424 "iterations: {} -> {}",
1425 left_round.iteration_count, right_round.iteration_count
1426 ));
1427 }
1428 if left_round.tool_execution_count != right_round.tool_execution_count {
1429 details.push(format!(
1430 "tool_executions: {} -> {}",
1431 left_round.tool_execution_count, right_round.tool_execution_count
1432 ));
1433 }
1434 if left_round.research_facts != right_round.research_facts {
1435 details.push(format!(
1436 "research_facts: {:?} -> {:?}",
1437 left_round.research_facts, right_round.research_facts
1438 ));
1439 }
1440 let left_deliverables = left_round
1441 .task_ledger
1442 .as_ref()
1443 .map(|ledger| {
1444 ledger
1445 .deliverables
1446 .iter()
1447 .map(|item| format!("{}:{}", item.id, item.status))
1448 .collect::<Vec<_>>()
1449 })
1450 .unwrap_or_default();
1451 let right_deliverables = right_round
1452 .task_ledger
1453 .as_ref()
1454 .map(|ledger| {
1455 ledger
1456 .deliverables
1457 .iter()
1458 .map(|item| format!("{}:{}", item.id, item.status))
1459 .collect::<Vec<_>>()
1460 })
1461 .unwrap_or_default();
1462 if left_deliverables != right_deliverables {
1463 details.push(format!(
1464 "deliverables: {:?} -> {:?}",
1465 left_deliverables, right_deliverables
1466 ));
1467 }
1468 if left_round.successful_tools != right_round.successful_tools {
1469 details.push(format!(
1470 "successful_tools: {:?} -> {:?}",
1471 left_round.successful_tools, right_round.successful_tools
1472 ));
1473 }
1474 if !details.is_empty() {
1475 observability_diffs.push(RunObservabilityDiffRecord {
1476 section: "planner_rounds".to_string(),
1477 label: left_round.node_id.clone(),
1478 details,
1479 });
1480 }
1481 }
1482 _ => {}
1483 }
1484 }
1485
1486 let left_pointers = left_observability
1487 .transcript_pointers
1488 .iter()
1489 .map(|pointer| {
1490 (
1491 pointer.id.clone(),
1492 (
1493 pointer.available,
1494 pointer.path.clone(),
1495 pointer.location.clone(),
1496 ),
1497 )
1498 })
1499 .collect::<BTreeMap<_, _>>();
1500 let right_pointers = right_observability
1501 .transcript_pointers
1502 .iter()
1503 .map(|pointer| {
1504 (
1505 pointer.id.clone(),
1506 (
1507 pointer.available,
1508 pointer.path.clone(),
1509 pointer.location.clone(),
1510 ),
1511 )
1512 })
1513 .collect::<BTreeMap<_, _>>();
1514 let pointer_ids = left_pointers
1515 .keys()
1516 .chain(right_pointers.keys())
1517 .cloned()
1518 .collect::<BTreeSet<_>>();
1519 for pointer_id in pointer_ids {
1520 match (
1521 left_pointers.get(&pointer_id),
1522 right_pointers.get(&pointer_id),
1523 ) {
1524 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1525 section: "transcript_pointers".to_string(),
1526 label: pointer_id,
1527 details: vec!["pointer missing from right run".to_string()],
1528 }),
1529 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1530 section: "transcript_pointers".to_string(),
1531 label: pointer_id,
1532 details: vec!["pointer missing from left run".to_string()],
1533 }),
1534 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
1535 observability_diffs.push(RunObservabilityDiffRecord {
1536 section: "transcript_pointers".to_string(),
1537 label: pointer_id,
1538 details: vec![format!(
1539 "pointer: {:?} -> {:?}",
1540 left_pointer, right_pointer
1541 )],
1542 });
1543 }
1544 _ => {}
1545 }
1546 }
1547
1548 let left_verification = left_observability
1549 .verification_outcomes
1550 .iter()
1551 .map(|item| (item.stage_id.clone(), item))
1552 .collect::<BTreeMap<_, _>>();
1553 let right_verification = right_observability
1554 .verification_outcomes
1555 .iter()
1556 .map(|item| (item.stage_id.clone(), item))
1557 .collect::<BTreeMap<_, _>>();
1558 let verification_ids = left_verification
1559 .keys()
1560 .chain(right_verification.keys())
1561 .cloned()
1562 .collect::<BTreeSet<_>>();
1563 for stage_id in verification_ids {
1564 match (
1565 left_verification.get(&stage_id),
1566 right_verification.get(&stage_id),
1567 ) {
1568 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1569 section: "verification".to_string(),
1570 label: stage_id,
1571 details: vec!["verification missing from right run".to_string()],
1572 }),
1573 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1574 section: "verification".to_string(),
1575 label: stage_id,
1576 details: vec!["verification missing from left run".to_string()],
1577 }),
1578 (Some(left_item), Some(right_item)) if left_item != right_item => {
1579 let mut details = Vec::new();
1580 if left_item.passed != right_item.passed {
1581 details.push(format!(
1582 "passed: {:?} -> {:?}",
1583 left_item.passed, right_item.passed
1584 ));
1585 }
1586 if left_item.summary != right_item.summary {
1587 details.push(format!(
1588 "summary: {:?} -> {:?}",
1589 left_item.summary, right_item.summary
1590 ));
1591 }
1592 observability_diffs.push(RunObservabilityDiffRecord {
1593 section: "verification".to_string(),
1594 label: left_item.node_id.clone(),
1595 details,
1596 });
1597 }
1598 _ => {}
1599 }
1600 }
1601
1602 let left_graph = (
1603 left_observability.action_graph_nodes.len(),
1604 left_observability.action_graph_edges.len(),
1605 );
1606 let right_graph = (
1607 right_observability.action_graph_nodes.len(),
1608 right_observability.action_graph_edges.len(),
1609 );
1610 if left_graph != right_graph {
1611 observability_diffs.push(RunObservabilityDiffRecord {
1612 section: "action_graph".to_string(),
1613 label: "shape".to_string(),
1614 details: vec![format!(
1615 "nodes/edges: {}/{} -> {}/{}",
1616 left_graph.0, left_graph.1, right_graph.0, right_graph.1
1617 )],
1618 });
1619 }
1620
1621 let status_changed = left.status != right.status;
1622 let identical = !status_changed
1623 && stage_diffs.is_empty()
1624 && tool_diffs.is_empty()
1625 && observability_diffs.is_empty()
1626 && left.transitions.len() == right.transitions.len()
1627 && left.artifacts.len() == right.artifacts.len()
1628 && left.checkpoints.len() == right.checkpoints.len();
1629
1630 RunDiffReport {
1631 left_run_id: left.id.clone(),
1632 right_run_id: right.id.clone(),
1633 identical,
1634 status_changed,
1635 left_status: left.status.clone(),
1636 right_status: right.status.clone(),
1637 stage_diffs,
1638 tool_diffs,
1639 observability_diffs,
1640 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
1641 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
1642 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
1643 }
1644}