Skip to main content

harn_vm/orchestration/
replay_oracle.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11    pub schema_version: String,
12    pub name: String,
13    pub description: Option<String>,
14    pub expect: ReplayExpectation,
15    pub protocol_fixture_refs: Vec<String>,
16    pub allowlist: Vec<ReplayAllowlistRule>,
17    pub first_run: ReplayTraceRun,
18    pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22    fn default() -> Self {
23        Self {
24            schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25            name: String::new(),
26            description: None,
27            expect: ReplayExpectation::Match,
28            protocol_fixture_refs: Vec::new(),
29            allowlist: Vec::new(),
30            first_run: ReplayTraceRun::default(),
31            second_run: ReplayTraceRun::default(),
32        }
33    }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39    #[default]
40    Match,
41    Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47    /// JSON-pointer-like path. `*` matches every array element or object value.
48    pub path: String,
49    pub reason: String,
50    pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56    pub run_id: String,
57    pub event_log_entries: Vec<JsonValue>,
58    pub trigger_firings: Vec<JsonValue>,
59    pub llm_interactions: Vec<JsonValue>,
60    pub protocol_interactions: Vec<JsonValue>,
61    pub approval_interactions: Vec<JsonValue>,
62    pub effect_receipts: Vec<JsonValue>,
63    pub agent_transcript_deltas: Vec<JsonValue>,
64    pub final_artifacts: Vec<JsonValue>,
65    pub policy_decisions: Vec<JsonValue>,
66}
67
68impl ReplayTraceRun {
69    pub fn counts(&self) -> ReplayTraceRunCounts {
70        ReplayTraceRunCounts {
71            event_log_entries: self.event_log_entries.len(),
72            trigger_firings: self.trigger_firings.len(),
73            llm_interactions: self.llm_interactions.len(),
74            protocol_interactions: self.protocol_interactions.len(),
75            approval_interactions: self.approval_interactions.len(),
76            effect_receipts: self.effect_receipts.len(),
77            agent_transcript_deltas: self.agent_transcript_deltas.len(),
78            final_artifacts: self.final_artifacts.len(),
79            policy_decisions: self.policy_decisions.len(),
80        }
81    }
82}
83
84#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct ReplayTraceRunCounts {
87    pub event_log_entries: usize,
88    pub trigger_firings: usize,
89    pub llm_interactions: usize,
90    pub protocol_interactions: usize,
91    pub approval_interactions: usize,
92    pub effect_receipts: usize,
93    pub agent_transcript_deltas: usize,
94    pub final_artifacts: usize,
95    pub policy_decisions: usize,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
99#[serde(default)]
100pub struct ReplayOracleReport {
101    pub name: String,
102    pub expectation: ReplayExpectation,
103    pub passed: bool,
104    pub first_run_counts: ReplayTraceRunCounts,
105    pub second_run_counts: ReplayTraceRunCounts,
106    pub protocol_fixture_refs: Vec<String>,
107    pub divergence: Option<ReplayDivergence>,
108}
109
110impl Default for ReplayOracleReport {
111    fn default() -> Self {
112        Self {
113            name: String::new(),
114            expectation: ReplayExpectation::Match,
115            passed: false,
116            first_run_counts: ReplayTraceRunCounts::default(),
117            second_run_counts: ReplayTraceRunCounts::default(),
118            protocol_fixture_refs: Vec::new(),
119            divergence: None,
120        }
121    }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
125pub struct ReplayDivergence {
126    pub path: String,
127    pub left: JsonValue,
128    pub right: JsonValue,
129    pub message: String,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub enum ReplayOracleError {
134    InvalidTrace(String),
135    InvalidAllowlistPath(String),
136    AllowlistPathMissing(String),
137    Serialization(String),
138}
139
140impl fmt::Display for ReplayOracleError {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        match self {
143            Self::InvalidTrace(message)
144            | Self::InvalidAllowlistPath(message)
145            | Self::AllowlistPathMissing(message)
146            | Self::Serialization(message) => message.fmt(f),
147        }
148    }
149}
150
151impl std::error::Error for ReplayOracleError {}
152
153pub fn run_replay_oracle_trace(
154    trace: &ReplayOracleTrace,
155) -> Result<ReplayOracleReport, ReplayOracleError> {
156    validate_trace(trace)?;
157    let first_run_counts = trace.first_run.counts();
158    let second_run_counts = trace.second_run.counts();
159    let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
160    let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
161    let divergence = first_divergence(&first, &second);
162    let passed = match (trace.expect, divergence.is_some()) {
163        (ReplayExpectation::Match, false) => true,
164        (ReplayExpectation::Match, true) => false,
165        (ReplayExpectation::Drift, true) => true,
166        (ReplayExpectation::Drift, false) => false,
167    };
168
169    Ok(ReplayOracleReport {
170        name: trace.name.clone(),
171        expectation: trace.expect,
172        passed,
173        first_run_counts,
174        second_run_counts,
175        protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
176        divergence,
177    })
178}
179
180pub fn canonicalize_run(
181    run: &ReplayTraceRun,
182    allowlist: &[ReplayAllowlistRule],
183) -> Result<JsonValue, ReplayOracleError> {
184    let mut value = serde_json::to_value(run)
185        .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
186    for rule in allowlist {
187        apply_allowlist_rule(&mut value, rule)?;
188    }
189    Ok(value)
190}
191
192pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
193    first_divergence_at(left, right, String::new())
194}
195
196fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
197    if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
198        return Err(ReplayOracleError::InvalidTrace(format!(
199            "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
200            trace.schema_version
201        )));
202    }
203    if trace.name.trim().is_empty() {
204        return Err(ReplayOracleError::InvalidTrace(
205            "replay trace name cannot be empty".to_string(),
206        ));
207    }
208    if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
209        return Err(ReplayOracleError::InvalidTrace(format!(
210            "{} must include run ids for both replay executions",
211            trace.name
212        )));
213    }
214    if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
215        return Err(ReplayOracleError::InvalidTrace(format!(
216            "{} must include replay trace material for both executions",
217            trace.name
218        )));
219    }
220    for rule in &trace.allowlist {
221        if rule.path.trim().is_empty() {
222            return Err(ReplayOracleError::InvalidAllowlistPath(
223                "allowlist path cannot be empty".to_string(),
224            ));
225        }
226        if rule.reason.trim().is_empty() {
227            return Err(ReplayOracleError::InvalidAllowlistPath(format!(
228                "allowlist path {} must explain why the field is nondeterministic",
229                rule.path
230            )));
231        }
232        parse_pointer_path(&rule.path)?;
233    }
234    Ok(())
235}
236
237fn trace_material_count(run: &ReplayTraceRun) -> usize {
238    let counts = run.counts();
239    counts.event_log_entries
240        + counts.trigger_firings
241        + counts.llm_interactions
242        + counts.protocol_interactions
243        + counts.approval_interactions
244        + counts.effect_receipts
245        + counts.agent_transcript_deltas
246        + counts.final_artifacts
247        + counts.policy_decisions
248}
249
250fn apply_allowlist_rule(
251    value: &mut JsonValue,
252    rule: &ReplayAllowlistRule,
253) -> Result<(), ReplayOracleError> {
254    let segments = parse_pointer_path(&rule.path)?;
255    let replacement = rule.replacement.clone().unwrap_or_else(|| {
256        json!({
257            "$harn_replay_allowlisted": rule.path,
258        })
259    });
260    let replaced = replace_matching_paths(value, &segments, &replacement);
261    if replaced == 0 {
262        return Err(ReplayOracleError::AllowlistPathMissing(format!(
263            "allowlist path {} did not match any replay field",
264            rule.path
265        )));
266    }
267    Ok(())
268}
269
270fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
271    if path == "/" {
272        return Err(ReplayOracleError::InvalidAllowlistPath(
273            "allowlist path cannot replace the whole run".to_string(),
274        ));
275    }
276    if !path.starts_with('/') {
277        return Err(ReplayOracleError::InvalidAllowlistPath(format!(
278            "allowlist path {path:?} must start with '/'"
279        )));
280    }
281    path.split('/')
282        .skip(1)
283        .map(|segment| {
284            if segment.is_empty() {
285                return Err(ReplayOracleError::InvalidAllowlistPath(format!(
286                    "allowlist path {path:?} contains an empty segment"
287                )));
288            }
289            Ok(segment.replace("~1", "/").replace("~0", "~"))
290        })
291        .collect()
292}
293
294fn replace_matching_paths(
295    value: &mut JsonValue,
296    segments: &[String],
297    replacement: &JsonValue,
298) -> usize {
299    if segments.is_empty() {
300        *value = replacement.clone();
301        return 1;
302    }
303
304    let head = segments[0].as_str();
305    let tail = &segments[1..];
306    if head == "*" {
307        return match value {
308            JsonValue::Array(items) => items
309                .iter_mut()
310                .map(|item| replace_matching_paths(item, tail, replacement))
311                .sum(),
312            JsonValue::Object(map) => map
313                .values_mut()
314                .map(|item| replace_matching_paths(item, tail, replacement))
315                .sum(),
316            _ => 0,
317        };
318    }
319
320    match value {
321        JsonValue::Object(map) => map
322            .get_mut(head)
323            .map(|child| replace_matching_paths(child, tail, replacement))
324            .unwrap_or(0),
325        JsonValue::Array(items) => head
326            .parse::<usize>()
327            .ok()
328            .and_then(|index| items.get_mut(index))
329            .map(|child| replace_matching_paths(child, tail, replacement))
330            .unwrap_or(0),
331        _ => 0,
332    }
333}
334
335fn first_divergence_at(
336    left: &JsonValue,
337    right: &JsonValue,
338    path: String,
339) -> Option<ReplayDivergence> {
340    if left == right {
341        return None;
342    }
343    match (left, right) {
344        (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
345            let keys = left_map
346                .keys()
347                .chain(right_map.keys())
348                .cloned()
349                .collect::<BTreeSet<_>>();
350            for key in keys {
351                let next_path = pointer_child(&path, &key);
352                match (left_map.get(&key), right_map.get(&key)) {
353                    (Some(left_child), Some(right_child)) => {
354                        if let Some(divergence) =
355                            first_divergence_at(left_child, right_child, next_path)
356                        {
357                            return Some(divergence);
358                        }
359                    }
360                    (Some(left_child), None) => {
361                        return Some(divergence(
362                            next_path,
363                            left_child.clone(),
364                            JsonValue::Null,
365                            "right run is missing this field",
366                        ));
367                    }
368                    (None, Some(right_child)) => {
369                        return Some(divergence(
370                            next_path,
371                            JsonValue::Null,
372                            right_child.clone(),
373                            "left run is missing this field",
374                        ));
375                    }
376                    (None, None) => {}
377                }
378            }
379            Some(divergence(
380                display_path(&path),
381                left.clone(),
382                right.clone(),
383                "objects differ",
384            ))
385        }
386        (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
387            for index in 0..left_items.len().max(right_items.len()) {
388                let next_path = pointer_child(&path, &index.to_string());
389                match (left_items.get(index), right_items.get(index)) {
390                    (Some(left_child), Some(right_child)) => {
391                        if let Some(divergence) =
392                            first_divergence_at(left_child, right_child, next_path)
393                        {
394                            return Some(divergence);
395                        }
396                    }
397                    (Some(left_child), None) => {
398                        return Some(divergence(
399                            next_path,
400                            left_child.clone(),
401                            JsonValue::Null,
402                            "right run is missing this array element",
403                        ));
404                    }
405                    (None, Some(right_child)) => {
406                        return Some(divergence(
407                            next_path,
408                            JsonValue::Null,
409                            right_child.clone(),
410                            "left run is missing this array element",
411                        ));
412                    }
413                    (None, None) => {}
414                }
415            }
416            Some(divergence(
417                display_path(&path),
418                left.clone(),
419                right.clone(),
420                "arrays differ",
421            ))
422        }
423        _ => Some(divergence(
424            display_path(&path),
425            left.clone(),
426            right.clone(),
427            "values differ",
428        )),
429    }
430}
431
432fn pointer_child(parent: &str, child: &str) -> String {
433    let escaped = child.replace('~', "~0").replace('/', "~1");
434    if parent.is_empty() {
435        format!("/{escaped}")
436    } else {
437        format!("{parent}/{escaped}")
438    }
439}
440
441fn display_path(path: &str) -> String {
442    if path.is_empty() {
443        "/".to_string()
444    } else {
445        path.to_string()
446    }
447}
448
449fn divergence(
450    path: String,
451    left: JsonValue,
452    right: JsonValue,
453    message: impl Into<String>,
454) -> ReplayDivergence {
455    ReplayDivergence {
456        path,
457        left,
458        right,
459        message: message.into(),
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    fn base_trace() -> ReplayOracleTrace {
468        ReplayOracleTrace {
469            name: "fixture".to_string(),
470            allowlist: vec![
471                ReplayAllowlistRule {
472                    path: "/run_id".to_string(),
473                    reason: "run ids are allocated per execution".to_string(),
474                    replacement: Some(JsonValue::String("<run-id>".to_string())),
475                },
476                ReplayAllowlistRule {
477                    path: "/event_log_entries/*/event_id".to_string(),
478                    reason: "event log offsets are backend-local".to_string(),
479                    replacement: Some(JsonValue::String("<event-id>".to_string())),
480                },
481                ReplayAllowlistRule {
482                    path: "/event_log_entries/*/occurred_at_ms".to_string(),
483                    reason: "append timestamps are wall-clock observations".to_string(),
484                    replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
485                },
486            ],
487            first_run: ReplayTraceRun {
488                run_id: "run-a".to_string(),
489                event_log_entries: vec![json!({
490                    "event_id": 10,
491                    "topic": "trigger.outbox",
492                    "kind": "dispatch_succeeded",
493                    "occurred_at_ms": 1000,
494                    "payload": {"binding_id": "demo", "status": "dispatched"}
495                })],
496                ..ReplayTraceRun::default()
497            },
498            second_run: ReplayTraceRun {
499                run_id: "run-b".to_string(),
500                event_log_entries: vec![json!({
501                    "event_id": 42,
502                    "topic": "trigger.outbox",
503                    "kind": "dispatch_succeeded",
504                    "occurred_at_ms": 2000,
505                    "payload": {"binding_id": "demo", "status": "dispatched"}
506                })],
507                ..ReplayTraceRun::default()
508            },
509            ..ReplayOracleTrace::default()
510        }
511    }
512
513    #[test]
514    fn canonical_comparison_allows_explicit_nondeterministic_fields() {
515        let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
516        assert!(report.passed, "{report:?}");
517        assert_eq!(report.divergence, None);
518    }
519
520    #[test]
521    fn meaningful_drift_reports_first_divergent_path() {
522        let mut trace = base_trace();
523        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
524
525        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
526
527        assert!(!report.passed);
528        let divergence = report.divergence.expect("drift is reported");
529        assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
530        assert_eq!(divergence.left, json!("dispatched"));
531        assert_eq!(divergence.right, json!("dlq"));
532    }
533
534    #[test]
535    fn expected_drift_fixture_passes_only_when_drift_is_detected() {
536        let mut trace = base_trace();
537        trace.expect = ReplayExpectation::Drift;
538        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
539
540        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
541
542        assert!(report.passed);
543        assert!(report.divergence.is_some());
544    }
545
546    #[test]
547    fn allowlist_paths_must_match_real_fields() {
548        let mut trace = base_trace();
549        trace.allowlist.push(ReplayAllowlistRule {
550            path: "/llm_interactions/*/latency_ms".to_string(),
551            reason: "latency is nondeterministic".to_string(),
552            replacement: None,
553        });
554
555        let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
556
557        assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
558    }
559}