1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11 pub schema_version: String,
12 pub name: String,
13 pub description: Option<String>,
14 pub expect: ReplayExpectation,
15 pub protocol_fixture_refs: Vec<String>,
16 pub allowlist: Vec<ReplayAllowlistRule>,
17 pub first_run: ReplayTraceRun,
18 pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22 fn default() -> Self {
23 Self {
24 schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25 name: String::new(),
26 description: None,
27 expect: ReplayExpectation::Match,
28 protocol_fixture_refs: Vec::new(),
29 allowlist: Vec::new(),
30 first_run: ReplayTraceRun::default(),
31 second_run: ReplayTraceRun::default(),
32 }
33 }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39 #[default]
40 Match,
41 Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47 pub path: String,
49 pub reason: String,
50 pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56 pub run_id: String,
57 pub event_log_entries: Vec<JsonValue>,
58 pub trigger_firings: Vec<JsonValue>,
59 pub llm_interactions: Vec<JsonValue>,
60 pub protocol_interactions: Vec<JsonValue>,
61 pub approval_interactions: Vec<JsonValue>,
62 pub effect_receipts: Vec<JsonValue>,
63 pub persona_runtime_states: Vec<JsonValue>,
64 pub agent_transcript_deltas: Vec<JsonValue>,
65 pub final_artifacts: Vec<JsonValue>,
66 pub policy_decisions: Vec<JsonValue>,
67}
68
69impl ReplayTraceRun {
70 pub fn counts(&self) -> ReplayTraceRunCounts {
71 ReplayTraceRunCounts {
72 event_log_entries: self.event_log_entries.len(),
73 trigger_firings: self.trigger_firings.len(),
74 llm_interactions: self.llm_interactions.len(),
75 protocol_interactions: self.protocol_interactions.len(),
76 approval_interactions: self.approval_interactions.len(),
77 effect_receipts: self.effect_receipts.len(),
78 persona_runtime_states: self.persona_runtime_states.len(),
79 agent_transcript_deltas: self.agent_transcript_deltas.len(),
80 final_artifacts: self.final_artifacts.len(),
81 policy_decisions: self.policy_decisions.len(),
82 }
83 }
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayTraceRunCounts {
89 pub event_log_entries: usize,
90 pub trigger_firings: usize,
91 pub llm_interactions: usize,
92 pub protocol_interactions: usize,
93 pub approval_interactions: usize,
94 pub effect_receipts: usize,
95 pub persona_runtime_states: usize,
96 pub agent_transcript_deltas: usize,
97 pub final_artifacts: usize,
98 pub policy_decisions: usize,
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
102#[serde(default)]
103pub struct ReplayOracleReport {
104 pub name: String,
105 pub expectation: ReplayExpectation,
106 pub passed: bool,
107 pub first_run_counts: ReplayTraceRunCounts,
108 pub second_run_counts: ReplayTraceRunCounts,
109 pub protocol_fixture_refs: Vec<String>,
110 pub divergence: Option<ReplayDivergence>,
111}
112
113impl Default for ReplayOracleReport {
114 fn default() -> Self {
115 Self {
116 name: String::new(),
117 expectation: ReplayExpectation::Match,
118 passed: false,
119 first_run_counts: ReplayTraceRunCounts::default(),
120 second_run_counts: ReplayTraceRunCounts::default(),
121 protocol_fixture_refs: Vec::new(),
122 divergence: None,
123 }
124 }
125}
126
127#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub struct ReplayDivergence {
129 pub path: String,
130 pub left: JsonValue,
131 pub right: JsonValue,
132 pub message: String,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum ReplayOracleError {
137 InvalidTrace(String),
138 InvalidAllowlistPath(String),
139 AllowlistPathMissing(String),
140 Serialization(String),
141}
142
143impl fmt::Display for ReplayOracleError {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 match self {
146 Self::InvalidTrace(message)
147 | Self::InvalidAllowlistPath(message)
148 | Self::AllowlistPathMissing(message)
149 | Self::Serialization(message) => message.fmt(f),
150 }
151 }
152}
153
154impl std::error::Error for ReplayOracleError {}
155
156pub fn run_replay_oracle_trace(
157 trace: &ReplayOracleTrace,
158) -> Result<ReplayOracleReport, ReplayOracleError> {
159 validate_trace(trace)?;
160 let first_run_counts = trace.first_run.counts();
161 let second_run_counts = trace.second_run.counts();
162 let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
163 let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
164 let divergence = first_divergence(&first, &second);
165 let passed = match (trace.expect, divergence.is_some()) {
166 (ReplayExpectation::Match, false) => true,
167 (ReplayExpectation::Match, true) => false,
168 (ReplayExpectation::Drift, true) => true,
169 (ReplayExpectation::Drift, false) => false,
170 };
171
172 Ok(ReplayOracleReport {
173 name: trace.name.clone(),
174 expectation: trace.expect,
175 passed,
176 first_run_counts,
177 second_run_counts,
178 protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
179 divergence,
180 })
181}
182
183pub fn canonicalize_run(
184 run: &ReplayTraceRun,
185 allowlist: &[ReplayAllowlistRule],
186) -> Result<JsonValue, ReplayOracleError> {
187 let mut value = serde_json::to_value(run)
188 .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
189 for rule in allowlist {
190 apply_allowlist_rule(&mut value, rule)?;
191 }
192 Ok(value)
193}
194
195pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
196 first_divergence_at(left, right, String::new())
197}
198
199fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
200 if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
201 return Err(ReplayOracleError::InvalidTrace(format!(
202 "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
203 trace.schema_version
204 )));
205 }
206 if trace.name.trim().is_empty() {
207 return Err(ReplayOracleError::InvalidTrace(
208 "replay trace name cannot be empty".to_string(),
209 ));
210 }
211 if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
212 return Err(ReplayOracleError::InvalidTrace(format!(
213 "{} must include run ids for both replay executions",
214 trace.name
215 )));
216 }
217 if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
218 return Err(ReplayOracleError::InvalidTrace(format!(
219 "{} must include replay trace material for both executions",
220 trace.name
221 )));
222 }
223 for rule in &trace.allowlist {
224 if rule.path.trim().is_empty() {
225 return Err(ReplayOracleError::InvalidAllowlistPath(
226 "allowlist path cannot be empty".to_string(),
227 ));
228 }
229 if rule.reason.trim().is_empty() {
230 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
231 "allowlist path {} must explain why the field is nondeterministic",
232 rule.path
233 )));
234 }
235 parse_pointer_path(&rule.path)?;
236 }
237 Ok(())
238}
239
240fn trace_material_count(run: &ReplayTraceRun) -> usize {
241 let counts = run.counts();
242 counts.event_log_entries
243 + counts.trigger_firings
244 + counts.llm_interactions
245 + counts.protocol_interactions
246 + counts.approval_interactions
247 + counts.effect_receipts
248 + counts.persona_runtime_states
249 + counts.agent_transcript_deltas
250 + counts.final_artifacts
251 + counts.policy_decisions
252}
253
254fn apply_allowlist_rule(
255 value: &mut JsonValue,
256 rule: &ReplayAllowlistRule,
257) -> Result<(), ReplayOracleError> {
258 let segments = parse_pointer_path(&rule.path)?;
259 let replacement = rule.replacement.clone().unwrap_or_else(|| {
260 json!({
261 "$harn_replay_allowlisted": rule.path,
262 })
263 });
264 let replaced = replace_matching_paths(value, &segments, &replacement);
265 if replaced == 0 {
266 return Err(ReplayOracleError::AllowlistPathMissing(format!(
267 "allowlist path {} did not match any replay field",
268 rule.path
269 )));
270 }
271 Ok(())
272}
273
274fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
275 if path == "/" {
276 return Err(ReplayOracleError::InvalidAllowlistPath(
277 "allowlist path cannot replace the whole run".to_string(),
278 ));
279 }
280 if !path.starts_with('/') {
281 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
282 "allowlist path {path:?} must start with '/'"
283 )));
284 }
285 path.split('/')
286 .skip(1)
287 .map(|segment| {
288 if segment.is_empty() {
289 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
290 "allowlist path {path:?} contains an empty segment"
291 )));
292 }
293 Ok(segment.replace("~1", "/").replace("~0", "~"))
294 })
295 .collect()
296}
297
298fn replace_matching_paths(
299 value: &mut JsonValue,
300 segments: &[String],
301 replacement: &JsonValue,
302) -> usize {
303 if segments.is_empty() {
304 *value = replacement.clone();
305 return 1;
306 }
307
308 let head = segments[0].as_str();
309 let tail = &segments[1..];
310 if head == "*" {
311 return match value {
312 JsonValue::Array(items) => items
313 .iter_mut()
314 .map(|item| replace_matching_paths(item, tail, replacement))
315 .sum(),
316 JsonValue::Object(map) => map
317 .values_mut()
318 .map(|item| replace_matching_paths(item, tail, replacement))
319 .sum(),
320 _ => 0,
321 };
322 }
323
324 match value {
325 JsonValue::Object(map) => map
326 .get_mut(head)
327 .map(|child| replace_matching_paths(child, tail, replacement))
328 .unwrap_or(0),
329 JsonValue::Array(items) => head
330 .parse::<usize>()
331 .ok()
332 .and_then(|index| items.get_mut(index))
333 .map(|child| replace_matching_paths(child, tail, replacement))
334 .unwrap_or(0),
335 _ => 0,
336 }
337}
338
339fn first_divergence_at(
340 left: &JsonValue,
341 right: &JsonValue,
342 path: String,
343) -> Option<ReplayDivergence> {
344 if left == right {
345 return None;
346 }
347 match (left, right) {
348 (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
349 let keys = left_map
350 .keys()
351 .chain(right_map.keys())
352 .cloned()
353 .collect::<BTreeSet<_>>();
354 for key in keys {
355 let next_path = pointer_child(&path, &key);
356 match (left_map.get(&key), right_map.get(&key)) {
357 (Some(left_child), Some(right_child)) => {
358 if let Some(divergence) =
359 first_divergence_at(left_child, right_child, next_path)
360 {
361 return Some(divergence);
362 }
363 }
364 (Some(left_child), None) => {
365 return Some(divergence(
366 next_path,
367 left_child.clone(),
368 JsonValue::Null,
369 "right run is missing this field",
370 ));
371 }
372 (None, Some(right_child)) => {
373 return Some(divergence(
374 next_path,
375 JsonValue::Null,
376 right_child.clone(),
377 "left run is missing this field",
378 ));
379 }
380 (None, None) => {}
381 }
382 }
383 Some(divergence(
384 display_path(&path),
385 left.clone(),
386 right.clone(),
387 "objects differ",
388 ))
389 }
390 (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
391 for index in 0..left_items.len().max(right_items.len()) {
392 let next_path = pointer_child(&path, &index.to_string());
393 match (left_items.get(index), right_items.get(index)) {
394 (Some(left_child), Some(right_child)) => {
395 if let Some(divergence) =
396 first_divergence_at(left_child, right_child, next_path)
397 {
398 return Some(divergence);
399 }
400 }
401 (Some(left_child), None) => {
402 return Some(divergence(
403 next_path,
404 left_child.clone(),
405 JsonValue::Null,
406 "right run is missing this array element",
407 ));
408 }
409 (None, Some(right_child)) => {
410 return Some(divergence(
411 next_path,
412 JsonValue::Null,
413 right_child.clone(),
414 "left run is missing this array element",
415 ));
416 }
417 (None, None) => {}
418 }
419 }
420 Some(divergence(
421 display_path(&path),
422 left.clone(),
423 right.clone(),
424 "arrays differ",
425 ))
426 }
427 _ => Some(divergence(
428 display_path(&path),
429 left.clone(),
430 right.clone(),
431 "values differ",
432 )),
433 }
434}
435
436fn pointer_child(parent: &str, child: &str) -> String {
437 let escaped = child.replace('~', "~0").replace('/', "~1");
438 if parent.is_empty() {
439 format!("/{escaped}")
440 } else {
441 format!("{parent}/{escaped}")
442 }
443}
444
445fn display_path(path: &str) -> String {
446 if path.is_empty() {
447 "/".to_string()
448 } else {
449 path.to_string()
450 }
451}
452
453fn divergence(
454 path: String,
455 left: JsonValue,
456 right: JsonValue,
457 message: impl Into<String>,
458) -> ReplayDivergence {
459 ReplayDivergence {
460 path,
461 left,
462 right,
463 message: message.into(),
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 fn base_trace() -> ReplayOracleTrace {
472 ReplayOracleTrace {
473 name: "fixture".to_string(),
474 allowlist: vec![
475 ReplayAllowlistRule {
476 path: "/run_id".to_string(),
477 reason: "run ids are allocated per execution".to_string(),
478 replacement: Some(JsonValue::String("<run-id>".to_string())),
479 },
480 ReplayAllowlistRule {
481 path: "/event_log_entries/*/event_id".to_string(),
482 reason: "event log offsets are backend-local".to_string(),
483 replacement: Some(JsonValue::String("<event-id>".to_string())),
484 },
485 ReplayAllowlistRule {
486 path: "/event_log_entries/*/occurred_at_ms".to_string(),
487 reason: "append timestamps are wall-clock observations".to_string(),
488 replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
489 },
490 ],
491 first_run: ReplayTraceRun {
492 run_id: "run-a".to_string(),
493 event_log_entries: vec![json!({
494 "event_id": 10,
495 "topic": "trigger.outbox",
496 "kind": "dispatch_succeeded",
497 "occurred_at_ms": 1000,
498 "payload": {"binding_id": "demo", "status": "dispatched"}
499 })],
500 ..ReplayTraceRun::default()
501 },
502 second_run: ReplayTraceRun {
503 run_id: "run-b".to_string(),
504 event_log_entries: vec![json!({
505 "event_id": 42,
506 "topic": "trigger.outbox",
507 "kind": "dispatch_succeeded",
508 "occurred_at_ms": 2000,
509 "payload": {"binding_id": "demo", "status": "dispatched"}
510 })],
511 ..ReplayTraceRun::default()
512 },
513 ..ReplayOracleTrace::default()
514 }
515 }
516
517 #[test]
518 fn canonical_comparison_allows_explicit_nondeterministic_fields() {
519 let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
520 assert!(report.passed, "{report:?}");
521 assert_eq!(report.divergence, None);
522 }
523
524 #[test]
525 fn persona_runtime_states_are_first_class_replay_material() {
526 let mut trace = base_trace();
527 trace.first_run.persona_runtime_states = vec![json!({
528 "name": "merge_captain",
529 "state": "idle",
530 "queued_work": [],
531 "handoff_inbox": [],
532 "budget": {"spent_today_usd": 0.01},
533 })];
534 trace.second_run.persona_runtime_states = trace.first_run.persona_runtime_states.clone();
535
536 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
537
538 assert!(report.passed, "{report:?}");
539 assert_eq!(report.first_run_counts.persona_runtime_states, 1);
540 }
541
542 #[test]
543 fn meaningful_drift_reports_first_divergent_path() {
544 let mut trace = base_trace();
545 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
546
547 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
548
549 assert!(!report.passed);
550 let divergence = report.divergence.expect("drift is reported");
551 assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
552 assert_eq!(divergence.left, json!("dispatched"));
553 assert_eq!(divergence.right, json!("dlq"));
554 }
555
556 #[test]
557 fn expected_drift_fixture_passes_only_when_drift_is_detected() {
558 let mut trace = base_trace();
559 trace.expect = ReplayExpectation::Drift;
560 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
561
562 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
563
564 assert!(report.passed);
565 assert!(report.divergence.is_some());
566 }
567
568 #[test]
569 fn allowlist_paths_must_match_real_fields() {
570 let mut trace = base_trace();
571 trace.allowlist.push(ReplayAllowlistRule {
572 path: "/llm_interactions/*/latency_ms".to_string(),
573 reason: "latency is nondeterministic".to_string(),
574 replacement: None,
575 });
576
577 let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
578
579 assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
580 }
581}