1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10 CapabilityPolicy,
11};
12use crate::llm::vm_value_to_json;
13use crate::value::{VmError, VmValue};
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct LlmUsageRecord {
18 pub input_tokens: i64,
19 pub output_tokens: i64,
20 pub total_duration_ms: i64,
21 pub call_count: i64,
22 pub total_cost: f64,
23 pub models: Vec<String>,
24}
25
26#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
27#[serde(default)]
28pub struct RunStageRecord {
29 pub id: String,
30 pub node_id: String,
31 pub kind: String,
32 pub status: String,
33 pub outcome: String,
34 pub branch: Option<String>,
35 pub started_at: String,
36 pub finished_at: Option<String>,
37 pub visible_text: Option<String>,
38 pub private_reasoning: Option<String>,
39 pub transcript: Option<serde_json::Value>,
40 pub verification: Option<serde_json::Value>,
41 pub usage: Option<LlmUsageRecord>,
42 pub artifacts: Vec<ArtifactRecord>,
43 pub consumed_artifact_ids: Vec<String>,
44 pub produced_artifact_ids: Vec<String>,
45 pub attempts: Vec<RunStageAttemptRecord>,
46 pub metadata: BTreeMap<String, serde_json::Value>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageAttemptRecord {
52 pub attempt: usize,
53 pub status: String,
54 pub outcome: String,
55 pub branch: Option<String>,
56 pub error: Option<String>,
57 pub verification: Option<serde_json::Value>,
58 pub started_at: String,
59 pub finished_at: Option<String>,
60}
61
62#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct RunTransitionRecord {
65 pub id: String,
66 pub from_stage_id: Option<String>,
67 pub from_node_id: Option<String>,
68 pub to_node_id: String,
69 pub branch: Option<String>,
70 pub timestamp: String,
71 pub consumed_artifact_ids: Vec<String>,
72 pub produced_artifact_ids: Vec<String>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(default)]
77pub struct RunCheckpointRecord {
78 pub id: String,
79 pub ready_nodes: Vec<String>,
80 pub completed_nodes: Vec<String>,
81 pub last_stage_id: Option<String>,
82 pub persisted_at: String,
83 pub reason: String,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayFixture {
89 #[serde(rename = "_type")]
90 pub type_name: String,
91 pub id: String,
92 pub source_run_id: String,
93 pub workflow_id: String,
94 pub workflow_name: Option<String>,
95 pub created_at: String,
96 pub expected_status: String,
97 pub stage_assertions: Vec<ReplayStageAssertion>,
98}
99
100#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(default)]
102pub struct ReplayStageAssertion {
103 pub node_id: String,
104 pub expected_status: String,
105 pub expected_outcome: String,
106 pub expected_branch: Option<String>,
107 pub required_artifact_kinds: Vec<String>,
108 pub visible_text_contains: Option<String>,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ReplayEvalReport {
114 pub pass: bool,
115 pub failures: Vec<String>,
116 pub stage_count: usize,
117}
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(default)]
121pub struct ReplayEvalCaseReport {
122 pub run_id: String,
123 pub workflow_id: String,
124 pub label: Option<String>,
125 pub pass: bool,
126 pub failures: Vec<String>,
127 pub stage_count: usize,
128 pub source_path: Option<String>,
129 pub comparison: Option<RunDiffReport>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct ReplayEvalSuiteReport {
135 pub pass: bool,
136 pub total: usize,
137 pub passed: usize,
138 pub failed: usize,
139 pub cases: Vec<ReplayEvalCaseReport>,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct RunDeliverableSummaryRecord {
145 pub id: String,
146 pub text: String,
147 pub status: String,
148 pub note: Option<String>,
149}
150
151#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
152#[serde(default)]
153pub struct RunTaskLedgerSummaryRecord {
154 pub root_task: String,
155 pub rationale: String,
156 pub deliverables: Vec<RunDeliverableSummaryRecord>,
157 pub observations: Vec<String>,
158 pub blocking_count: usize,
159}
160
161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(default)]
163pub struct RunPlannerRoundRecord {
164 pub stage_id: String,
165 pub node_id: String,
166 pub stage_kind: String,
167 pub status: String,
168 pub outcome: String,
169 pub iteration_count: usize,
170 pub llm_call_count: usize,
171 pub tool_execution_count: usize,
172 pub tool_rejection_count: usize,
173 pub intervention_count: usize,
174 pub compaction_count: usize,
175 pub tools_used: Vec<String>,
176 pub successful_tools: Vec<String>,
177 pub ledger_done_rejections: usize,
178 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
179 pub research_facts: Vec<String>,
180}
181
182#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
183#[serde(default)]
184pub struct RunWorkerLineageRecord {
185 pub worker_id: String,
186 pub worker_name: String,
187 pub parent_stage_id: Option<String>,
188 pub task: String,
189 pub status: String,
190 pub session_id: Option<String>,
191 pub parent_session_id: Option<String>,
192 pub run_id: Option<String>,
193 pub run_path: Option<String>,
194 pub snapshot_path: Option<String>,
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
198#[serde(default)]
199pub struct RunActionGraphNodeRecord {
200 pub id: String,
201 pub label: String,
202 pub kind: String,
203 pub status: String,
204 pub outcome: String,
205 pub stage_id: Option<String>,
206 pub node_id: Option<String>,
207 pub worker_id: Option<String>,
208 pub run_id: Option<String>,
209 pub run_path: Option<String>,
210}
211
212#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
213#[serde(default)]
214pub struct RunActionGraphEdgeRecord {
215 pub from_id: String,
216 pub to_id: String,
217 pub kind: String,
218 pub label: Option<String>,
219}
220
221#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
222#[serde(default)]
223pub struct RunVerificationOutcomeRecord {
224 pub stage_id: String,
225 pub node_id: String,
226 pub status: String,
227 pub passed: Option<bool>,
228 pub summary: Option<String>,
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
232#[serde(default)]
233pub struct RunTranscriptPointerRecord {
234 pub id: String,
235 pub label: String,
236 pub kind: String,
237 pub location: String,
238 pub path: Option<String>,
239 pub available: bool,
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
243#[serde(default)]
244pub struct CompactionEventRecord {
245 pub id: String,
246 pub transcript_id: Option<String>,
247 pub stage_id: Option<String>,
248 pub node_id: Option<String>,
249 pub mode: String,
250 pub strategy: String,
251 pub archived_messages: usize,
252 pub estimated_tokens_before: usize,
253 pub estimated_tokens_after: usize,
254 pub snapshot_asset_id: Option<String>,
255 pub snapshot_location: String,
256 pub snapshot_path: Option<String>,
257 pub available: bool,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct RunObservabilityRecord {
263 pub schema_version: usize,
264 pub planner_rounds: Vec<RunPlannerRoundRecord>,
265 pub research_fact_count: usize,
266 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
267 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
268 pub worker_lineage: Vec<RunWorkerLineageRecord>,
269 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
270 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
271 pub compaction_events: Vec<CompactionEventRecord>,
272}
273
274#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
275#[serde(default)]
276pub struct RunStageDiffRecord {
277 pub node_id: String,
278 pub change: String,
279 pub details: Vec<String>,
280}
281
282#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
283#[serde(default)]
284pub struct ToolCallDiffRecord {
285 pub tool_name: String,
286 pub args_hash: String,
287 pub result_changed: bool,
288 pub left_result: Option<String>,
289 pub right_result: Option<String>,
290}
291
292#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
293#[serde(default)]
294pub struct RunObservabilityDiffRecord {
295 pub section: String,
296 pub label: String,
297 pub details: Vec<String>,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct RunDiffReport {
303 pub left_run_id: String,
304 pub right_run_id: String,
305 pub identical: bool,
306 pub status_changed: bool,
307 pub left_status: String,
308 pub right_status: String,
309 pub stage_diffs: Vec<RunStageDiffRecord>,
310 pub tool_diffs: Vec<ToolCallDiffRecord>,
311 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
312 pub transition_count_delta: isize,
313 pub artifact_count_delta: isize,
314 pub checkpoint_count_delta: isize,
315}
316
317#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
318#[serde(default)]
319pub struct EvalSuiteManifest {
320 #[serde(rename = "_type")]
321 pub type_name: String,
322 pub id: String,
323 pub name: Option<String>,
324 pub base_dir: Option<String>,
325 pub cases: Vec<EvalSuiteCase>,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct EvalSuiteCase {
331 pub label: Option<String>,
332 pub run_path: String,
333 pub fixture_path: Option<String>,
334 pub compare_to: Option<String>,
335}
336
337#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
338#[serde(default)]
339pub struct RunRecord {
340 #[serde(rename = "_type")]
341 pub type_name: String,
342 pub id: String,
343 pub workflow_id: String,
344 pub workflow_name: Option<String>,
345 pub task: String,
346 pub status: String,
347 pub started_at: String,
348 pub finished_at: Option<String>,
349 pub parent_run_id: Option<String>,
350 pub root_run_id: Option<String>,
351 pub stages: Vec<RunStageRecord>,
352 pub transitions: Vec<RunTransitionRecord>,
353 pub checkpoints: Vec<RunCheckpointRecord>,
354 pub pending_nodes: Vec<String>,
355 pub completed_nodes: Vec<String>,
356 pub child_runs: Vec<RunChildRecord>,
357 pub artifacts: Vec<ArtifactRecord>,
358 pub policy: CapabilityPolicy,
359 pub execution: Option<RunExecutionRecord>,
360 pub transcript: Option<serde_json::Value>,
361 pub usage: Option<LlmUsageRecord>,
362 pub replay_fixture: Option<ReplayFixture>,
363 pub observability: Option<RunObservabilityRecord>,
364 pub trace_spans: Vec<RunTraceSpanRecord>,
365 pub tool_recordings: Vec<ToolCallRecord>,
366 pub metadata: BTreeMap<String, serde_json::Value>,
367 pub persisted_path: Option<String>,
368}
369
370#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
371#[serde(default)]
372pub struct ToolCallRecord {
373 pub tool_name: String,
374 pub tool_use_id: String,
375 pub args_hash: String,
376 pub result: String,
377 pub is_rejected: bool,
378 pub duration_ms: u64,
379 pub iteration: usize,
380 pub timestamp: String,
381}
382
383pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
385 use std::hash::{Hash, Hasher};
386 let mut hasher = std::collections::hash_map::DefaultHasher::new();
387 tool_name.hash(&mut hasher);
388 let args_str = serde_json::to_string(args).unwrap_or_default();
389 args_str.hash(&mut hasher);
390 format!("{:016x}", hasher.finish())
391}
392
393#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
394#[serde(default)]
395pub struct RunTraceSpanRecord {
396 pub span_id: u64,
397 pub parent_id: Option<u64>,
398 pub kind: String,
399 pub name: String,
400 pub start_ms: u64,
401 pub duration_ms: u64,
402 pub metadata: BTreeMap<String, serde_json::Value>,
403}
404
405#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
406#[serde(default)]
407pub struct RunChildRecord {
408 pub worker_id: String,
409 pub worker_name: String,
410 pub parent_stage_id: Option<String>,
411 pub session_id: Option<String>,
412 pub parent_session_id: Option<String>,
413 pub mutation_scope: Option<String>,
414 pub approval_policy: Option<super::ToolApprovalPolicy>,
415 pub task: String,
416 pub request: Option<serde_json::Value>,
417 pub provenance: Option<serde_json::Value>,
418 pub status: String,
419 pub started_at: String,
420 pub finished_at: Option<String>,
421 pub run_id: Option<String>,
422 pub run_path: Option<String>,
423 pub snapshot_path: Option<String>,
424 pub execution: Option<RunExecutionRecord>,
425}
426
427#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
428#[serde(default)]
429pub struct RunExecutionRecord {
430 pub cwd: Option<String>,
431 pub source_dir: Option<String>,
432 pub env: BTreeMap<String, String>,
433 pub adapter: Option<String>,
434 pub repo_path: Option<String>,
435 pub worktree_path: Option<String>,
436 pub branch: Option<String>,
437 pub base_ref: Option<String>,
438 pub cleanup: Option<String>,
439}
440
441fn compact_json_value(value: &serde_json::Value) -> String {
442 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
443}
444
445fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
446 value
447 .and_then(|value| value.as_array())
448 .map(|items| {
449 items
450 .iter()
451 .filter_map(|item| item.as_str().map(str::to_string))
452 .collect::<Vec<_>>()
453 })
454 .unwrap_or_default()
455}
456
457fn json_usize(value: Option<&serde_json::Value>) -> usize {
458 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
459}
460
461fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
462 value.and_then(|value| value.as_bool())
463}
464
465fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
466 stage
467 .artifacts
468 .iter()
469 .find_map(|artifact| artifact.data.as_ref())
470}
471
472fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
473 let deliverables = value
474 .get("deliverables")
475 .and_then(|raw| raw.as_array())
476 .map(|items| {
477 items
478 .iter()
479 .map(|item| RunDeliverableSummaryRecord {
480 id: item
481 .get("id")
482 .and_then(|value| value.as_str())
483 .unwrap_or_default()
484 .to_string(),
485 text: item
486 .get("text")
487 .and_then(|value| value.as_str())
488 .unwrap_or_default()
489 .to_string(),
490 status: item
491 .get("status")
492 .and_then(|value| value.as_str())
493 .unwrap_or_default()
494 .to_string(),
495 note: item
496 .get("note")
497 .and_then(|value| value.as_str())
498 .map(str::to_string),
499 })
500 .collect::<Vec<_>>()
501 })
502 .unwrap_or_default();
503 let observations = json_string_array(value.get("observations"));
504 let root_task = value
505 .get("root_task")
506 .and_then(|value| value.as_str())
507 .unwrap_or_default()
508 .to_string();
509 let rationale = value
510 .get("rationale")
511 .and_then(|value| value.as_str())
512 .unwrap_or_default()
513 .to_string();
514 if root_task.is_empty()
515 && rationale.is_empty()
516 && deliverables.is_empty()
517 && observations.is_empty()
518 {
519 return None;
520 }
521 let blocking_count = deliverables
522 .iter()
523 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
524 .count();
525 Some(RunTaskLedgerSummaryRecord {
526 root_task,
527 rationale,
528 deliverables,
529 observations,
530 blocking_count,
531 })
532}
533
534fn compaction_events_from_transcript(
535 transcript: &serde_json::Value,
536 stage_id: Option<&str>,
537 node_id: Option<&str>,
538 location_prefix: &str,
539 persisted_path: Option<&Path>,
540) -> Vec<CompactionEventRecord> {
541 let transcript_id = transcript
542 .get("id")
543 .and_then(|value| value.as_str())
544 .map(str::to_string);
545 let asset_ids = transcript
546 .get("assets")
547 .and_then(|value| value.as_array())
548 .map(|assets| {
549 assets
550 .iter()
551 .filter_map(|asset| {
552 asset
553 .get("id")
554 .and_then(|value| value.as_str())
555 .map(str::to_string)
556 })
557 .collect::<BTreeSet<_>>()
558 })
559 .unwrap_or_default();
560 transcript
561 .get("events")
562 .and_then(|value| value.as_array())
563 .map(|events| {
564 events
565 .iter()
566 .filter(|event| {
567 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
568 })
569 .map(|event| {
570 let metadata = event.get("metadata");
571 let snapshot_asset_id = metadata
572 .and_then(|value| value.get("snapshot_asset_id"))
573 .and_then(|value| value.as_str())
574 .map(str::to_string);
575 let available = snapshot_asset_id
576 .as_ref()
577 .is_some_and(|asset_id| asset_ids.contains(asset_id));
578 let snapshot_location = snapshot_asset_id
579 .as_ref()
580 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
581 .unwrap_or_else(|| location_prefix.to_string());
582 CompactionEventRecord {
583 id: event
584 .get("id")
585 .and_then(|value| value.as_str())
586 .unwrap_or_default()
587 .to_string(),
588 transcript_id: transcript_id.clone(),
589 stage_id: stage_id.map(str::to_string),
590 node_id: node_id.map(str::to_string),
591 mode: metadata
592 .and_then(|value| value.get("mode"))
593 .and_then(|value| value.as_str())
594 .unwrap_or_default()
595 .to_string(),
596 strategy: metadata
597 .and_then(|value| value.get("strategy"))
598 .and_then(|value| value.as_str())
599 .unwrap_or_default()
600 .to_string(),
601 archived_messages: json_usize(
602 metadata.and_then(|value| value.get("archived_messages")),
603 ),
604 estimated_tokens_before: json_usize(
605 metadata.and_then(|value| value.get("estimated_tokens_before")),
606 ),
607 estimated_tokens_after: json_usize(
608 metadata.and_then(|value| value.get("estimated_tokens_after")),
609 ),
610 snapshot_asset_id,
611 snapshot_location,
612 snapshot_path: persisted_path
613 .map(|path| path.to_string_lossy().into_owned()),
614 available,
615 }
616 })
617 .collect()
618 })
619 .unwrap_or_default()
620}
621
622pub fn derive_run_observability(
623 run: &RunRecord,
624 persisted_path: Option<&Path>,
625) -> RunObservabilityRecord {
626 let mut action_graph_nodes = Vec::new();
627 let mut action_graph_edges = Vec::new();
628 let mut verification_outcomes = Vec::new();
629 let mut planner_rounds = Vec::new();
630 let mut transcript_pointers = Vec::new();
631 let mut compaction_events = Vec::new();
632 let mut research_fact_count = 0usize;
633
634 let root_node_id = format!("run:{}", run.id);
635 action_graph_nodes.push(RunActionGraphNodeRecord {
636 id: root_node_id.clone(),
637 label: run
638 .workflow_name
639 .clone()
640 .unwrap_or_else(|| run.workflow_id.clone()),
641 kind: "run".to_string(),
642 status: run.status.clone(),
643 outcome: run.status.clone(),
644 stage_id: None,
645 node_id: None,
646 worker_id: None,
647 run_id: Some(run.id.clone()),
648 run_path: run.persisted_path.clone(),
649 });
650
651 let stage_node_ids = run
652 .stages
653 .iter()
654 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
655 .collect::<BTreeMap<_, _>>();
656 let stage_by_node_id = run
657 .stages
658 .iter()
659 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
660 .collect::<BTreeMap<_, _>>();
661
662 let incoming_nodes = run
663 .transitions
664 .iter()
665 .map(|transition| transition.to_node_id.clone())
666 .collect::<BTreeSet<_>>();
667
668 for stage in &run.stages {
669 let graph_node_id = stage_node_ids
670 .get(&stage.id)
671 .cloned()
672 .unwrap_or_else(|| format!("stage:{}", stage.id));
673 action_graph_nodes.push(RunActionGraphNodeRecord {
674 id: graph_node_id.clone(),
675 label: stage.node_id.clone(),
676 kind: "stage".to_string(),
677 status: stage.status.clone(),
678 outcome: stage.outcome.clone(),
679 stage_id: Some(stage.id.clone()),
680 node_id: Some(stage.node_id.clone()),
681 worker_id: stage
682 .metadata
683 .get("worker_id")
684 .and_then(|value| value.as_str())
685 .map(str::to_string),
686 run_id: None,
687 run_path: None,
688 });
689 if !incoming_nodes.contains(&stage.node_id) {
690 action_graph_edges.push(RunActionGraphEdgeRecord {
691 from_id: root_node_id.clone(),
692 to_id: graph_node_id.clone(),
693 kind: "entry".to_string(),
694 label: None,
695 });
696 }
697
698 if stage.kind == "verify" || stage.verification.is_some() {
699 let passed = json_bool(
700 stage
701 .verification
702 .as_ref()
703 .and_then(|value| value.get("pass")),
704 )
705 .or_else(|| {
706 json_bool(
707 stage
708 .verification
709 .as_ref()
710 .and_then(|value| value.get("success")),
711 )
712 })
713 .or_else(|| {
714 if stage.status == "completed" && stage.outcome == "success" {
715 Some(true)
716 } else if stage.status == "failed" || stage.outcome == "failed" {
717 Some(false)
718 } else {
719 None
720 }
721 });
722 verification_outcomes.push(RunVerificationOutcomeRecord {
723 stage_id: stage.id.clone(),
724 node_id: stage.node_id.clone(),
725 status: stage.status.clone(),
726 passed,
727 summary: stage
728 .verification
729 .as_ref()
730 .map(compact_json_value)
731 .or_else(|| {
732 stage
733 .visible_text
734 .as_ref()
735 .filter(|value| !value.trim().is_empty())
736 .cloned()
737 }),
738 });
739 }
740
741 if stage.transcript.is_some() {
742 transcript_pointers.push(RunTranscriptPointerRecord {
743 id: format!("stage:{}:transcript", stage.id),
744 label: format!("Stage {} transcript", stage.node_id),
745 kind: "embedded_transcript".to_string(),
746 location: format!("run.stages[{}].transcript", stage.node_id),
747 path: run.persisted_path.clone(),
748 available: true,
749 });
750 if let Some(transcript) = stage.transcript.as_ref() {
751 compaction_events.extend(compaction_events_from_transcript(
752 transcript,
753 Some(&stage.id),
754 Some(&stage.node_id),
755 &format!("run.stages[{}].transcript", stage.node_id),
756 persisted_path,
757 ));
758 }
759 }
760
761 if let Some(payload) = stage_result_payload(stage) {
762 let trace = payload.get("trace");
763 let task_ledger = payload
764 .get("task_ledger")
765 .and_then(task_ledger_summary_from_value);
766 let research_facts = task_ledger
767 .as_ref()
768 .map(|ledger| ledger.observations.clone())
769 .unwrap_or_default();
770 research_fact_count += research_facts.len();
771 let tools_used = json_string_array(
772 payload
773 .get("tools_used")
774 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
775 );
776 let successful_tools = json_string_array(payload.get("successful_tools"));
777 let planner_round = RunPlannerRoundRecord {
778 stage_id: stage.id.clone(),
779 node_id: stage.node_id.clone(),
780 stage_kind: stage.kind.clone(),
781 status: stage.status.clone(),
782 outcome: stage.outcome.clone(),
783 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
784 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
785 tool_execution_count: json_usize(
786 trace.and_then(|trace| trace.get("tool_executions")),
787 ),
788 tool_rejection_count: json_usize(
789 trace.and_then(|trace| trace.get("tool_rejections")),
790 ),
791 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
792 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
793 tools_used,
794 successful_tools,
795 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
796 task_ledger,
797 research_facts,
798 };
799 let has_agentic_detail = planner_round.iteration_count > 0
800 || planner_round.llm_call_count > 0
801 || planner_round.tool_execution_count > 0
802 || planner_round.ledger_done_rejections > 0
803 || planner_round.task_ledger.is_some()
804 || !planner_round.tools_used.is_empty()
805 || !planner_round.successful_tools.is_empty();
806 if has_agentic_detail {
807 planner_rounds.push(planner_round);
808 }
809 }
810 }
811
812 for transition in &run.transitions {
813 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
814 continue;
815 };
816 let from_id = transition
817 .from_stage_id
818 .as_ref()
819 .and_then(|stage_id| stage_node_ids.get(stage_id))
820 .cloned()
821 .or_else(|| {
822 transition
823 .from_node_id
824 .as_ref()
825 .and_then(|node_id| stage_by_node_id.get(node_id))
826 .cloned()
827 })
828 .unwrap_or_else(|| root_node_id.clone());
829 action_graph_edges.push(RunActionGraphEdgeRecord {
830 from_id,
831 to_id,
832 kind: "transition".to_string(),
833 label: transition.branch.clone(),
834 });
835 }
836
837 let worker_lineage = run
838 .child_runs
839 .iter()
840 .map(|child| {
841 let worker_node_id = format!("worker:{}", child.worker_id);
842 action_graph_nodes.push(RunActionGraphNodeRecord {
843 id: worker_node_id.clone(),
844 label: child.worker_name.clone(),
845 kind: "worker".to_string(),
846 status: child.status.clone(),
847 outcome: child.status.clone(),
848 stage_id: child.parent_stage_id.clone(),
849 node_id: None,
850 worker_id: Some(child.worker_id.clone()),
851 run_id: child.run_id.clone(),
852 run_path: child.run_path.clone(),
853 });
854 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
855 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
856 action_graph_edges.push(RunActionGraphEdgeRecord {
857 from_id: stage_node_id.clone(),
858 to_id: worker_node_id,
859 kind: "delegates".to_string(),
860 label: Some(child.worker_name.clone()),
861 });
862 }
863 }
864 RunWorkerLineageRecord {
865 worker_id: child.worker_id.clone(),
866 worker_name: child.worker_name.clone(),
867 parent_stage_id: child.parent_stage_id.clone(),
868 task: child.task.clone(),
869 status: child.status.clone(),
870 session_id: child.session_id.clone(),
871 parent_session_id: child.parent_session_id.clone(),
872 run_id: child.run_id.clone(),
873 run_path: child.run_path.clone(),
874 snapshot_path: child.snapshot_path.clone(),
875 }
876 })
877 .collect::<Vec<_>>();
878
879 if run.transcript.is_some() {
880 transcript_pointers.push(RunTranscriptPointerRecord {
881 id: "run:transcript".to_string(),
882 label: "Run transcript".to_string(),
883 kind: "embedded_transcript".to_string(),
884 location: "run.transcript".to_string(),
885 path: run.persisted_path.clone(),
886 available: true,
887 });
888 if let Some(transcript) = run.transcript.as_ref() {
889 compaction_events.extend(compaction_events_from_transcript(
890 transcript,
891 None,
892 None,
893 "run.transcript",
894 persisted_path,
895 ));
896 }
897 }
898
899 if let Some(path) = persisted_path {
900 let stem = path
901 .file_stem()
902 .and_then(|value| value.to_str())
903 .unwrap_or_default();
904 if !stem.is_empty() {
905 let sidecar_path = path
906 .parent()
907 .unwrap_or_else(|| Path::new("."))
908 .join(format!("{stem}-llm/llm_transcript.jsonl"));
909 transcript_pointers.push(RunTranscriptPointerRecord {
910 id: "run:llm_transcript".to_string(),
911 label: "LLM transcript sidecar".to_string(),
912 kind: "llm_jsonl".to_string(),
913 location: "run sidecar".to_string(),
914 path: Some(sidecar_path.to_string_lossy().into_owned()),
915 available: sidecar_path.exists(),
916 });
917 }
918 }
919
920 RunObservabilityRecord {
921 schema_version: 2,
922 planner_rounds,
923 research_fact_count,
924 action_graph_nodes,
925 action_graph_edges,
926 worker_lineage,
927 verification_outcomes,
928 transcript_pointers,
929 compaction_events,
930 }
931}
932
933fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
934 run.observability = Some(derive_run_observability(run, persisted_path));
935}
936
937pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
938 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
939 if run.type_name.is_empty() {
940 run.type_name = "run_record".to_string();
941 }
942 if run.id.is_empty() {
943 run.id = new_id("run");
944 }
945 if run.started_at.is_empty() {
946 run.started_at = now_rfc3339();
947 }
948 if run.status.is_empty() {
949 run.status = "running".to_string();
950 }
951 if run.root_run_id.is_none() {
952 run.root_run_id = Some(run.id.clone());
953 }
954 if run.replay_fixture.is_none() {
955 run.replay_fixture = Some(replay_fixture_from_run(&run));
956 }
957 if run.observability.is_none() {
958 let persisted_path = run.persisted_path.clone();
959 let persisted = persisted_path.as_deref().map(Path::new);
960 refresh_run_observability(&mut run, persisted);
961 }
962 Ok(run)
963}
964
965pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
966 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
967 if manifest.type_name.is_empty() {
968 manifest.type_name = "eval_suite_manifest".to_string();
969 }
970 if manifest.id.is_empty() {
971 manifest.id = new_id("eval_suite");
972 }
973 Ok(manifest)
974}
975
976fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
977 let content = std::fs::read_to_string(path)
978 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
979 serde_json::from_str(&content)
980 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
981}
982
983fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
984 let path_buf = PathBuf::from(path);
985 if path_buf.is_absolute() {
986 path_buf
987 } else if let Some(base_dir) = base_dir {
988 base_dir.join(path_buf)
989 } else {
990 path_buf
991 }
992}
993
994pub fn evaluate_run_suite_manifest(
995 manifest: &EvalSuiteManifest,
996) -> Result<ReplayEvalSuiteReport, VmError> {
997 let base_dir = manifest.base_dir.as_deref().map(Path::new);
998 let mut reports = Vec::new();
999 for case in &manifest.cases {
1000 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1001 let run = load_run_record(&run_path)?;
1002 let fixture = match &case.fixture_path {
1003 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1004 None => run
1005 .replay_fixture
1006 .clone()
1007 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1008 };
1009 let eval = evaluate_run_against_fixture(&run, &fixture);
1010 let mut pass = eval.pass;
1011 let mut failures = eval.failures;
1012 let comparison = match &case.compare_to {
1013 Some(path) => {
1014 let baseline_path = resolve_manifest_path(base_dir, path);
1015 let baseline = load_run_record(&baseline_path)?;
1016 let diff = diff_run_records(&baseline, &run);
1017 if !diff.identical {
1018 pass = false;
1019 failures.push(format!(
1020 "run differs from baseline {} with {} stage changes",
1021 baseline_path.display(),
1022 diff.stage_diffs.len()
1023 ));
1024 }
1025 Some(diff)
1026 }
1027 None => None,
1028 };
1029 reports.push(ReplayEvalCaseReport {
1030 run_id: run.id.clone(),
1031 workflow_id: run.workflow_id.clone(),
1032 label: case.label.clone(),
1033 pass,
1034 failures,
1035 stage_count: eval.stage_count,
1036 source_path: Some(run_path.display().to_string()),
1037 comparison,
1038 });
1039 }
1040 let total = reports.len();
1041 let passed = reports.iter().filter(|report| report.pass).count();
1042 let failed = total.saturating_sub(passed);
1043 Ok(ReplayEvalSuiteReport {
1044 pass: failed == 0,
1045 total,
1046 passed,
1047 failed,
1048 cases: reports,
1049 })
1050}
1051
1052#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1054pub(crate) enum DiffOp {
1055 Equal,
1056 Delete,
1057 Insert,
1058}
1059
1060pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1064 let n = a.len() as isize;
1065 let m = b.len() as isize;
1066 if n == 0 && m == 0 {
1067 return Vec::new();
1068 }
1069 if n == 0 {
1070 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1071 }
1072 if m == 0 {
1073 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1074 }
1075
1076 let max_d = (n + m) as usize;
1077 let offset = max_d as isize;
1078 let v_size = 2 * max_d + 1;
1079 let mut v = vec![0isize; v_size];
1080 let mut trace: Vec<Vec<isize>> = Vec::new();
1082
1083 'outer: for d in 0..=max_d as isize {
1084 trace.push(v.clone());
1085 let mut new_v = v.clone();
1086 for k in (-d..=d).step_by(2) {
1087 let ki = (k + offset) as usize;
1088 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1089 v[ki + 1]
1090 } else {
1091 v[ki - 1] + 1
1092 };
1093 let mut y = x - k;
1094 while x < n && y < m && a[x as usize] == b[y as usize] {
1095 x += 1;
1096 y += 1;
1097 }
1098 new_v[ki] = x;
1099 if x >= n && y >= m {
1100 let _ = new_v;
1101 break 'outer;
1102 }
1103 }
1104 v = new_v;
1105 }
1106
1107 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1108 let mut x = n;
1109 let mut y = m;
1110 for d in (1..trace.len() as isize).rev() {
1111 let k = x - y;
1112 let v_prev = &trace[d as usize];
1113 let prev_k = if k == -d
1114 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1115 {
1116 k + 1
1117 } else {
1118 k - 1
1119 };
1120 let prev_x = v_prev[(prev_k + offset) as usize];
1121 let prev_y = prev_x - prev_k;
1122
1123 while x > prev_x && y > prev_y {
1124 x -= 1;
1125 y -= 1;
1126 ops.push((DiffOp::Equal, x as usize));
1127 }
1128 if prev_k < k {
1129 x -= 1;
1130 ops.push((DiffOp::Delete, x as usize));
1131 } else {
1132 y -= 1;
1133 ops.push((DiffOp::Insert, y as usize));
1134 }
1135 }
1136 while x > 0 && y > 0 {
1137 x -= 1;
1138 y -= 1;
1139 ops.push((DiffOp::Equal, x as usize));
1140 }
1141 ops.reverse();
1142 ops
1143}
1144
1145pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1146 let before_lines: Vec<&str> = before.lines().collect();
1147 let after_lines: Vec<&str> = after.lines().collect();
1148 let ops = myers_diff(&before_lines, &after_lines);
1149
1150 let mut diff = String::new();
1151 let file = path.unwrap_or("artifact");
1152 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1153 for &(op, idx) in &ops {
1154 match op {
1155 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1156 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1157 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1158 }
1159 }
1160 diff
1161}
1162
1163pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1164 let path = path
1165 .map(PathBuf::from)
1166 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1167 let mut materialized = run.clone();
1168 if materialized.replay_fixture.is_none() {
1169 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1170 }
1171 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1172 refresh_run_observability(&mut materialized, Some(&path));
1173 if let Some(parent) = path.parent() {
1174 std::fs::create_dir_all(parent)
1175 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1176 }
1177 let json = serde_json::to_string_pretty(&materialized)
1178 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1179 let tmp_path = path.with_extension("json.tmp");
1181 std::fs::write(&tmp_path, &json)
1182 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1183 std::fs::rename(&tmp_path, &path).map_err(|e| {
1184 let _ = std::fs::write(&path, &json);
1186 VmError::Runtime(format!("failed to finalize run record: {e}"))
1187 })?;
1188 Ok(path.to_string_lossy().into_owned())
1189}
1190
1191pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1192 let content = std::fs::read_to_string(path)
1193 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1194 let mut run: RunRecord = serde_json::from_str(&content)
1195 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1196 if run.replay_fixture.is_none() {
1197 run.replay_fixture = Some(replay_fixture_from_run(&run));
1198 }
1199 run.persisted_path
1200 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1201 refresh_run_observability(&mut run, Some(path));
1202 Ok(run)
1203}
1204
1205pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1206 ReplayFixture {
1207 type_name: "replay_fixture".to_string(),
1208 id: new_id("fixture"),
1209 source_run_id: run.id.clone(),
1210 workflow_id: run.workflow_id.clone(),
1211 workflow_name: run.workflow_name.clone(),
1212 created_at: now_rfc3339(),
1213 expected_status: run.status.clone(),
1214 stage_assertions: run
1215 .stages
1216 .iter()
1217 .map(|stage| ReplayStageAssertion {
1218 node_id: stage.node_id.clone(),
1219 expected_status: stage.status.clone(),
1220 expected_outcome: stage.outcome.clone(),
1221 expected_branch: stage.branch.clone(),
1222 required_artifact_kinds: stage
1223 .artifacts
1224 .iter()
1225 .map(|artifact| artifact.kind.clone())
1226 .collect(),
1227 visible_text_contains: stage
1228 .visible_text
1229 .as_ref()
1230 .filter(|text| !text.is_empty())
1231 .map(|text| text.chars().take(80).collect()),
1232 })
1233 .collect(),
1234 }
1235}
1236
1237pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1238 let mut failures = Vec::new();
1239 if run.status != fixture.expected_status {
1240 failures.push(format!(
1241 "run status mismatch: expected {}, got {}",
1242 fixture.expected_status, run.status
1243 ));
1244 }
1245 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1246 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1247 for assertion in &fixture.stage_assertions {
1248 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1249 failures.push(format!("missing stage {}", assertion.node_id));
1250 continue;
1251 };
1252 if stage.status != assertion.expected_status {
1253 failures.push(format!(
1254 "stage {} status mismatch: expected {}, got {}",
1255 assertion.node_id, assertion.expected_status, stage.status
1256 ));
1257 }
1258 if stage.outcome != assertion.expected_outcome {
1259 failures.push(format!(
1260 "stage {} outcome mismatch: expected {}, got {}",
1261 assertion.node_id, assertion.expected_outcome, stage.outcome
1262 ));
1263 }
1264 if stage.branch != assertion.expected_branch {
1265 failures.push(format!(
1266 "stage {} branch mismatch: expected {:?}, got {:?}",
1267 assertion.node_id, assertion.expected_branch, stage.branch
1268 ));
1269 }
1270 for required_kind in &assertion.required_artifact_kinds {
1271 if !stage
1272 .artifacts
1273 .iter()
1274 .any(|artifact| &artifact.kind == required_kind)
1275 {
1276 failures.push(format!(
1277 "stage {} missing artifact kind {}",
1278 assertion.node_id, required_kind
1279 ));
1280 }
1281 }
1282 if let Some(snippet) = &assertion.visible_text_contains {
1283 let actual = stage.visible_text.clone().unwrap_or_default();
1284 if !actual.contains(snippet) {
1285 failures.push(format!(
1286 "stage {} visible text does not contain expected snippet {:?}",
1287 assertion.node_id, snippet
1288 ));
1289 }
1290 }
1291 }
1292
1293 ReplayEvalReport {
1294 pass: failures.is_empty(),
1295 failures,
1296 stage_count: run.stages.len(),
1297 }
1298}
1299
1300pub fn evaluate_run_suite(
1301 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1302) -> ReplayEvalSuiteReport {
1303 let mut reports = Vec::new();
1304 for (run, fixture, source_path) in cases {
1305 let report = evaluate_run_against_fixture(&run, &fixture);
1306 reports.push(ReplayEvalCaseReport {
1307 run_id: run.id.clone(),
1308 workflow_id: run.workflow_id.clone(),
1309 label: None,
1310 pass: report.pass,
1311 failures: report.failures,
1312 stage_count: report.stage_count,
1313 source_path,
1314 comparison: None,
1315 });
1316 }
1317 let total = reports.len();
1318 let passed = reports.iter().filter(|report| report.pass).count();
1319 let failed = total.saturating_sub(passed);
1320 ReplayEvalSuiteReport {
1321 pass: failed == 0,
1322 total,
1323 passed,
1324 failed,
1325 cases: reports,
1326 }
1327}
1328
1329pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1330 let mut stage_diffs = Vec::new();
1331 let mut all_node_ids = BTreeSet::new();
1332 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1333 .stages
1334 .iter()
1335 .map(|s| (s.node_id.as_str(), s))
1336 .collect();
1337 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1338 .stages
1339 .iter()
1340 .map(|s| (s.node_id.as_str(), s))
1341 .collect();
1342 all_node_ids.extend(left_by_id.keys().copied());
1343 all_node_ids.extend(right_by_id.keys().copied());
1344
1345 for node_id in all_node_ids {
1346 let left_stage = left_by_id.get(node_id).copied();
1347 let right_stage = right_by_id.get(node_id).copied();
1348 match (left_stage, right_stage) {
1349 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1350 node_id: node_id.to_string(),
1351 change: "removed".to_string(),
1352 details: vec!["stage missing from right run".to_string()],
1353 }),
1354 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1355 node_id: node_id.to_string(),
1356 change: "added".to_string(),
1357 details: vec!["stage missing from left run".to_string()],
1358 }),
1359 (Some(left_stage), Some(right_stage)) => {
1360 let mut details = Vec::new();
1361 if left_stage.status != right_stage.status {
1362 details.push(format!(
1363 "status: {} -> {}",
1364 left_stage.status, right_stage.status
1365 ));
1366 }
1367 if left_stage.outcome != right_stage.outcome {
1368 details.push(format!(
1369 "outcome: {} -> {}",
1370 left_stage.outcome, right_stage.outcome
1371 ));
1372 }
1373 if left_stage.branch != right_stage.branch {
1374 details.push(format!(
1375 "branch: {:?} -> {:?}",
1376 left_stage.branch, right_stage.branch
1377 ));
1378 }
1379 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1380 {
1381 details.push(format!(
1382 "produced_artifacts: {} -> {}",
1383 left_stage.produced_artifact_ids.len(),
1384 right_stage.produced_artifact_ids.len()
1385 ));
1386 }
1387 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1388 details.push(format!(
1389 "artifact_records: {} -> {}",
1390 left_stage.artifacts.len(),
1391 right_stage.artifacts.len()
1392 ));
1393 }
1394 if !details.is_empty() {
1395 stage_diffs.push(RunStageDiffRecord {
1396 node_id: node_id.to_string(),
1397 change: "changed".to_string(),
1398 details,
1399 });
1400 }
1401 }
1402 (None, None) => {}
1403 }
1404 }
1405
1406 let mut tool_diffs = Vec::new();
1407 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1408 .tool_recordings
1409 .iter()
1410 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1411 .collect();
1412 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1413 .tool_recordings
1414 .iter()
1415 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1416 .collect();
1417 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1418 .keys()
1419 .chain(right_tools.keys())
1420 .cloned()
1421 .collect();
1422 for key in &all_tool_keys {
1423 let l = left_tools.get(key);
1424 let r = right_tools.get(key);
1425 let result_changed = match (l, r) {
1426 (Some(a), Some(b)) => a.result != b.result,
1427 _ => true,
1428 };
1429 if result_changed {
1430 tool_diffs.push(ToolCallDiffRecord {
1431 tool_name: key.0.clone(),
1432 args_hash: key.1.clone(),
1433 result_changed,
1434 left_result: l.map(|t| t.result.clone()),
1435 right_result: r.map(|t| t.result.clone()),
1436 });
1437 }
1438 }
1439
1440 let left_observability = left.observability.clone().unwrap_or_else(|| {
1441 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1442 });
1443 let right_observability = right.observability.clone().unwrap_or_else(|| {
1444 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1445 });
1446 let mut observability_diffs = Vec::new();
1447
1448 let left_workers = left_observability
1449 .worker_lineage
1450 .iter()
1451 .map(|worker| {
1452 (
1453 worker.worker_id.clone(),
1454 (
1455 worker.status.clone(),
1456 worker.run_id.clone(),
1457 worker.run_path.clone(),
1458 ),
1459 )
1460 })
1461 .collect::<BTreeMap<_, _>>();
1462 let right_workers = right_observability
1463 .worker_lineage
1464 .iter()
1465 .map(|worker| {
1466 (
1467 worker.worker_id.clone(),
1468 (
1469 worker.status.clone(),
1470 worker.run_id.clone(),
1471 worker.run_path.clone(),
1472 ),
1473 )
1474 })
1475 .collect::<BTreeMap<_, _>>();
1476 let worker_ids = left_workers
1477 .keys()
1478 .chain(right_workers.keys())
1479 .cloned()
1480 .collect::<BTreeSet<_>>();
1481 for worker_id in worker_ids {
1482 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1483 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1484 section: "worker_lineage".to_string(),
1485 label: worker_id,
1486 details: vec!["worker missing from right run".to_string()],
1487 }),
1488 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1489 section: "worker_lineage".to_string(),
1490 label: worker_id,
1491 details: vec!["worker missing from left run".to_string()],
1492 }),
1493 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1494 let mut details = Vec::new();
1495 if left_worker.0 != right_worker.0 {
1496 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1497 }
1498 if left_worker.1 != right_worker.1 {
1499 details.push(format!(
1500 "run_id: {:?} -> {:?}",
1501 left_worker.1, right_worker.1
1502 ));
1503 }
1504 if left_worker.2 != right_worker.2 {
1505 details.push(format!(
1506 "run_path: {:?} -> {:?}",
1507 left_worker.2, right_worker.2
1508 ));
1509 }
1510 observability_diffs.push(RunObservabilityDiffRecord {
1511 section: "worker_lineage".to_string(),
1512 label: worker_id,
1513 details,
1514 });
1515 }
1516 _ => {}
1517 }
1518 }
1519
1520 let left_rounds = left_observability
1521 .planner_rounds
1522 .iter()
1523 .map(|round| (round.stage_id.clone(), round))
1524 .collect::<BTreeMap<_, _>>();
1525 let right_rounds = right_observability
1526 .planner_rounds
1527 .iter()
1528 .map(|round| (round.stage_id.clone(), round))
1529 .collect::<BTreeMap<_, _>>();
1530 let round_ids = left_rounds
1531 .keys()
1532 .chain(right_rounds.keys())
1533 .cloned()
1534 .collect::<BTreeSet<_>>();
1535 for stage_id in round_ids {
1536 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1537 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1538 section: "planner_rounds".to_string(),
1539 label: stage_id,
1540 details: vec!["planner summary missing from right run".to_string()],
1541 }),
1542 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1543 section: "planner_rounds".to_string(),
1544 label: stage_id,
1545 details: vec!["planner summary missing from left run".to_string()],
1546 }),
1547 (Some(left_round), Some(right_round)) => {
1548 let mut details = Vec::new();
1549 if left_round.iteration_count != right_round.iteration_count {
1550 details.push(format!(
1551 "iterations: {} -> {}",
1552 left_round.iteration_count, right_round.iteration_count
1553 ));
1554 }
1555 if left_round.tool_execution_count != right_round.tool_execution_count {
1556 details.push(format!(
1557 "tool_executions: {} -> {}",
1558 left_round.tool_execution_count, right_round.tool_execution_count
1559 ));
1560 }
1561 if left_round.research_facts != right_round.research_facts {
1562 details.push(format!(
1563 "research_facts: {:?} -> {:?}",
1564 left_round.research_facts, right_round.research_facts
1565 ));
1566 }
1567 let left_deliverables = left_round
1568 .task_ledger
1569 .as_ref()
1570 .map(|ledger| {
1571 ledger
1572 .deliverables
1573 .iter()
1574 .map(|item| format!("{}:{}", item.id, item.status))
1575 .collect::<Vec<_>>()
1576 })
1577 .unwrap_or_default();
1578 let right_deliverables = right_round
1579 .task_ledger
1580 .as_ref()
1581 .map(|ledger| {
1582 ledger
1583 .deliverables
1584 .iter()
1585 .map(|item| format!("{}:{}", item.id, item.status))
1586 .collect::<Vec<_>>()
1587 })
1588 .unwrap_or_default();
1589 if left_deliverables != right_deliverables {
1590 details.push(format!(
1591 "deliverables: {:?} -> {:?}",
1592 left_deliverables, right_deliverables
1593 ));
1594 }
1595 if left_round.successful_tools != right_round.successful_tools {
1596 details.push(format!(
1597 "successful_tools: {:?} -> {:?}",
1598 left_round.successful_tools, right_round.successful_tools
1599 ));
1600 }
1601 if !details.is_empty() {
1602 observability_diffs.push(RunObservabilityDiffRecord {
1603 section: "planner_rounds".to_string(),
1604 label: left_round.node_id.clone(),
1605 details,
1606 });
1607 }
1608 }
1609 _ => {}
1610 }
1611 }
1612
1613 let left_pointers = left_observability
1614 .transcript_pointers
1615 .iter()
1616 .map(|pointer| {
1617 (
1618 pointer.id.clone(),
1619 (
1620 pointer.available,
1621 pointer.path.clone(),
1622 pointer.location.clone(),
1623 ),
1624 )
1625 })
1626 .collect::<BTreeMap<_, _>>();
1627 let right_pointers = right_observability
1628 .transcript_pointers
1629 .iter()
1630 .map(|pointer| {
1631 (
1632 pointer.id.clone(),
1633 (
1634 pointer.available,
1635 pointer.path.clone(),
1636 pointer.location.clone(),
1637 ),
1638 )
1639 })
1640 .collect::<BTreeMap<_, _>>();
1641 let pointer_ids = left_pointers
1642 .keys()
1643 .chain(right_pointers.keys())
1644 .cloned()
1645 .collect::<BTreeSet<_>>();
1646 for pointer_id in pointer_ids {
1647 match (
1648 left_pointers.get(&pointer_id),
1649 right_pointers.get(&pointer_id),
1650 ) {
1651 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1652 section: "transcript_pointers".to_string(),
1653 label: pointer_id,
1654 details: vec!["pointer missing from right run".to_string()],
1655 }),
1656 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1657 section: "transcript_pointers".to_string(),
1658 label: pointer_id,
1659 details: vec!["pointer missing from left run".to_string()],
1660 }),
1661 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
1662 observability_diffs.push(RunObservabilityDiffRecord {
1663 section: "transcript_pointers".to_string(),
1664 label: pointer_id,
1665 details: vec![format!(
1666 "pointer: {:?} -> {:?}",
1667 left_pointer, right_pointer
1668 )],
1669 });
1670 }
1671 _ => {}
1672 }
1673 }
1674
1675 let left_compactions = left_observability
1676 .compaction_events
1677 .iter()
1678 .map(|event| {
1679 (
1680 event.id.clone(),
1681 (
1682 event.strategy.clone(),
1683 event.archived_messages,
1684 event.snapshot_asset_id.clone(),
1685 event.available,
1686 ),
1687 )
1688 })
1689 .collect::<BTreeMap<_, _>>();
1690 let right_compactions = right_observability
1691 .compaction_events
1692 .iter()
1693 .map(|event| {
1694 (
1695 event.id.clone(),
1696 (
1697 event.strategy.clone(),
1698 event.archived_messages,
1699 event.snapshot_asset_id.clone(),
1700 event.available,
1701 ),
1702 )
1703 })
1704 .collect::<BTreeMap<_, _>>();
1705 let compaction_ids = left_compactions
1706 .keys()
1707 .chain(right_compactions.keys())
1708 .cloned()
1709 .collect::<BTreeSet<_>>();
1710 for compaction_id in compaction_ids {
1711 match (
1712 left_compactions.get(&compaction_id),
1713 right_compactions.get(&compaction_id),
1714 ) {
1715 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1716 section: "compaction_events".to_string(),
1717 label: compaction_id,
1718 details: vec!["compaction event missing from right run".to_string()],
1719 }),
1720 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1721 section: "compaction_events".to_string(),
1722 label: compaction_id,
1723 details: vec!["compaction event missing from left run".to_string()],
1724 }),
1725 (Some(left_event), Some(right_event)) if left_event != right_event => {
1726 observability_diffs.push(RunObservabilityDiffRecord {
1727 section: "compaction_events".to_string(),
1728 label: compaction_id,
1729 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
1730 });
1731 }
1732 _ => {}
1733 }
1734 }
1735
1736 let left_verification = left_observability
1737 .verification_outcomes
1738 .iter()
1739 .map(|item| (item.stage_id.clone(), item))
1740 .collect::<BTreeMap<_, _>>();
1741 let right_verification = right_observability
1742 .verification_outcomes
1743 .iter()
1744 .map(|item| (item.stage_id.clone(), item))
1745 .collect::<BTreeMap<_, _>>();
1746 let verification_ids = left_verification
1747 .keys()
1748 .chain(right_verification.keys())
1749 .cloned()
1750 .collect::<BTreeSet<_>>();
1751 for stage_id in verification_ids {
1752 match (
1753 left_verification.get(&stage_id),
1754 right_verification.get(&stage_id),
1755 ) {
1756 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1757 section: "verification".to_string(),
1758 label: stage_id,
1759 details: vec!["verification missing from right run".to_string()],
1760 }),
1761 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1762 section: "verification".to_string(),
1763 label: stage_id,
1764 details: vec!["verification missing from left run".to_string()],
1765 }),
1766 (Some(left_item), Some(right_item)) if left_item != right_item => {
1767 let mut details = Vec::new();
1768 if left_item.passed != right_item.passed {
1769 details.push(format!(
1770 "passed: {:?} -> {:?}",
1771 left_item.passed, right_item.passed
1772 ));
1773 }
1774 if left_item.summary != right_item.summary {
1775 details.push(format!(
1776 "summary: {:?} -> {:?}",
1777 left_item.summary, right_item.summary
1778 ));
1779 }
1780 observability_diffs.push(RunObservabilityDiffRecord {
1781 section: "verification".to_string(),
1782 label: left_item.node_id.clone(),
1783 details,
1784 });
1785 }
1786 _ => {}
1787 }
1788 }
1789
1790 let left_graph = (
1791 left_observability.action_graph_nodes.len(),
1792 left_observability.action_graph_edges.len(),
1793 );
1794 let right_graph = (
1795 right_observability.action_graph_nodes.len(),
1796 right_observability.action_graph_edges.len(),
1797 );
1798 if left_graph != right_graph {
1799 observability_diffs.push(RunObservabilityDiffRecord {
1800 section: "action_graph".to_string(),
1801 label: "shape".to_string(),
1802 details: vec![format!(
1803 "nodes/edges: {}/{} -> {}/{}",
1804 left_graph.0, left_graph.1, right_graph.0, right_graph.1
1805 )],
1806 });
1807 }
1808
1809 let status_changed = left.status != right.status;
1810 let identical = !status_changed
1811 && stage_diffs.is_empty()
1812 && tool_diffs.is_empty()
1813 && observability_diffs.is_empty()
1814 && left.transitions.len() == right.transitions.len()
1815 && left.artifacts.len() == right.artifacts.len()
1816 && left.checkpoints.len() == right.checkpoints.len();
1817
1818 RunDiffReport {
1819 left_run_id: left.id.clone(),
1820 right_run_id: right.id.clone(),
1821 identical,
1822 status_changed,
1823 left_status: left.status.clone(),
1824 right_status: right.status.clone(),
1825 stage_diffs,
1826 tool_diffs,
1827 observability_diffs,
1828 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
1829 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
1830 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
1831 }
1832}