1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11 pub schema_version: String,
12 pub name: String,
13 pub description: Option<String>,
14 pub expect: ReplayExpectation,
15 pub protocol_fixture_refs: Vec<String>,
16 pub allowlist: Vec<ReplayAllowlistRule>,
17 pub first_run: ReplayTraceRun,
18 pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22 fn default() -> Self {
23 Self {
24 schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25 name: String::new(),
26 description: None,
27 expect: ReplayExpectation::Match,
28 protocol_fixture_refs: Vec::new(),
29 allowlist: Vec::new(),
30 first_run: ReplayTraceRun::default(),
31 second_run: ReplayTraceRun::default(),
32 }
33 }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39 #[default]
40 Match,
41 Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47 pub path: String,
49 pub reason: String,
50 pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56 pub run_id: String,
57 pub event_log_entries: Vec<JsonValue>,
58 pub trigger_firings: Vec<JsonValue>,
59 pub llm_interactions: Vec<JsonValue>,
60 pub protocol_interactions: Vec<JsonValue>,
61 pub approval_interactions: Vec<JsonValue>,
62 pub effect_receipts: Vec<JsonValue>,
63 pub agent_transcript_deltas: Vec<JsonValue>,
64 pub final_artifacts: Vec<JsonValue>,
65 pub policy_decisions: Vec<JsonValue>,
66}
67
68impl ReplayTraceRun {
69 pub fn counts(&self) -> ReplayTraceRunCounts {
70 ReplayTraceRunCounts {
71 event_log_entries: self.event_log_entries.len(),
72 trigger_firings: self.trigger_firings.len(),
73 llm_interactions: self.llm_interactions.len(),
74 protocol_interactions: self.protocol_interactions.len(),
75 approval_interactions: self.approval_interactions.len(),
76 effect_receipts: self.effect_receipts.len(),
77 agent_transcript_deltas: self.agent_transcript_deltas.len(),
78 final_artifacts: self.final_artifacts.len(),
79 policy_decisions: self.policy_decisions.len(),
80 }
81 }
82}
83
84#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct ReplayTraceRunCounts {
87 pub event_log_entries: usize,
88 pub trigger_firings: usize,
89 pub llm_interactions: usize,
90 pub protocol_interactions: usize,
91 pub approval_interactions: usize,
92 pub effect_receipts: usize,
93 pub agent_transcript_deltas: usize,
94 pub final_artifacts: usize,
95 pub policy_decisions: usize,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
99#[serde(default)]
100pub struct ReplayOracleReport {
101 pub name: String,
102 pub expectation: ReplayExpectation,
103 pub passed: bool,
104 pub first_run_counts: ReplayTraceRunCounts,
105 pub second_run_counts: ReplayTraceRunCounts,
106 pub protocol_fixture_refs: Vec<String>,
107 pub divergence: Option<ReplayDivergence>,
108}
109
110impl Default for ReplayOracleReport {
111 fn default() -> Self {
112 Self {
113 name: String::new(),
114 expectation: ReplayExpectation::Match,
115 passed: false,
116 first_run_counts: ReplayTraceRunCounts::default(),
117 second_run_counts: ReplayTraceRunCounts::default(),
118 protocol_fixture_refs: Vec::new(),
119 divergence: None,
120 }
121 }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
125pub struct ReplayDivergence {
126 pub path: String,
127 pub left: JsonValue,
128 pub right: JsonValue,
129 pub message: String,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub enum ReplayOracleError {
134 InvalidTrace(String),
135 InvalidAllowlistPath(String),
136 AllowlistPathMissing(String),
137 Serialization(String),
138}
139
140impl fmt::Display for ReplayOracleError {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 Self::InvalidTrace(message)
144 | Self::InvalidAllowlistPath(message)
145 | Self::AllowlistPathMissing(message)
146 | Self::Serialization(message) => message.fmt(f),
147 }
148 }
149}
150
151impl std::error::Error for ReplayOracleError {}
152
153pub fn run_replay_oracle_trace(
154 trace: &ReplayOracleTrace,
155) -> Result<ReplayOracleReport, ReplayOracleError> {
156 validate_trace(trace)?;
157 let first_run_counts = trace.first_run.counts();
158 let second_run_counts = trace.second_run.counts();
159 let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
160 let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
161 let divergence = first_divergence(&first, &second);
162 let passed = match (trace.expect, divergence.is_some()) {
163 (ReplayExpectation::Match, false) => true,
164 (ReplayExpectation::Match, true) => false,
165 (ReplayExpectation::Drift, true) => true,
166 (ReplayExpectation::Drift, false) => false,
167 };
168
169 Ok(ReplayOracleReport {
170 name: trace.name.clone(),
171 expectation: trace.expect,
172 passed,
173 first_run_counts,
174 second_run_counts,
175 protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
176 divergence,
177 })
178}
179
180pub fn canonicalize_run(
181 run: &ReplayTraceRun,
182 allowlist: &[ReplayAllowlistRule],
183) -> Result<JsonValue, ReplayOracleError> {
184 let mut value = serde_json::to_value(run)
185 .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
186 for rule in allowlist {
187 apply_allowlist_rule(&mut value, rule)?;
188 }
189 Ok(value)
190}
191
192pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
193 first_divergence_at(left, right, String::new())
194}
195
196fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
197 if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
198 return Err(ReplayOracleError::InvalidTrace(format!(
199 "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
200 trace.schema_version
201 )));
202 }
203 if trace.name.trim().is_empty() {
204 return Err(ReplayOracleError::InvalidTrace(
205 "replay trace name cannot be empty".to_string(),
206 ));
207 }
208 if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
209 return Err(ReplayOracleError::InvalidTrace(format!(
210 "{} must include run ids for both replay executions",
211 trace.name
212 )));
213 }
214 if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
215 return Err(ReplayOracleError::InvalidTrace(format!(
216 "{} must include replay trace material for both executions",
217 trace.name
218 )));
219 }
220 for rule in &trace.allowlist {
221 if rule.path.trim().is_empty() {
222 return Err(ReplayOracleError::InvalidAllowlistPath(
223 "allowlist path cannot be empty".to_string(),
224 ));
225 }
226 if rule.reason.trim().is_empty() {
227 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
228 "allowlist path {} must explain why the field is nondeterministic",
229 rule.path
230 )));
231 }
232 parse_pointer_path(&rule.path)?;
233 }
234 Ok(())
235}
236
237fn trace_material_count(run: &ReplayTraceRun) -> usize {
238 let counts = run.counts();
239 counts.event_log_entries
240 + counts.trigger_firings
241 + counts.llm_interactions
242 + counts.protocol_interactions
243 + counts.approval_interactions
244 + counts.effect_receipts
245 + counts.agent_transcript_deltas
246 + counts.final_artifacts
247 + counts.policy_decisions
248}
249
250fn apply_allowlist_rule(
251 value: &mut JsonValue,
252 rule: &ReplayAllowlistRule,
253) -> Result<(), ReplayOracleError> {
254 let segments = parse_pointer_path(&rule.path)?;
255 let replacement = rule.replacement.clone().unwrap_or_else(|| {
256 json!({
257 "$harn_replay_allowlisted": rule.path,
258 })
259 });
260 let replaced = replace_matching_paths(value, &segments, &replacement);
261 if replaced == 0 {
262 return Err(ReplayOracleError::AllowlistPathMissing(format!(
263 "allowlist path {} did not match any replay field",
264 rule.path
265 )));
266 }
267 Ok(())
268}
269
270fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
271 if path == "/" {
272 return Err(ReplayOracleError::InvalidAllowlistPath(
273 "allowlist path cannot replace the whole run".to_string(),
274 ));
275 }
276 if !path.starts_with('/') {
277 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
278 "allowlist path {path:?} must start with '/'"
279 )));
280 }
281 path.split('/')
282 .skip(1)
283 .map(|segment| {
284 if segment.is_empty() {
285 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
286 "allowlist path {path:?} contains an empty segment"
287 )));
288 }
289 Ok(segment.replace("~1", "/").replace("~0", "~"))
290 })
291 .collect()
292}
293
294fn replace_matching_paths(
295 value: &mut JsonValue,
296 segments: &[String],
297 replacement: &JsonValue,
298) -> usize {
299 if segments.is_empty() {
300 *value = replacement.clone();
301 return 1;
302 }
303
304 let head = segments[0].as_str();
305 let tail = &segments[1..];
306 if head == "*" {
307 return match value {
308 JsonValue::Array(items) => items
309 .iter_mut()
310 .map(|item| replace_matching_paths(item, tail, replacement))
311 .sum(),
312 JsonValue::Object(map) => map
313 .values_mut()
314 .map(|item| replace_matching_paths(item, tail, replacement))
315 .sum(),
316 _ => 0,
317 };
318 }
319
320 match value {
321 JsonValue::Object(map) => map
322 .get_mut(head)
323 .map(|child| replace_matching_paths(child, tail, replacement))
324 .unwrap_or(0),
325 JsonValue::Array(items) => head
326 .parse::<usize>()
327 .ok()
328 .and_then(|index| items.get_mut(index))
329 .map(|child| replace_matching_paths(child, tail, replacement))
330 .unwrap_or(0),
331 _ => 0,
332 }
333}
334
335fn first_divergence_at(
336 left: &JsonValue,
337 right: &JsonValue,
338 path: String,
339) -> Option<ReplayDivergence> {
340 if left == right {
341 return None;
342 }
343 match (left, right) {
344 (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
345 let keys = left_map
346 .keys()
347 .chain(right_map.keys())
348 .cloned()
349 .collect::<BTreeSet<_>>();
350 for key in keys {
351 let next_path = pointer_child(&path, &key);
352 match (left_map.get(&key), right_map.get(&key)) {
353 (Some(left_child), Some(right_child)) => {
354 if let Some(divergence) =
355 first_divergence_at(left_child, right_child, next_path)
356 {
357 return Some(divergence);
358 }
359 }
360 (Some(left_child), None) => {
361 return Some(divergence(
362 next_path,
363 left_child.clone(),
364 JsonValue::Null,
365 "right run is missing this field",
366 ));
367 }
368 (None, Some(right_child)) => {
369 return Some(divergence(
370 next_path,
371 JsonValue::Null,
372 right_child.clone(),
373 "left run is missing this field",
374 ));
375 }
376 (None, None) => {}
377 }
378 }
379 Some(divergence(
380 display_path(&path),
381 left.clone(),
382 right.clone(),
383 "objects differ",
384 ))
385 }
386 (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
387 for index in 0..left_items.len().max(right_items.len()) {
388 let next_path = pointer_child(&path, &index.to_string());
389 match (left_items.get(index), right_items.get(index)) {
390 (Some(left_child), Some(right_child)) => {
391 if let Some(divergence) =
392 first_divergence_at(left_child, right_child, next_path)
393 {
394 return Some(divergence);
395 }
396 }
397 (Some(left_child), None) => {
398 return Some(divergence(
399 next_path,
400 left_child.clone(),
401 JsonValue::Null,
402 "right run is missing this array element",
403 ));
404 }
405 (None, Some(right_child)) => {
406 return Some(divergence(
407 next_path,
408 JsonValue::Null,
409 right_child.clone(),
410 "left run is missing this array element",
411 ));
412 }
413 (None, None) => {}
414 }
415 }
416 Some(divergence(
417 display_path(&path),
418 left.clone(),
419 right.clone(),
420 "arrays differ",
421 ))
422 }
423 _ => Some(divergence(
424 display_path(&path),
425 left.clone(),
426 right.clone(),
427 "values differ",
428 )),
429 }
430}
431
432fn pointer_child(parent: &str, child: &str) -> String {
433 let escaped = child.replace('~', "~0").replace('/', "~1");
434 if parent.is_empty() {
435 format!("/{escaped}")
436 } else {
437 format!("{parent}/{escaped}")
438 }
439}
440
441fn display_path(path: &str) -> String {
442 if path.is_empty() {
443 "/".to_string()
444 } else {
445 path.to_string()
446 }
447}
448
449fn divergence(
450 path: String,
451 left: JsonValue,
452 right: JsonValue,
453 message: impl Into<String>,
454) -> ReplayDivergence {
455 ReplayDivergence {
456 path,
457 left,
458 right,
459 message: message.into(),
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 fn base_trace() -> ReplayOracleTrace {
468 ReplayOracleTrace {
469 name: "fixture".to_string(),
470 allowlist: vec![
471 ReplayAllowlistRule {
472 path: "/run_id".to_string(),
473 reason: "run ids are allocated per execution".to_string(),
474 replacement: Some(JsonValue::String("<run-id>".to_string())),
475 },
476 ReplayAllowlistRule {
477 path: "/event_log_entries/*/event_id".to_string(),
478 reason: "event log offsets are backend-local".to_string(),
479 replacement: Some(JsonValue::String("<event-id>".to_string())),
480 },
481 ReplayAllowlistRule {
482 path: "/event_log_entries/*/occurred_at_ms".to_string(),
483 reason: "append timestamps are wall-clock observations".to_string(),
484 replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
485 },
486 ],
487 first_run: ReplayTraceRun {
488 run_id: "run-a".to_string(),
489 event_log_entries: vec![json!({
490 "event_id": 10,
491 "topic": "trigger.outbox",
492 "kind": "dispatch_succeeded",
493 "occurred_at_ms": 1000,
494 "payload": {"binding_id": "demo", "status": "dispatched"}
495 })],
496 ..ReplayTraceRun::default()
497 },
498 second_run: ReplayTraceRun {
499 run_id: "run-b".to_string(),
500 event_log_entries: vec![json!({
501 "event_id": 42,
502 "topic": "trigger.outbox",
503 "kind": "dispatch_succeeded",
504 "occurred_at_ms": 2000,
505 "payload": {"binding_id": "demo", "status": "dispatched"}
506 })],
507 ..ReplayTraceRun::default()
508 },
509 ..ReplayOracleTrace::default()
510 }
511 }
512
513 #[test]
514 fn canonical_comparison_allows_explicit_nondeterministic_fields() {
515 let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
516 assert!(report.passed, "{report:?}");
517 assert_eq!(report.divergence, None);
518 }
519
520 #[test]
521 fn meaningful_drift_reports_first_divergent_path() {
522 let mut trace = base_trace();
523 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
524
525 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
526
527 assert!(!report.passed);
528 let divergence = report.divergence.expect("drift is reported");
529 assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
530 assert_eq!(divergence.left, json!("dispatched"));
531 assert_eq!(divergence.right, json!("dlq"));
532 }
533
534 #[test]
535 fn expected_drift_fixture_passes_only_when_drift_is_detected() {
536 let mut trace = base_trace();
537 trace.expect = ReplayExpectation::Drift;
538 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
539
540 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
541
542 assert!(report.passed);
543 assert!(report.divergence.is_some());
544 }
545
546 #[test]
547 fn allowlist_paths_must_match_real_fields() {
548 let mut trace = base_trace();
549 trace.allowlist.push(ReplayAllowlistRule {
550 path: "/llm_interactions/*/latency_ms".to_string(),
551 reason: "latency is nondeterministic".to_string(),
552 replacement: None,
553 });
554
555 let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
556
557 assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
558 }
559}