1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, sync_run_handoffs,
10 ArtifactRecord, CapabilityPolicy, HandoffArtifact,
11};
12use crate::event_log::{
13 active_event_log, AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic,
14};
15use crate::llm::vm_value_to_json;
16use crate::triggers::{SignatureStatus, TriggerEvent};
17use crate::value::{VmError, VmValue};
18
19pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
21pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
22pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
23pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
24pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
25pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
26pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
27pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
28pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
29pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
30
31pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
32pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
33pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
34pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
35pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
36pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
37pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
38pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
39pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct LlmUsageRecord {
44 pub input_tokens: i64,
45 pub output_tokens: i64,
46 pub total_duration_ms: i64,
47 pub call_count: i64,
48 pub total_cost: f64,
49 pub models: Vec<String>,
50}
51
52#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
53#[serde(default)]
54pub struct RunStageRecord {
55 pub id: String,
56 pub node_id: String,
57 pub kind: String,
58 pub status: String,
59 pub outcome: String,
60 pub branch: Option<String>,
61 pub started_at: String,
62 pub finished_at: Option<String>,
63 pub visible_text: Option<String>,
64 pub private_reasoning: Option<String>,
65 pub transcript: Option<serde_json::Value>,
66 pub verification: Option<serde_json::Value>,
67 pub usage: Option<LlmUsageRecord>,
68 pub artifacts: Vec<ArtifactRecord>,
69 pub consumed_artifact_ids: Vec<String>,
70 pub produced_artifact_ids: Vec<String>,
71 pub attempts: Vec<RunStageAttemptRecord>,
72 pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
76#[serde(default)]
77pub struct RunStageAttemptRecord {
78 pub attempt: usize,
79 pub status: String,
80 pub outcome: String,
81 pub branch: Option<String>,
82 pub error: Option<String>,
83 pub verification: Option<serde_json::Value>,
84 pub started_at: String,
85 pub finished_at: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct RunTransitionRecord {
91 pub id: String,
92 pub from_stage_id: Option<String>,
93 pub from_node_id: Option<String>,
94 pub to_node_id: String,
95 pub branch: Option<String>,
96 pub timestamp: String,
97 pub consumed_artifact_ids: Vec<String>,
98 pub produced_artifact_ids: Vec<String>,
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct RunCheckpointRecord {
104 pub id: String,
105 pub ready_nodes: Vec<String>,
106 pub completed_nodes: Vec<String>,
107 pub last_stage_id: Option<String>,
108 pub persisted_at: String,
109 pub reason: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ReplayFixture {
115 #[serde(rename = "_type")]
116 pub type_name: String,
117 pub id: String,
118 pub source_run_id: String,
119 pub workflow_id: String,
120 pub workflow_name: Option<String>,
121 pub created_at: String,
122 pub eval_kind: Option<String>,
123 pub clarifying_question: Option<ClarifyingQuestionEvalSpec>,
124 pub expected_status: String,
125 pub stage_assertions: Vec<ReplayStageAssertion>,
126}
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(default)]
130pub struct ClarifyingQuestionEvalSpec {
131 pub expected_question: Option<String>,
132 pub accepted_questions: Vec<String>,
133 pub required_terms: Vec<String>,
134 pub forbidden_terms: Vec<String>,
135 pub min_questions: usize,
136 pub max_questions: Option<usize>,
137}
138
139#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
140#[serde(default)]
141pub struct ReplayStageAssertion {
142 pub node_id: String,
143 pub expected_status: String,
144 pub expected_outcome: String,
145 pub expected_branch: Option<String>,
146 pub required_artifact_kinds: Vec<String>,
147 pub visible_text_contains: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ReplayEvalReport {
153 pub pass: bool,
154 pub failures: Vec<String>,
155 pub stage_count: usize,
156}
157
158#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct ReplayEvalCaseReport {
161 pub run_id: String,
162 pub workflow_id: String,
163 pub label: Option<String>,
164 pub pass: bool,
165 pub failures: Vec<String>,
166 pub stage_count: usize,
167 pub source_path: Option<String>,
168 pub comparison: Option<RunDiffReport>,
169}
170
171#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
172#[serde(default)]
173pub struct ReplayEvalSuiteReport {
174 pub pass: bool,
175 pub total: usize,
176 pub passed: usize,
177 pub failed: usize,
178 pub cases: Vec<ReplayEvalCaseReport>,
179}
180
181#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
182#[serde(default)]
183pub struct RunDeliverableSummaryRecord {
184 pub id: String,
185 pub text: String,
186 pub status: String,
187 pub note: Option<String>,
188}
189
190#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
191#[serde(default)]
192pub struct RunTaskLedgerSummaryRecord {
193 pub root_task: String,
194 pub rationale: String,
195 pub deliverables: Vec<RunDeliverableSummaryRecord>,
196 pub observations: Vec<String>,
197 pub blocking_count: usize,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunPlannerRoundRecord {
203 pub stage_id: String,
204 pub node_id: String,
205 pub stage_kind: String,
206 pub status: String,
207 pub outcome: String,
208 pub iteration_count: usize,
209 pub llm_call_count: usize,
210 pub tool_execution_count: usize,
211 pub tool_rejection_count: usize,
212 pub intervention_count: usize,
213 pub compaction_count: usize,
214 pub native_text_tool_fallback_count: usize,
215 pub native_text_tool_fallback_rejection_count: usize,
216 pub empty_completion_retry_count: usize,
217 pub tools_used: Vec<String>,
218 pub successful_tools: Vec<String>,
219 pub ledger_done_rejections: usize,
220 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
221 pub research_facts: Vec<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunWorkerLineageRecord {
227 pub worker_id: String,
228 pub worker_name: String,
229 pub parent_stage_id: Option<String>,
230 pub task: String,
231 pub status: String,
232 pub session_id: Option<String>,
233 pub parent_session_id: Option<String>,
234 pub run_id: Option<String>,
235 pub run_path: Option<String>,
236 pub snapshot_path: Option<String>,
237}
238
239#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
240#[serde(default)]
241pub struct RunActionGraphNodeRecord {
242 pub id: String,
243 pub label: String,
244 pub kind: String,
245 pub status: String,
246 pub outcome: String,
247 pub trace_id: Option<String>,
248 pub stage_id: Option<String>,
249 pub node_id: Option<String>,
250 pub worker_id: Option<String>,
251 pub run_id: Option<String>,
252 pub run_path: Option<String>,
253 pub metadata: BTreeMap<String, serde_json::Value>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
257#[serde(default)]
258pub struct RunActionGraphEdgeRecord {
259 pub from_id: String,
260 pub to_id: String,
261 pub kind: String,
262 pub label: Option<String>,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct RunVerificationOutcomeRecord {
268 pub stage_id: String,
269 pub node_id: String,
270 pub status: String,
271 pub passed: Option<bool>,
272 pub summary: Option<String>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct RunTranscriptPointerRecord {
278 pub id: String,
279 pub label: String,
280 pub kind: String,
281 pub location: String,
282 pub path: Option<String>,
283 pub available: bool,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct CompactionEventRecord {
289 pub id: String,
290 pub transcript_id: Option<String>,
291 pub stage_id: Option<String>,
292 pub node_id: Option<String>,
293 pub mode: String,
294 pub strategy: String,
295 pub archived_messages: usize,
296 pub estimated_tokens_before: usize,
297 pub estimated_tokens_after: usize,
298 pub snapshot_asset_id: Option<String>,
299 pub snapshot_location: String,
300 pub snapshot_path: Option<String>,
301 pub available: bool,
302}
303
304#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
305#[serde(rename_all = "snake_case")]
306pub enum DaemonEventKindRecord {
307 #[default]
308 Spawned,
309 Triggered,
310 Snapshotted,
311 Resumed,
312 Stopped,
313}
314
315#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
316#[serde(default)]
317pub struct DaemonEventRecord {
318 pub daemon_id: String,
319 pub name: String,
320 pub kind: DaemonEventKindRecord,
321 pub timestamp: String,
322 pub persist_path: String,
323 pub payload_summary: Option<String>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunObservabilityRecord {
329 pub schema_version: usize,
330 pub planner_rounds: Vec<RunPlannerRoundRecord>,
331 pub research_fact_count: usize,
332 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
333 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
334 pub worker_lineage: Vec<RunWorkerLineageRecord>,
335 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
336 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
337 pub compaction_events: Vec<CompactionEventRecord>,
338 pub daemon_events: Vec<DaemonEventRecord>,
339}
340
341#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(default)]
343pub struct RunStageDiffRecord {
344 pub node_id: String,
345 pub change: String,
346 pub details: Vec<String>,
347}
348
349#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
350#[serde(default)]
351pub struct ToolCallDiffRecord {
352 pub tool_name: String,
353 pub args_hash: String,
354 pub result_changed: bool,
355 pub left_result: Option<String>,
356 pub right_result: Option<String>,
357}
358
359#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
360#[serde(default)]
361pub struct RunObservabilityDiffRecord {
362 pub section: String,
363 pub label: String,
364 pub details: Vec<String>,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct RunDiffReport {
370 pub left_run_id: String,
371 pub right_run_id: String,
372 pub identical: bool,
373 pub status_changed: bool,
374 pub left_status: String,
375 pub right_status: String,
376 pub stage_diffs: Vec<RunStageDiffRecord>,
377 pub tool_diffs: Vec<ToolCallDiffRecord>,
378 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
379 pub transition_count_delta: isize,
380 pub artifact_count_delta: isize,
381 pub checkpoint_count_delta: isize,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct EvalSuiteManifest {
387 #[serde(rename = "_type")]
388 pub type_name: String,
389 pub id: String,
390 pub name: Option<String>,
391 pub base_dir: Option<String>,
392 pub cases: Vec<EvalSuiteCase>,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct EvalSuiteCase {
398 pub label: Option<String>,
399 pub run_path: String,
400 pub fixture_path: Option<String>,
401 pub compare_to: Option<String>,
402}
403
404#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
405#[serde(default)]
406pub struct RunHitlQuestionRecord {
407 pub request_id: String,
408 pub prompt: String,
409 pub agent: String,
410 pub trace_id: Option<String>,
411 pub asked_at: String,
412}
413
414#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
415#[serde(default)]
416pub struct RunRecord {
417 #[serde(rename = "_type")]
418 pub type_name: String,
419 pub id: String,
420 pub workflow_id: String,
421 pub workflow_name: Option<String>,
422 pub task: String,
423 pub status: String,
424 pub started_at: String,
425 pub finished_at: Option<String>,
426 pub parent_run_id: Option<String>,
427 pub root_run_id: Option<String>,
428 pub stages: Vec<RunStageRecord>,
429 pub transitions: Vec<RunTransitionRecord>,
430 pub checkpoints: Vec<RunCheckpointRecord>,
431 pub pending_nodes: Vec<String>,
432 pub completed_nodes: Vec<String>,
433 pub child_runs: Vec<RunChildRecord>,
434 pub artifacts: Vec<ArtifactRecord>,
435 pub handoffs: Vec<HandoffArtifact>,
436 pub policy: CapabilityPolicy,
437 pub execution: Option<RunExecutionRecord>,
438 pub transcript: Option<serde_json::Value>,
439 pub usage: Option<LlmUsageRecord>,
440 pub replay_fixture: Option<ReplayFixture>,
441 pub observability: Option<RunObservabilityRecord>,
442 pub trace_spans: Vec<RunTraceSpanRecord>,
443 pub tool_recordings: Vec<ToolCallRecord>,
444 pub hitl_questions: Vec<RunHitlQuestionRecord>,
445 pub metadata: BTreeMap<String, serde_json::Value>,
446 pub persisted_path: Option<String>,
447}
448
449#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
450#[serde(default)]
451pub struct ToolCallRecord {
452 pub tool_name: String,
453 pub tool_use_id: String,
454 pub args_hash: String,
455 pub result: String,
456 pub is_rejected: bool,
457 pub duration_ms: u64,
458 pub iteration: usize,
459 pub timestamp: String,
460}
461
462pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
464 use std::hash::{Hash, Hasher};
465 let mut hasher = std::collections::hash_map::DefaultHasher::new();
466 tool_name.hash(&mut hasher);
467 let args_str = serde_json::to_string(args).unwrap_or_default();
468 args_str.hash(&mut hasher);
469 format!("{:016x}", hasher.finish())
470}
471
472#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
473#[serde(default)]
474pub struct RunTraceSpanRecord {
475 pub span_id: u64,
476 pub parent_id: Option<u64>,
477 pub kind: String,
478 pub name: String,
479 pub start_ms: u64,
480 pub duration_ms: u64,
481 pub metadata: BTreeMap<String, serde_json::Value>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
485#[serde(default)]
486pub struct RunChildRecord {
487 pub worker_id: String,
488 pub worker_name: String,
489 pub parent_stage_id: Option<String>,
490 pub session_id: Option<String>,
491 pub parent_session_id: Option<String>,
492 pub mutation_scope: Option<String>,
493 pub approval_policy: Option<super::ToolApprovalPolicy>,
494 pub task: String,
495 pub request: Option<serde_json::Value>,
496 pub provenance: Option<serde_json::Value>,
497 pub status: String,
498 pub started_at: String,
499 pub finished_at: Option<String>,
500 pub run_id: Option<String>,
501 pub run_path: Option<String>,
502 pub snapshot_path: Option<String>,
503 pub execution: Option<RunExecutionRecord>,
504}
505
506#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(default)]
508pub struct RunExecutionRecord {
509 pub cwd: Option<String>,
510 pub source_dir: Option<String>,
511 pub env: BTreeMap<String, String>,
512 pub adapter: Option<String>,
513 pub repo_path: Option<String>,
514 pub worktree_path: Option<String>,
515 pub branch: Option<String>,
516 pub base_ref: Option<String>,
517 pub cleanup: Option<String>,
518}
519
520fn compact_json_value(value: &serde_json::Value) -> String {
521 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
522}
523
524fn normalize_question_text(text: &str) -> String {
525 text.chars()
526 .map(|ch| {
527 if ch.is_ascii_alphanumeric() || ch.is_whitespace() {
528 ch.to_ascii_lowercase()
529 } else {
530 ' '
531 }
532 })
533 .collect::<String>()
534 .split_whitespace()
535 .collect::<Vec<_>>()
536 .join(" ")
537}
538
539fn clarifying_min_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
540 spec.min_questions.max(1)
541}
542
543fn clarifying_max_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
544 spec.max_questions.unwrap_or(1).max(1)
545}
546
547fn read_topic_records(
548 log: &AnyEventLog,
549 topic: &Topic,
550) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
551 let mut from = None;
552 let mut records = Vec::new();
553 loop {
554 let batch =
555 futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
556 if batch.is_empty() {
557 break;
558 }
559 from = batch.last().map(|(event_id, _)| *event_id);
560 records.extend(batch);
561 }
562 records
563}
564
565fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
566 let Some(log) = active_event_log() else {
567 return;
568 };
569 let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
570 .expect("static hitl.questions topic should always be valid");
571 let mut merged = run
572 .hitl_questions
573 .iter()
574 .cloned()
575 .map(|question| (question.request_id.clone(), question))
576 .collect::<BTreeMap<_, _>>();
577
578 for (_, event) in read_topic_records(log.as_ref(), &topic) {
579 if event.kind != "hitl.question_asked" {
580 continue;
581 }
582 let payload = &event.payload;
583 let matches_run = event
584 .headers
585 .get("run_id")
586 .is_some_and(|value| value == &run.id)
587 || payload
588 .get("run_id")
589 .and_then(|value| value.as_str())
590 .is_some_and(|value| value == run.id);
591 if !matches_run {
592 continue;
593 }
594 let request_id = payload
595 .get("request_id")
596 .and_then(|value| value.as_str())
597 .or_else(|| event.headers.get("request_id").map(String::as_str))
598 .unwrap_or_default();
599 let prompt = payload
600 .get("payload")
601 .and_then(|value| value.get("prompt"))
602 .and_then(|value| value.as_str())
603 .unwrap_or_default();
604 if request_id.is_empty() || prompt.is_empty() {
605 continue;
606 }
607 merged.insert(
608 request_id.to_string(),
609 RunHitlQuestionRecord {
610 request_id: request_id.to_string(),
611 prompt: prompt.to_string(),
612 agent: payload
613 .get("agent")
614 .and_then(|value| value.as_str())
615 .unwrap_or_default()
616 .to_string(),
617 trace_id: payload
618 .get("trace_id")
619 .and_then(|value| value.as_str())
620 .map(str::to_string),
621 asked_at: payload
622 .get("requested_at")
623 .and_then(|value| value.as_str())
624 .unwrap_or_default()
625 .to_string(),
626 },
627 );
628 }
629
630 run.hitl_questions = merged.into_values().collect();
631 run.hitl_questions.sort_by(|left, right| {
632 (left.asked_at.as_str(), left.request_id.as_str())
633 .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
634 });
635}
636
637fn signature_status_label(status: &SignatureStatus) -> &'static str {
638 match status {
639 SignatureStatus::Verified => "verified",
640 SignatureStatus::Unsigned => "unsigned",
641 SignatureStatus::Failed { .. } => "failed",
642 }
643}
644
645fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
646 run.metadata
647 .get("trigger_event")
648 .cloned()
649 .and_then(|value| serde_json::from_value(value).ok())
650}
651
652fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
653 trigger_event
654 .map(|event| event.trace_id.0.clone())
655 .or_else(|| {
656 run.metadata
657 .get("trace_id")
658 .and_then(|value| value.as_str())
659 .map(str::to_string)
660 })
661}
662
663fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
664 run.metadata
665 .get("replay_of_event_id")
666 .and_then(|value| value.as_str())
667 .map(str::to_string)
668}
669
670fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
671 if stage.kind == "condition" {
672 ACTION_GRAPH_NODE_KIND_PREDICATE
673 } else {
674 ACTION_GRAPH_NODE_KIND_STAGE
675 }
676}
677
678fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
679 let mut metadata = BTreeMap::new();
680 metadata.insert(
681 "provider".to_string(),
682 serde_json::json!(trigger_event.provider.as_str()),
683 );
684 metadata.insert(
685 "event_kind".to_string(),
686 serde_json::json!(trigger_event.kind),
687 );
688 metadata.insert(
689 "dedupe_key".to_string(),
690 serde_json::json!(trigger_event.dedupe_key),
691 );
692 metadata.insert(
693 "signature_status".to_string(),
694 serde_json::json!(signature_status_label(&trigger_event.signature_status)),
695 );
696 metadata
697}
698
699fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
700 let mut metadata = BTreeMap::new();
701 metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
702 if let Some(branch) = stage.branch.as_ref() {
703 metadata.insert("branch".to_string(), serde_json::json!(branch));
704 }
705 if let Some(worker_id) = stage
706 .metadata
707 .get("worker_id")
708 .and_then(|value| value.as_str())
709 {
710 metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
711 }
712 metadata
713}
714
715fn append_action_graph_node(
716 nodes: &mut Vec<RunActionGraphNodeRecord>,
717 record: RunActionGraphNodeRecord,
718) {
719 nodes.push(record);
720}
721
722pub async fn append_action_graph_update(
723 headers: BTreeMap<String, String>,
724 payload: serde_json::Value,
725) -> Result<(), crate::event_log::LogError> {
726 let Some(log) = active_event_log() else {
727 return Ok(());
728 };
729 let topic = Topic::new("observability.action_graph")
730 .expect("static observability.action_graph topic should always be valid");
731 let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
732 log.append(&topic, record).await.map(|_| ())
733}
734
735fn publish_action_graph_event(
736 run: &RunRecord,
737 observability: &RunObservabilityRecord,
738 path: &Path,
739) {
740 let trigger_event = trigger_event_from_run(run);
741 let mut headers = BTreeMap::new();
742 headers.insert("run_id".to_string(), run.id.clone());
743 headers.insert("workflow_id".to_string(), run.workflow_id.clone());
744 if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
745 headers.insert("trace_id".to_string(), trace_id);
746 }
747 let payload = serde_json::json!({
748 "run_id": run.id,
749 "workflow_id": run.workflow_id,
750 "persisted_path": path.to_string_lossy(),
751 "status": run.status,
752 "observability": observability,
753 });
754 if let Ok(handle) = tokio::runtime::Handle::try_current() {
755 handle.spawn(async move {
756 let _ = append_action_graph_update(headers, payload).await;
757 });
758 } else {
759 let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
760 }
761}
762
763fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
764 let stem = run_path.file_stem()?.to_str()?;
765 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
766 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
767}
768
769fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
770 value
771 .and_then(|value| value.as_array())
772 .map(|items| {
773 items
774 .iter()
775 .filter_map(|item| item.as_str().map(str::to_string))
776 .collect::<Vec<_>>()
777 })
778 .unwrap_or_default()
779}
780
781fn json_usize(value: Option<&serde_json::Value>) -> usize {
782 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
783}
784
785fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
786 value.and_then(|value| value.as_bool())
787}
788
789fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
790 stage
791 .artifacts
792 .iter()
793 .find_map(|artifact| artifact.data.as_ref())
794}
795
796fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
797 let deliverables = value
798 .get("deliverables")
799 .and_then(|raw| raw.as_array())
800 .map(|items| {
801 items
802 .iter()
803 .map(|item| RunDeliverableSummaryRecord {
804 id: item
805 .get("id")
806 .and_then(|value| value.as_str())
807 .unwrap_or_default()
808 .to_string(),
809 text: item
810 .get("text")
811 .and_then(|value| value.as_str())
812 .unwrap_or_default()
813 .to_string(),
814 status: item
815 .get("status")
816 .and_then(|value| value.as_str())
817 .unwrap_or_default()
818 .to_string(),
819 note: item
820 .get("note")
821 .and_then(|value| value.as_str())
822 .map(str::to_string),
823 })
824 .collect::<Vec<_>>()
825 })
826 .unwrap_or_default();
827 let observations = json_string_array(value.get("observations"));
828 let root_task = value
829 .get("root_task")
830 .and_then(|value| value.as_str())
831 .unwrap_or_default()
832 .to_string();
833 let rationale = value
834 .get("rationale")
835 .and_then(|value| value.as_str())
836 .unwrap_or_default()
837 .to_string();
838 if root_task.is_empty()
839 && rationale.is_empty()
840 && deliverables.is_empty()
841 && observations.is_empty()
842 {
843 return None;
844 }
845 let blocking_count = deliverables
846 .iter()
847 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
848 .count();
849 Some(RunTaskLedgerSummaryRecord {
850 root_task,
851 rationale,
852 deliverables,
853 observations,
854 blocking_count,
855 })
856}
857
858fn compaction_events_from_transcript(
859 transcript: &serde_json::Value,
860 stage_id: Option<&str>,
861 node_id: Option<&str>,
862 location_prefix: &str,
863 persisted_path: Option<&Path>,
864) -> Vec<CompactionEventRecord> {
865 let transcript_id = transcript
866 .get("id")
867 .and_then(|value| value.as_str())
868 .map(str::to_string);
869 let asset_ids = transcript
870 .get("assets")
871 .and_then(|value| value.as_array())
872 .map(|assets| {
873 assets
874 .iter()
875 .filter_map(|asset| {
876 asset
877 .get("id")
878 .and_then(|value| value.as_str())
879 .map(str::to_string)
880 })
881 .collect::<BTreeSet<_>>()
882 })
883 .unwrap_or_default();
884 transcript
885 .get("events")
886 .and_then(|value| value.as_array())
887 .map(|events| {
888 events
889 .iter()
890 .filter(|event| {
891 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
892 })
893 .map(|event| {
894 let metadata = event.get("metadata");
895 let snapshot_asset_id = metadata
896 .and_then(|value| value.get("snapshot_asset_id"))
897 .and_then(|value| value.as_str())
898 .map(str::to_string);
899 let available = snapshot_asset_id
900 .as_ref()
901 .is_some_and(|asset_id| asset_ids.contains(asset_id));
902 let snapshot_location = snapshot_asset_id
903 .as_ref()
904 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
905 .unwrap_or_else(|| location_prefix.to_string());
906 CompactionEventRecord {
907 id: event
908 .get("id")
909 .and_then(|value| value.as_str())
910 .unwrap_or_default()
911 .to_string(),
912 transcript_id: transcript_id.clone(),
913 stage_id: stage_id.map(str::to_string),
914 node_id: node_id.map(str::to_string),
915 mode: metadata
916 .and_then(|value| value.get("mode"))
917 .and_then(|value| value.as_str())
918 .unwrap_or_default()
919 .to_string(),
920 strategy: metadata
921 .and_then(|value| value.get("strategy"))
922 .and_then(|value| value.as_str())
923 .unwrap_or_default()
924 .to_string(),
925 archived_messages: json_usize(
926 metadata.and_then(|value| value.get("archived_messages")),
927 ),
928 estimated_tokens_before: json_usize(
929 metadata.and_then(|value| value.get("estimated_tokens_before")),
930 ),
931 estimated_tokens_after: json_usize(
932 metadata.and_then(|value| value.get("estimated_tokens_after")),
933 ),
934 snapshot_asset_id,
935 snapshot_location,
936 snapshot_path: persisted_path
937 .map(|path| path.to_string_lossy().into_owned()),
938 available,
939 }
940 })
941 .collect()
942 })
943 .unwrap_or_default()
944}
945
946fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
947 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
948 return Vec::new();
949 };
950 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
951 return Vec::new();
952 };
953
954 content
955 .lines()
956 .filter(|line| !line.trim().is_empty())
957 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
958 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
959 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
960 .collect()
961}
962
963pub fn derive_run_observability(
964 run: &RunRecord,
965 persisted_path: Option<&Path>,
966) -> RunObservabilityRecord {
967 let mut action_graph_nodes = Vec::new();
968 let mut action_graph_edges = Vec::new();
969 let mut verification_outcomes = Vec::new();
970 let mut planner_rounds = Vec::new();
971 let mut transcript_pointers = Vec::new();
972 let mut compaction_events = Vec::new();
973 let mut daemon_events = Vec::new();
974 let mut research_fact_count = 0usize;
975
976 let root_node_id = format!("run:{}", run.id);
977 let trigger_event = trigger_event_from_run(run);
978 let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
979 append_action_graph_node(
980 &mut action_graph_nodes,
981 RunActionGraphNodeRecord {
982 id: root_node_id.clone(),
983 label: run
984 .workflow_name
985 .clone()
986 .unwrap_or_else(|| run.workflow_id.clone()),
987 kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
988 status: run.status.clone(),
989 outcome: run.status.clone(),
990 trace_id: propagated_trace_id.clone(),
991 stage_id: None,
992 node_id: None,
993 worker_id: None,
994 run_id: Some(run.id.clone()),
995 run_path: run.persisted_path.clone(),
996 metadata: BTreeMap::from([(
997 "workflow_id".to_string(),
998 serde_json::json!(run.workflow_id),
999 )]),
1000 },
1001 );
1002 let mut entry_node_id = root_node_id.clone();
1003 if let Some(trigger_event) = trigger_event.as_ref() {
1004 if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
1005 let replay_source_node_id = format!("trigger:{replay_of_event_id}");
1006 append_action_graph_node(
1007 &mut action_graph_nodes,
1008 RunActionGraphNodeRecord {
1009 id: replay_source_node_id.clone(),
1010 label: format!(
1011 "{}:{} (original {})",
1012 trigger_event.provider.as_str(),
1013 trigger_event.kind,
1014 replay_of_event_id
1015 ),
1016 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1017 status: "historical".to_string(),
1018 outcome: "replayed_from".to_string(),
1019 trace_id: Some(trigger_event.trace_id.0.clone()),
1020 stage_id: None,
1021 node_id: None,
1022 worker_id: None,
1023 run_id: Some(run.id.clone()),
1024 run_path: run.persisted_path.clone(),
1025 metadata: trigger_node_metadata(trigger_event),
1026 },
1027 );
1028 action_graph_edges.push(RunActionGraphEdgeRecord {
1029 from_id: replay_source_node_id,
1030 to_id: format!("trigger:{}", trigger_event.id.0),
1031 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1032 label: Some("replay chain".to_string()),
1033 });
1034 }
1035 let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
1036 append_action_graph_node(
1037 &mut action_graph_nodes,
1038 RunActionGraphNodeRecord {
1039 id: trigger_node_id.clone(),
1040 label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
1041 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1042 status: "received".to_string(),
1043 outcome: signature_status_label(&trigger_event.signature_status).to_string(),
1044 trace_id: Some(trigger_event.trace_id.0.clone()),
1045 stage_id: None,
1046 node_id: None,
1047 worker_id: None,
1048 run_id: Some(run.id.clone()),
1049 run_path: run.persisted_path.clone(),
1050 metadata: trigger_node_metadata(trigger_event),
1051 },
1052 );
1053 action_graph_edges.push(RunActionGraphEdgeRecord {
1054 from_id: root_node_id.clone(),
1055 to_id: trigger_node_id.clone(),
1056 kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
1057 label: Some(trigger_event.id.0.clone()),
1058 });
1059 entry_node_id = trigger_node_id;
1060 }
1061
1062 let stage_node_ids = run
1063 .stages
1064 .iter()
1065 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
1066 .collect::<BTreeMap<_, _>>();
1067 let stage_by_id = run
1068 .stages
1069 .iter()
1070 .map(|stage| (stage.id.as_str(), stage))
1071 .collect::<BTreeMap<_, _>>();
1072 let stage_by_node_id = run
1073 .stages
1074 .iter()
1075 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
1076 .collect::<BTreeMap<_, _>>();
1077
1078 let incoming_nodes = run
1079 .transitions
1080 .iter()
1081 .map(|transition| transition.to_node_id.clone())
1082 .collect::<BTreeSet<_>>();
1083
1084 for stage in &run.stages {
1085 let graph_node_id = stage_node_ids
1086 .get(&stage.id)
1087 .cloned()
1088 .unwrap_or_else(|| format!("stage:{}", stage.id));
1089 append_action_graph_node(
1090 &mut action_graph_nodes,
1091 RunActionGraphNodeRecord {
1092 id: graph_node_id.clone(),
1093 label: stage.node_id.clone(),
1094 kind: action_graph_kind_for_stage(stage).to_string(),
1095 status: stage.status.clone(),
1096 outcome: stage.outcome.clone(),
1097 trace_id: propagated_trace_id.clone(),
1098 stage_id: Some(stage.id.clone()),
1099 node_id: Some(stage.node_id.clone()),
1100 worker_id: stage
1101 .metadata
1102 .get("worker_id")
1103 .and_then(|value| value.as_str())
1104 .map(str::to_string),
1105 run_id: None,
1106 run_path: None,
1107 metadata: stage_node_metadata(stage),
1108 },
1109 );
1110 if !incoming_nodes.contains(&stage.node_id) {
1111 action_graph_edges.push(RunActionGraphEdgeRecord {
1112 from_id: entry_node_id.clone(),
1113 to_id: graph_node_id.clone(),
1114 kind: if trigger_event.is_some() {
1115 ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
1116 } else {
1117 ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
1118 },
1119 label: None,
1120 });
1121 }
1122
1123 if stage.kind == "verify" || stage.verification.is_some() {
1124 let passed = json_bool(
1125 stage
1126 .verification
1127 .as_ref()
1128 .and_then(|value| value.get("pass")),
1129 )
1130 .or_else(|| {
1131 json_bool(
1132 stage
1133 .verification
1134 .as_ref()
1135 .and_then(|value| value.get("success")),
1136 )
1137 })
1138 .or_else(|| {
1139 if stage.status == "completed" && stage.outcome == "success" {
1140 Some(true)
1141 } else if stage.status == "failed" || stage.outcome == "failed" {
1142 Some(false)
1143 } else {
1144 None
1145 }
1146 });
1147 verification_outcomes.push(RunVerificationOutcomeRecord {
1148 stage_id: stage.id.clone(),
1149 node_id: stage.node_id.clone(),
1150 status: stage.status.clone(),
1151 passed,
1152 summary: stage
1153 .verification
1154 .as_ref()
1155 .map(compact_json_value)
1156 .or_else(|| {
1157 stage
1158 .visible_text
1159 .as_ref()
1160 .filter(|value| !value.trim().is_empty())
1161 .cloned()
1162 }),
1163 });
1164 }
1165
1166 if stage.transcript.is_some() {
1167 transcript_pointers.push(RunTranscriptPointerRecord {
1168 id: format!("stage:{}:transcript", stage.id),
1169 label: format!("Stage {} transcript", stage.node_id),
1170 kind: "embedded_transcript".to_string(),
1171 location: format!("run.stages[{}].transcript", stage.node_id),
1172 path: run.persisted_path.clone(),
1173 available: true,
1174 });
1175 if let Some(transcript) = stage.transcript.as_ref() {
1176 compaction_events.extend(compaction_events_from_transcript(
1177 transcript,
1178 Some(&stage.id),
1179 Some(&stage.node_id),
1180 &format!("run.stages[{}].transcript", stage.node_id),
1181 persisted_path,
1182 ));
1183 }
1184 }
1185
1186 if let Some(payload) = stage_result_payload(stage) {
1187 let trace = payload.get("trace");
1188 let task_ledger = payload
1189 .get("task_ledger")
1190 .and_then(task_ledger_summary_from_value);
1191 let research_facts = task_ledger
1192 .as_ref()
1193 .map(|ledger| ledger.observations.clone())
1194 .unwrap_or_default();
1195 research_fact_count += research_facts.len();
1196 let tools_used = json_string_array(
1197 payload
1198 .get("tools_used")
1199 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1200 );
1201 let successful_tools = json_string_array(payload.get("successful_tools"));
1202 let planner_round = RunPlannerRoundRecord {
1203 stage_id: stage.id.clone(),
1204 node_id: stage.node_id.clone(),
1205 stage_kind: stage.kind.clone(),
1206 status: stage.status.clone(),
1207 outcome: stage.outcome.clone(),
1208 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1209 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1210 tool_execution_count: json_usize(
1211 trace.and_then(|trace| trace.get("tool_executions")),
1212 ),
1213 tool_rejection_count: json_usize(
1214 trace.and_then(|trace| trace.get("tool_rejections")),
1215 ),
1216 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1217 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1218 native_text_tool_fallback_count: json_usize(
1219 trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1220 ),
1221 native_text_tool_fallback_rejection_count: json_usize(
1222 trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1223 ),
1224 empty_completion_retry_count: json_usize(
1225 trace.and_then(|trace| trace.get("empty_completion_retries")),
1226 ),
1227 tools_used,
1228 successful_tools,
1229 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1230 task_ledger,
1231 research_facts,
1232 };
1233 let has_agentic_detail = planner_round.iteration_count > 0
1234 || planner_round.llm_call_count > 0
1235 || planner_round.tool_execution_count > 0
1236 || planner_round.native_text_tool_fallback_count > 0
1237 || planner_round.native_text_tool_fallback_rejection_count > 0
1238 || planner_round.empty_completion_retry_count > 0
1239 || planner_round.ledger_done_rejections > 0
1240 || planner_round.task_ledger.is_some()
1241 || !planner_round.tools_used.is_empty()
1242 || !planner_round.successful_tools.is_empty();
1243 if has_agentic_detail {
1244 planner_rounds.push(planner_round);
1245 }
1246 }
1247 }
1248
1249 for transition in &run.transitions {
1250 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1251 continue;
1252 };
1253 let from_stage = transition
1254 .from_stage_id
1255 .as_deref()
1256 .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1257 let from_id = transition
1258 .from_stage_id
1259 .as_ref()
1260 .and_then(|stage_id| stage_node_ids.get(stage_id))
1261 .cloned()
1262 .or_else(|| {
1263 transition
1264 .from_node_id
1265 .as_ref()
1266 .and_then(|node_id| stage_by_node_id.get(node_id))
1267 .cloned()
1268 })
1269 .unwrap_or_else(|| root_node_id.clone());
1270 action_graph_edges.push(RunActionGraphEdgeRecord {
1271 from_id,
1272 to_id,
1273 kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1274 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1275 } else {
1276 ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1277 },
1278 label: transition.branch.clone(),
1279 });
1280 }
1281
1282 let worker_lineage = run
1283 .child_runs
1284 .iter()
1285 .map(|child| {
1286 let worker_node_id = format!("worker:{}", child.worker_id);
1287 append_action_graph_node(
1288 &mut action_graph_nodes,
1289 RunActionGraphNodeRecord {
1290 id: worker_node_id.clone(),
1291 label: child.worker_name.clone(),
1292 kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1293 status: child.status.clone(),
1294 outcome: child.status.clone(),
1295 trace_id: propagated_trace_id.clone(),
1296 stage_id: child.parent_stage_id.clone(),
1297 node_id: None,
1298 worker_id: Some(child.worker_id.clone()),
1299 run_id: child.run_id.clone(),
1300 run_path: child.run_path.clone(),
1301 metadata: BTreeMap::from([
1302 (
1303 "worker_name".to_string(),
1304 serde_json::json!(child.worker_name),
1305 ),
1306 ("task".to_string(), serde_json::json!(child.task)),
1307 ]),
1308 },
1309 );
1310 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1311 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1312 action_graph_edges.push(RunActionGraphEdgeRecord {
1313 from_id: stage_node_id.clone(),
1314 to_id: worker_node_id,
1315 kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1316 label: Some(child.worker_name.clone()),
1317 });
1318 }
1319 }
1320 RunWorkerLineageRecord {
1321 worker_id: child.worker_id.clone(),
1322 worker_name: child.worker_name.clone(),
1323 parent_stage_id: child.parent_stage_id.clone(),
1324 task: child.task.clone(),
1325 status: child.status.clone(),
1326 session_id: child.session_id.clone(),
1327 parent_session_id: child.parent_session_id.clone(),
1328 run_id: child.run_id.clone(),
1329 run_path: child.run_path.clone(),
1330 snapshot_path: child.snapshot_path.clone(),
1331 }
1332 })
1333 .collect::<Vec<_>>();
1334
1335 if run.transcript.is_some() {
1336 transcript_pointers.push(RunTranscriptPointerRecord {
1337 id: "run:transcript".to_string(),
1338 label: "Run transcript".to_string(),
1339 kind: "embedded_transcript".to_string(),
1340 location: "run.transcript".to_string(),
1341 path: run.persisted_path.clone(),
1342 available: true,
1343 });
1344 if let Some(transcript) = run.transcript.as_ref() {
1345 compaction_events.extend(compaction_events_from_transcript(
1346 transcript,
1347 None,
1348 None,
1349 "run.transcript",
1350 persisted_path,
1351 ));
1352 }
1353 }
1354
1355 if let Some(path) = persisted_path {
1356 if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1357 transcript_pointers.push(RunTranscriptPointerRecord {
1358 id: "run:llm_transcript".to_string(),
1359 label: "LLM transcript sidecar".to_string(),
1360 kind: "llm_jsonl".to_string(),
1361 location: "run sidecar".to_string(),
1362 path: Some(sidecar_path.to_string_lossy().into_owned()),
1363 available: sidecar_path.exists(),
1364 });
1365 }
1366 daemon_events.extend(daemon_events_from_sidecar(path));
1367 }
1368
1369 RunObservabilityRecord {
1370 schema_version: 4,
1371 planner_rounds,
1372 research_fact_count,
1373 action_graph_nodes,
1374 action_graph_edges,
1375 worker_lineage,
1376 verification_outcomes,
1377 transcript_pointers,
1378 compaction_events,
1379 daemon_events,
1380 }
1381}
1382
1383fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1384 run.observability = Some(derive_run_observability(run, persisted_path));
1385}
1386
1387pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1388 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1389 if run.type_name.is_empty() {
1390 run.type_name = "run_record".to_string();
1391 }
1392 if run.id.is_empty() {
1393 run.id = new_id("run");
1394 }
1395 if run.started_at.is_empty() {
1396 run.started_at = now_rfc3339();
1397 }
1398 if run.status.is_empty() {
1399 run.status = "running".to_string();
1400 }
1401 if run.root_run_id.is_none() {
1402 run.root_run_id = Some(run.id.clone());
1403 }
1404 if run.replay_fixture.is_none() {
1405 run.replay_fixture = Some(replay_fixture_from_run(&run));
1406 }
1407 merge_hitl_questions_from_active_log(&mut run);
1408 sync_run_handoffs(&mut run);
1409 if run.observability.is_none() {
1410 let persisted_path = run.persisted_path.clone();
1411 let persisted = persisted_path.as_deref().map(Path::new);
1412 refresh_run_observability(&mut run, persisted);
1413 }
1414 Ok(run)
1415}
1416
1417pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1418 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1419 if manifest.type_name.is_empty() {
1420 manifest.type_name = "eval_suite_manifest".to_string();
1421 }
1422 if manifest.id.is_empty() {
1423 manifest.id = new_id("eval_suite");
1424 }
1425 Ok(manifest)
1426}
1427
1428fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1429 let content = std::fs::read_to_string(path)
1430 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1431 serde_json::from_str(&content)
1432 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1433}
1434
1435fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1436 let path_buf = PathBuf::from(path);
1437 if path_buf.is_absolute() {
1438 path_buf
1439 } else if let Some(base_dir) = base_dir {
1440 base_dir.join(path_buf)
1441 } else {
1442 path_buf
1443 }
1444}
1445
1446pub fn evaluate_run_suite_manifest(
1447 manifest: &EvalSuiteManifest,
1448) -> Result<ReplayEvalSuiteReport, VmError> {
1449 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1450 let mut reports = Vec::new();
1451 for case in &manifest.cases {
1452 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1453 let run = load_run_record(&run_path)?;
1454 let fixture = match &case.fixture_path {
1455 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1456 None => run
1457 .replay_fixture
1458 .clone()
1459 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1460 };
1461 let eval = evaluate_run_against_fixture(&run, &fixture);
1462 let mut pass = eval.pass;
1463 let mut failures = eval.failures;
1464 let comparison = match &case.compare_to {
1465 Some(path) => {
1466 let baseline_path = resolve_manifest_path(base_dir, path);
1467 let baseline = load_run_record(&baseline_path)?;
1468 let diff = diff_run_records(&baseline, &run);
1469 if !diff.identical {
1470 pass = false;
1471 failures.push(format!(
1472 "run differs from baseline {} with {} stage changes",
1473 baseline_path.display(),
1474 diff.stage_diffs.len()
1475 ));
1476 }
1477 Some(diff)
1478 }
1479 None => None,
1480 };
1481 reports.push(ReplayEvalCaseReport {
1482 run_id: run.id.clone(),
1483 workflow_id: run.workflow_id.clone(),
1484 label: case.label.clone(),
1485 pass,
1486 failures,
1487 stage_count: eval.stage_count,
1488 source_path: Some(run_path.display().to_string()),
1489 comparison,
1490 });
1491 }
1492 let total = reports.len();
1493 let passed = reports.iter().filter(|report| report.pass).count();
1494 let failed = total.saturating_sub(passed);
1495 Ok(ReplayEvalSuiteReport {
1496 pass: failed == 0,
1497 total,
1498 passed,
1499 failed,
1500 cases: reports,
1501 })
1502}
1503
1504#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1506pub(crate) enum DiffOp {
1507 Equal,
1508 Delete,
1509 Insert,
1510}
1511
1512pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1516 let n = a.len() as isize;
1517 let m = b.len() as isize;
1518 if n == 0 && m == 0 {
1519 return Vec::new();
1520 }
1521 if n == 0 {
1522 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1523 }
1524 if m == 0 {
1525 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1526 }
1527
1528 let max_d = (n + m) as usize;
1529 let offset = max_d as isize;
1530 let v_size = 2 * max_d + 1;
1531 let mut v = vec![0isize; v_size];
1532 let mut trace: Vec<Vec<isize>> = Vec::new();
1534
1535 'outer: for d in 0..=max_d as isize {
1536 trace.push(v.clone());
1537 let mut new_v = v.clone();
1538 for k in (-d..=d).step_by(2) {
1539 let ki = (k + offset) as usize;
1540 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1541 v[ki + 1]
1542 } else {
1543 v[ki - 1] + 1
1544 };
1545 let mut y = x - k;
1546 while x < n && y < m && a[x as usize] == b[y as usize] {
1547 x += 1;
1548 y += 1;
1549 }
1550 new_v[ki] = x;
1551 if x >= n && y >= m {
1552 let _ = new_v;
1553 break 'outer;
1554 }
1555 }
1556 v = new_v;
1557 }
1558
1559 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1560 let mut x = n;
1561 let mut y = m;
1562 for d in (1..trace.len() as isize).rev() {
1563 let k = x - y;
1564 let v_prev = &trace[d as usize];
1565 let prev_k = if k == -d
1566 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1567 {
1568 k + 1
1569 } else {
1570 k - 1
1571 };
1572 let prev_x = v_prev[(prev_k + offset) as usize];
1573 let prev_y = prev_x - prev_k;
1574
1575 while x > prev_x && y > prev_y {
1576 x -= 1;
1577 y -= 1;
1578 ops.push((DiffOp::Equal, x as usize));
1579 }
1580 if prev_k < k {
1581 x -= 1;
1582 ops.push((DiffOp::Delete, x as usize));
1583 } else {
1584 y -= 1;
1585 ops.push((DiffOp::Insert, y as usize));
1586 }
1587 }
1588 while x > 0 && y > 0 {
1589 x -= 1;
1590 y -= 1;
1591 ops.push((DiffOp::Equal, x as usize));
1592 }
1593 ops.reverse();
1594 ops
1595}
1596
1597pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1598 let before_lines: Vec<&str> = before.lines().collect();
1599 let after_lines: Vec<&str> = after.lines().collect();
1600 let ops = myers_diff(&before_lines, &after_lines);
1601
1602 let mut diff = String::new();
1603 let file = path.unwrap_or("artifact");
1604 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1605 for &(op, idx) in &ops {
1606 match op {
1607 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1608 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1609 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1610 }
1611 }
1612 diff
1613}
1614
1615pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1616 let path = path
1617 .map(PathBuf::from)
1618 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1619 let mut materialized = run.clone();
1620 merge_hitl_questions_from_active_log(&mut materialized);
1621 if materialized.replay_fixture.is_none() {
1622 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1623 }
1624 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1625 sync_run_handoffs(&mut materialized);
1626 refresh_run_observability(&mut materialized, Some(&path));
1627 if let Some(parent) = path.parent() {
1628 std::fs::create_dir_all(parent)
1629 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1630 }
1631 let json = serde_json::to_string_pretty(&materialized)
1632 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1633 let tmp_path = path.with_extension("json.tmp");
1635 std::fs::write(&tmp_path, &json)
1636 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1637 std::fs::rename(&tmp_path, &path).map_err(|e| {
1638 let _ = std::fs::write(&path, &json);
1640 VmError::Runtime(format!("failed to finalize run record: {e}"))
1641 })?;
1642 if let Some(observability) = materialized.observability.as_ref() {
1643 publish_action_graph_event(&materialized, observability, &path);
1644 }
1645 Ok(path.to_string_lossy().into_owned())
1646}
1647
1648pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1649 let content = std::fs::read_to_string(path)
1650 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1651 let mut run: RunRecord = serde_json::from_str(&content)
1652 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1653 if run.replay_fixture.is_none() {
1654 run.replay_fixture = Some(replay_fixture_from_run(&run));
1655 }
1656 run.persisted_path
1657 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1658 sync_run_handoffs(&mut run);
1659 refresh_run_observability(&mut run, Some(path));
1660 Ok(run)
1661}
1662
1663pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1664 ReplayFixture {
1665 type_name: "replay_fixture".to_string(),
1666 id: new_id("fixture"),
1667 source_run_id: run.id.clone(),
1668 workflow_id: run.workflow_id.clone(),
1669 workflow_name: run.workflow_name.clone(),
1670 created_at: now_rfc3339(),
1671 eval_kind: Some("replay".to_string()),
1672 clarifying_question: None,
1673 expected_status: run.status.clone(),
1674 stage_assertions: run
1675 .stages
1676 .iter()
1677 .map(|stage| ReplayStageAssertion {
1678 node_id: stage.node_id.clone(),
1679 expected_status: stage.status.clone(),
1680 expected_outcome: stage.outcome.clone(),
1681 expected_branch: stage.branch.clone(),
1682 required_artifact_kinds: stage
1683 .artifacts
1684 .iter()
1685 .map(|artifact| artifact.kind.clone())
1686 .collect(),
1687 visible_text_contains: stage
1688 .visible_text
1689 .as_ref()
1690 .filter(|text| !text.is_empty())
1691 .map(|text| text.chars().take(80).collect()),
1692 })
1693 .collect(),
1694 }
1695}
1696
1697pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1698 if fixture.eval_kind.as_deref() == Some("clarifying_question") {
1699 return evaluate_clarifying_question(run, fixture);
1700 }
1701 let mut failures = Vec::new();
1702 if run.status != fixture.expected_status {
1703 failures.push(format!(
1704 "run status mismatch: expected {}, got {}",
1705 fixture.expected_status, run.status
1706 ));
1707 }
1708 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1709 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1710 for assertion in &fixture.stage_assertions {
1711 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1712 failures.push(format!("missing stage {}", assertion.node_id));
1713 continue;
1714 };
1715 if stage.status != assertion.expected_status {
1716 failures.push(format!(
1717 "stage {} status mismatch: expected {}, got {}",
1718 assertion.node_id, assertion.expected_status, stage.status
1719 ));
1720 }
1721 if stage.outcome != assertion.expected_outcome {
1722 failures.push(format!(
1723 "stage {} outcome mismatch: expected {}, got {}",
1724 assertion.node_id, assertion.expected_outcome, stage.outcome
1725 ));
1726 }
1727 if stage.branch != assertion.expected_branch {
1728 failures.push(format!(
1729 "stage {} branch mismatch: expected {:?}, got {:?}",
1730 assertion.node_id, assertion.expected_branch, stage.branch
1731 ));
1732 }
1733 for required_kind in &assertion.required_artifact_kinds {
1734 if !stage
1735 .artifacts
1736 .iter()
1737 .any(|artifact| &artifact.kind == required_kind)
1738 {
1739 failures.push(format!(
1740 "stage {} missing artifact kind {}",
1741 assertion.node_id, required_kind
1742 ));
1743 }
1744 }
1745 if let Some(snippet) = &assertion.visible_text_contains {
1746 let actual = stage.visible_text.clone().unwrap_or_default();
1747 if !actual.contains(snippet) {
1748 failures.push(format!(
1749 "stage {} visible text does not contain expected snippet {:?}",
1750 assertion.node_id, snippet
1751 ));
1752 }
1753 }
1754 }
1755
1756 ReplayEvalReport {
1757 pass: failures.is_empty(),
1758 failures,
1759 stage_count: run.stages.len(),
1760 }
1761}
1762
1763fn evaluate_clarifying_question(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1764 let mut failures = Vec::new();
1765 let spec = fixture.clarifying_question.clone().unwrap_or_default();
1766 let min_questions = clarifying_min_questions(&spec);
1767 let max_questions = clarifying_max_questions(&spec);
1768 let questions = &run.hitl_questions;
1769
1770 if run.status != fixture.expected_status {
1771 failures.push(format!(
1772 "run status mismatch: expected {}, got {}",
1773 fixture.expected_status, run.status
1774 ));
1775 }
1776 if questions.len() < min_questions {
1777 failures.push(format!(
1778 "expected at least {min_questions} clarifying question(s), got {}",
1779 questions.len()
1780 ));
1781 }
1782 if questions.len() > max_questions {
1783 failures.push(format!(
1784 "expected at most {max_questions} clarifying question(s), got {}",
1785 questions.len()
1786 ));
1787 }
1788
1789 let normalized_expected = spec
1790 .expected_question
1791 .as_deref()
1792 .map(normalize_question_text);
1793 let normalized_accepted = spec
1794 .accepted_questions
1795 .iter()
1796 .map(|question| normalize_question_text(question))
1797 .collect::<Vec<_>>();
1798 let required_terms = spec
1799 .required_terms
1800 .iter()
1801 .map(|term| normalize_question_text(term))
1802 .collect::<Vec<_>>();
1803 let forbidden_terms = spec
1804 .forbidden_terms
1805 .iter()
1806 .map(|term| normalize_question_text(term))
1807 .collect::<Vec<_>>();
1808
1809 let matched = questions.iter().any(|question| {
1810 let normalized = normalize_question_text(&question.prompt);
1811 let matches_expected = normalized_expected
1812 .as_ref()
1813 .is_none_or(|expected| &normalized == expected)
1814 && (normalized_accepted.is_empty()
1815 || normalized_accepted
1816 .iter()
1817 .any(|candidate| candidate == &normalized));
1818 let has_required_terms = required_terms
1819 .iter()
1820 .all(|term| normalized.contains(term.as_str()));
1821 let avoids_forbidden_terms = forbidden_terms
1822 .iter()
1823 .all(|term| !normalized.contains(term.as_str()));
1824 matches_expected && has_required_terms && avoids_forbidden_terms
1825 });
1826
1827 if !questions.is_empty()
1828 && (!normalized_accepted.is_empty()
1829 || normalized_expected.is_some()
1830 || !required_terms.is_empty()
1831 || !forbidden_terms.is_empty())
1832 && !matched
1833 {
1834 failures.push(format!(
1835 "no clarifying question matched fixture; actual questions: {}",
1836 questions
1837 .iter()
1838 .map(|question| format!("{:?}", question.prompt))
1839 .collect::<Vec<_>>()
1840 .join(", ")
1841 ));
1842 }
1843
1844 ReplayEvalReport {
1845 pass: failures.is_empty(),
1846 failures,
1847 stage_count: run.stages.len(),
1848 }
1849}
1850
1851pub fn evaluate_run_suite(
1852 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1853) -> ReplayEvalSuiteReport {
1854 let mut reports = Vec::new();
1855 for (run, fixture, source_path) in cases {
1856 let report = evaluate_run_against_fixture(&run, &fixture);
1857 reports.push(ReplayEvalCaseReport {
1858 run_id: run.id.clone(),
1859 workflow_id: run.workflow_id.clone(),
1860 label: None,
1861 pass: report.pass,
1862 failures: report.failures,
1863 stage_count: report.stage_count,
1864 source_path,
1865 comparison: None,
1866 });
1867 }
1868 let total = reports.len();
1869 let passed = reports.iter().filter(|report| report.pass).count();
1870 let failed = total.saturating_sub(passed);
1871 ReplayEvalSuiteReport {
1872 pass: failed == 0,
1873 total,
1874 passed,
1875 failed,
1876 cases: reports,
1877 }
1878}
1879
1880pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1881 let mut stage_diffs = Vec::new();
1882 let mut all_node_ids = BTreeSet::new();
1883 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1884 .stages
1885 .iter()
1886 .map(|s| (s.node_id.as_str(), s))
1887 .collect();
1888 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1889 .stages
1890 .iter()
1891 .map(|s| (s.node_id.as_str(), s))
1892 .collect();
1893 all_node_ids.extend(left_by_id.keys().copied());
1894 all_node_ids.extend(right_by_id.keys().copied());
1895
1896 for node_id in all_node_ids {
1897 let left_stage = left_by_id.get(node_id).copied();
1898 let right_stage = right_by_id.get(node_id).copied();
1899 match (left_stage, right_stage) {
1900 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1901 node_id: node_id.to_string(),
1902 change: "removed".to_string(),
1903 details: vec!["stage missing from right run".to_string()],
1904 }),
1905 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1906 node_id: node_id.to_string(),
1907 change: "added".to_string(),
1908 details: vec!["stage missing from left run".to_string()],
1909 }),
1910 (Some(left_stage), Some(right_stage)) => {
1911 let mut details = Vec::new();
1912 if left_stage.status != right_stage.status {
1913 details.push(format!(
1914 "status: {} -> {}",
1915 left_stage.status, right_stage.status
1916 ));
1917 }
1918 if left_stage.outcome != right_stage.outcome {
1919 details.push(format!(
1920 "outcome: {} -> {}",
1921 left_stage.outcome, right_stage.outcome
1922 ));
1923 }
1924 if left_stage.branch != right_stage.branch {
1925 details.push(format!(
1926 "branch: {:?} -> {:?}",
1927 left_stage.branch, right_stage.branch
1928 ));
1929 }
1930 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1931 {
1932 details.push(format!(
1933 "produced_artifacts: {} -> {}",
1934 left_stage.produced_artifact_ids.len(),
1935 right_stage.produced_artifact_ids.len()
1936 ));
1937 }
1938 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1939 details.push(format!(
1940 "artifact_records: {} -> {}",
1941 left_stage.artifacts.len(),
1942 right_stage.artifacts.len()
1943 ));
1944 }
1945 if !details.is_empty() {
1946 stage_diffs.push(RunStageDiffRecord {
1947 node_id: node_id.to_string(),
1948 change: "changed".to_string(),
1949 details,
1950 });
1951 }
1952 }
1953 (None, None) => {}
1954 }
1955 }
1956
1957 let mut tool_diffs = Vec::new();
1958 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1959 .tool_recordings
1960 .iter()
1961 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1962 .collect();
1963 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1964 .tool_recordings
1965 .iter()
1966 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1967 .collect();
1968 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1969 .keys()
1970 .chain(right_tools.keys())
1971 .cloned()
1972 .collect();
1973 for key in &all_tool_keys {
1974 let l = left_tools.get(key);
1975 let r = right_tools.get(key);
1976 let result_changed = match (l, r) {
1977 (Some(a), Some(b)) => a.result != b.result,
1978 _ => true,
1979 };
1980 if result_changed {
1981 tool_diffs.push(ToolCallDiffRecord {
1982 tool_name: key.0.clone(),
1983 args_hash: key.1.clone(),
1984 result_changed,
1985 left_result: l.map(|t| t.result.clone()),
1986 right_result: r.map(|t| t.result.clone()),
1987 });
1988 }
1989 }
1990
1991 let left_observability = left.observability.clone().unwrap_or_else(|| {
1992 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1993 });
1994 let right_observability = right.observability.clone().unwrap_or_else(|| {
1995 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1996 });
1997 let mut observability_diffs = Vec::new();
1998
1999 let left_workers = left_observability
2000 .worker_lineage
2001 .iter()
2002 .map(|worker| {
2003 (
2004 worker.worker_id.clone(),
2005 (
2006 worker.status.clone(),
2007 worker.run_id.clone(),
2008 worker.run_path.clone(),
2009 ),
2010 )
2011 })
2012 .collect::<BTreeMap<_, _>>();
2013 let right_workers = right_observability
2014 .worker_lineage
2015 .iter()
2016 .map(|worker| {
2017 (
2018 worker.worker_id.clone(),
2019 (
2020 worker.status.clone(),
2021 worker.run_id.clone(),
2022 worker.run_path.clone(),
2023 ),
2024 )
2025 })
2026 .collect::<BTreeMap<_, _>>();
2027 let worker_ids = left_workers
2028 .keys()
2029 .chain(right_workers.keys())
2030 .cloned()
2031 .collect::<BTreeSet<_>>();
2032 for worker_id in worker_ids {
2033 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
2034 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2035 section: "worker_lineage".to_string(),
2036 label: worker_id,
2037 details: vec!["worker missing from right run".to_string()],
2038 }),
2039 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2040 section: "worker_lineage".to_string(),
2041 label: worker_id,
2042 details: vec!["worker missing from left run".to_string()],
2043 }),
2044 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
2045 let mut details = Vec::new();
2046 if left_worker.0 != right_worker.0 {
2047 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
2048 }
2049 if left_worker.1 != right_worker.1 {
2050 details.push(format!(
2051 "run_id: {:?} -> {:?}",
2052 left_worker.1, right_worker.1
2053 ));
2054 }
2055 if left_worker.2 != right_worker.2 {
2056 details.push(format!(
2057 "run_path: {:?} -> {:?}",
2058 left_worker.2, right_worker.2
2059 ));
2060 }
2061 observability_diffs.push(RunObservabilityDiffRecord {
2062 section: "worker_lineage".to_string(),
2063 label: worker_id,
2064 details,
2065 });
2066 }
2067 _ => {}
2068 }
2069 }
2070
2071 let left_rounds = left_observability
2072 .planner_rounds
2073 .iter()
2074 .map(|round| (round.stage_id.clone(), round))
2075 .collect::<BTreeMap<_, _>>();
2076 let right_rounds = right_observability
2077 .planner_rounds
2078 .iter()
2079 .map(|round| (round.stage_id.clone(), round))
2080 .collect::<BTreeMap<_, _>>();
2081 let round_ids = left_rounds
2082 .keys()
2083 .chain(right_rounds.keys())
2084 .cloned()
2085 .collect::<BTreeSet<_>>();
2086 for stage_id in round_ids {
2087 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
2088 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2089 section: "planner_rounds".to_string(),
2090 label: stage_id,
2091 details: vec!["planner summary missing from right run".to_string()],
2092 }),
2093 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2094 section: "planner_rounds".to_string(),
2095 label: stage_id,
2096 details: vec!["planner summary missing from left run".to_string()],
2097 }),
2098 (Some(left_round), Some(right_round)) => {
2099 let mut details = Vec::new();
2100 if left_round.iteration_count != right_round.iteration_count {
2101 details.push(format!(
2102 "iterations: {} -> {}",
2103 left_round.iteration_count, right_round.iteration_count
2104 ));
2105 }
2106 if left_round.tool_execution_count != right_round.tool_execution_count {
2107 details.push(format!(
2108 "tool_executions: {} -> {}",
2109 left_round.tool_execution_count, right_round.tool_execution_count
2110 ));
2111 }
2112 if left_round.native_text_tool_fallback_count
2113 != right_round.native_text_tool_fallback_count
2114 {
2115 details.push(format!(
2116 "native_text_tool_fallbacks: {} -> {}",
2117 left_round.native_text_tool_fallback_count,
2118 right_round.native_text_tool_fallback_count
2119 ));
2120 }
2121 if left_round.native_text_tool_fallback_rejection_count
2122 != right_round.native_text_tool_fallback_rejection_count
2123 {
2124 details.push(format!(
2125 "native_text_tool_fallback_rejections: {} -> {}",
2126 left_round.native_text_tool_fallback_rejection_count,
2127 right_round.native_text_tool_fallback_rejection_count
2128 ));
2129 }
2130 if left_round.empty_completion_retry_count
2131 != right_round.empty_completion_retry_count
2132 {
2133 details.push(format!(
2134 "empty_completion_retries: {} -> {}",
2135 left_round.empty_completion_retry_count,
2136 right_round.empty_completion_retry_count
2137 ));
2138 }
2139 if left_round.research_facts != right_round.research_facts {
2140 details.push(format!(
2141 "research_facts: {:?} -> {:?}",
2142 left_round.research_facts, right_round.research_facts
2143 ));
2144 }
2145 let left_deliverables = left_round
2146 .task_ledger
2147 .as_ref()
2148 .map(|ledger| {
2149 ledger
2150 .deliverables
2151 .iter()
2152 .map(|item| format!("{}:{}", item.id, item.status))
2153 .collect::<Vec<_>>()
2154 })
2155 .unwrap_or_default();
2156 let right_deliverables = right_round
2157 .task_ledger
2158 .as_ref()
2159 .map(|ledger| {
2160 ledger
2161 .deliverables
2162 .iter()
2163 .map(|item| format!("{}:{}", item.id, item.status))
2164 .collect::<Vec<_>>()
2165 })
2166 .unwrap_or_default();
2167 if left_deliverables != right_deliverables {
2168 details.push(format!(
2169 "deliverables: {:?} -> {:?}",
2170 left_deliverables, right_deliverables
2171 ));
2172 }
2173 if left_round.successful_tools != right_round.successful_tools {
2174 details.push(format!(
2175 "successful_tools: {:?} -> {:?}",
2176 left_round.successful_tools, right_round.successful_tools
2177 ));
2178 }
2179 if !details.is_empty() {
2180 observability_diffs.push(RunObservabilityDiffRecord {
2181 section: "planner_rounds".to_string(),
2182 label: left_round.node_id.clone(),
2183 details,
2184 });
2185 }
2186 }
2187 _ => {}
2188 }
2189 }
2190
2191 let left_pointers = left_observability
2192 .transcript_pointers
2193 .iter()
2194 .map(|pointer| {
2195 (
2196 pointer.id.clone(),
2197 (
2198 pointer.available,
2199 pointer.path.clone(),
2200 pointer.location.clone(),
2201 ),
2202 )
2203 })
2204 .collect::<BTreeMap<_, _>>();
2205 let right_pointers = right_observability
2206 .transcript_pointers
2207 .iter()
2208 .map(|pointer| {
2209 (
2210 pointer.id.clone(),
2211 (
2212 pointer.available,
2213 pointer.path.clone(),
2214 pointer.location.clone(),
2215 ),
2216 )
2217 })
2218 .collect::<BTreeMap<_, _>>();
2219 let pointer_ids = left_pointers
2220 .keys()
2221 .chain(right_pointers.keys())
2222 .cloned()
2223 .collect::<BTreeSet<_>>();
2224 for pointer_id in pointer_ids {
2225 match (
2226 left_pointers.get(&pointer_id),
2227 right_pointers.get(&pointer_id),
2228 ) {
2229 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2230 section: "transcript_pointers".to_string(),
2231 label: pointer_id,
2232 details: vec!["pointer missing from right run".to_string()],
2233 }),
2234 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2235 section: "transcript_pointers".to_string(),
2236 label: pointer_id,
2237 details: vec!["pointer missing from left run".to_string()],
2238 }),
2239 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2240 observability_diffs.push(RunObservabilityDiffRecord {
2241 section: "transcript_pointers".to_string(),
2242 label: pointer_id,
2243 details: vec![format!(
2244 "pointer: {:?} -> {:?}",
2245 left_pointer, right_pointer
2246 )],
2247 });
2248 }
2249 _ => {}
2250 }
2251 }
2252
2253 let left_compactions = left_observability
2254 .compaction_events
2255 .iter()
2256 .map(|event| {
2257 (
2258 event.id.clone(),
2259 (
2260 event.strategy.clone(),
2261 event.archived_messages,
2262 event.snapshot_asset_id.clone(),
2263 event.available,
2264 ),
2265 )
2266 })
2267 .collect::<BTreeMap<_, _>>();
2268 let right_compactions = right_observability
2269 .compaction_events
2270 .iter()
2271 .map(|event| {
2272 (
2273 event.id.clone(),
2274 (
2275 event.strategy.clone(),
2276 event.archived_messages,
2277 event.snapshot_asset_id.clone(),
2278 event.available,
2279 ),
2280 )
2281 })
2282 .collect::<BTreeMap<_, _>>();
2283 let compaction_ids = left_compactions
2284 .keys()
2285 .chain(right_compactions.keys())
2286 .cloned()
2287 .collect::<BTreeSet<_>>();
2288 for compaction_id in compaction_ids {
2289 match (
2290 left_compactions.get(&compaction_id),
2291 right_compactions.get(&compaction_id),
2292 ) {
2293 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2294 section: "compaction_events".to_string(),
2295 label: compaction_id,
2296 details: vec!["compaction event missing from right run".to_string()],
2297 }),
2298 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2299 section: "compaction_events".to_string(),
2300 label: compaction_id,
2301 details: vec!["compaction event missing from left run".to_string()],
2302 }),
2303 (Some(left_event), Some(right_event)) if left_event != right_event => {
2304 observability_diffs.push(RunObservabilityDiffRecord {
2305 section: "compaction_events".to_string(),
2306 label: compaction_id,
2307 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2308 });
2309 }
2310 _ => {}
2311 }
2312 }
2313
2314 let left_daemons = left_observability
2315 .daemon_events
2316 .iter()
2317 .map(|event| {
2318 (
2319 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2320 (
2321 event.name.clone(),
2322 event.persist_path.clone(),
2323 event.payload_summary.clone(),
2324 ),
2325 )
2326 })
2327 .collect::<BTreeMap<_, _>>();
2328 let right_daemons = right_observability
2329 .daemon_events
2330 .iter()
2331 .map(|event| {
2332 (
2333 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2334 (
2335 event.name.clone(),
2336 event.persist_path.clone(),
2337 event.payload_summary.clone(),
2338 ),
2339 )
2340 })
2341 .collect::<BTreeMap<_, _>>();
2342 let daemon_keys = left_daemons
2343 .keys()
2344 .chain(right_daemons.keys())
2345 .cloned()
2346 .collect::<BTreeSet<_>>();
2347 for daemon_key in daemon_keys {
2348 let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2349 match (
2350 left_daemons.get(&daemon_key),
2351 right_daemons.get(&daemon_key),
2352 ) {
2353 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2354 section: "daemon_events".to_string(),
2355 label,
2356 details: vec!["daemon event missing from right run".to_string()],
2357 }),
2358 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2359 section: "daemon_events".to_string(),
2360 label,
2361 details: vec!["daemon event missing from left run".to_string()],
2362 }),
2363 (Some(left_event), Some(right_event)) if left_event != right_event => {
2364 observability_diffs.push(RunObservabilityDiffRecord {
2365 section: "daemon_events".to_string(),
2366 label,
2367 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2368 });
2369 }
2370 _ => {}
2371 }
2372 }
2373
2374 let left_verification = left_observability
2375 .verification_outcomes
2376 .iter()
2377 .map(|item| (item.stage_id.clone(), item))
2378 .collect::<BTreeMap<_, _>>();
2379 let right_verification = right_observability
2380 .verification_outcomes
2381 .iter()
2382 .map(|item| (item.stage_id.clone(), item))
2383 .collect::<BTreeMap<_, _>>();
2384 let verification_ids = left_verification
2385 .keys()
2386 .chain(right_verification.keys())
2387 .cloned()
2388 .collect::<BTreeSet<_>>();
2389 for stage_id in verification_ids {
2390 match (
2391 left_verification.get(&stage_id),
2392 right_verification.get(&stage_id),
2393 ) {
2394 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2395 section: "verification".to_string(),
2396 label: stage_id,
2397 details: vec!["verification missing from right run".to_string()],
2398 }),
2399 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2400 section: "verification".to_string(),
2401 label: stage_id,
2402 details: vec!["verification missing from left run".to_string()],
2403 }),
2404 (Some(left_item), Some(right_item)) if left_item != right_item => {
2405 let mut details = Vec::new();
2406 if left_item.passed != right_item.passed {
2407 details.push(format!(
2408 "passed: {:?} -> {:?}",
2409 left_item.passed, right_item.passed
2410 ));
2411 }
2412 if left_item.summary != right_item.summary {
2413 details.push(format!(
2414 "summary: {:?} -> {:?}",
2415 left_item.summary, right_item.summary
2416 ));
2417 }
2418 observability_diffs.push(RunObservabilityDiffRecord {
2419 section: "verification".to_string(),
2420 label: left_item.node_id.clone(),
2421 details,
2422 });
2423 }
2424 _ => {}
2425 }
2426 }
2427
2428 let left_graph = (
2429 left_observability.action_graph_nodes.len(),
2430 left_observability.action_graph_edges.len(),
2431 );
2432 let right_graph = (
2433 right_observability.action_graph_nodes.len(),
2434 right_observability.action_graph_edges.len(),
2435 );
2436 if left_graph != right_graph {
2437 observability_diffs.push(RunObservabilityDiffRecord {
2438 section: "action_graph".to_string(),
2439 label: "shape".to_string(),
2440 details: vec![format!(
2441 "nodes/edges: {}/{} -> {}/{}",
2442 left_graph.0, left_graph.1, right_graph.0, right_graph.1
2443 )],
2444 });
2445 }
2446
2447 let status_changed = left.status != right.status;
2448 let identical = !status_changed
2449 && stage_diffs.is_empty()
2450 && tool_diffs.is_empty()
2451 && observability_diffs.is_empty()
2452 && left.transitions.len() == right.transitions.len()
2453 && left.artifacts.len() == right.artifacts.len()
2454 && left.checkpoints.len() == right.checkpoints.len();
2455
2456 RunDiffReport {
2457 left_run_id: left.id.clone(),
2458 right_run_id: right.id.clone(),
2459 identical,
2460 status_changed,
2461 left_status: left.status.clone(),
2462 right_status: right.status.clone(),
2463 stage_diffs,
2464 tool_diffs,
2465 observability_diffs,
2466 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2467 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2468 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2469 }
2470}