1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9 default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, sync_run_handoffs,
10 ArtifactRecord, CapabilityPolicy, HandoffArtifact,
11};
12use crate::event_log::{
13 active_event_log, AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic,
14};
15use crate::llm::vm_value_to_json;
16use crate::triggers::{SignatureStatus, TriggerEvent};
17use crate::value::{VmError, VmValue};
18
19pub const ACTION_GRAPH_NODE_KIND_RUN: &str = "run";
20pub const ACTION_GRAPH_NODE_KIND_TRIGGER: &str = "trigger";
21pub const ACTION_GRAPH_NODE_KIND_PREDICATE: &str = "predicate";
22pub const ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE: &str = "trigger_predicate";
23pub const ACTION_GRAPH_NODE_KIND_STAGE: &str = "stage";
24pub const ACTION_GRAPH_NODE_KIND_WORKER: &str = "worker";
25pub const ACTION_GRAPH_NODE_KIND_DISPATCH: &str = "dispatch";
26pub const ACTION_GRAPH_NODE_KIND_A2A_HOP: &str = "a2a_hop";
27pub const ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE: &str = "worker_enqueue";
28pub const ACTION_GRAPH_NODE_KIND_RETRY: &str = "retry";
29pub const ACTION_GRAPH_NODE_KIND_DLQ: &str = "dlq";
30
31pub const ACTION_GRAPH_EDGE_KIND_ENTRY: &str = "entry";
32pub const ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH: &str = "trigger_dispatch";
33pub const ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH: &str = "a2a_dispatch";
34pub const ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE: &str = "predicate_gate";
35pub const ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN: &str = "replay_chain";
36pub const ACTION_GRAPH_EDGE_KIND_TRANSITION: &str = "transition";
37pub const ACTION_GRAPH_EDGE_KIND_DELEGATES: &str = "delegates";
38pub const ACTION_GRAPH_EDGE_KIND_RETRY: &str = "retry";
39pub const ACTION_GRAPH_EDGE_KIND_DLQ_MOVE: &str = "dlq_move";
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct LlmUsageRecord {
44 pub input_tokens: i64,
45 pub output_tokens: i64,
46 pub total_duration_ms: i64,
47 pub call_count: i64,
48 pub total_cost: f64,
49 pub models: Vec<String>,
50}
51
52#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
53#[serde(default)]
54pub struct RunStageRecord {
55 pub id: String,
56 pub node_id: String,
57 pub kind: String,
58 pub status: String,
59 pub outcome: String,
60 pub branch: Option<String>,
61 pub started_at: String,
62 pub finished_at: Option<String>,
63 pub visible_text: Option<String>,
64 pub private_reasoning: Option<String>,
65 pub transcript: Option<serde_json::Value>,
66 pub verification: Option<serde_json::Value>,
67 pub usage: Option<LlmUsageRecord>,
68 pub artifacts: Vec<ArtifactRecord>,
69 pub consumed_artifact_ids: Vec<String>,
70 pub produced_artifact_ids: Vec<String>,
71 pub attempts: Vec<RunStageAttemptRecord>,
72 pub metadata: BTreeMap<String, serde_json::Value>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
76#[serde(default)]
77pub struct RunStageAttemptRecord {
78 pub attempt: usize,
79 pub status: String,
80 pub outcome: String,
81 pub branch: Option<String>,
82 pub error: Option<String>,
83 pub verification: Option<serde_json::Value>,
84 pub started_at: String,
85 pub finished_at: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct RunTransitionRecord {
91 pub id: String,
92 pub from_stage_id: Option<String>,
93 pub from_node_id: Option<String>,
94 pub to_node_id: String,
95 pub branch: Option<String>,
96 pub timestamp: String,
97 pub consumed_artifact_ids: Vec<String>,
98 pub produced_artifact_ids: Vec<String>,
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct RunCheckpointRecord {
104 pub id: String,
105 pub ready_nodes: Vec<String>,
106 pub completed_nodes: Vec<String>,
107 pub last_stage_id: Option<String>,
108 pub persisted_at: String,
109 pub reason: String,
110}
111
112#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct ReplayFixture {
115 #[serde(rename = "_type")]
116 pub type_name: String,
117 pub id: String,
118 pub source_run_id: String,
119 pub workflow_id: String,
120 pub workflow_name: Option<String>,
121 pub created_at: String,
122 pub eval_kind: Option<String>,
123 pub clarifying_question: Option<ClarifyingQuestionEvalSpec>,
124 pub expected_status: String,
125 pub stage_assertions: Vec<ReplayStageAssertion>,
126}
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(default)]
130pub struct ClarifyingQuestionEvalSpec {
131 pub expected_question: Option<String>,
132 pub accepted_questions: Vec<String>,
133 pub required_terms: Vec<String>,
134 pub forbidden_terms: Vec<String>,
135 pub min_questions: usize,
136 pub max_questions: Option<usize>,
137}
138
139#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
140#[serde(default)]
141pub struct ReplayStageAssertion {
142 pub node_id: String,
143 pub expected_status: String,
144 pub expected_outcome: String,
145 pub expected_branch: Option<String>,
146 pub required_artifact_kinds: Vec<String>,
147 pub visible_text_contains: Option<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ReplayEvalReport {
153 pub pass: bool,
154 pub failures: Vec<String>,
155 pub stage_count: usize,
156}
157
158#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(default)]
160pub struct ReplayEvalCaseReport {
161 pub run_id: String,
162 pub workflow_id: String,
163 pub label: Option<String>,
164 pub pass: bool,
165 pub failures: Vec<String>,
166 pub stage_count: usize,
167 pub source_path: Option<String>,
168 pub comparison: Option<RunDiffReport>,
169}
170
171#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
172#[serde(default)]
173pub struct ReplayEvalSuiteReport {
174 pub pass: bool,
175 pub total: usize,
176 pub passed: usize,
177 pub failed: usize,
178 pub cases: Vec<ReplayEvalCaseReport>,
179}
180
181#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
182#[serde(default)]
183pub struct RunDeliverableSummaryRecord {
184 pub id: String,
185 pub text: String,
186 pub status: String,
187 pub note: Option<String>,
188}
189
190#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
191#[serde(default)]
192pub struct RunTaskLedgerSummaryRecord {
193 pub root_task: String,
194 pub rationale: String,
195 pub deliverables: Vec<RunDeliverableSummaryRecord>,
196 pub observations: Vec<String>,
197 pub blocking_count: usize,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunPlannerRoundRecord {
203 pub stage_id: String,
204 pub node_id: String,
205 pub stage_kind: String,
206 pub status: String,
207 pub outcome: String,
208 pub iteration_count: usize,
209 pub llm_call_count: usize,
210 pub tool_execution_count: usize,
211 pub tool_rejection_count: usize,
212 pub intervention_count: usize,
213 pub compaction_count: usize,
214 pub native_text_tool_fallback_count: usize,
215 pub native_text_tool_fallback_rejection_count: usize,
216 pub empty_completion_retry_count: usize,
217 pub tools_used: Vec<String>,
218 pub successful_tools: Vec<String>,
219 pub ledger_done_rejections: usize,
220 pub task_ledger: Option<RunTaskLedgerSummaryRecord>,
221 pub research_facts: Vec<String>,
222}
223
224#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(default)]
226pub struct RunWorkerLineageRecord {
227 pub worker_id: String,
228 pub worker_name: String,
229 pub parent_stage_id: Option<String>,
230 pub task: String,
231 pub status: String,
232 pub session_id: Option<String>,
233 pub parent_session_id: Option<String>,
234 pub run_id: Option<String>,
235 pub run_path: Option<String>,
236 pub snapshot_path: Option<String>,
237}
238
239#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
240#[serde(default)]
241pub struct RunActionGraphNodeRecord {
242 pub id: String,
243 pub label: String,
244 pub kind: String,
245 pub status: String,
246 pub outcome: String,
247 pub trace_id: Option<String>,
248 pub stage_id: Option<String>,
249 pub node_id: Option<String>,
250 pub worker_id: Option<String>,
251 pub run_id: Option<String>,
252 pub run_path: Option<String>,
253 pub metadata: BTreeMap<String, serde_json::Value>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
257#[serde(default)]
258pub struct RunActionGraphEdgeRecord {
259 pub from_id: String,
260 pub to_id: String,
261 pub kind: String,
262 pub label: Option<String>,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct RunVerificationOutcomeRecord {
268 pub stage_id: String,
269 pub node_id: String,
270 pub status: String,
271 pub passed: Option<bool>,
272 pub summary: Option<String>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct RunTranscriptPointerRecord {
278 pub id: String,
279 pub label: String,
280 pub kind: String,
281 pub location: String,
282 pub path: Option<String>,
283 pub available: bool,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct CompactionEventRecord {
289 pub id: String,
290 pub transcript_id: Option<String>,
291 pub stage_id: Option<String>,
292 pub node_id: Option<String>,
293 pub mode: String,
294 pub strategy: String,
295 pub archived_messages: usize,
296 pub estimated_tokens_before: usize,
297 pub estimated_tokens_after: usize,
298 pub snapshot_asset_id: Option<String>,
299 pub snapshot_location: String,
300 pub snapshot_path: Option<String>,
301 pub available: bool,
302}
303
304#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
305#[serde(rename_all = "snake_case")]
306pub enum DaemonEventKindRecord {
307 #[default]
308 Spawned,
309 Triggered,
310 Snapshotted,
311 Resumed,
312 Stopped,
313}
314
315#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
316#[serde(default)]
317pub struct DaemonEventRecord {
318 pub daemon_id: String,
319 pub name: String,
320 pub kind: DaemonEventKindRecord,
321 pub timestamp: String,
322 pub persist_path: String,
323 pub payload_summary: Option<String>,
324}
325
326#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
327#[serde(default)]
328pub struct RunObservabilityRecord {
329 pub schema_version: usize,
330 pub planner_rounds: Vec<RunPlannerRoundRecord>,
331 pub research_fact_count: usize,
332 pub action_graph_nodes: Vec<RunActionGraphNodeRecord>,
333 pub action_graph_edges: Vec<RunActionGraphEdgeRecord>,
334 pub worker_lineage: Vec<RunWorkerLineageRecord>,
335 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
336 pub transcript_pointers: Vec<RunTranscriptPointerRecord>,
337 pub compaction_events: Vec<CompactionEventRecord>,
338 pub daemon_events: Vec<DaemonEventRecord>,
339}
340
341#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(default)]
343pub struct RunStageDiffRecord {
344 pub node_id: String,
345 pub change: String,
346 pub details: Vec<String>,
347}
348
349#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
350#[serde(default)]
351pub struct ToolCallDiffRecord {
352 pub tool_name: String,
353 pub args_hash: String,
354 pub result_changed: bool,
355 pub left_result: Option<String>,
356 pub right_result: Option<String>,
357}
358
359#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
360#[serde(default)]
361pub struct RunObservabilityDiffRecord {
362 pub section: String,
363 pub label: String,
364 pub details: Vec<String>,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct RunDiffReport {
370 pub left_run_id: String,
371 pub right_run_id: String,
372 pub identical: bool,
373 pub status_changed: bool,
374 pub left_status: String,
375 pub right_status: String,
376 pub stage_diffs: Vec<RunStageDiffRecord>,
377 pub tool_diffs: Vec<ToolCallDiffRecord>,
378 pub observability_diffs: Vec<RunObservabilityDiffRecord>,
379 pub transition_count_delta: isize,
380 pub artifact_count_delta: isize,
381 pub checkpoint_count_delta: isize,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct EvalSuiteManifest {
387 #[serde(rename = "_type")]
388 pub type_name: String,
389 pub id: String,
390 pub name: Option<String>,
391 pub base_dir: Option<String>,
392 pub cases: Vec<EvalSuiteCase>,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct EvalSuiteCase {
398 pub label: Option<String>,
399 pub run_path: String,
400 pub fixture_path: Option<String>,
401 pub compare_to: Option<String>,
402}
403
404#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
405#[serde(default)]
406pub struct RunHitlQuestionRecord {
407 pub request_id: String,
408 pub prompt: String,
409 pub agent: String,
410 pub trace_id: Option<String>,
411 pub asked_at: String,
412}
413
414#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
415#[serde(default)]
416pub struct RunRecord {
417 #[serde(rename = "_type")]
418 pub type_name: String,
419 pub id: String,
420 pub workflow_id: String,
421 pub workflow_name: Option<String>,
422 pub task: String,
423 pub status: String,
424 pub started_at: String,
425 pub finished_at: Option<String>,
426 pub parent_run_id: Option<String>,
427 pub root_run_id: Option<String>,
428 pub stages: Vec<RunStageRecord>,
429 pub transitions: Vec<RunTransitionRecord>,
430 pub checkpoints: Vec<RunCheckpointRecord>,
431 pub pending_nodes: Vec<String>,
432 pub completed_nodes: Vec<String>,
433 pub child_runs: Vec<RunChildRecord>,
434 pub artifacts: Vec<ArtifactRecord>,
435 pub handoffs: Vec<HandoffArtifact>,
436 pub policy: CapabilityPolicy,
437 pub execution: Option<RunExecutionRecord>,
438 pub transcript: Option<serde_json::Value>,
439 pub usage: Option<LlmUsageRecord>,
440 pub replay_fixture: Option<ReplayFixture>,
441 pub observability: Option<RunObservabilityRecord>,
442 pub trace_spans: Vec<RunTraceSpanRecord>,
443 pub tool_recordings: Vec<ToolCallRecord>,
444 pub hitl_questions: Vec<RunHitlQuestionRecord>,
445 pub metadata: BTreeMap<String, serde_json::Value>,
446 pub persisted_path: Option<String>,
447}
448
449#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
450#[serde(default)]
451pub struct ToolCallRecord {
452 pub tool_name: String,
453 pub tool_use_id: String,
454 pub args_hash: String,
455 pub result: String,
456 pub is_rejected: bool,
457 pub duration_ms: u64,
458 pub iteration: usize,
459 pub timestamp: String,
460}
461
462pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
464 use std::hash::{Hash, Hasher};
465 let mut hasher = std::collections::hash_map::DefaultHasher::new();
466 tool_name.hash(&mut hasher);
467 let args_str = serde_json::to_string(args).unwrap_or_default();
468 args_str.hash(&mut hasher);
469 format!("{:016x}", hasher.finish())
470}
471
472#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
473#[serde(default)]
474pub struct RunTraceSpanRecord {
475 pub span_id: u64,
476 pub parent_id: Option<u64>,
477 pub kind: String,
478 pub name: String,
479 pub start_ms: u64,
480 pub duration_ms: u64,
481 pub metadata: BTreeMap<String, serde_json::Value>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
485#[serde(default)]
486pub struct RunChildRecord {
487 pub worker_id: String,
488 pub worker_name: String,
489 pub parent_stage_id: Option<String>,
490 pub session_id: Option<String>,
491 pub parent_session_id: Option<String>,
492 pub mutation_scope: Option<String>,
493 pub approval_policy: Option<super::ToolApprovalPolicy>,
494 pub task: String,
495 pub request: Option<serde_json::Value>,
496 pub provenance: Option<serde_json::Value>,
497 pub status: String,
498 pub started_at: String,
499 pub finished_at: Option<String>,
500 pub run_id: Option<String>,
501 pub run_path: Option<String>,
502 pub snapshot_path: Option<String>,
503 pub execution: Option<RunExecutionRecord>,
504}
505
506#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(default)]
508pub struct RunExecutionRecord {
509 pub cwd: Option<String>,
510 pub source_dir: Option<String>,
511 pub env: BTreeMap<String, String>,
512 pub adapter: Option<String>,
513 pub repo_path: Option<String>,
514 pub worktree_path: Option<String>,
515 pub branch: Option<String>,
516 pub base_ref: Option<String>,
517 pub cleanup: Option<String>,
518}
519
520fn compact_json_value(value: &serde_json::Value) -> String {
521 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
522}
523
524fn normalize_question_text(text: &str) -> String {
525 text.chars()
526 .map(|ch| {
527 if ch.is_ascii_alphanumeric() || ch.is_whitespace() {
528 ch.to_ascii_lowercase()
529 } else {
530 ' '
531 }
532 })
533 .collect::<String>()
534 .split_whitespace()
535 .collect::<Vec<_>>()
536 .join(" ")
537}
538
539fn clarifying_min_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
540 spec.min_questions.max(1)
541}
542
543fn clarifying_max_questions(spec: &ClarifyingQuestionEvalSpec) -> usize {
544 spec.max_questions.unwrap_or(1).max(1)
545}
546
547fn read_topic_records(
548 log: &AnyEventLog,
549 topic: &Topic,
550) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
551 let mut from = None;
552 let mut records = Vec::new();
553 loop {
554 let batch =
555 futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
556 if batch.is_empty() {
557 break;
558 }
559 from = batch.last().map(|(event_id, _)| *event_id);
560 records.extend(batch);
561 }
562 records
563}
564
565fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
566 let Some(log) = active_event_log() else {
567 return;
568 };
569 let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
570 .expect("static hitl.questions topic should always be valid");
571 let mut merged = run
572 .hitl_questions
573 .iter()
574 .cloned()
575 .map(|question| (question.request_id.clone(), question))
576 .collect::<BTreeMap<_, _>>();
577
578 for (_, event) in read_topic_records(log.as_ref(), &topic) {
579 if event.kind != "hitl.question_asked" {
580 continue;
581 }
582 let payload = &event.payload;
583 let matches_run = event
584 .headers
585 .get("run_id")
586 .is_some_and(|value| value == &run.id)
587 || payload
588 .get("run_id")
589 .and_then(|value| value.as_str())
590 .is_some_and(|value| value == run.id);
591 if !matches_run {
592 continue;
593 }
594 let request_id = payload
595 .get("request_id")
596 .and_then(|value| value.as_str())
597 .or_else(|| event.headers.get("request_id").map(String::as_str))
598 .unwrap_or_default();
599 let prompt = payload
600 .get("payload")
601 .and_then(|value| value.get("prompt"))
602 .and_then(|value| value.as_str())
603 .unwrap_or_default();
604 if request_id.is_empty() || prompt.is_empty() {
605 continue;
606 }
607 merged.insert(
608 request_id.to_string(),
609 RunHitlQuestionRecord {
610 request_id: request_id.to_string(),
611 prompt: prompt.to_string(),
612 agent: payload
613 .get("agent")
614 .and_then(|value| value.as_str())
615 .unwrap_or_default()
616 .to_string(),
617 trace_id: payload
618 .get("trace_id")
619 .and_then(|value| value.as_str())
620 .map(str::to_string),
621 asked_at: payload
622 .get("requested_at")
623 .and_then(|value| value.as_str())
624 .unwrap_or_default()
625 .to_string(),
626 },
627 );
628 }
629
630 run.hitl_questions = merged.into_values().collect();
631 run.hitl_questions.sort_by(|left, right| {
632 (left.asked_at.as_str(), left.request_id.as_str())
633 .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
634 });
635}
636
637fn signature_status_label(status: &SignatureStatus) -> &'static str {
638 match status {
639 SignatureStatus::Verified => "verified",
640 SignatureStatus::Unsigned => "unsigned",
641 SignatureStatus::Failed { .. } => "failed",
642 }
643}
644
645fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
646 run.metadata
647 .get("trigger_event")
648 .cloned()
649 .and_then(|value| serde_json::from_value(value).ok())
650}
651
652fn run_trace_id(run: &RunRecord, trigger_event: Option<&TriggerEvent>) -> Option<String> {
653 trigger_event
654 .map(|event| event.trace_id.0.clone())
655 .or_else(|| {
656 run.metadata
657 .get("trace_id")
658 .and_then(|value| value.as_str())
659 .map(str::to_string)
660 })
661}
662
663fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
664 run.metadata
665 .get("replay_of_event_id")
666 .and_then(|value| value.as_str())
667 .map(str::to_string)
668}
669
670fn action_graph_kind_for_stage(stage: &RunStageRecord) -> &'static str {
671 if stage.kind == "condition" {
672 ACTION_GRAPH_NODE_KIND_PREDICATE
673 } else {
674 ACTION_GRAPH_NODE_KIND_STAGE
675 }
676}
677
678fn trigger_node_metadata(trigger_event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
679 let mut metadata = BTreeMap::new();
680 metadata.insert(
681 "provider".to_string(),
682 serde_json::json!(trigger_event.provider.as_str()),
683 );
684 metadata.insert(
685 "event_kind".to_string(),
686 serde_json::json!(trigger_event.kind),
687 );
688 metadata.insert(
689 "dedupe_key".to_string(),
690 serde_json::json!(trigger_event.dedupe_key),
691 );
692 metadata.insert(
693 "signature_status".to_string(),
694 serde_json::json!(signature_status_label(&trigger_event.signature_status)),
695 );
696 metadata
697}
698
699fn stage_node_metadata(stage: &RunStageRecord) -> BTreeMap<String, serde_json::Value> {
700 let mut metadata = BTreeMap::new();
701 metadata.insert("stage_kind".to_string(), serde_json::json!(stage.kind));
702 if let Some(branch) = stage.branch.as_ref() {
703 metadata.insert("branch".to_string(), serde_json::json!(branch));
704 }
705 if let Some(worker_id) = stage
706 .metadata
707 .get("worker_id")
708 .and_then(|value| value.as_str())
709 {
710 metadata.insert("worker_id".to_string(), serde_json::json!(worker_id));
711 }
712 metadata
713}
714
715fn append_action_graph_node(
716 nodes: &mut Vec<RunActionGraphNodeRecord>,
717 record: RunActionGraphNodeRecord,
718) {
719 nodes.push(record);
720}
721
722pub async fn append_action_graph_update(
723 headers: BTreeMap<String, String>,
724 payload: serde_json::Value,
725) -> Result<(), crate::event_log::LogError> {
726 let Some(log) = active_event_log() else {
727 return Ok(());
728 };
729 let topic = Topic::new("observability.action_graph")
730 .expect("static observability.action_graph topic should always be valid");
731 let record = EventLogRecord::new("action_graph_update", payload).with_headers(headers);
732 log.append(&topic, record).await.map(|_| ())
733}
734
735fn publish_action_graph_event(
736 run: &RunRecord,
737 observability: &RunObservabilityRecord,
738 path: &Path,
739) {
740 let trigger_event = trigger_event_from_run(run);
741 let mut headers = BTreeMap::new();
742 headers.insert("run_id".to_string(), run.id.clone());
743 headers.insert("workflow_id".to_string(), run.workflow_id.clone());
744 if let Some(trace_id) = run_trace_id(run, trigger_event.as_ref()) {
745 headers.insert("trace_id".to_string(), trace_id);
746 }
747 let payload = serde_json::json!({
748 "run_id": run.id,
749 "workflow_id": run.workflow_id,
750 "persisted_path": path.to_string_lossy(),
751 "status": run.status,
752 "observability": observability,
753 });
754 if let Ok(handle) = tokio::runtime::Handle::try_current() {
755 handle.spawn(async move {
756 let _ = append_action_graph_update(headers, payload).await;
757 });
758 } else {
759 let _ = futures::executor::block_on(append_action_graph_update(headers, payload));
760 }
761}
762
763fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
764 let stem = run_path.file_stem()?.to_str()?;
765 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
766 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
767}
768
769fn json_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
770 value
771 .and_then(|value| value.as_array())
772 .map(|items| {
773 items
774 .iter()
775 .filter_map(|item| item.as_str().map(str::to_string))
776 .collect::<Vec<_>>()
777 })
778 .unwrap_or_default()
779}
780
781fn json_usize(value: Option<&serde_json::Value>) -> usize {
782 value.and_then(|value| value.as_u64()).unwrap_or_default() as usize
783}
784
785fn json_bool(value: Option<&serde_json::Value>) -> Option<bool> {
786 value.and_then(|value| value.as_bool())
787}
788
789fn stage_result_payload(stage: &RunStageRecord) -> Option<&serde_json::Value> {
790 stage
791 .artifacts
792 .iter()
793 .find_map(|artifact| artifact.data.as_ref())
794}
795
796fn task_ledger_summary_from_value(value: &serde_json::Value) -> Option<RunTaskLedgerSummaryRecord> {
797 let deliverables = value
798 .get("deliverables")
799 .and_then(|raw| raw.as_array())
800 .map(|items| {
801 items
802 .iter()
803 .map(|item| RunDeliverableSummaryRecord {
804 id: item
805 .get("id")
806 .and_then(|value| value.as_str())
807 .unwrap_or_default()
808 .to_string(),
809 text: item
810 .get("text")
811 .and_then(|value| value.as_str())
812 .unwrap_or_default()
813 .to_string(),
814 status: item
815 .get("status")
816 .and_then(|value| value.as_str())
817 .unwrap_or_default()
818 .to_string(),
819 note: item
820 .get("note")
821 .and_then(|value| value.as_str())
822 .map(str::to_string),
823 })
824 .collect::<Vec<_>>()
825 })
826 .unwrap_or_default();
827 let observations = json_string_array(value.get("observations"));
828 let root_task = value
829 .get("root_task")
830 .and_then(|value| value.as_str())
831 .unwrap_or_default()
832 .to_string();
833 let rationale = value
834 .get("rationale")
835 .and_then(|value| value.as_str())
836 .unwrap_or_default()
837 .to_string();
838 if root_task.is_empty()
839 && rationale.is_empty()
840 && deliverables.is_empty()
841 && observations.is_empty()
842 {
843 return None;
844 }
845 let blocking_count = deliverables
846 .iter()
847 .filter(|deliverable| matches!(deliverable.status.as_str(), "open" | "blocked"))
848 .count();
849 Some(RunTaskLedgerSummaryRecord {
850 root_task,
851 rationale,
852 deliverables,
853 observations,
854 blocking_count,
855 })
856}
857
858fn compaction_events_from_transcript(
859 transcript: &serde_json::Value,
860 stage_id: Option<&str>,
861 node_id: Option<&str>,
862 location_prefix: &str,
863 persisted_path: Option<&Path>,
864) -> Vec<CompactionEventRecord> {
865 let transcript_id = transcript
866 .get("id")
867 .and_then(|value| value.as_str())
868 .map(str::to_string);
869 let asset_ids = transcript
870 .get("assets")
871 .and_then(|value| value.as_array())
872 .map(|assets| {
873 assets
874 .iter()
875 .filter_map(|asset| {
876 asset
877 .get("id")
878 .and_then(|value| value.as_str())
879 .map(str::to_string)
880 })
881 .collect::<BTreeSet<_>>()
882 })
883 .unwrap_or_default();
884 transcript
885 .get("events")
886 .and_then(|value| value.as_array())
887 .map(|events| {
888 events
889 .iter()
890 .filter(|event| {
891 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
892 })
893 .map(|event| {
894 let metadata = event.get("metadata");
895 let snapshot_asset_id = metadata
896 .and_then(|value| value.get("snapshot_asset_id"))
897 .and_then(|value| value.as_str())
898 .map(str::to_string);
899 let available = snapshot_asset_id
900 .as_ref()
901 .is_some_and(|asset_id| asset_ids.contains(asset_id));
902 let snapshot_location = snapshot_asset_id
903 .as_ref()
904 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
905 .unwrap_or_else(|| location_prefix.to_string());
906 CompactionEventRecord {
907 id: event
908 .get("id")
909 .and_then(|value| value.as_str())
910 .unwrap_or_default()
911 .to_string(),
912 transcript_id: transcript_id.clone(),
913 stage_id: stage_id.map(str::to_string),
914 node_id: node_id.map(str::to_string),
915 mode: metadata
916 .and_then(|value| value.get("mode"))
917 .and_then(|value| value.as_str())
918 .unwrap_or_default()
919 .to_string(),
920 strategy: metadata
921 .and_then(|value| value.get("strategy"))
922 .and_then(|value| value.as_str())
923 .unwrap_or_default()
924 .to_string(),
925 archived_messages: json_usize(
926 metadata.and_then(|value| value.get("archived_messages")),
927 ),
928 estimated_tokens_before: json_usize(
929 metadata.and_then(|value| value.get("estimated_tokens_before")),
930 ),
931 estimated_tokens_after: json_usize(
932 metadata.and_then(|value| value.get("estimated_tokens_after")),
933 ),
934 snapshot_asset_id,
935 snapshot_location,
936 snapshot_path: persisted_path
937 .map(|path| path.to_string_lossy().into_owned()),
938 available,
939 }
940 })
941 .collect()
942 })
943 .unwrap_or_default()
944}
945
946fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
947 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
948 return Vec::new();
949 };
950 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
951 return Vec::new();
952 };
953
954 content
955 .lines()
956 .filter(|line| !line.trim().is_empty())
957 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
958 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
959 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
960 .collect()
961}
962
963pub fn derive_run_observability(
964 run: &RunRecord,
965 persisted_path: Option<&Path>,
966) -> RunObservabilityRecord {
967 let mut action_graph_nodes = Vec::new();
968 let mut action_graph_edges = Vec::new();
969 let mut verification_outcomes = Vec::new();
970 let mut planner_rounds = Vec::new();
971 let mut transcript_pointers = Vec::new();
972 let mut compaction_events = Vec::new();
973 let mut daemon_events = Vec::new();
974 let mut research_fact_count = 0usize;
975
976 let root_node_id = format!("run:{}", run.id);
977 let trigger_event = trigger_event_from_run(run);
978 let propagated_trace_id = run_trace_id(run, trigger_event.as_ref());
979 append_action_graph_node(
980 &mut action_graph_nodes,
981 RunActionGraphNodeRecord {
982 id: root_node_id.clone(),
983 label: run
984 .workflow_name
985 .clone()
986 .unwrap_or_else(|| run.workflow_id.clone()),
987 kind: ACTION_GRAPH_NODE_KIND_RUN.to_string(),
988 status: run.status.clone(),
989 outcome: run.status.clone(),
990 trace_id: propagated_trace_id.clone(),
991 stage_id: None,
992 node_id: None,
993 worker_id: None,
994 run_id: Some(run.id.clone()),
995 run_path: run.persisted_path.clone(),
996 metadata: BTreeMap::from([(
997 "workflow_id".to_string(),
998 serde_json::json!(run.workflow_id),
999 )]),
1000 },
1001 );
1002 let mut entry_node_id = root_node_id.clone();
1003 if let Some(trigger_event) = trigger_event.as_ref() {
1004 if let Some(replay_of_event_id) = replay_of_event_id_from_run(run) {
1005 let replay_source_node_id = format!("trigger:{replay_of_event_id}");
1006 append_action_graph_node(
1007 &mut action_graph_nodes,
1008 RunActionGraphNodeRecord {
1009 id: replay_source_node_id.clone(),
1010 label: format!(
1011 "{}:{} (original {})",
1012 trigger_event.provider.as_str(),
1013 trigger_event.kind,
1014 replay_of_event_id
1015 ),
1016 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1017 status: "historical".to_string(),
1018 outcome: "replayed_from".to_string(),
1019 trace_id: Some(trigger_event.trace_id.0.clone()),
1020 stage_id: None,
1021 node_id: None,
1022 worker_id: None,
1023 run_id: Some(run.id.clone()),
1024 run_path: run.persisted_path.clone(),
1025 metadata: trigger_node_metadata(trigger_event),
1026 },
1027 );
1028 action_graph_edges.push(RunActionGraphEdgeRecord {
1029 from_id: replay_source_node_id,
1030 to_id: format!("trigger:{}", trigger_event.id.0),
1031 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1032 label: Some("replay chain".to_string()),
1033 });
1034 }
1035 let trigger_node_id = format!("trigger:{}", trigger_event.id.0);
1036 append_action_graph_node(
1037 &mut action_graph_nodes,
1038 RunActionGraphNodeRecord {
1039 id: trigger_node_id.clone(),
1040 label: format!("{}:{}", trigger_event.provider.as_str(), trigger_event.kind),
1041 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1042 status: "received".to_string(),
1043 outcome: signature_status_label(&trigger_event.signature_status).to_string(),
1044 trace_id: Some(trigger_event.trace_id.0.clone()),
1045 stage_id: None,
1046 node_id: None,
1047 worker_id: None,
1048 run_id: Some(run.id.clone()),
1049 run_path: run.persisted_path.clone(),
1050 metadata: trigger_node_metadata(trigger_event),
1051 },
1052 );
1053 action_graph_edges.push(RunActionGraphEdgeRecord {
1054 from_id: root_node_id.clone(),
1055 to_id: trigger_node_id.clone(),
1056 kind: ACTION_GRAPH_EDGE_KIND_ENTRY.to_string(),
1057 label: Some(trigger_event.id.0.clone()),
1058 });
1059 entry_node_id = trigger_node_id;
1060 }
1061
1062 let stage_node_ids = run
1063 .stages
1064 .iter()
1065 .map(|stage| (stage.id.clone(), format!("stage:{}", stage.id)))
1066 .collect::<BTreeMap<_, _>>();
1067 let stage_by_id = run
1068 .stages
1069 .iter()
1070 .map(|stage| (stage.id.as_str(), stage))
1071 .collect::<BTreeMap<_, _>>();
1072 let stage_by_node_id = run
1073 .stages
1074 .iter()
1075 .map(|stage| (stage.node_id.clone(), format!("stage:{}", stage.id)))
1076 .collect::<BTreeMap<_, _>>();
1077
1078 let incoming_nodes = run
1079 .transitions
1080 .iter()
1081 .map(|transition| transition.to_node_id.clone())
1082 .collect::<BTreeSet<_>>();
1083
1084 for stage in &run.stages {
1085 let graph_node_id = stage_node_ids
1086 .get(&stage.id)
1087 .cloned()
1088 .unwrap_or_else(|| format!("stage:{}", stage.id));
1089 append_action_graph_node(
1090 &mut action_graph_nodes,
1091 RunActionGraphNodeRecord {
1092 id: graph_node_id.clone(),
1093 label: stage.node_id.clone(),
1094 kind: action_graph_kind_for_stage(stage).to_string(),
1095 status: stage.status.clone(),
1096 outcome: stage.outcome.clone(),
1097 trace_id: propagated_trace_id.clone(),
1098 stage_id: Some(stage.id.clone()),
1099 node_id: Some(stage.node_id.clone()),
1100 worker_id: stage
1101 .metadata
1102 .get("worker_id")
1103 .and_then(|value| value.as_str())
1104 .map(str::to_string),
1105 run_id: None,
1106 run_path: None,
1107 metadata: stage_node_metadata(stage),
1108 },
1109 );
1110 if !incoming_nodes.contains(&stage.node_id) {
1111 action_graph_edges.push(RunActionGraphEdgeRecord {
1112 from_id: entry_node_id.clone(),
1113 to_id: graph_node_id.clone(),
1114 kind: if trigger_event.is_some() {
1115 ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string()
1116 } else {
1117 ACTION_GRAPH_EDGE_KIND_ENTRY.to_string()
1118 },
1119 label: None,
1120 });
1121 }
1122
1123 if stage.kind == "verify" || stage.verification.is_some() {
1124 let passed = json_bool(
1125 stage
1126 .verification
1127 .as_ref()
1128 .and_then(|value| value.get("pass")),
1129 )
1130 .or_else(|| {
1131 json_bool(
1132 stage
1133 .verification
1134 .as_ref()
1135 .and_then(|value| value.get("success")),
1136 )
1137 })
1138 .or_else(|| {
1139 if stage.status == "completed" && stage.outcome == "success" {
1140 Some(true)
1141 } else if stage.status == "failed" || stage.outcome == "failed" {
1142 Some(false)
1143 } else {
1144 None
1145 }
1146 });
1147 verification_outcomes.push(RunVerificationOutcomeRecord {
1148 stage_id: stage.id.clone(),
1149 node_id: stage.node_id.clone(),
1150 status: stage.status.clone(),
1151 passed,
1152 summary: stage
1153 .verification
1154 .as_ref()
1155 .map(compact_json_value)
1156 .or_else(|| {
1157 stage
1158 .visible_text
1159 .as_ref()
1160 .filter(|value| !value.trim().is_empty())
1161 .cloned()
1162 }),
1163 });
1164 }
1165
1166 if stage.transcript.is_some() {
1167 transcript_pointers.push(RunTranscriptPointerRecord {
1168 id: format!("stage:{}:transcript", stage.id),
1169 label: format!("Stage {} transcript", stage.node_id),
1170 kind: "embedded_transcript".to_string(),
1171 location: format!("run.stages[{}].transcript", stage.node_id),
1172 path: run.persisted_path.clone(),
1173 available: true,
1174 });
1175 if let Some(transcript) = stage.transcript.as_ref() {
1176 compaction_events.extend(compaction_events_from_transcript(
1177 transcript,
1178 Some(&stage.id),
1179 Some(&stage.node_id),
1180 &format!("run.stages[{}].transcript", stage.node_id),
1181 persisted_path,
1182 ));
1183 }
1184 }
1185
1186 if let Some(payload) = stage_result_payload(stage) {
1187 let trace = payload.get("trace");
1188 let task_ledger = payload
1189 .get("task_ledger")
1190 .and_then(task_ledger_summary_from_value);
1191 let research_facts = task_ledger
1192 .as_ref()
1193 .map(|ledger| ledger.observations.clone())
1194 .unwrap_or_default();
1195 research_fact_count += research_facts.len();
1196 let tools_payload = payload.get("tools");
1197 let tools_used = json_string_array(
1198 tools_payload
1199 .and_then(|tools| tools.get("calls"))
1200 .or_else(|| trace.and_then(|trace| trace.get("tools_used"))),
1201 );
1202 let successful_tools =
1203 json_string_array(tools_payload.and_then(|tools| tools.get("successful")));
1204 let planner_round = RunPlannerRoundRecord {
1205 stage_id: stage.id.clone(),
1206 node_id: stage.node_id.clone(),
1207 stage_kind: stage.kind.clone(),
1208 status: stage.status.clone(),
1209 outcome: stage.outcome.clone(),
1210 iteration_count: json_usize(trace.and_then(|trace| trace.get("iterations"))),
1211 llm_call_count: json_usize(trace.and_then(|trace| trace.get("llm_calls"))),
1212 tool_execution_count: json_usize(
1213 trace.and_then(|trace| trace.get("tool_executions")),
1214 ),
1215 tool_rejection_count: json_usize(
1216 trace.and_then(|trace| trace.get("tool_rejections")),
1217 ),
1218 intervention_count: json_usize(trace.and_then(|trace| trace.get("interventions"))),
1219 compaction_count: json_usize(trace.and_then(|trace| trace.get("compactions"))),
1220 native_text_tool_fallback_count: json_usize(
1221 trace.and_then(|trace| trace.get("native_text_tool_fallbacks")),
1222 ),
1223 native_text_tool_fallback_rejection_count: json_usize(
1224 trace.and_then(|trace| trace.get("native_text_tool_fallback_rejections")),
1225 ),
1226 empty_completion_retry_count: json_usize(
1227 trace.and_then(|trace| trace.get("empty_completion_retries")),
1228 ),
1229 tools_used,
1230 successful_tools,
1231 ledger_done_rejections: json_usize(payload.get("ledger_done_rejections")),
1232 task_ledger,
1233 research_facts,
1234 };
1235 let has_agentic_detail = planner_round.iteration_count > 0
1236 || planner_round.llm_call_count > 0
1237 || planner_round.tool_execution_count > 0
1238 || planner_round.native_text_tool_fallback_count > 0
1239 || planner_round.native_text_tool_fallback_rejection_count > 0
1240 || planner_round.empty_completion_retry_count > 0
1241 || planner_round.ledger_done_rejections > 0
1242 || planner_round.task_ledger.is_some()
1243 || !planner_round.tools_used.is_empty()
1244 || !planner_round.successful_tools.is_empty();
1245 if has_agentic_detail {
1246 planner_rounds.push(planner_round);
1247 }
1248 }
1249 }
1250
1251 for transition in &run.transitions {
1252 let Some(to_id) = stage_by_node_id.get(&transition.to_node_id).cloned() else {
1253 continue;
1254 };
1255 let from_stage = transition
1256 .from_stage_id
1257 .as_deref()
1258 .and_then(|stage_id| stage_by_id.get(stage_id).copied());
1259 let from_id = transition
1260 .from_stage_id
1261 .as_ref()
1262 .and_then(|stage_id| stage_node_ids.get(stage_id))
1263 .cloned()
1264 .or_else(|| {
1265 transition
1266 .from_node_id
1267 .as_ref()
1268 .and_then(|node_id| stage_by_node_id.get(node_id))
1269 .cloned()
1270 })
1271 .unwrap_or_else(|| root_node_id.clone());
1272 action_graph_edges.push(RunActionGraphEdgeRecord {
1273 from_id,
1274 to_id,
1275 kind: if from_stage.is_some_and(|stage| stage.kind == "condition") {
1276 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE.to_string()
1277 } else {
1278 ACTION_GRAPH_EDGE_KIND_TRANSITION.to_string()
1279 },
1280 label: transition.branch.clone(),
1281 });
1282 }
1283
1284 let worker_lineage = run
1285 .child_runs
1286 .iter()
1287 .map(|child| {
1288 let worker_node_id = format!("worker:{}", child.worker_id);
1289 append_action_graph_node(
1290 &mut action_graph_nodes,
1291 RunActionGraphNodeRecord {
1292 id: worker_node_id.clone(),
1293 label: child.worker_name.clone(),
1294 kind: ACTION_GRAPH_NODE_KIND_WORKER.to_string(),
1295 status: child.status.clone(),
1296 outcome: child.status.clone(),
1297 trace_id: propagated_trace_id.clone(),
1298 stage_id: child.parent_stage_id.clone(),
1299 node_id: None,
1300 worker_id: Some(child.worker_id.clone()),
1301 run_id: child.run_id.clone(),
1302 run_path: child.run_path.clone(),
1303 metadata: BTreeMap::from([
1304 (
1305 "worker_name".to_string(),
1306 serde_json::json!(child.worker_name),
1307 ),
1308 ("task".to_string(), serde_json::json!(child.task)),
1309 ]),
1310 },
1311 );
1312 if let Some(parent_stage_id) = child.parent_stage_id.as_ref() {
1313 if let Some(stage_node_id) = stage_node_ids.get(parent_stage_id) {
1314 action_graph_edges.push(RunActionGraphEdgeRecord {
1315 from_id: stage_node_id.clone(),
1316 to_id: worker_node_id,
1317 kind: ACTION_GRAPH_EDGE_KIND_DELEGATES.to_string(),
1318 label: Some(child.worker_name.clone()),
1319 });
1320 }
1321 }
1322 RunWorkerLineageRecord {
1323 worker_id: child.worker_id.clone(),
1324 worker_name: child.worker_name.clone(),
1325 parent_stage_id: child.parent_stage_id.clone(),
1326 task: child.task.clone(),
1327 status: child.status.clone(),
1328 session_id: child.session_id.clone(),
1329 parent_session_id: child.parent_session_id.clone(),
1330 run_id: child.run_id.clone(),
1331 run_path: child.run_path.clone(),
1332 snapshot_path: child.snapshot_path.clone(),
1333 }
1334 })
1335 .collect::<Vec<_>>();
1336
1337 if run.transcript.is_some() {
1338 transcript_pointers.push(RunTranscriptPointerRecord {
1339 id: "run:transcript".to_string(),
1340 label: "Run transcript".to_string(),
1341 kind: "embedded_transcript".to_string(),
1342 location: "run.transcript".to_string(),
1343 path: run.persisted_path.clone(),
1344 available: true,
1345 });
1346 if let Some(transcript) = run.transcript.as_ref() {
1347 compaction_events.extend(compaction_events_from_transcript(
1348 transcript,
1349 None,
1350 None,
1351 "run.transcript",
1352 persisted_path,
1353 ));
1354 }
1355 }
1356
1357 if let Some(path) = persisted_path {
1358 if let Some(sidecar_path) = llm_transcript_sidecar_path(path) {
1359 transcript_pointers.push(RunTranscriptPointerRecord {
1360 id: "run:llm_transcript".to_string(),
1361 label: "LLM transcript sidecar".to_string(),
1362 kind: "llm_jsonl".to_string(),
1363 location: "run sidecar".to_string(),
1364 path: Some(sidecar_path.to_string_lossy().into_owned()),
1365 available: sidecar_path.exists(),
1366 });
1367 }
1368 daemon_events.extend(daemon_events_from_sidecar(path));
1369 }
1370
1371 RunObservabilityRecord {
1372 schema_version: 4,
1373 planner_rounds,
1374 research_fact_count,
1375 action_graph_nodes,
1376 action_graph_edges,
1377 worker_lineage,
1378 verification_outcomes,
1379 transcript_pointers,
1380 compaction_events,
1381 daemon_events,
1382 }
1383}
1384
1385fn refresh_run_observability(run: &mut RunRecord, persisted_path: Option<&Path>) {
1386 run.observability = Some(derive_run_observability(run, persisted_path));
1387}
1388
1389pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1390 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
1391 if run.type_name.is_empty() {
1392 run.type_name = "run_record".to_string();
1393 }
1394 if run.id.is_empty() {
1395 run.id = new_id("run");
1396 }
1397 if run.started_at.is_empty() {
1398 run.started_at = now_rfc3339();
1399 }
1400 if run.status.is_empty() {
1401 run.status = "running".to_string();
1402 }
1403 if run.root_run_id.is_none() {
1404 run.root_run_id = Some(run.id.clone());
1405 }
1406 if run.replay_fixture.is_none() {
1407 run.replay_fixture = Some(replay_fixture_from_run(&run));
1408 }
1409 merge_hitl_questions_from_active_log(&mut run);
1410 sync_run_handoffs(&mut run);
1411 if run.observability.is_none() {
1412 let persisted_path = run.persisted_path.clone();
1413 let persisted = persisted_path.as_deref().map(Path::new);
1414 refresh_run_observability(&mut run, persisted);
1415 }
1416 Ok(run)
1417}
1418
1419pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1420 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1421 if manifest.type_name.is_empty() {
1422 manifest.type_name = "eval_suite_manifest".to_string();
1423 }
1424 if manifest.id.is_empty() {
1425 manifest.id = new_id("eval_suite");
1426 }
1427 Ok(manifest)
1428}
1429
1430fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1431 let content = std::fs::read_to_string(path)
1432 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1433 serde_json::from_str(&content)
1434 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1435}
1436
1437fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1438 let path_buf = PathBuf::from(path);
1439 if path_buf.is_absolute() {
1440 path_buf
1441 } else if let Some(base_dir) = base_dir {
1442 base_dir.join(path_buf)
1443 } else {
1444 path_buf
1445 }
1446}
1447
1448pub fn evaluate_run_suite_manifest(
1449 manifest: &EvalSuiteManifest,
1450) -> Result<ReplayEvalSuiteReport, VmError> {
1451 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1452 let mut reports = Vec::new();
1453 for case in &manifest.cases {
1454 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1455 let run = load_run_record(&run_path)?;
1456 let fixture = match &case.fixture_path {
1457 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1458 None => run
1459 .replay_fixture
1460 .clone()
1461 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1462 };
1463 let eval = evaluate_run_against_fixture(&run, &fixture);
1464 let mut pass = eval.pass;
1465 let mut failures = eval.failures;
1466 let comparison = match &case.compare_to {
1467 Some(path) => {
1468 let baseline_path = resolve_manifest_path(base_dir, path);
1469 let baseline = load_run_record(&baseline_path)?;
1470 let diff = diff_run_records(&baseline, &run);
1471 if !diff.identical {
1472 pass = false;
1473 failures.push(format!(
1474 "run differs from baseline {} with {} stage changes",
1475 baseline_path.display(),
1476 diff.stage_diffs.len()
1477 ));
1478 }
1479 Some(diff)
1480 }
1481 None => None,
1482 };
1483 reports.push(ReplayEvalCaseReport {
1484 run_id: run.id.clone(),
1485 workflow_id: run.workflow_id.clone(),
1486 label: case.label.clone(),
1487 pass,
1488 failures,
1489 stage_count: eval.stage_count,
1490 source_path: Some(run_path.display().to_string()),
1491 comparison,
1492 });
1493 }
1494 let total = reports.len();
1495 let passed = reports.iter().filter(|report| report.pass).count();
1496 let failed = total.saturating_sub(passed);
1497 Ok(ReplayEvalSuiteReport {
1498 pass: failed == 0,
1499 total,
1500 passed,
1501 failed,
1502 cases: reports,
1503 })
1504}
1505
1506#[derive(Clone, Copy, PartialEq, Eq, Debug)]
1508pub(crate) enum DiffOp {
1509 Equal,
1510 Delete,
1511 Insert,
1512}
1513
1514pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
1518 let n = a.len() as isize;
1519 let m = b.len() as isize;
1520 if n == 0 && m == 0 {
1521 return Vec::new();
1522 }
1523 if n == 0 {
1524 return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
1525 }
1526 if m == 0 {
1527 return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
1528 }
1529
1530 let max_d = (n + m) as usize;
1531 let offset = max_d as isize;
1532 let v_size = 2 * max_d + 1;
1533 let mut v = vec![0isize; v_size];
1534 let mut trace: Vec<Vec<isize>> = Vec::new();
1536
1537 'outer: for d in 0..=max_d as isize {
1538 trace.push(v.clone());
1539 let mut new_v = v.clone();
1540 for k in (-d..=d).step_by(2) {
1541 let ki = (k + offset) as usize;
1542 let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
1543 v[ki + 1]
1544 } else {
1545 v[ki - 1] + 1
1546 };
1547 let mut y = x - k;
1548 while x < n && y < m && a[x as usize] == b[y as usize] {
1549 x += 1;
1550 y += 1;
1551 }
1552 new_v[ki] = x;
1553 if x >= n && y >= m {
1554 let _ = new_v;
1555 break 'outer;
1556 }
1557 }
1558 v = new_v;
1559 }
1560
1561 let mut ops: Vec<(DiffOp, usize)> = Vec::new();
1562 let mut x = n;
1563 let mut y = m;
1564 for d in (1..trace.len() as isize).rev() {
1565 let k = x - y;
1566 let v_prev = &trace[d as usize];
1567 let prev_k = if k == -d
1568 || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
1569 {
1570 k + 1
1571 } else {
1572 k - 1
1573 };
1574 let prev_x = v_prev[(prev_k + offset) as usize];
1575 let prev_y = prev_x - prev_k;
1576
1577 while x > prev_x && y > prev_y {
1578 x -= 1;
1579 y -= 1;
1580 ops.push((DiffOp::Equal, x as usize));
1581 }
1582 if prev_k < k {
1583 x -= 1;
1584 ops.push((DiffOp::Delete, x as usize));
1585 } else {
1586 y -= 1;
1587 ops.push((DiffOp::Insert, y as usize));
1588 }
1589 }
1590 while x > 0 && y > 0 {
1591 x -= 1;
1592 y -= 1;
1593 ops.push((DiffOp::Equal, x as usize));
1594 }
1595 ops.reverse();
1596 ops
1597}
1598
1599pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1600 let before_lines: Vec<&str> = before.lines().collect();
1601 let after_lines: Vec<&str> = after.lines().collect();
1602 let ops = myers_diff(&before_lines, &after_lines);
1603
1604 let mut diff = String::new();
1605 let file = path.unwrap_or("artifact");
1606 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1607 for &(op, idx) in &ops {
1608 match op {
1609 DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
1610 DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
1611 DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
1612 }
1613 }
1614 diff
1615}
1616
1617pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1618 let path = path
1619 .map(PathBuf::from)
1620 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1621 let mut materialized = run.clone();
1622 merge_hitl_questions_from_active_log(&mut materialized);
1623 if materialized.replay_fixture.is_none() {
1624 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
1625 }
1626 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
1627 sync_run_handoffs(&mut materialized);
1628 refresh_run_observability(&mut materialized, Some(&path));
1629 if let Some(parent) = path.parent() {
1630 std::fs::create_dir_all(parent)
1631 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1632 }
1633 let json = serde_json::to_string_pretty(&materialized)
1634 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1635 let tmp_path = path.with_extension("json.tmp");
1637 std::fs::write(&tmp_path, &json)
1638 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1639 std::fs::rename(&tmp_path, &path).map_err(|e| {
1640 let _ = std::fs::write(&path, &json);
1642 VmError::Runtime(format!("failed to finalize run record: {e}"))
1643 })?;
1644 if let Some(observability) = materialized.observability.as_ref() {
1645 publish_action_graph_event(&materialized, observability, &path);
1646 }
1647 Ok(path.to_string_lossy().into_owned())
1648}
1649
1650pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1651 let content = std::fs::read_to_string(path)
1652 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1653 let mut run: RunRecord = serde_json::from_str(&content)
1654 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
1655 if run.replay_fixture.is_none() {
1656 run.replay_fixture = Some(replay_fixture_from_run(&run));
1657 }
1658 run.persisted_path
1659 .get_or_insert_with(|| path.to_string_lossy().into_owned());
1660 sync_run_handoffs(&mut run);
1661 refresh_run_observability(&mut run, Some(path));
1662 Ok(run)
1663}
1664
1665pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1666 ReplayFixture {
1667 type_name: "replay_fixture".to_string(),
1668 id: new_id("fixture"),
1669 source_run_id: run.id.clone(),
1670 workflow_id: run.workflow_id.clone(),
1671 workflow_name: run.workflow_name.clone(),
1672 created_at: now_rfc3339(),
1673 eval_kind: Some("replay".to_string()),
1674 clarifying_question: None,
1675 expected_status: run.status.clone(),
1676 stage_assertions: run
1677 .stages
1678 .iter()
1679 .map(|stage| ReplayStageAssertion {
1680 node_id: stage.node_id.clone(),
1681 expected_status: stage.status.clone(),
1682 expected_outcome: stage.outcome.clone(),
1683 expected_branch: stage.branch.clone(),
1684 required_artifact_kinds: stage
1685 .artifacts
1686 .iter()
1687 .map(|artifact| artifact.kind.clone())
1688 .collect(),
1689 visible_text_contains: stage
1690 .visible_text
1691 .as_ref()
1692 .filter(|text| !text.is_empty())
1693 .map(|text| text.chars().take(80).collect()),
1694 })
1695 .collect(),
1696 }
1697}
1698
1699pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1700 if fixture.eval_kind.as_deref() == Some("clarifying_question") {
1701 return evaluate_clarifying_question(run, fixture);
1702 }
1703 let mut failures = Vec::new();
1704 if run.status != fixture.expected_status {
1705 failures.push(format!(
1706 "run status mismatch: expected {}, got {}",
1707 fixture.expected_status, run.status
1708 ));
1709 }
1710 let stages_by_id: BTreeMap<&str, &RunStageRecord> =
1711 run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
1712 for assertion in &fixture.stage_assertions {
1713 let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
1714 failures.push(format!("missing stage {}", assertion.node_id));
1715 continue;
1716 };
1717 if stage.status != assertion.expected_status {
1718 failures.push(format!(
1719 "stage {} status mismatch: expected {}, got {}",
1720 assertion.node_id, assertion.expected_status, stage.status
1721 ));
1722 }
1723 if stage.outcome != assertion.expected_outcome {
1724 failures.push(format!(
1725 "stage {} outcome mismatch: expected {}, got {}",
1726 assertion.node_id, assertion.expected_outcome, stage.outcome
1727 ));
1728 }
1729 if stage.branch != assertion.expected_branch {
1730 failures.push(format!(
1731 "stage {} branch mismatch: expected {:?}, got {:?}",
1732 assertion.node_id, assertion.expected_branch, stage.branch
1733 ));
1734 }
1735 for required_kind in &assertion.required_artifact_kinds {
1736 if !stage
1737 .artifacts
1738 .iter()
1739 .any(|artifact| &artifact.kind == required_kind)
1740 {
1741 failures.push(format!(
1742 "stage {} missing artifact kind {}",
1743 assertion.node_id, required_kind
1744 ));
1745 }
1746 }
1747 if let Some(snippet) = &assertion.visible_text_contains {
1748 let actual = stage.visible_text.clone().unwrap_or_default();
1749 if !actual.contains(snippet) {
1750 failures.push(format!(
1751 "stage {} visible text does not contain expected snippet {:?}",
1752 assertion.node_id, snippet
1753 ));
1754 }
1755 }
1756 }
1757
1758 ReplayEvalReport {
1759 pass: failures.is_empty(),
1760 failures,
1761 stage_count: run.stages.len(),
1762 }
1763}
1764
1765fn evaluate_clarifying_question(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1766 let mut failures = Vec::new();
1767 let spec = fixture.clarifying_question.clone().unwrap_or_default();
1768 let min_questions = clarifying_min_questions(&spec);
1769 let max_questions = clarifying_max_questions(&spec);
1770 let questions = &run.hitl_questions;
1771
1772 if run.status != fixture.expected_status {
1773 failures.push(format!(
1774 "run status mismatch: expected {}, got {}",
1775 fixture.expected_status, run.status
1776 ));
1777 }
1778 if questions.len() < min_questions {
1779 failures.push(format!(
1780 "expected at least {min_questions} clarifying question(s), got {}",
1781 questions.len()
1782 ));
1783 }
1784 if questions.len() > max_questions {
1785 failures.push(format!(
1786 "expected at most {max_questions} clarifying question(s), got {}",
1787 questions.len()
1788 ));
1789 }
1790
1791 let normalized_expected = spec
1792 .expected_question
1793 .as_deref()
1794 .map(normalize_question_text);
1795 let normalized_accepted = spec
1796 .accepted_questions
1797 .iter()
1798 .map(|question| normalize_question_text(question))
1799 .collect::<Vec<_>>();
1800 let required_terms = spec
1801 .required_terms
1802 .iter()
1803 .map(|term| normalize_question_text(term))
1804 .collect::<Vec<_>>();
1805 let forbidden_terms = spec
1806 .forbidden_terms
1807 .iter()
1808 .map(|term| normalize_question_text(term))
1809 .collect::<Vec<_>>();
1810
1811 let matched = questions.iter().any(|question| {
1812 let normalized = normalize_question_text(&question.prompt);
1813 let matches_expected = normalized_expected
1814 .as_ref()
1815 .is_none_or(|expected| &normalized == expected)
1816 && (normalized_accepted.is_empty()
1817 || normalized_accepted
1818 .iter()
1819 .any(|candidate| candidate == &normalized));
1820 let has_required_terms = required_terms
1821 .iter()
1822 .all(|term| normalized.contains(term.as_str()));
1823 let avoids_forbidden_terms = forbidden_terms
1824 .iter()
1825 .all(|term| !normalized.contains(term.as_str()));
1826 matches_expected && has_required_terms && avoids_forbidden_terms
1827 });
1828
1829 if !questions.is_empty()
1830 && (!normalized_accepted.is_empty()
1831 || normalized_expected.is_some()
1832 || !required_terms.is_empty()
1833 || !forbidden_terms.is_empty())
1834 && !matched
1835 {
1836 failures.push(format!(
1837 "no clarifying question matched fixture; actual questions: {}",
1838 questions
1839 .iter()
1840 .map(|question| format!("{:?}", question.prompt))
1841 .collect::<Vec<_>>()
1842 .join(", ")
1843 ));
1844 }
1845
1846 ReplayEvalReport {
1847 pass: failures.is_empty(),
1848 failures,
1849 stage_count: run.stages.len(),
1850 }
1851}
1852
1853pub fn evaluate_run_suite(
1854 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1855) -> ReplayEvalSuiteReport {
1856 let mut reports = Vec::new();
1857 for (run, fixture, source_path) in cases {
1858 let report = evaluate_run_against_fixture(&run, &fixture);
1859 reports.push(ReplayEvalCaseReport {
1860 run_id: run.id.clone(),
1861 workflow_id: run.workflow_id.clone(),
1862 label: None,
1863 pass: report.pass,
1864 failures: report.failures,
1865 stage_count: report.stage_count,
1866 source_path,
1867 comparison: None,
1868 });
1869 }
1870 let total = reports.len();
1871 let passed = reports.iter().filter(|report| report.pass).count();
1872 let failed = total.saturating_sub(passed);
1873 ReplayEvalSuiteReport {
1874 pass: failed == 0,
1875 total,
1876 passed,
1877 failed,
1878 cases: reports,
1879 }
1880}
1881
1882pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1883 let mut stage_diffs = Vec::new();
1884 let mut all_node_ids = BTreeSet::new();
1885 let left_by_id: BTreeMap<&str, &RunStageRecord> = left
1886 .stages
1887 .iter()
1888 .map(|s| (s.node_id.as_str(), s))
1889 .collect();
1890 let right_by_id: BTreeMap<&str, &RunStageRecord> = right
1891 .stages
1892 .iter()
1893 .map(|s| (s.node_id.as_str(), s))
1894 .collect();
1895 all_node_ids.extend(left_by_id.keys().copied());
1896 all_node_ids.extend(right_by_id.keys().copied());
1897
1898 for node_id in all_node_ids {
1899 let left_stage = left_by_id.get(node_id).copied();
1900 let right_stage = right_by_id.get(node_id).copied();
1901 match (left_stage, right_stage) {
1902 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1903 node_id: node_id.to_string(),
1904 change: "removed".to_string(),
1905 details: vec!["stage missing from right run".to_string()],
1906 }),
1907 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1908 node_id: node_id.to_string(),
1909 change: "added".to_string(),
1910 details: vec!["stage missing from left run".to_string()],
1911 }),
1912 (Some(left_stage), Some(right_stage)) => {
1913 let mut details = Vec::new();
1914 if left_stage.status != right_stage.status {
1915 details.push(format!(
1916 "status: {} -> {}",
1917 left_stage.status, right_stage.status
1918 ));
1919 }
1920 if left_stage.outcome != right_stage.outcome {
1921 details.push(format!(
1922 "outcome: {} -> {}",
1923 left_stage.outcome, right_stage.outcome
1924 ));
1925 }
1926 if left_stage.branch != right_stage.branch {
1927 details.push(format!(
1928 "branch: {:?} -> {:?}",
1929 left_stage.branch, right_stage.branch
1930 ));
1931 }
1932 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1933 {
1934 details.push(format!(
1935 "produced_artifacts: {} -> {}",
1936 left_stage.produced_artifact_ids.len(),
1937 right_stage.produced_artifact_ids.len()
1938 ));
1939 }
1940 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1941 details.push(format!(
1942 "artifact_records: {} -> {}",
1943 left_stage.artifacts.len(),
1944 right_stage.artifacts.len()
1945 ));
1946 }
1947 if !details.is_empty() {
1948 stage_diffs.push(RunStageDiffRecord {
1949 node_id: node_id.to_string(),
1950 change: "changed".to_string(),
1951 details,
1952 });
1953 }
1954 }
1955 (None, None) => {}
1956 }
1957 }
1958
1959 let mut tool_diffs = Vec::new();
1960 let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
1961 .tool_recordings
1962 .iter()
1963 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1964 .collect();
1965 let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
1966 .tool_recordings
1967 .iter()
1968 .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
1969 .collect();
1970 let all_tool_keys: std::collections::BTreeSet<_> = left_tools
1971 .keys()
1972 .chain(right_tools.keys())
1973 .cloned()
1974 .collect();
1975 for key in &all_tool_keys {
1976 let l = left_tools.get(key);
1977 let r = right_tools.get(key);
1978 let result_changed = match (l, r) {
1979 (Some(a), Some(b)) => a.result != b.result,
1980 _ => true,
1981 };
1982 if result_changed {
1983 tool_diffs.push(ToolCallDiffRecord {
1984 tool_name: key.0.clone(),
1985 args_hash: key.1.clone(),
1986 result_changed,
1987 left_result: l.map(|t| t.result.clone()),
1988 right_result: r.map(|t| t.result.clone()),
1989 });
1990 }
1991 }
1992
1993 let left_observability = left.observability.clone().unwrap_or_else(|| {
1994 derive_run_observability(left, left.persisted_path.as_deref().map(Path::new))
1995 });
1996 let right_observability = right.observability.clone().unwrap_or_else(|| {
1997 derive_run_observability(right, right.persisted_path.as_deref().map(Path::new))
1998 });
1999 let mut observability_diffs = Vec::new();
2000
2001 let left_workers = left_observability
2002 .worker_lineage
2003 .iter()
2004 .map(|worker| {
2005 (
2006 worker.worker_id.clone(),
2007 (
2008 worker.status.clone(),
2009 worker.run_id.clone(),
2010 worker.run_path.clone(),
2011 ),
2012 )
2013 })
2014 .collect::<BTreeMap<_, _>>();
2015 let right_workers = right_observability
2016 .worker_lineage
2017 .iter()
2018 .map(|worker| {
2019 (
2020 worker.worker_id.clone(),
2021 (
2022 worker.status.clone(),
2023 worker.run_id.clone(),
2024 worker.run_path.clone(),
2025 ),
2026 )
2027 })
2028 .collect::<BTreeMap<_, _>>();
2029 let worker_ids = left_workers
2030 .keys()
2031 .chain(right_workers.keys())
2032 .cloned()
2033 .collect::<BTreeSet<_>>();
2034 for worker_id in worker_ids {
2035 match (left_workers.get(&worker_id), right_workers.get(&worker_id)) {
2036 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2037 section: "worker_lineage".to_string(),
2038 label: worker_id,
2039 details: vec!["worker missing from right run".to_string()],
2040 }),
2041 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2042 section: "worker_lineage".to_string(),
2043 label: worker_id,
2044 details: vec!["worker missing from left run".to_string()],
2045 }),
2046 (Some(left_worker), Some(right_worker)) if left_worker != right_worker => {
2047 let mut details = Vec::new();
2048 if left_worker.0 != right_worker.0 {
2049 details.push(format!("status: {} -> {}", left_worker.0, right_worker.0));
2050 }
2051 if left_worker.1 != right_worker.1 {
2052 details.push(format!(
2053 "run_id: {:?} -> {:?}",
2054 left_worker.1, right_worker.1
2055 ));
2056 }
2057 if left_worker.2 != right_worker.2 {
2058 details.push(format!(
2059 "run_path: {:?} -> {:?}",
2060 left_worker.2, right_worker.2
2061 ));
2062 }
2063 observability_diffs.push(RunObservabilityDiffRecord {
2064 section: "worker_lineage".to_string(),
2065 label: worker_id,
2066 details,
2067 });
2068 }
2069 _ => {}
2070 }
2071 }
2072
2073 let left_rounds = left_observability
2074 .planner_rounds
2075 .iter()
2076 .map(|round| (round.stage_id.clone(), round))
2077 .collect::<BTreeMap<_, _>>();
2078 let right_rounds = right_observability
2079 .planner_rounds
2080 .iter()
2081 .map(|round| (round.stage_id.clone(), round))
2082 .collect::<BTreeMap<_, _>>();
2083 let round_ids = left_rounds
2084 .keys()
2085 .chain(right_rounds.keys())
2086 .cloned()
2087 .collect::<BTreeSet<_>>();
2088 for stage_id in round_ids {
2089 match (left_rounds.get(&stage_id), right_rounds.get(&stage_id)) {
2090 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2091 section: "planner_rounds".to_string(),
2092 label: stage_id,
2093 details: vec!["planner summary missing from right run".to_string()],
2094 }),
2095 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2096 section: "planner_rounds".to_string(),
2097 label: stage_id,
2098 details: vec!["planner summary missing from left run".to_string()],
2099 }),
2100 (Some(left_round), Some(right_round)) => {
2101 let mut details = Vec::new();
2102 if left_round.iteration_count != right_round.iteration_count {
2103 details.push(format!(
2104 "iterations: {} -> {}",
2105 left_round.iteration_count, right_round.iteration_count
2106 ));
2107 }
2108 if left_round.tool_execution_count != right_round.tool_execution_count {
2109 details.push(format!(
2110 "tool_executions: {} -> {}",
2111 left_round.tool_execution_count, right_round.tool_execution_count
2112 ));
2113 }
2114 if left_round.native_text_tool_fallback_count
2115 != right_round.native_text_tool_fallback_count
2116 {
2117 details.push(format!(
2118 "native_text_tool_fallbacks: {} -> {}",
2119 left_round.native_text_tool_fallback_count,
2120 right_round.native_text_tool_fallback_count
2121 ));
2122 }
2123 if left_round.native_text_tool_fallback_rejection_count
2124 != right_round.native_text_tool_fallback_rejection_count
2125 {
2126 details.push(format!(
2127 "native_text_tool_fallback_rejections: {} -> {}",
2128 left_round.native_text_tool_fallback_rejection_count,
2129 right_round.native_text_tool_fallback_rejection_count
2130 ));
2131 }
2132 if left_round.empty_completion_retry_count
2133 != right_round.empty_completion_retry_count
2134 {
2135 details.push(format!(
2136 "empty_completion_retries: {} -> {}",
2137 left_round.empty_completion_retry_count,
2138 right_round.empty_completion_retry_count
2139 ));
2140 }
2141 if left_round.research_facts != right_round.research_facts {
2142 details.push(format!(
2143 "research_facts: {:?} -> {:?}",
2144 left_round.research_facts, right_round.research_facts
2145 ));
2146 }
2147 let left_deliverables = left_round
2148 .task_ledger
2149 .as_ref()
2150 .map(|ledger| {
2151 ledger
2152 .deliverables
2153 .iter()
2154 .map(|item| format!("{}:{}", item.id, item.status))
2155 .collect::<Vec<_>>()
2156 })
2157 .unwrap_or_default();
2158 let right_deliverables = right_round
2159 .task_ledger
2160 .as_ref()
2161 .map(|ledger| {
2162 ledger
2163 .deliverables
2164 .iter()
2165 .map(|item| format!("{}:{}", item.id, item.status))
2166 .collect::<Vec<_>>()
2167 })
2168 .unwrap_or_default();
2169 if left_deliverables != right_deliverables {
2170 details.push(format!(
2171 "deliverables: {:?} -> {:?}",
2172 left_deliverables, right_deliverables
2173 ));
2174 }
2175 if left_round.successful_tools != right_round.successful_tools {
2176 details.push(format!(
2177 "successful_tools: {:?} -> {:?}",
2178 left_round.successful_tools, right_round.successful_tools
2179 ));
2180 }
2181 if !details.is_empty() {
2182 observability_diffs.push(RunObservabilityDiffRecord {
2183 section: "planner_rounds".to_string(),
2184 label: left_round.node_id.clone(),
2185 details,
2186 });
2187 }
2188 }
2189 _ => {}
2190 }
2191 }
2192
2193 let left_pointers = left_observability
2194 .transcript_pointers
2195 .iter()
2196 .map(|pointer| {
2197 (
2198 pointer.id.clone(),
2199 (
2200 pointer.available,
2201 pointer.path.clone(),
2202 pointer.location.clone(),
2203 ),
2204 )
2205 })
2206 .collect::<BTreeMap<_, _>>();
2207 let right_pointers = right_observability
2208 .transcript_pointers
2209 .iter()
2210 .map(|pointer| {
2211 (
2212 pointer.id.clone(),
2213 (
2214 pointer.available,
2215 pointer.path.clone(),
2216 pointer.location.clone(),
2217 ),
2218 )
2219 })
2220 .collect::<BTreeMap<_, _>>();
2221 let pointer_ids = left_pointers
2222 .keys()
2223 .chain(right_pointers.keys())
2224 .cloned()
2225 .collect::<BTreeSet<_>>();
2226 for pointer_id in pointer_ids {
2227 match (
2228 left_pointers.get(&pointer_id),
2229 right_pointers.get(&pointer_id),
2230 ) {
2231 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2232 section: "transcript_pointers".to_string(),
2233 label: pointer_id,
2234 details: vec!["pointer missing from right run".to_string()],
2235 }),
2236 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2237 section: "transcript_pointers".to_string(),
2238 label: pointer_id,
2239 details: vec!["pointer missing from left run".to_string()],
2240 }),
2241 (Some(left_pointer), Some(right_pointer)) if left_pointer != right_pointer => {
2242 observability_diffs.push(RunObservabilityDiffRecord {
2243 section: "transcript_pointers".to_string(),
2244 label: pointer_id,
2245 details: vec![format!(
2246 "pointer: {:?} -> {:?}",
2247 left_pointer, right_pointer
2248 )],
2249 });
2250 }
2251 _ => {}
2252 }
2253 }
2254
2255 let left_compactions = left_observability
2256 .compaction_events
2257 .iter()
2258 .map(|event| {
2259 (
2260 event.id.clone(),
2261 (
2262 event.strategy.clone(),
2263 event.archived_messages,
2264 event.snapshot_asset_id.clone(),
2265 event.available,
2266 ),
2267 )
2268 })
2269 .collect::<BTreeMap<_, _>>();
2270 let right_compactions = right_observability
2271 .compaction_events
2272 .iter()
2273 .map(|event| {
2274 (
2275 event.id.clone(),
2276 (
2277 event.strategy.clone(),
2278 event.archived_messages,
2279 event.snapshot_asset_id.clone(),
2280 event.available,
2281 ),
2282 )
2283 })
2284 .collect::<BTreeMap<_, _>>();
2285 let compaction_ids = left_compactions
2286 .keys()
2287 .chain(right_compactions.keys())
2288 .cloned()
2289 .collect::<BTreeSet<_>>();
2290 for compaction_id in compaction_ids {
2291 match (
2292 left_compactions.get(&compaction_id),
2293 right_compactions.get(&compaction_id),
2294 ) {
2295 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2296 section: "compaction_events".to_string(),
2297 label: compaction_id,
2298 details: vec!["compaction event missing from right run".to_string()],
2299 }),
2300 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2301 section: "compaction_events".to_string(),
2302 label: compaction_id,
2303 details: vec!["compaction event missing from left run".to_string()],
2304 }),
2305 (Some(left_event), Some(right_event)) if left_event != right_event => {
2306 observability_diffs.push(RunObservabilityDiffRecord {
2307 section: "compaction_events".to_string(),
2308 label: compaction_id,
2309 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2310 });
2311 }
2312 _ => {}
2313 }
2314 }
2315
2316 let left_daemons = left_observability
2317 .daemon_events
2318 .iter()
2319 .map(|event| {
2320 (
2321 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2322 (
2323 event.name.clone(),
2324 event.persist_path.clone(),
2325 event.payload_summary.clone(),
2326 ),
2327 )
2328 })
2329 .collect::<BTreeMap<_, _>>();
2330 let right_daemons = right_observability
2331 .daemon_events
2332 .iter()
2333 .map(|event| {
2334 (
2335 (event.daemon_id.clone(), event.kind, event.timestamp.clone()),
2336 (
2337 event.name.clone(),
2338 event.persist_path.clone(),
2339 event.payload_summary.clone(),
2340 ),
2341 )
2342 })
2343 .collect::<BTreeMap<_, _>>();
2344 let daemon_keys = left_daemons
2345 .keys()
2346 .chain(right_daemons.keys())
2347 .cloned()
2348 .collect::<BTreeSet<_>>();
2349 for daemon_key in daemon_keys {
2350 let label = format!("{}:{:?}:{}", daemon_key.0, daemon_key.1, daemon_key.2);
2351 match (
2352 left_daemons.get(&daemon_key),
2353 right_daemons.get(&daemon_key),
2354 ) {
2355 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2356 section: "daemon_events".to_string(),
2357 label,
2358 details: vec!["daemon event missing from right run".to_string()],
2359 }),
2360 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2361 section: "daemon_events".to_string(),
2362 label,
2363 details: vec!["daemon event missing from left run".to_string()],
2364 }),
2365 (Some(left_event), Some(right_event)) if left_event != right_event => {
2366 observability_diffs.push(RunObservabilityDiffRecord {
2367 section: "daemon_events".to_string(),
2368 label,
2369 details: vec![format!("event: {:?} -> {:?}", left_event, right_event)],
2370 });
2371 }
2372 _ => {}
2373 }
2374 }
2375
2376 let left_verification = left_observability
2377 .verification_outcomes
2378 .iter()
2379 .map(|item| (item.stage_id.clone(), item))
2380 .collect::<BTreeMap<_, _>>();
2381 let right_verification = right_observability
2382 .verification_outcomes
2383 .iter()
2384 .map(|item| (item.stage_id.clone(), item))
2385 .collect::<BTreeMap<_, _>>();
2386 let verification_ids = left_verification
2387 .keys()
2388 .chain(right_verification.keys())
2389 .cloned()
2390 .collect::<BTreeSet<_>>();
2391 for stage_id in verification_ids {
2392 match (
2393 left_verification.get(&stage_id),
2394 right_verification.get(&stage_id),
2395 ) {
2396 (Some(_), None) => observability_diffs.push(RunObservabilityDiffRecord {
2397 section: "verification".to_string(),
2398 label: stage_id,
2399 details: vec!["verification missing from right run".to_string()],
2400 }),
2401 (None, Some(_)) => observability_diffs.push(RunObservabilityDiffRecord {
2402 section: "verification".to_string(),
2403 label: stage_id,
2404 details: vec!["verification missing from left run".to_string()],
2405 }),
2406 (Some(left_item), Some(right_item)) if left_item != right_item => {
2407 let mut details = Vec::new();
2408 if left_item.passed != right_item.passed {
2409 details.push(format!(
2410 "passed: {:?} -> {:?}",
2411 left_item.passed, right_item.passed
2412 ));
2413 }
2414 if left_item.summary != right_item.summary {
2415 details.push(format!(
2416 "summary: {:?} -> {:?}",
2417 left_item.summary, right_item.summary
2418 ));
2419 }
2420 observability_diffs.push(RunObservabilityDiffRecord {
2421 section: "verification".to_string(),
2422 label: left_item.node_id.clone(),
2423 details,
2424 });
2425 }
2426 _ => {}
2427 }
2428 }
2429
2430 let left_graph = (
2431 left_observability.action_graph_nodes.len(),
2432 left_observability.action_graph_edges.len(),
2433 );
2434 let right_graph = (
2435 right_observability.action_graph_nodes.len(),
2436 right_observability.action_graph_edges.len(),
2437 );
2438 if left_graph != right_graph {
2439 observability_diffs.push(RunObservabilityDiffRecord {
2440 section: "action_graph".to_string(),
2441 label: "shape".to_string(),
2442 details: vec![format!(
2443 "nodes/edges: {}/{} -> {}/{}",
2444 left_graph.0, left_graph.1, right_graph.0, right_graph.1
2445 )],
2446 });
2447 }
2448
2449 let status_changed = left.status != right.status;
2450 let identical = !status_changed
2451 && stage_diffs.is_empty()
2452 && tool_diffs.is_empty()
2453 && observability_diffs.is_empty()
2454 && left.transitions.len() == right.transitions.len()
2455 && left.artifacts.len() == right.artifacts.len()
2456 && left.checkpoints.len() == right.checkpoints.len();
2457
2458 RunDiffReport {
2459 left_run_id: left.id.clone(),
2460 right_run_id: right.id.clone(),
2461 identical,
2462 status_changed,
2463 left_status: left.status.clone(),
2464 right_status: right.status.clone(),
2465 stage_diffs,
2466 tool_diffs,
2467 observability_diffs,
2468 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2469 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2470 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2471 }
2472}