Skip to main content

harn_vm/orchestration/
replay_oracle.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11    pub schema_version: String,
12    pub name: String,
13    pub description: Option<String>,
14    pub expect: ReplayExpectation,
15    pub protocol_fixture_refs: Vec<String>,
16    pub allowlist: Vec<ReplayAllowlistRule>,
17    pub first_run: ReplayTraceRun,
18    pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22    fn default() -> Self {
23        Self {
24            schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25            name: String::new(),
26            description: None,
27            expect: ReplayExpectation::Match,
28            protocol_fixture_refs: Vec::new(),
29            allowlist: Vec::new(),
30            first_run: ReplayTraceRun::default(),
31            second_run: ReplayTraceRun::default(),
32        }
33    }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39    #[default]
40    Match,
41    Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47    /// JSON-pointer-like path. `*` matches every array element or object value.
48    pub path: String,
49    pub reason: String,
50    pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56    pub run_id: String,
57    pub event_log_entries: Vec<JsonValue>,
58    pub trigger_firings: Vec<JsonValue>,
59    pub llm_interactions: Vec<JsonValue>,
60    pub protocol_interactions: Vec<JsonValue>,
61    pub approval_interactions: Vec<JsonValue>,
62    pub effect_receipts: Vec<JsonValue>,
63    pub persona_runtime_states: Vec<JsonValue>,
64    pub agent_transcript_deltas: Vec<JsonValue>,
65    pub final_artifacts: Vec<JsonValue>,
66    pub policy_decisions: Vec<JsonValue>,
67}
68
69impl ReplayTraceRun {
70    pub fn counts(&self) -> ReplayTraceRunCounts {
71        ReplayTraceRunCounts {
72            event_log_entries: self.event_log_entries.len(),
73            trigger_firings: self.trigger_firings.len(),
74            llm_interactions: self.llm_interactions.len(),
75            protocol_interactions: self.protocol_interactions.len(),
76            approval_interactions: self.approval_interactions.len(),
77            effect_receipts: self.effect_receipts.len(),
78            persona_runtime_states: self.persona_runtime_states.len(),
79            agent_transcript_deltas: self.agent_transcript_deltas.len(),
80            final_artifacts: self.final_artifacts.len(),
81            policy_decisions: self.policy_decisions.len(),
82        }
83    }
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayTraceRunCounts {
89    pub event_log_entries: usize,
90    pub trigger_firings: usize,
91    pub llm_interactions: usize,
92    pub protocol_interactions: usize,
93    pub approval_interactions: usize,
94    pub effect_receipts: usize,
95    pub persona_runtime_states: usize,
96    pub agent_transcript_deltas: usize,
97    pub final_artifacts: usize,
98    pub policy_decisions: usize,
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
102#[serde(default)]
103pub struct ReplayOracleReport {
104    pub name: String,
105    pub expectation: ReplayExpectation,
106    pub passed: bool,
107    pub first_run_counts: ReplayTraceRunCounts,
108    pub second_run_counts: ReplayTraceRunCounts,
109    pub protocol_fixture_refs: Vec<String>,
110    pub divergence: Option<ReplayDivergence>,
111}
112
113impl Default for ReplayOracleReport {
114    fn default() -> Self {
115        Self {
116            name: String::new(),
117            expectation: ReplayExpectation::Match,
118            passed: false,
119            first_run_counts: ReplayTraceRunCounts::default(),
120            second_run_counts: ReplayTraceRunCounts::default(),
121            protocol_fixture_refs: Vec::new(),
122            divergence: None,
123        }
124    }
125}
126
127#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub struct ReplayDivergence {
129    pub path: String,
130    pub left: JsonValue,
131    pub right: JsonValue,
132    pub message: String,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum ReplayOracleError {
137    InvalidTrace(String),
138    InvalidAllowlistPath(String),
139    AllowlistPathMissing(String),
140    Serialization(String),
141}
142
143impl fmt::Display for ReplayOracleError {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        match self {
146            Self::InvalidTrace(message)
147            | Self::InvalidAllowlistPath(message)
148            | Self::AllowlistPathMissing(message)
149            | Self::Serialization(message) => message.fmt(f),
150        }
151    }
152}
153
154impl std::error::Error for ReplayOracleError {}
155
156pub fn run_replay_oracle_trace(
157    trace: &ReplayOracleTrace,
158) -> Result<ReplayOracleReport, ReplayOracleError> {
159    validate_trace(trace)?;
160    let first_run_counts = trace.first_run.counts();
161    let second_run_counts = trace.second_run.counts();
162    let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
163    let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
164    let divergence = first_divergence(&first, &second);
165    let passed = match (trace.expect, divergence.is_some()) {
166        (ReplayExpectation::Match, false) => true,
167        (ReplayExpectation::Match, true) => false,
168        (ReplayExpectation::Drift, true) => true,
169        (ReplayExpectation::Drift, false) => false,
170    };
171
172    Ok(ReplayOracleReport {
173        name: trace.name.clone(),
174        expectation: trace.expect,
175        passed,
176        first_run_counts,
177        second_run_counts,
178        protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
179        divergence,
180    })
181}
182
183pub fn canonicalize_run(
184    run: &ReplayTraceRun,
185    allowlist: &[ReplayAllowlistRule],
186) -> Result<JsonValue, ReplayOracleError> {
187    let mut value = serde_json::to_value(run)
188        .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
189    for rule in allowlist {
190        apply_allowlist_rule(&mut value, rule)?;
191    }
192    Ok(value)
193}
194
195pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
196    first_divergence_at(left, right, String::new())
197}
198
199fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
200    if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
201        return Err(ReplayOracleError::InvalidTrace(format!(
202            "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
203            trace.schema_version
204        )));
205    }
206    if trace.name.trim().is_empty() {
207        return Err(ReplayOracleError::InvalidTrace(
208            "replay trace name cannot be empty".to_string(),
209        ));
210    }
211    if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
212        return Err(ReplayOracleError::InvalidTrace(format!(
213            "{} must include run ids for both replay executions",
214            trace.name
215        )));
216    }
217    if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
218        return Err(ReplayOracleError::InvalidTrace(format!(
219            "{} must include replay trace material for both executions",
220            trace.name
221        )));
222    }
223    for rule in &trace.allowlist {
224        if rule.path.trim().is_empty() {
225            return Err(ReplayOracleError::InvalidAllowlistPath(
226                "allowlist path cannot be empty".to_string(),
227            ));
228        }
229        if rule.reason.trim().is_empty() {
230            return Err(ReplayOracleError::InvalidAllowlistPath(format!(
231                "allowlist path {} must explain why the field is nondeterministic",
232                rule.path
233            )));
234        }
235        parse_pointer_path(&rule.path)?;
236    }
237    Ok(())
238}
239
240fn trace_material_count(run: &ReplayTraceRun) -> usize {
241    let counts = run.counts();
242    counts.event_log_entries
243        + counts.trigger_firings
244        + counts.llm_interactions
245        + counts.protocol_interactions
246        + counts.approval_interactions
247        + counts.effect_receipts
248        + counts.persona_runtime_states
249        + counts.agent_transcript_deltas
250        + counts.final_artifacts
251        + counts.policy_decisions
252}
253
254fn apply_allowlist_rule(
255    value: &mut JsonValue,
256    rule: &ReplayAllowlistRule,
257) -> Result<(), ReplayOracleError> {
258    let segments = parse_pointer_path(&rule.path)?;
259    let replacement = rule.replacement.clone().unwrap_or_else(|| {
260        json!({
261            "$harn_replay_allowlisted": rule.path,
262        })
263    });
264    let replaced = replace_matching_paths(value, &segments, &replacement);
265    if replaced == 0 {
266        return Err(ReplayOracleError::AllowlistPathMissing(format!(
267            "allowlist path {} did not match any replay field",
268            rule.path
269        )));
270    }
271    Ok(())
272}
273
274fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
275    if path == "/" {
276        return Err(ReplayOracleError::InvalidAllowlistPath(
277            "allowlist path cannot replace the whole run".to_string(),
278        ));
279    }
280    if !path.starts_with('/') {
281        return Err(ReplayOracleError::InvalidAllowlistPath(format!(
282            "allowlist path {path:?} must start with '/'"
283        )));
284    }
285    path.split('/')
286        .skip(1)
287        .map(|segment| {
288            if segment.is_empty() {
289                return Err(ReplayOracleError::InvalidAllowlistPath(format!(
290                    "allowlist path {path:?} contains an empty segment"
291                )));
292            }
293            Ok(segment.replace("~1", "/").replace("~0", "~"))
294        })
295        .collect()
296}
297
298fn replace_matching_paths(
299    value: &mut JsonValue,
300    segments: &[String],
301    replacement: &JsonValue,
302) -> usize {
303    if segments.is_empty() {
304        *value = replacement.clone();
305        return 1;
306    }
307
308    let head = segments[0].as_str();
309    let tail = &segments[1..];
310    if head == "*" {
311        return match value {
312            JsonValue::Array(items) => items
313                .iter_mut()
314                .map(|item| replace_matching_paths(item, tail, replacement))
315                .sum(),
316            JsonValue::Object(map) => map
317                .values_mut()
318                .map(|item| replace_matching_paths(item, tail, replacement))
319                .sum(),
320            _ => 0,
321        };
322    }
323
324    match value {
325        JsonValue::Object(map) => map
326            .get_mut(head)
327            .map(|child| replace_matching_paths(child, tail, replacement))
328            .unwrap_or(0),
329        JsonValue::Array(items) => head
330            .parse::<usize>()
331            .ok()
332            .and_then(|index| items.get_mut(index))
333            .map(|child| replace_matching_paths(child, tail, replacement))
334            .unwrap_or(0),
335        _ => 0,
336    }
337}
338
339fn first_divergence_at(
340    left: &JsonValue,
341    right: &JsonValue,
342    path: String,
343) -> Option<ReplayDivergence> {
344    if left == right {
345        return None;
346    }
347    match (left, right) {
348        (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
349            let keys = left_map
350                .keys()
351                .chain(right_map.keys())
352                .cloned()
353                .collect::<BTreeSet<_>>();
354            for key in keys {
355                let next_path = pointer_child(&path, &key);
356                match (left_map.get(&key), right_map.get(&key)) {
357                    (Some(left_child), Some(right_child)) => {
358                        if let Some(divergence) =
359                            first_divergence_at(left_child, right_child, next_path)
360                        {
361                            return Some(divergence);
362                        }
363                    }
364                    (Some(left_child), None) => {
365                        return Some(divergence(
366                            next_path,
367                            left_child.clone(),
368                            JsonValue::Null,
369                            "right run is missing this field",
370                        ));
371                    }
372                    (None, Some(right_child)) => {
373                        return Some(divergence(
374                            next_path,
375                            JsonValue::Null,
376                            right_child.clone(),
377                            "left run is missing this field",
378                        ));
379                    }
380                    (None, None) => {}
381                }
382            }
383            Some(divergence(
384                display_path(&path),
385                left.clone(),
386                right.clone(),
387                "objects differ",
388            ))
389        }
390        (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
391            for index in 0..left_items.len().max(right_items.len()) {
392                let next_path = pointer_child(&path, &index.to_string());
393                match (left_items.get(index), right_items.get(index)) {
394                    (Some(left_child), Some(right_child)) => {
395                        if let Some(divergence) =
396                            first_divergence_at(left_child, right_child, next_path)
397                        {
398                            return Some(divergence);
399                        }
400                    }
401                    (Some(left_child), None) => {
402                        return Some(divergence(
403                            next_path,
404                            left_child.clone(),
405                            JsonValue::Null,
406                            "right run is missing this array element",
407                        ));
408                    }
409                    (None, Some(right_child)) => {
410                        return Some(divergence(
411                            next_path,
412                            JsonValue::Null,
413                            right_child.clone(),
414                            "left run is missing this array element",
415                        ));
416                    }
417                    (None, None) => {}
418                }
419            }
420            Some(divergence(
421                display_path(&path),
422                left.clone(),
423                right.clone(),
424                "arrays differ",
425            ))
426        }
427        _ => Some(divergence(
428            display_path(&path),
429            left.clone(),
430            right.clone(),
431            "values differ",
432        )),
433    }
434}
435
436fn pointer_child(parent: &str, child: &str) -> String {
437    let escaped = child.replace('~', "~0").replace('/', "~1");
438    if parent.is_empty() {
439        format!("/{escaped}")
440    } else {
441        format!("{parent}/{escaped}")
442    }
443}
444
445fn display_path(path: &str) -> String {
446    if path.is_empty() {
447        "/".to_string()
448    } else {
449        path.to_string()
450    }
451}
452
453fn divergence(
454    path: String,
455    left: JsonValue,
456    right: JsonValue,
457    message: impl Into<String>,
458) -> ReplayDivergence {
459    ReplayDivergence {
460        path,
461        left,
462        right,
463        message: message.into(),
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    fn base_trace() -> ReplayOracleTrace {
472        ReplayOracleTrace {
473            name: "fixture".to_string(),
474            allowlist: vec![
475                ReplayAllowlistRule {
476                    path: "/run_id".to_string(),
477                    reason: "run ids are allocated per execution".to_string(),
478                    replacement: Some(JsonValue::String("<run-id>".to_string())),
479                },
480                ReplayAllowlistRule {
481                    path: "/event_log_entries/*/event_id".to_string(),
482                    reason: "event log offsets are backend-local".to_string(),
483                    replacement: Some(JsonValue::String("<event-id>".to_string())),
484                },
485                ReplayAllowlistRule {
486                    path: "/event_log_entries/*/occurred_at_ms".to_string(),
487                    reason: "append timestamps are wall-clock observations".to_string(),
488                    replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
489                },
490            ],
491            first_run: ReplayTraceRun {
492                run_id: "run-a".to_string(),
493                event_log_entries: vec![json!({
494                    "event_id": 10,
495                    "topic": "trigger.outbox",
496                    "kind": "dispatch_succeeded",
497                    "occurred_at_ms": 1000,
498                    "payload": {"binding_id": "demo", "status": "dispatched"}
499                })],
500                ..ReplayTraceRun::default()
501            },
502            second_run: ReplayTraceRun {
503                run_id: "run-b".to_string(),
504                event_log_entries: vec![json!({
505                    "event_id": 42,
506                    "topic": "trigger.outbox",
507                    "kind": "dispatch_succeeded",
508                    "occurred_at_ms": 2000,
509                    "payload": {"binding_id": "demo", "status": "dispatched"}
510                })],
511                ..ReplayTraceRun::default()
512            },
513            ..ReplayOracleTrace::default()
514        }
515    }
516
517    #[test]
518    fn canonical_comparison_allows_explicit_nondeterministic_fields() {
519        let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
520        assert!(report.passed, "{report:?}");
521        assert_eq!(report.divergence, None);
522    }
523
524    #[test]
525    fn persona_runtime_states_are_first_class_replay_material() {
526        let mut trace = base_trace();
527        trace.first_run.persona_runtime_states = vec![json!({
528            "name": "merge_captain",
529            "state": "idle",
530            "queued_work": [],
531            "handoff_inbox": [],
532            "budget": {"spent_today_usd": 0.01},
533        })];
534        trace.second_run.persona_runtime_states = trace.first_run.persona_runtime_states.clone();
535
536        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
537
538        assert!(report.passed, "{report:?}");
539        assert_eq!(report.first_run_counts.persona_runtime_states, 1);
540    }
541
542    #[test]
543    fn meaningful_drift_reports_first_divergent_path() {
544        let mut trace = base_trace();
545        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
546
547        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
548
549        assert!(!report.passed);
550        let divergence = report.divergence.expect("drift is reported");
551        assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
552        assert_eq!(divergence.left, json!("dispatched"));
553        assert_eq!(divergence.right, json!("dlq"));
554    }
555
556    #[test]
557    fn expected_drift_fixture_passes_only_when_drift_is_detected() {
558        let mut trace = base_trace();
559        trace.expect = ReplayExpectation::Drift;
560        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
561
562        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
563
564        assert!(report.passed);
565        assert!(report.divergence.is_some());
566    }
567
568    #[test]
569    fn allowlist_paths_must_match_real_fields() {
570        let mut trace = base_trace();
571        trace.allowlist.push(ReplayAllowlistRule {
572            path: "/llm_interactions/*/latency_ms".to_string(),
573            reason: "latency is nondeterministic".to_string(),
574            replacement: None,
575        });
576
577        let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
578
579        assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
580    }
581}