1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10 CapabilityPolicy,
11};
12use crate::event_log::{active_event_log, EventLog, LogEvent as EventLogRecord, Topic};
13use crate::llm::vm_value_to_json;
14use crate::triggers::{SignatureStatus, TriggerEvent};
15use crate::value::{VmError, VmValue};
16
17pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
18pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
19pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
21pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
22pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
23pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
24pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
25pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
26pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
27pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
28
29pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
30pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
31pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
32pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
33pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
34pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
35pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
36pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
37pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
38
39#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
40#[serde(default)]
41pub struct LlmUsageRecord {
42 pub input_tokens: i64,
43 pub output_tokens: i64,
44 pub total_duration_ms: i64,
45 pub call_count: i64,
46 pub total_cost: f64,
47 pub models: Vec<String>,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
51#[serde(default)]
52pub struct RunStageRecord {
53 pub id: String,
54 pub node_id: String,
55 pub kind: String,
56 pub status: String,
57 pub outcome: String,
58 pub branch: Option<String>,
59 pub started_at: String,
60 pub finished_at: Option<String>,
61 pub visible_text: Option<String>,
62 pub private_reasoning: Option<String>,
63 pub transcript: Option<serde_json::Value>,
64 pub verification: Option<serde_json::Value>,
65 pub usage: Option<LlmUsageRecord>,
66 pub artifacts: Vec<ArtifactRecord>,
67 pub consumed_artifact_ids: Vec<String>,
68 pub produced_artifact_ids: Vec<String>,
69 pub attempts: Vec<RunStageAttemptRecord>,
70 pub metadata: BTreeMap<String, serde_json::Value>,
71}
72
73#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
74#[serde(default)]
75pub struct RunStageAttemptRecord {
76 pub attempt: usize,
77 pub status: String,
78 pub outcome: String,
79 pub branch: Option<String>,
80 pub error: Option<String>,
81 pub verification: Option<serde_json::Value>,
82 pub started_at: String,
83 pub finished_at: Option<String>,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct RunTransitionRecord {
89 pub id: String,
90 pub from_stage_id: Option<String>,
91 pub from_node_id: Option<String>,
92 pub to_node_id: String,
93 pub branch: Option<String>,
94 pub timestamp: String,
95 pub consumed_artifact_ids: Vec<String>,
96 pub produced_artifact_ids: Vec<String>,
97}
98
99#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
100#[serde(default)]
101pub struct RunCheckpointRecord {
102 pub id: String,
103 pub ready_nodes: Vec<String>,
104 pub completed_nodes: Vec<String>,
105 pub last_stage_id: Option<String>,
106 pub persisted_at: String,
107 pub reason: String,
108}
109
110#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
111#[serde(default)]
112pub struct ReplayFixture {
113 #[serde(rename = "_type")]
114 pub type_name: String,
115 pub id: String,
116 pub source_run_id: String,
117 pub workflow_id: String,
118 pub workflow_name: Option<String>,
119 pub created_at: String,
120 pub expected_status: String,
121 pub stage_assertions: Vec<ReplayStageAssertion>,
122}
123
124#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
125#[serde(default)]
126pub struct ReplayStageAssertion {
127 pub node_id: String,
128 pub expected_status: String,
129 pub expected_outcome: String,
130 pub expected_branch: Option<String>,
131 pub required_artifact_kinds: Vec<String>,
132 pub visible_text_contains: Option<String>,
133}
134
135#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
136#[serde(default)]
137pub struct ReplayEvalReport {
138 pub pass: bool,
139 pub failures: Vec<String>,
140 pub stage_count: usize,
141}
142
143#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
144#[serde(default)]
145pub struct ReplayEvalCaseReport {
146 pub run_id: String,
147 pub workflow_id: String,
148 pub label: Option<String>,
149 pub pass: bool,
150 pub failures: Vec<String>,
151 pub stage_count: usize,
152 pub source_path: Option<String>,
153 pub comparison: Option<RunDiffReport>,
154}
155
156#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
157#[serde(default)]
158pub struct ReplayEvalSuiteReport {
159 pub pass: bool,
160 pub total: usize,
161 pub passed: usize,
162 pub failed: usize,
163 pub cases: Vec<ReplayEvalCaseReport>,
164}
165
166#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
167#[serde(default)]
168pub struct RunDeliverableSummaryRecord {
169 pub id: String,
170 pub text: String,
171 pub status: String,
172 pub note: Option<String>,
173}
174
175#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
176#[serde(default)]
177pub struct RunTaskLedgerSummaryRecord {
178 pub root_task: String,
179 pub rationale: String,
180 pub deliverables: Vec<RunDeliverableSummaryRecord>,
181 pub observations: Vec<String>,
182 pub blocking_count: usize,
183}
184
185#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
186#[serde(default)]
187pub struct RunPlannerRoundRecord {
188 pub stage_id: String,
189 pub node_id: String,
190 pub stage_kind: String,
191 pub status: String,
192 pub outcome: String,
193 pub iteration_count: usize,
194 pub llm_call_count: usize,
195 pub tool_execution_count: usize,
196 pub tool_rejection_count: usize,
197 pub intervention_count: usize,
198 pub compaction_count: usize,
199 pub native_text_tool_fallback_count: usize,
200 pub native_text_tool_fallback_rejection_count: usize,
201 pub empty_completion_retry_count: usize,
202 pub tools_used: Vec<String>,
203 pub successful_tools: Vec<String>,
204 pub ledger_done_rejections: usize,
205 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
206 pub research_facts: Vec<String>,
207}
208
209#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(default)]
211pub struct RunWorkerLineageRecord {
212 pub worker_id: String,
213 pub worker_name: String,
214 pub parent_stage_id: Option<String>,
215 pub task: String,
216 pub status: String,
217 pub session_id: Option<String>,
218 pub parent_session_id: Option<String>,
219 pub run_id: Option<String>,
220 pub run_path: Option<String>,
221 pub snapshot_path: Option<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunActionGraphNodeRecord {
227 pub id: String,
228 pub label: String,
229 pub kind: String,
230 pub status: String,
231 pub outcome: String,
232 pub trace_id: Option<String>,
233 pub stage_id: Option<String>,
234 pub node_id: Option<String>,
235 pub worker_id: Option<String>,
236 pub run_id: Option<String>,
237 pub run_path: Option<String>,
238 pub metadata: BTreeMap<String, serde_json::Value>,
239}
240
241#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
242#[serde(default)]
243pub struct RunActionGraphEdgeRecord {
244 pub from_id: String,
245 pub to_id: String,
246 pub kind: String,
247 pub label: Option<String>,
248}
249
250#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
251#[serde(default)]
252pub struct RunVerificationOutcomeRecord {
253 pub stage_id: String,
254 pub node_id: String,
255 pub status: String,
256 pub passed: Option<bool>,
257 pub summary: Option<String>,
258}
259
260#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(default)]
262pub struct RunTranscriptPointerRecord {
263 pub id: String,
264 pub label: String,
265 pub kind: String,
266 pub location: String,
267 pub path: Option<String>,
268 pub available: bool,
269}
270
271#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
272#[serde(default)]
273pub struct CompactionEventRecord {
274 pub id: String,
275 pub transcript_id: Option<String>,
276 pub stage_id: Option<String>,
277 pub node_id: Option<String>,
278 pub mode: String,
279 pub strategy: String,
280 pub archived_messages: usize,
281 pub estimated_tokens_before: usize,
282 pub estimated_tokens_after: usize,
283 pub snapshot_asset_id: Option<String>,
284 pub snapshot_location: String,
285 pub snapshot_path: Option<String>,
286 pub available: bool,
287}
288
289#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
290#[serde(rename_all = "snake_case")]
291pub enum DaemonEventKindRecord {
292 #[default]
293 Spawned,
294 Triggered,
295 Snapshotted,
296 Resumed,
297 Stopped,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct DaemonEventRecord {
303 pub daemon_id: String,
304 pub name: String,
305 pub kind: DaemonEventKindRecord,
306 pub timestamp: String,
307 pub persist_path: String,
308 pub payload_summary: Option<String>,
309}
310
311#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
312#[serde(default)]
313pub struct RunObservabilityRecord {
314 pub schema_version: usize,
315 pub planner_rounds: Vec<RunPlannerRoundRecord>,
316 pub research_fact_count: usize,
317 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
318 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
319 pub worker_lineage: Vec<RunWorkerLineageRecord>,
320 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
321 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
322 pub compaction_events: Vec<CompactionEventRecord>,
323 pub daemon_events: Vec<DaemonEventRecord>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunStageDiffRecord {
329 pub node_id: String,
330 pub change: String,
331 pub details: Vec<String>,
332}
333
334#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
335#[serde(default)]
336pub struct ToolCallDiffRecord {
337 pub tool_name: String,
338 pub args_hash: String,
339 pub result_changed: bool,
340 pub left_result: Option<String>,
341 pub right_result: Option<String>,
342}
343
344#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
345#[serde(default)]
346pub struct RunObservabilityDiffRecord {
347 pub section: String,
348 pub label: String,
349 pub details: Vec<String>,
350}
351
352#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
353#[serde(default)]
354pub struct RunDiffReport {
355 pub left_run_id: String,
356 pub right_run_id: String,
357 pub identical: bool,
358 pub status_changed: bool,
359 pub left_status: String,
360 pub right_status: String,
361 pub stage_diffs: Vec<RunStageDiffRecord>,
362 pub tool_diffs: Vec<ToolCallDiffRecord>,
363 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
364 pub transition_count_delta: isize,
365 pub artifact_count_delta: isize,
366 pub checkpoint_count_delta: isize,
367}
368
369#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
370#[serde(default)]
371pub struct EvalSuiteManifest {
372 #[serde(rename = "_type")]
373 pub type_name: String,
374 pub id: String,
375 pub name: Option<String>,
376 pub base_dir: Option<String>,
377 pub cases: Vec<EvalSuiteCase>,
378}
379
380#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
381#[serde(default)]
382pub struct EvalSuiteCase {
383 pub label: Option<String>,
384 pub run_path: String,
385 pub fixture_path: Option<String>,
386 pub compare_to: Option<String>,
387}
388
389#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
390#[serde(default)]
391pub struct RunRecord {
392 #[serde(rename = "_type")]
393 pub type_name: String,
394 pub id: String,
395 pub workflow_id: String,
396 pub workflow_name: Option<String>,
397 pub task: String,
398 pub status: String,
399 pub started_at: String,
400 pub finished_at: Option<String>,
401 pub parent_run_id: Option<String>,
402 pub root_run_id: Option<String>,
403 pub stages: Vec<RunStageRecord>,
404 pub transitions: Vec<RunTransitionRecord>,
405 pub checkpoints: Vec<RunCheckpointRecord>,
406 pub pending_nodes: Vec<String>,
407 pub completed_nodes: Vec<String>,
408 pub child_runs: Vec<RunChildRecord>,
409 pub artifacts: Vec<ArtifactRecord>,
410 pub policy: CapabilityPolicy,
411 pub execution: Option<RunExecutionRecord>,
412 pub transcript: Option<serde_json::Value>,
413 pub usage: Option<LlmUsageRecord>,
414 pub replay_fixture: Option<ReplayFixture>,
415 pub observability: Option<RunObservabilityRecord>,
416 pub trace_spans: Vec<RunTraceSpanRecord>,
417 pub tool_recordings: Vec<ToolCallRecord>,
418 pub metadata: BTreeMap<String, serde_json::Value>,
419 pub persisted_path: Option<String>,
420}
421
422#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
423#[serde(default)]
424pub struct ToolCallRecord {
425 pub tool_name: String,
426 pub tool_use_id: String,
427 pub args_hash: String,
428 pub result: String,
429 pub is_rejected: bool,
430 pub duration_ms: u64,
431 pub iteration: usize,
432 pub timestamp: String,
433}
434
435pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
437 use std::hash::{Hash, Hasher};
438 let mut hasher = std::collections::hash_map::DefaultHasher::new();
439 tool_name.hash(&mut hasher);
440 let args_str = serde_json::to_string(args).unwrap_or_default();
441 args_str.hash(&mut hasher);
442 format!("{:016x}", hasher.finish())
443}
444
445#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
446#[serde(default)]
447pub struct RunTraceSpanRecord {
448 pub span_id: u64,
449 pub parent_id: Option<u64>,
450 pub kind: String,
451 pub name: String,
452 pub start_ms: u64,
453 pub duration_ms: u64,
454 pub metadata: BTreeMap<String, serde_json::Value>,
455}
456
457#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(default)]
459pub struct RunChildRecord {
460 pub worker_id: String,
461 pub worker_name: String,
462 pub parent_stage_id: Option<String>,
463 pub session_id: Option<String>,
464 pub parent_session_id: Option<String>,
465 pub mutation_scope: Option<String>,
466 pub approval_policy: Option<super::ToolApprovalPolicy>,
467 pub task: String,
468 pub request: Option<serde_json::Value>,
469 pub provenance: Option<serde_json::Value>,
470 pub status: String,
471 pub started_at: String,
472 pub finished_at: Option<String>,
473 pub run_id: Option<String>,
474 pub run_path: Option<String>,
475 pub snapshot_path: Option<String>,
476 pub execution: Option<RunExecutionRecord>,
477}
478
479#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
480#[serde(default)]
481pub struct RunExecutionRecord {
482 pub cwd: Option<String>,
483 pub source_dir: Option<String>,
484 pub env: BTreeMap<String, String>,
485 pub adapter: Option<String>,
486 pub repo_path: Option<String>,
487 pub worktree_path: Option<String>,
488 pub branch: Option<String>,
489 pub base_ref: Option<String>,
490 pub cleanup: Option<String>,
491}
492
493fn compact_json_value(value: &serde_json::Value) -> String {
494 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
495}
496
497fn signature_status_label(status: &SignatureStatus) -> &'static str {
498 match status {
499 SignatureStatus::Verified => "verified",
500 SignatureStatus::Unsigned => "unsigned",
501 SignatureStatus::Failed { .. } => "failed",
502 }
503}
504
505fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
506 run.metadata
507 .get("trigger_event")
508 .cloned()
509 .and_then(|value| serde_json::from_value(value).ok())
510}
511
512fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
513 trigger_event
514 .map(|event| event.trace_id.0.clone())
515 .or_else(|| {
516 run.metadata
517 .get("trace_id")
518 .and_then(|value| value.as_str())
519 .map(str::to_string)
520 })
521}
522
523fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
524 run.metadata
525 .get("replay_of_event_id")
526 .and_then(|value| value.as_str())
527 .map(str::to_string)
528}
529
530fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
531 if stage.kind == "condition" {
532 ACTION_GRAPH_NODE_KIND_PREDICATE
533 } else {
534 ACTION_GRAPH_NODE_KIND_STAGE
535 }
536}
537
538fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
539 let mut metadata = BTreeMap::new();
540 metadata.insert(
541 "provider".to_string(),
542 serde_json::json!(trigger_event.provider.as_str()),
543 );
544 metadata.insert(
545 "event_kind".to_string(),
546 serde_json::json!(trigger_event.kind),
547 );
548 metadata.insert(
549 "dedupe_key".to_string(),
550 serde_json::json!(trigger_event.dedupe_key),
551 );
552 metadata.insert(
553 "signature_status".to_string(),
554 serde_json::json!(signature_status_label(&trigger_event.signature_status)),
555 );
556 metadata
557}
558
559fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
560 let mut metadata = BTreeMap::new();
561 metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
562 if let Some(branch) = stage.branch.as_ref() {
563 metadata.insert("branch".to_string(), serde_json::json!(branch));
564 }
565 if let Some(worker_id) = stage
566 .metadata
567 .get("worker_id")
568 .and_then(|value| value.as_str())
569 {
570 metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
571 }
572 metadata
573}
574
575fn append_action_graph_node(
576 nodes: &mut Vec<RunActionGraphNodeRecord>,
577 record: RunActionGraphNodeRecord,
578) {
579 nodes.push(record);
580}
581
582pub async fn append_action_graph_update(
583 headers: BTreeMap<String, String>,
584 payload: serde_json::Value,
585) -> Result<(), crate::event_log::LogError> {
586 let Some(log) = active_event_log() else {
587 return Ok(());
588 };
589 let topic = Topic::new("observability.action_graph")
590 .expect("static observability.action_graph topic should always be valid");
591 let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
592 log.append(&topic, record).await.map(|_| ())
593}
594
595fn publish_action_graph_event(
596 run: &RunRecord,
597 observability: &RunObservabilityRecord,
598 path: &Path,
599) {
600 let trigger_event = trigger_event_from_run(run);
601 let mut headers = BTreeMap::new();
602 headers.insert("run_id".to_string(), run.id.clone());
603 headers.insert("workflow_id".to_string(), run.workflow_id.clone());
604 if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
605 headers.insert("trace_id".to_string(), trace_id);
606 }
607 let payload = serde_json::json!({
608 "run_id": run.id,
609 "workflow_id": run.workflow_id,
610 "persisted_path": path.to_string_lossy(),
611 "status": run.status,
612 "observability": observability,
613 });
614 if let Ok(handle) = tokio::runtime::Handle::try_current() {
615 handle.spawn(async move {
616 let _ = append_action_graph_update(headers, payload).await;
617 });
618 } else {
619 let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
620 }
621}
622
623fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
624 let stem = run_path.file_stem()?.to_str()?;
625 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
626 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
627}
628
629fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
630 value
631 .and_then(|value| value.as_array())
632 .map(|items| {
633 items
634 .iter()
635 .filter_map(|item| item.as_str().map(str::to_string))
636 .collect::<Vec<_>>()
637 })
638 .unwrap_or_default()
639}
640
641fn json_usize(value: Option<&serde_json::Value>) -> usize {
642 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
643}
644
645fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
646 value.and_then(|value| value.as_bool())
647}
648
649fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
650 stage
651 .artifacts
652 .iter()
653 .find_map(|artifact| artifact.data.as_ref())
654}
655
656fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
657 let deliverables = value
658 .get("deliverables")
659 .and_then(|raw| raw.as_array())
660 .map(|items| {
661 items
662 .iter()
663 .map(|item| RunDeliverableSummaryRecord {
664 id: item
665 .get("id")
666 .and_then(|value| value.as_str())
667 .unwrap_or_default()
668 .to_string(),
669 text: item
670 .get("text")
671 .and_then(|value| value.as_str())
672 .unwrap_or_default()
673 .to_string(),
674 status: item
675 .get("status")
676 .and_then(|value| value.as_str())
677 .unwrap_or_default()
678 .to_string(),
679 note: item
680 .get("note")
681 .and_then(|value| value.as_str())
682 .map(str::to_string),
683 })
684 .collect::<Vec<_>>()
685 })
686 .unwrap_or_default();
687 let observations = json_string_array(value.get("observations"));
688 let root_task = value
689 .get("root_task")
690 .and_then(|value| value.as_str())
691 .unwrap_or_default()
692 .to_string();
693 let rationale = value
694 .get("rationale")
695 .and_then(|value| value.as_str())
696 .unwrap_or_default()
697 .to_string();
698 if root_task.is_empty()
699 && rationale.is_empty()
700 && deliverables.is_empty()
701 && observations.is_empty()
702 {
703 return None;
704 }
705 let blocking_count = deliverables
706 .iter()
707 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
708 .count();
709 Some(RunTaskLedgerSummaryRecord {
710 root_task,
711 rationale,
712 deliverables,
713 observations,
714 blocking_count,
715 })
716}
717
718fn compaction_events_from_transcript(
719 transcript: &serde_json::Value,
720 stage_id: Option<&str>,
721 node_id: Option<&str>,
722 location_prefix: &str,
723 persisted_path: Option<&Path>,
724) -> Vec<CompactionEventRecord> {
725 let transcript_id = transcript
726 .get("id")
727 .and_then(|value| value.as_str())
728 .map(str::to_string);
729 let asset_ids = transcript
730 .get("assets")
731 .and_then(|value| value.as_array())
732 .map(|assets| {
733 assets
734 .iter()
735 .filter_map(|asset| {
736 asset
737 .get("id")
738 .and_then(|value| value.as_str())
739 .map(str::to_string)
740 })
741 .collect::<BTreeSet<_>>()
742 })
743 .unwrap_or_default();
744 transcript
745 .get("events")
746 .and_then(|value| value.as_array())
747 .map(|events| {
748 events
749 .iter()
750 .filter(|event| {
751 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
752 })
753 .map(|event| {
754 let metadata = event.get("metadata");
755 let snapshot_asset_id = metadata
756 .and_then(|value| value.get("snapshot_asset_id"))
757 .and_then(|value| value.as_str())
758 .map(str::to_string);
759 let available = snapshot_asset_id
760 .as_ref()
761 .is_some_and(|asset_id| asset_ids.contains(asset_id));
762 let snapshot_location = snapshot_asset_id
763 .as_ref()
764 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
765 .unwrap_or_else(|| location_prefix.to_string());
766 CompactionEventRecord {
767 id: event
768 .get("id")
769 .and_then(|value| value.as_str())
770 .unwrap_or_default()
771 .to_string(),
772 transcript_id: transcript_id.clone(),
773 stage_id: stage_id.map(str::to_string),
774 node_id: node_id.map(str::to_string),
775 mode: metadata
776 .and_then(|value| value.get("mode"))
777 .and_then(|value| value.as_str())
778 .unwrap_or_default()
779 .to_string(),
780 strategy: metadata
781 .and_then(|value| value.get("strategy"))
782 .and_then(|value| value.as_str())
783 .unwrap_or_default()
784 .to_string(),
785 archived_messages: json_usize(
786 metadata.and_then(|value| value.get("archived_messages")),
787 ),
788 estimated_tokens_before: json_usize(
789 metadata.and_then(|value| value.get("estimated_tokens_before")),
790 ),
791 estimated_tokens_after: json_usize(
792 metadata.and_then(|value| value.get("estimated_tokens_after")),
793 ),
794 snapshot_asset_id,
795 snapshot_location,
796 snapshot_path: persisted_path
797 .map(|path| path.to_string_lossy().into_owned()),
798 available,
799 }
800 })
801 .collect()
802 })
803 .unwrap_or_default()
804}
805
806fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
807 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
808 return Vec::new();
809 };
810 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
811 return Vec::new();
812 };
813
814 content
815 .lines()
816 .filter(|line| !line.trim().is_empty())
817 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
818 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
819 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
820 .collect()
821}
822
823pub fn derive_run_observability(
824 run: &RunRecord,
825 persisted_path: Option<&Path>,
826) -> RunObservabilityRecord {
827 let mut action_graph_nodes = Vec::new();
828 let mut action_graph_edges = Vec::new();
829 let mut verification_outcomes = Vec::new();
830 let mut planner_rounds = Vec::new();
831 let mut transcript_pointers = Vec::new();
832 let mut compaction_events = Vec::new();
833 let mut daemon_events = Vec::new();
834 let mut research_fact_count = 0usize;
835
836 let root_node_id = format!("run:{}", run.id);
837 let trigger_event = trigger_event_from_run(run);
838 let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
839 append_action_graph_node(
840 &mut action_graph_nodes,
841 RunActionGraphNodeRecord {
842 id: root_node_id.clone(),
843 label: run
844 .workflow_name
845 .clone()
846 .unwrap_or_else(|| run.workflow_id.clone()),
847 kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
848 status: run.status.clone(),
849 outcome: run.status.clone(),
850 trace_id: propagated_trace_id.clone(),
851 stage_id: None,
852 node_id: None,
853 worker_id: None,
854 run_id: Some(run.id.clone()),
855 run_path: run.persisted_path.clone(),
856 metadata: BTreeMap::from([(
857 "workflow_id".to_string(),
858 serde_json::json!(run.workflow_id),
859 )]),
860 },
861 );
862 let mut entry_node_id = root_node_id.clone();
863 if let Some(trigger_event) = trigger_event.as_ref() {
864 if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
865 let replay_source_node_id = format!("trigger:{replay_of_event_id}");
866 append_action_graph_node(
867 &mut action_graph_nodes,
868 RunActionGraphNodeRecord {
869 id: replay_source_node_id.clone(),
870 label: format!(
871 "{}:{} (original {})",
872 trigger_event.provider.as_str(),
873 trigger_event.kind,
874 replay_of_event_id
875 ),
876 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
877 status: "historical".to_string(),
878 outcome: "replayed_from".to_string(),
879 trace_id: Some(trigger_event.trace_id.0.clone()),
880 stage_id: None,
881 node_id: None,
882 worker_id: None,
883 run_id: Some(run.id.clone()),
884 run_path: run.persisted_path.clone(),
885 metadata: trigger_node_metadata(trigger_event),
886 },
887 );
888 action_graph_edges.push(RunActionGraphEdgeRecord {
889 from_id: replay_source_node_id,
890 to_id: format!("trigger:{}", trigger_event.id.0),
891 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
892 label: Some("replay chain".to_string()),
893 });
894 }
895 let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
896 append_action_graph_node(
897 &mut action_graph_nodes,
898 RunActionGraphNodeRecord {
899 id: trigger_node_id.clone(),
900 label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
901 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
902 status: "received".to_string(),
903 outcome: signature_status_label(&trigger_event.signature_status).to_string(),
904 trace_id: Some(trigger_event.trace_id.0.clone()),
905 stage_id: None,
906 node_id: None,
907 worker_id: None,
908 run_id: Some(run.id.clone()),
909 run_path: run.persisted_path.clone(),
910 metadata: trigger_node_metadata(trigger_event),
911 },
912 );
913 action_graph_edges.push(RunActionGraphEdgeRecord {
914 from_id: root_node_id.clone(),
915 to_id: trigger_node_id.clone(),
916 kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
917 label: Some(trigger_event.id.0.clone()),
918 });
919 entry_node_id = trigger_node_id;
920 }
921
922 let stage_node_ids = run
923 .stages
924 .iter()
925 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
926 .collect::<BTreeMap<_, _>>();
927 let stage_by_id = run
928 .stages
929 .iter()
930 .map(|stage| (stage.id.as_str(), stage))
931 .collect::<BTreeMap<_, _>>();
932 let stage_by_node_id = run
933 .stages
934 .iter()
935 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
936 .collect::<BTreeMap<_, _>>();
937
938 let incoming_nodes = run
939 .transitions
940 .iter()
941 .map(|transition| transition.to_node_id.clone())
942 .collect::<BTreeSet<_>>();
943
944 for stage in &run.stages {
945 let graph_node_id = stage_node_ids
946 .get(&stage.id)
947 .cloned()
948 .unwrap_or_else(|| format!("stage:{}", stage.id));
949 append_action_graph_node(
950 &mut action_graph_nodes,
951 RunActionGraphNodeRecord {
952 id: graph_node_id.clone(),
953 label: stage.node_id.clone(),
954 kind: action_graph_kind_for_stage(stage).to_string(),
955 status: stage.status.clone(),
956 outcome: stage.outcome.clone(),
957 trace_id: propagated_trace_id.clone(),
958 stage_id: Some(stage.id.clone()),
959 node_id: Some(stage.node_id.clone()),
960 worker_id: stage
961 .metadata
962 .get("worker_id")
963 .and_then(|value| value.as_str())
964 .map(str::to_string),
965 run_id: None,
966 run_path: None,
967 metadata: stage_node_metadata(stage),
968 },
969 );
970 if !incoming_nodes.contains(&stage.node_id) {
971 action_graph_edges.push(RunActionGraphEdgeRecord {
972 from_id: entry_node_id.clone(),
973 to_id: graph_node_id.clone(),
974 kind: if trigger_event.is_some() {
975 ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
976 } else {
977 ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
978 },
979 label: None,
980 });
981 }
982
983 if stage.kind == "verify" || stage.verification.is_some() {
984 let passed = json_bool(
985 stage
986 .verification
987 .as_ref()
988 .and_then(|value| value.get("pass")),
989 )
990 .or_else(|| {
991 json_bool(
992 stage
993 .verification
994 .as_ref()
995 .and_then(|value| value.get("success")),
996 )
997 })
998 .or_else(|| {
999 if stage.status == "completed" && stage.outcome == "success" {
1000 Some(true)
1001 } else if stage.status == "failed" || stage.outcome == "failed" {
1002 Some(false)
1003 } else {
1004 None
1005 }
1006 });
1007 verification_outcomes.push(RunVerificationOutcomeRecord {
1008 stage_id: stage.id.clone(),
1009 node_id: stage.node_id.clone(),
1010 status: stage.status.clone(),
1011 passed,
1012 summary: stage
1013 .verification
1014 .as_ref()
1015 .map(compact_json_value)
1016 .or_else(|| {
1017 stage
1018 .visible_text
1019 .as_ref()
1020 .filter(|value| !value.trim().is_empty())
1021 .cloned()
1022 }),
1023 });
1024 }
1025
1026 if stage.transcript.is_some() {
1027 transcript_pointers.push(RunTranscriptPointerRecord {
1028 id: format!("stage:{}:transcript", stage.id),
1029 label: format!("Stage {} transcript", stage.node_id),
1030 kind: "embedded_transcript".to_string(),
1031 location: format!("run.stages[{}].transcript", stage.node_id),
1032 path: run.persisted_path.clone(),
1033 available: true,
1034 });
1035 if let Some(transcript) = stage.transcript.as_ref() {
1036 compaction_events.extend(compaction_events_from_transcript(
1037 transcript,
1038 Some(&stage.id),
1039 Some(&stage.node_id),
1040 &format!("run.stages[{}].transcript", stage.node_id),
1041 persisted_path,
1042 ));
1043 }
1044 }
1045
1046 if let Some(payload) = stage_result_payload(stage) {
1047 let trace = payload.get("trace");
1048 let task_ledger = payload
1049 .get("task_ledger")
1050 .and_then(task_ledger_summary_from_value);
1051 let research_facts = task_ledger
1052 .as_ref()
1053 .map(|ledger| ledger.observations.clone())
1054 .unwrap_or_default();
1055 research_fact_count += research_facts.len();
1056 let tools_used = json_string_array(
1057 payload
1058 .get("tools_used")
1059 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1060 );
1061 let successful_tools = json_string_array(payload.get("successful_tools"));
1062 let planner_round = RunPlannerRoundRecord {
1063 stage_id: stage.id.clone(),
1064 node_id: stage.node_id.clone(),
1065 stage_kind: stage.kind.clone(),
1066 status: stage.status.clone(),
1067 outcome: stage.outcome.clone(),
1068 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1069 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1070 tool_execution_count: json_usize(
1071 trace.and_then(|trace| trace.get("tool_executions")),
1072 ),
1073 tool_rejection_count: json_usize(
1074 trace.and_then(|trace| trace.get("tool_rejections")),
1075 ),
1076 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1077 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1078 native_text_tool_fallback_count: json_usize(
1079 trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1080 ),
1081 native_text_tool_fallback_rejection_count: json_usize(
1082 trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1083 ),
1084 empty_completion_retry_count: json_usize(
1085 trace.and_then(|trace| trace.get("empty_completion_retries")),
1086 ),
1087 tools_used,
1088 successful_tools,
1089 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1090 task_ledger,
1091 research_facts,
1092 };
1093 let has_agentic_detail = planner_round.iteration_count > 0
1094 || planner_round.llm_call_count > 0
1095 || planner_round.tool_execution_count > 0
1096 || planner_round.native_text_tool_fallback_count > 0
1097 || planner_round.native_text_tool_fallback_rejection_count > 0
1098 || planner_round.empty_completion_retry_count > 0
1099 || planner_round.ledger_done_rejections > 0
1100 || planner_round.task_ledger.is_some()
1101 || !planner_round.tools_used.is_empty()
1102 || !planner_round.successful_tools.is_empty();
1103 if has_agentic_detail {
1104 planner_rounds.push(planner_round);
1105 }
1106 }
1107 }
1108
1109 for transition in &run.transitions {
1110 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1111 continue;
1112 };
1113 let from_stage = transition
1114 .from_stage_id
1115 .as_deref()
1116 .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1117 let from_id = transition
1118 .from_stage_id
1119 .as_ref()
1120 .and_then(|stage_id| stage_node_ids.get(stage_id))
1121 .cloned()
1122 .or_else(|| {
1123 transition
1124 .from_node_id
1125 .as_ref()
1126 .and_then(|node_id| stage_by_node_id.get(node_id))
1127 .cloned()
1128 })
1129 .unwrap_or_else(|| root_node_id.clone());
1130 action_graph_edges.push(RunActionGraphEdgeRecord {
1131 from_id,
1132 to_id,
1133 kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1134 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1135 } else {
1136 ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1137 },
1138 label: transition.branch.clone(),
1139 });
1140 }
1141
1142 let worker_lineage = run
1143 .child_runs
1144 .iter()
1145 .map(|child| {
1146 let worker_node_id = format!("worker:{}", child.worker_id);
1147 append_action_graph_node(
1148 &mut action_graph_nodes,
1149 RunActionGraphNodeRecord {
1150 id: worker_node_id.clone(),
1151 label: child.worker_name.clone(),
1152 kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1153 status: child.status.clone(),
1154 outcome: child.status.clone(),
1155 trace_id: propagated_trace_id.clone(),
1156 stage_id: child.parent_stage_id.clone(),
1157 node_id: None,
1158 worker_id: Some(child.worker_id.clone()),
1159 run_id: child.run_id.clone(),
1160 run_path: child.run_path.clone(),
1161 metadata: BTreeMap::from([
1162 (
1163 "worker_name".to_string(),
1164 serde_json::json!(child.worker_name),
1165 ),
1166 ("task".to_string(), serde_json::json!(child.task)),
1167 ]),
1168 },
1169 );
1170 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1171 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1172 action_graph_edges.push(RunActionGraphEdgeRecord {
1173 from_id: stage_node_id.clone(),
1174 to_id: worker_node_id,
1175 kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1176 label: Some(child.worker_name.clone()),
1177 });
1178 }
1179 }
1180 RunWorkerLineageRecord {
1181 worker_id: child.worker_id.clone(),
1182 worker_name: child.worker_name.clone(),
1183 parent_stage_id: child.parent_stage_id.clone(),
1184 task: child.task.clone(),
1185 status: child.status.clone(),
1186 session_id: child.session_id.clone(),
1187 parent_session_id: child.parent_session_id.clone(),
1188 run_id: child.run_id.clone(),
1189 run_path: child.run_path.clone(),
1190 snapshot_path: child.snapshot_path.clone(),
1191 }
1192 })
1193 .collect::<Vec<_>>();
1194
1195 if run.transcript.is_some() {
1196 transcript_pointers.push(RunTranscriptPointerRecord {
1197 id: "run:transcript".to_string(),
1198 label: "Run transcript".to_string(),
1199 kind: "embedded_transcript".to_string(),
1200 location: "run.transcript".to_string(),
1201 path: run.persisted_path.clone(),
1202 available: true,
1203 });
1204 if let Some(transcript) = run.transcript.as_ref() {
1205 compaction_events.extend(compaction_events_from_transcript(
1206 transcript,
1207 None,
1208 None,
1209 "run.transcript",
1210 persisted_path,
1211 ));
1212 }
1213 }
1214
1215 if let Some(path) = persisted_path {
1216 if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1217 transcript_pointers.push(RunTranscriptPointerRecord {
1218 id: "run:llm_transcript".to_string(),
1219 label: "LLM transcript sidecar".to_string(),
1220 kind: "llm_jsonl".to_string(),
1221 location: "run sidecar".to_string(),
1222 path: Some(sidecar_path.to_string_lossy().into_owned()),
1223 available: sidecar_path.exists(),
1224 });
1225 }
1226 daemon_events.extend(daemon_events_from_sidecar(path));
1227 }
1228
1229 RunObservabilityRecord {
1230 schema_version: 4,
1231 planner_rounds,
1232 research_fact_count,
1233 action_graph_nodes,
1234 action_graph_edges,
1235 worker_lineage,
1236 verification_outcomes,
1237 transcript_pointers,
1238 compaction_events,
1239 daemon_events,
1240 }
1241}
1242
1243fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1244 run.observability = Some(derive_run_observability(run, persisted_path));
1245}
1246
1247pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1248 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1249 if run.type_name.is_empty() {
1250 run.type_name = "run_record".to_string();
1251 }
1252 if run.id.is_empty() {
1253 run.id = new_id("run");
1254 }
1255 if run.started_at.is_empty() {
1256 run.started_at = now_rfc3339();
1257 }
1258 if run.status.is_empty() {
1259 run.status = "running".to_string();
1260 }
1261 if run.root_run_id.is_none() {
1262 run.root_run_id = Some(run.id.clone());
1263 }
1264 if run.replay_fixture.is_none() {
1265 run.replay_fixture = Some(replay_fixture_from_run(&run));
1266 }
1267 if run.observability.is_none() {
1268 let persisted_path = run.persisted_path.clone();
1269 let persisted = persisted_path.as_deref().map(Path::new);
1270 refresh_run_observability(&mut run, persisted);
1271 }
1272 Ok(run)
1273}
1274
1275pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1276 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1277 if manifest.type_name.is_empty() {
1278 manifest.type_name = "eval_suite_manifest".to_string();
1279 }
1280 if manifest.id.is_empty() {
1281 manifest.id = new_id("eval_suite");
1282 }
1283 Ok(manifest)
1284}
1285
1286fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1287 let content = std::fs::read_to_string(path)
1288 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1289 serde_json::from_str(&content)
1290 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1291}
1292
1293fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1294 let path_buf = PathBuf::from(path);
1295 if path_buf.is_absolute() {
1296 path_buf
1297 } else if let Some(base_dir) = base_dir {
1298 base_dir.join(path_buf)
1299 } else {
1300 path_buf
1301 }
1302}
1303
1304pub fn evaluate_run_suite_manifest(
1305 manifest: &EvalSuiteManifest,
1306) -> Result<ReplayEvalSuiteReport, VmError> {
1307 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1308 let mut reports = Vec::new();
1309 for case in &manifest.cases {
1310 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1311 let run = load_run_record(&run_path)?;
1312 let fixture = match &case.fixture_path {
1313 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1314 None => run
1315 .replay_fixture
1316 .clone()
1317 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1318 };
1319 let eval = evaluate_run_against_fixture(&run, &fixture);
1320 let mut pass = eval.pass;
1321 let mut failures = eval.failures;
1322 let comparison = match &case.compare_to {
1323 Some(path) => {
1324 let baseline_path = resolve_manifest_path(base_dir, path);
1325 let baseline = load_run_record(&baseline_path)?;
1326 let diff = diff_run_records(&baseline, &run);
1327 if !diff.identical {
1328 pass = false;
1329 failures.push(format!(
1330 "run differs from baseline {} with {} stage changes",
1331 baseline_path.display(),
1332 diff.stage_diffs.len()
1333 ));
1334 }
1335 Some(diff)
1336 }
1337 None => None,
1338 };
1339 reports.push(ReplayEvalCaseReport {
1340 run_id: run.id.clone(),
1341 workflow_id: run.workflow_id.clone(),
1342 label: case.label.clone(),
1343 pass,
1344 failures,
1345 stage_count: eval.stage_count,
1346 source_path: Some(run_path.display().to_string()),
1347 comparison,
1348 });
1349 }
1350 let total = reports.len();
1351 let passed = reports.iter().filter(|report| report.pass).count();
1352 let failed = total.saturating_sub(passed);
1353 Ok(ReplayEvalSuiteReport {
1354 pass: failed == 0,
1355 total,
1356 passed,
1357 failed,
1358 cases: reports,
1359 })
1360}
1361
1362#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1364pub(crate) enum DiffOp {
1365 Equal,
1366 Delete,
1367 Insert,
1368}
1369
1370pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1374 let n = a.len() as isize;
1375 let m = b.len() as isize;
1376 if n == 0 && m == 0 {
1377 return Vec::new();
1378 }
1379 if n == 0 {
1380 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1381 }
1382 if m == 0 {
1383 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1384 }
1385
1386 let max_d = (n + m) as usize;
1387 let offset = max_d as isize;
1388 let v_size = 2 * max_d + 1;
1389 let mut v = vec![0isize; v_size];
1390 let mut trace: Vec<Vec<isize>> = Vec::new();
1392
1393 'outer: for d in 0..=max_d as isize {
1394 trace.push(v.clone());
1395 let mut new_v = v.clone();
1396 for k in (-d..=d).step_by(2) {
1397 let ki = (k + offset) as usize;
1398 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1399 v[ki + 1]
1400 } else {
1401 v[ki - 1] + 1
1402 };
1403 let mut y = x - k;
1404 while x < n && y < m && a[x as usize] == b[y as usize] {
1405 x += 1;
1406 y += 1;
1407 }
1408 new_v[ki] = x;
1409 if x >= n && y >= m {
1410 let _ = new_v;
1411 break 'outer;
1412 }
1413 }
1414 v = new_v;
1415 }
1416
1417 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1418 let mut x = n;
1419 let mut y = m;
1420 for d in (1..trace.len() as isize).rev() {
1421 let k = x - y;
1422 let v_prev = &trace[d as usize];
1423 let prev_k = if k == -d
1424 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1425 {
1426 k + 1
1427 } else {
1428 k - 1
1429 };
1430 let prev_x = v_prev[(prev_k + offset) as usize];
1431 let prev_y = prev_x - prev_k;
1432
1433 while x > prev_x && y > prev_y {
1434 x -= 1;
1435 y -= 1;
1436 ops.push((DiffOp::Equal, x as usize));
1437 }
1438 if prev_k < k {
1439 x -= 1;
1440 ops.push((DiffOp::Delete, x as usize));
1441 } else {
1442 y -= 1;
1443 ops.push((DiffOp::Insert, y as usize));
1444 }
1445 }
1446 while x > 0 && y > 0 {
1447 x -= 1;
1448 y -= 1;
1449 ops.push((DiffOp::Equal, x as usize));
1450 }
1451 ops.reverse();
1452 ops
1453}
1454
1455pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1456 let before_lines: Vec<&str> = before.lines().collect();
1457 let after_lines: Vec<&str> = after.lines().collect();
1458 let ops = myers_diff(&before_lines, &after_lines);
1459
1460 let mut diff = String::new();
1461 let file = path.unwrap_or("artifact");
1462 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1463 for &(op, idx) in &ops {
1464 match op {
1465 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1466 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1467 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1468 }
1469 }
1470 diff
1471}
1472
1473pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1474 let path = path
1475 .map(PathBuf::from)
1476 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1477 let mut materialized = run.clone();
1478 if materialized.replay_fixture.is_none() {
1479 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1480 }
1481 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1482 refresh_run_observability(&mut materialized, Some(&path));
1483 if let Some(parent) = path.parent() {
1484 std::fs::create_dir_all(parent)
1485 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1486 }
1487 let json = serde_json::to_string_pretty(&materialized)
1488 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1489 let tmp_path = path.with_extension("json.tmp");
1491 std::fs::write(&tmp_path, &json)
1492 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1493 std::fs::rename(&tmp_path, &path).map_err(|e| {
1494 let _ = std::fs::write(&path, &json);
1496 VmError::Runtime(format!("failed to finalize run record: {e}"))
1497 })?;
1498 if let Some(observability) = materialized.observability.as_ref() {
1499 publish_action_graph_event(&materialized, observability, &path);
1500 }
1501 Ok(path.to_string_lossy().into_owned())
1502}
1503
1504pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1505 let content = std::fs::read_to_string(path)
1506 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1507 let mut run: RunRecord = serde_json::from_str(&content)
1508 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1509 if run.replay_fixture.is_none() {
1510 run.replay_fixture = Some(replay_fixture_from_run(&run));
1511 }
1512 run.persisted_path
1513 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1514 refresh_run_observability(&mut run, Some(path));
1515 Ok(run)
1516}
1517
1518pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1519 ReplayFixture {
1520 type_name: "replay_fixture".to_string(),
1521 id: new_id("fixture"),
1522 source_run_id: run.id.clone(),
1523 workflow_id: run.workflow_id.clone(),
1524 workflow_name: run.workflow_name.clone(),
1525 created_at: now_rfc3339(),
1526 expected_status: run.status.clone(),
1527 stage_assertions: run
1528 .stages
1529 .iter()
1530 .map(|stage| ReplayStageAssertion {
1531 node_id: stage.node_id.clone(),
1532 expected_status: stage.status.clone(),
1533 expected_outcome: stage.outcome.clone(),
1534 expected_branch: stage.branch.clone(),
1535 required_artifact_kinds: stage
1536 .artifacts
1537 .iter()
1538 .map(|artifact| artifact.kind.clone())
1539 .collect(),
1540 visible_text_contains: stage
1541 .visible_text
1542 .as_ref()
1543 .filter(|text| !text.is_empty())
1544 .map(|text| text.chars().take(80).collect()),
1545 })
1546 .collect(),
1547 }
1548}
1549
1550pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1551 let mut failures = Vec::new();
1552 if run.status != fixture.expected_status {
1553 failures.push(format!(
1554 "run status mismatch: expected {}, got {}",
1555 fixture.expected_status, run.status
1556 ));
1557 }
1558 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1559 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1560 for assertion in &fixture.stage_assertions {
1561 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1562 failures.push(format!("missing stage {}", assertion.node_id));
1563 continue;
1564 };
1565 if stage.status != assertion.expected_status {
1566 failures.push(format!(
1567 "stage {} status mismatch: expected {}, got {}",
1568 assertion.node_id, assertion.expected_status, stage.status
1569 ));
1570 }
1571 if stage.outcome != assertion.expected_outcome {
1572 failures.push(format!(
1573 "stage {} outcome mismatch: expected {}, got {}",
1574 assertion.node_id, assertion.expected_outcome, stage.outcome
1575 ));
1576 }
1577 if stage.branch != assertion.expected_branch {
1578 failures.push(format!(
1579 "stage {} branch mismatch: expected {:?}, got {:?}",
1580 assertion.node_id, assertion.expected_branch, stage.branch
1581 ));
1582 }
1583 for required_kind in &assertion.required_artifact_kinds {
1584 if !stage
1585 .artifacts
1586 .iter()
1587 .any(|artifact| &artifact.kind == required_kind)
1588 {
1589 failures.push(format!(
1590 "stage {} missing artifact kind {}",
1591 assertion.node_id, required_kind
1592 ));
1593 }
1594 }
1595 if let Some(snippet) = &assertion.visible_text_contains {
1596 let actual = stage.visible_text.clone().unwrap_or_default();
1597 if !actual.contains(snippet) {
1598 failures.push(format!(
1599 "stage {} visible text does not contain expected snippet {:?}",
1600 assertion.node_id, snippet
1601 ));
1602 }
1603 }
1604 }
1605
1606 ReplayEvalReport {
1607 pass: failures.is_empty(),
1608 failures,
1609 stage_count: run.stages.len(),
1610 }
1611}
1612
1613pub fn evaluate_run_suite(
1614 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1615) -> ReplayEvalSuiteReport {
1616 let mut reports = Vec::new();
1617 for (run, fixture, source_path) in cases {
1618 let report = evaluate_run_against_fixture(&run, &fixture);
1619 reports.push(ReplayEvalCaseReport {
1620 run_id: run.id.clone(),
1621 workflow_id: run.workflow_id.clone(),
1622 label: None,
1623 pass: report.pass,
1624 failures: report.failures,
1625 stage_count: report.stage_count,
1626 source_path,
1627 comparison: None,
1628 });
1629 }
1630 let total = reports.len();
1631 let passed = reports.iter().filter(|report| report.pass).count();
1632 let failed = total.saturating_sub(passed);
1633 ReplayEvalSuiteReport {
1634 pass: failed == 0,
1635 total,
1636 passed,
1637 failed,
1638 cases: reports,
1639 }
1640}
1641
1642pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1643 let mut stage_diffs = Vec::new();
1644 let mut all_node_ids = BTreeSet::new();
1645 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1646 .stages
1647 .iter()
1648 .map(|s| (s.node_id.as_str(), s))
1649 .collect();
1650 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1651 .stages
1652 .iter()
1653 .map(|s| (s.node_id.as_str(), s))
1654 .collect();
1655 all_node_ids.extend(left_by_id.keys().copied());
1656 all_node_ids.extend(right_by_id.keys().copied());
1657
1658 for node_id in all_node_ids {
1659 let left_stage = left_by_id.get(node_id).copied();
1660 let right_stage = right_by_id.get(node_id).copied();
1661 match (left_stage, right_stage) {
1662 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1663 node_id: node_id.to_string(),
1664 change: "removed".to_string(),
1665 details: vec!["stage missing from right run".to_string()],
1666 }),
1667 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1668 node_id: node_id.to_string(),
1669 change: "added".to_string(),
1670 details: vec!["stage missing from left run".to_string()],
1671 }),
1672 (Some(left_stage), Some(right_stage)) => {
1673 let mut details = Vec::new();
1674 if left_stage.status != right_stage.status {
1675 details.push(format!(
1676 "status: {} -> {}",
1677 left_stage.status, right_stage.status
1678 ));
1679 }
1680 if left_stage.outcome != right_stage.outcome {
1681 details.push(format!(
1682 "outcome: {} -> {}",
1683 left_stage.outcome, right_stage.outcome
1684 ));
1685 }
1686 if left_stage.branch != right_stage.branch {
1687 details.push(format!(
1688 "branch: {:?} -> {:?}",
1689 left_stage.branch, right_stage.branch
1690 ));
1691 }
1692 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1693 {
1694 details.push(format!(
1695 "produced_artifacts: {} -> {}",
1696 left_stage.produced_artifact_ids.len(),
1697 right_stage.produced_artifact_ids.len()
1698 ));
1699 }
1700 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1701 details.push(format!(
1702 "artifact_records: {} -> {}",
1703 left_stage.artifacts.len(),
1704 right_stage.artifacts.len()
1705 ));
1706 }
1707 if !details.is_empty() {
1708 stage_diffs.push(RunStageDiffRecord {
1709 node_id: node_id.to_string(),
1710 change: "changed".to_string(),
1711 details,
1712 });
1713 }
1714 }
1715 (None, None) => {}
1716 }
1717 }
1718
1719 let mut tool_diffs = Vec::new();
1720 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1721 .tool_recordings
1722 .iter()
1723 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1724 .collect();
1725 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1726 .tool_recordings
1727 .iter()
1728 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1729 .collect();
1730 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1731 .keys()
1732 .chain(right_tools.keys())
1733 .cloned()
1734 .collect();
1735 for key in &all_tool_keys {
1736 let l = left_tools.get(key);
1737 let r = right_tools.get(key);
1738 let result_changed = match (l, r) {
1739 (Some(a), Some(b)) => a.result != b.result,
1740 _ => true,
1741 };
1742 if result_changed {
1743 tool_diffs.push(ToolCallDiffRecord {
1744 tool_name: key.0.clone(),
1745 args_hash: key.1.clone(),
1746 result_changed,
1747 left_result: l.map(|t| t.result.clone()),
1748 right_result: r.map(|t| t.result.clone()),
1749 });
1750 }
1751 }
1752
1753 let left_observability = left.observability.clone().unwrap_or_else(|| {
1754 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1755 });
1756 let right_observability = right.observability.clone().unwrap_or_else(|| {
1757 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1758 });
1759 let mut observability_diffs = Vec::new();
1760
1761 let left_workers = left_observability
1762 .worker_lineage
1763 .iter()
1764 .map(|worker| {
1765 (
1766 worker.worker_id.clone(),
1767 (
1768 worker.status.clone(),
1769 worker.run_id.clone(),
1770 worker.run_path.clone(),
1771 ),
1772 )
1773 })
1774 .collect::<BTreeMap<_, _>>();
1775 let right_workers = right_observability
1776 .worker_lineage
1777 .iter()
1778 .map(|worker| {
1779 (
1780 worker.worker_id.clone(),
1781 (
1782 worker.status.clone(),
1783 worker.run_id.clone(),
1784 worker.run_path.clone(),
1785 ),
1786 )
1787 })
1788 .collect::<BTreeMap<_, _>>();
1789 let worker_ids = left_workers
1790 .keys()
1791 .chain(right_workers.keys())
1792 .cloned()
1793 .collect::<BTreeSet<_>>();
1794 for worker_id in worker_ids {
1795 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
1796 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1797 section: "worker_lineage".to_string(),
1798 label: worker_id,
1799 details: vec!["worker missing from right run".to_string()],
1800 }),
1801 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1802 section: "worker_lineage".to_string(),
1803 label: worker_id,
1804 details: vec!["worker missing from left run".to_string()],
1805 }),
1806 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
1807 let mut details = Vec::new();
1808 if left_worker.0 != right_worker.0 {
1809 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
1810 }
1811 if left_worker.1 != right_worker.1 {
1812 details.push(format!(
1813 "run_id: {:?} -> {:?}",
1814 left_worker.1, right_worker.1
1815 ));
1816 }
1817 if left_worker.2 != right_worker.2 {
1818 details.push(format!(
1819 "run_path: {:?} -> {:?}",
1820 left_worker.2, right_worker.2
1821 ));
1822 }
1823 observability_diffs.push(RunObservabilityDiffRecord {
1824 section: "worker_lineage".to_string(),
1825 label: worker_id,
1826 details,
1827 });
1828 }
1829 _ => {}
1830 }
1831 }
1832
1833 let left_rounds = left_observability
1834 .planner_rounds
1835 .iter()
1836 .map(|round| (round.stage_id.clone(), round))
1837 .collect::<BTreeMap<_, _>>();
1838 let right_rounds = right_observability
1839 .planner_rounds
1840 .iter()
1841 .map(|round| (round.stage_id.clone(), round))
1842 .collect::<BTreeMap<_, _>>();
1843 let round_ids = left_rounds
1844 .keys()
1845 .chain(right_rounds.keys())
1846 .cloned()
1847 .collect::<BTreeSet<_>>();
1848 for stage_id in round_ids {
1849 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
1850 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1851 section: "planner_rounds".to_string(),
1852 label: stage_id,
1853 details: vec!["planner summary missing from right run".to_string()],
1854 }),
1855 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1856 section: "planner_rounds".to_string(),
1857 label: stage_id,
1858 details: vec!["planner summary missing from left run".to_string()],
1859 }),
1860 (Some(left_round), Some(right_round)) => {
1861 let mut details = Vec::new();
1862 if left_round.iteration_count != right_round.iteration_count {
1863 details.push(format!(
1864 "iterations: {} -> {}",
1865 left_round.iteration_count, right_round.iteration_count
1866 ));
1867 }
1868 if left_round.tool_execution_count != right_round.tool_execution_count {
1869 details.push(format!(
1870 "tool_executions: {} -> {}",
1871 left_round.tool_execution_count, right_round.tool_execution_count
1872 ));
1873 }
1874 if left_round.native_text_tool_fallback_count
1875 != right_round.native_text_tool_fallback_count
1876 {
1877 details.push(format!(
1878 "native_text_tool_fallbacks: {} -> {}",
1879 left_round.native_text_tool_fallback_count,
1880 right_round.native_text_tool_fallback_count
1881 ));
1882 }
1883 if left_round.native_text_tool_fallback_rejection_count
1884 != right_round.native_text_tool_fallback_rejection_count
1885 {
1886 details.push(format!(
1887 "native_text_tool_fallback_rejections: {} -> {}",
1888 left_round.native_text_tool_fallback_rejection_count,
1889 right_round.native_text_tool_fallback_rejection_count
1890 ));
1891 }
1892 if left_round.empty_completion_retry_count
1893 != right_round.empty_completion_retry_count
1894 {
1895 details.push(format!(
1896 "empty_completion_retries: {} -> {}",
1897 left_round.empty_completion_retry_count,
1898 right_round.empty_completion_retry_count
1899 ));
1900 }
1901 if left_round.research_facts != right_round.research_facts {
1902 details.push(format!(
1903 "research_facts: {:?} -> {:?}",
1904 left_round.research_facts, right_round.research_facts
1905 ));
1906 }
1907 let left_deliverables = left_round
1908 .task_ledger
1909 .as_ref()
1910 .map(|ledger| {
1911 ledger
1912 .deliverables
1913 .iter()
1914 .map(|item| format!("{}:{}", item.id, item.status))
1915 .collect::<Vec<_>>()
1916 })
1917 .unwrap_or_default();
1918 let right_deliverables = right_round
1919 .task_ledger
1920 .as_ref()
1921 .map(|ledger| {
1922 ledger
1923 .deliverables
1924 .iter()
1925 .map(|item| format!("{}:{}", item.id, item.status))
1926 .collect::<Vec<_>>()
1927 })
1928 .unwrap_or_default();
1929 if left_deliverables != right_deliverables {
1930 details.push(format!(
1931 "deliverables: {:?} -> {:?}",
1932 left_deliverables, right_deliverables
1933 ));
1934 }
1935 if left_round.successful_tools != right_round.successful_tools {
1936 details.push(format!(
1937 "successful_tools: {:?} -> {:?}",
1938 left_round.successful_tools, right_round.successful_tools
1939 ));
1940 }
1941 if !details.is_empty() {
1942 observability_diffs.push(RunObservabilityDiffRecord {
1943 section: "planner_rounds".to_string(),
1944 label: left_round.node_id.clone(),
1945 details,
1946 });
1947 }
1948 }
1949 _ => {}
1950 }
1951 }
1952
1953 let left_pointers = left_observability
1954 .transcript_pointers
1955 .iter()
1956 .map(|pointer| {
1957 (
1958 pointer.id.clone(),
1959 (
1960 pointer.available,
1961 pointer.path.clone(),
1962 pointer.location.clone(),
1963 ),
1964 )
1965 })
1966 .collect::<BTreeMap<_, _>>();
1967 let right_pointers = right_observability
1968 .transcript_pointers
1969 .iter()
1970 .map(|pointer| {
1971 (
1972 pointer.id.clone(),
1973 (
1974 pointer.available,
1975 pointer.path.clone(),
1976 pointer.location.clone(),
1977 ),
1978 )
1979 })
1980 .collect::<BTreeMap<_, _>>();
1981 let pointer_ids = left_pointers
1982 .keys()
1983 .chain(right_pointers.keys())
1984 .cloned()
1985 .collect::<BTreeSet<_>>();
1986 for pointer_id in pointer_ids {
1987 match (
1988 left_pointers.get(&pointer_id),
1989 right_pointers.get(&pointer_id),
1990 ) {
1991 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
1992 section: "transcript_pointers".to_string(),
1993 label: pointer_id,
1994 details: vec!["pointer missing from right run".to_string()],
1995 }),
1996 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
1997 section: "transcript_pointers".to_string(),
1998 label: pointer_id,
1999 details: vec!["pointer missing from left run".to_string()],
2000 }),
2001 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2002 observability_diffs.push(RunObservabilityDiffRecord {
2003 section: "transcript_pointers".to_string(),
2004 label: pointer_id,
2005 details: vec![format!(
2006 "pointer: {:?} -> {:?}",
2007 left_pointer, right_pointer
2008 )],
2009 });
2010 }
2011 _ => {}
2012 }
2013 }
2014
2015 let left_compactions = left_observability
2016 .compaction_events
2017 .iter()
2018 .map(|event| {
2019 (
2020 event.id.clone(),
2021 (
2022 event.strategy.clone(),
2023 event.archived_messages,
2024 event.snapshot_asset_id.clone(),
2025 event.available,
2026 ),
2027 )
2028 })
2029 .collect::<BTreeMap<_, _>>();
2030 let right_compactions = right_observability
2031 .compaction_events
2032 .iter()
2033 .map(|event| {
2034 (
2035 event.id.clone(),
2036 (
2037 event.strategy.clone(),
2038 event.archived_messages,
2039 event.snapshot_asset_id.clone(),
2040 event.available,
2041 ),
2042 )
2043 })
2044 .collect::<BTreeMap<_, _>>();
2045 let compaction_ids = left_compactions
2046 .keys()
2047 .chain(right_compactions.keys())
2048 .cloned()
2049 .collect::<BTreeSet<_>>();
2050 for compaction_id in compaction_ids {
2051 match (
2052 left_compactions.get(&compaction_id),
2053 right_compactions.get(&compaction_id),
2054 ) {
2055 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2056 section: "compaction_events".to_string(),
2057 label: compaction_id,
2058 details: vec!["compaction event missing from right run".to_string()],
2059 }),
2060 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2061 section: "compaction_events".to_string(),
2062 label: compaction_id,
2063 details: vec!["compaction event missing from left run".to_string()],
2064 }),
2065 (Some(left_event), Some(right_event)) if left_event != right_event => {
2066 observability_diffs.push(RunObservabilityDiffRecord {
2067 section: "compaction_events".to_string(),
2068 label: compaction_id,
2069 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2070 });
2071 }
2072 _ => {}
2073 }
2074 }
2075
2076 let left_daemons = left_observability
2077 .daemon_events
2078 .iter()
2079 .map(|event| {
2080 (
2081 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2082 (
2083 event.name.clone(),
2084 event.persist_path.clone(),
2085 event.payload_summary.clone(),
2086 ),
2087 )
2088 })
2089 .collect::<BTreeMap<_, _>>();
2090 let right_daemons = right_observability
2091 .daemon_events
2092 .iter()
2093 .map(|event| {
2094 (
2095 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2096 (
2097 event.name.clone(),
2098 event.persist_path.clone(),
2099 event.payload_summary.clone(),
2100 ),
2101 )
2102 })
2103 .collect::<BTreeMap<_, _>>();
2104 let daemon_keys = left_daemons
2105 .keys()
2106 .chain(right_daemons.keys())
2107 .cloned()
2108 .collect::<BTreeSet<_>>();
2109 for daemon_key in daemon_keys {
2110 let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2111 match (
2112 left_daemons.get(&daemon_key),
2113 right_daemons.get(&daemon_key),
2114 ) {
2115 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2116 section: "daemon_events".to_string(),
2117 label,
2118 details: vec!["daemon event missing from right run".to_string()],
2119 }),
2120 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2121 section: "daemon_events".to_string(),
2122 label,
2123 details: vec!["daemon event missing from left run".to_string()],
2124 }),
2125 (Some(left_event), Some(right_event)) if left_event != right_event => {
2126 observability_diffs.push(RunObservabilityDiffRecord {
2127 section: "daemon_events".to_string(),
2128 label,
2129 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2130 });
2131 }
2132 _ => {}
2133 }
2134 }
2135
2136 let left_verification = left_observability
2137 .verification_outcomes
2138 .iter()
2139 .map(|item| (item.stage_id.clone(), item))
2140 .collect::<BTreeMap<_, _>>();
2141 let right_verification = right_observability
2142 .verification_outcomes
2143 .iter()
2144 .map(|item| (item.stage_id.clone(), item))
2145 .collect::<BTreeMap<_, _>>();
2146 let verification_ids = left_verification
2147 .keys()
2148 .chain(right_verification.keys())
2149 .cloned()
2150 .collect::<BTreeSet<_>>();
2151 for stage_id in verification_ids {
2152 match (
2153 left_verification.get(&stage_id),
2154 right_verification.get(&stage_id),
2155 ) {
2156 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2157 section: "verification".to_string(),
2158 label: stage_id,
2159 details: vec!["verification missing from right run".to_string()],
2160 }),
2161 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2162 section: "verification".to_string(),
2163 label: stage_id,
2164 details: vec!["verification missing from left run".to_string()],
2165 }),
2166 (Some(left_item), Some(right_item)) if left_item != right_item => {
2167 let mut details = Vec::new();
2168 if left_item.passed != right_item.passed {
2169 details.push(format!(
2170 "passed: {:?} -> {:?}",
2171 left_item.passed, right_item.passed
2172 ));
2173 }
2174 if left_item.summary != right_item.summary {
2175 details.push(format!(
2176 "summary: {:?} -> {:?}",
2177 left_item.summary, right_item.summary
2178 ));
2179 }
2180 observability_diffs.push(RunObservabilityDiffRecord {
2181 section: "verification".to_string(),
2182 label: left_item.node_id.clone(),
2183 details,
2184 });
2185 }
2186 _ => {}
2187 }
2188 }
2189
2190 let left_graph = (
2191 left_observability.action_graph_nodes.len(),
2192 left_observability.action_graph_edges.len(),
2193 );
2194 let right_graph = (
2195 right_observability.action_graph_nodes.len(),
2196 right_observability.action_graph_edges.len(),
2197 );
2198 if left_graph != right_graph {
2199 observability_diffs.push(RunObservabilityDiffRecord {
2200 section: "action_graph".to_string(),
2201 label: "shape".to_string(),
2202 details: vec![format!(
2203 "nodes/edges: {}/{} -> {}/{}",
2204 left_graph.0, left_graph.1, right_graph.0, right_graph.1
2205 )],
2206 });
2207 }
2208
2209 let status_changed = left.status != right.status;
2210 let identical = !status_changed
2211 && stage_diffs.is_empty()
2212 && tool_diffs.is_empty()
2213 && observability_diffs.is_empty()
2214 && left.transitions.len() == right.transitions.len()
2215 && left.artifacts.len() == right.artifacts.len()
2216 && left.checkpoints.len() == right.checkpoints.len();
2217
2218 RunDiffReport {
2219 left_run_id: left.id.clone(),
2220 right_run_id: right.id.clone(),
2221 identical,
2222 status_changed,
2223 left_status: left.status.clone(),
2224 right_status: right.status.clone(),
2225 stage_diffs,
2226 tool_diffs,
2227 observability_diffs,
2228 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2229 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2230 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2231 }
2232}