1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11 pub schema_version: String,
12 pub name: String,
13 pub description: Option<String>,
14 pub expect: ReplayExpectation,
15 pub protocol_fixture_refs: Vec<String>,
16 pub allowlist: Vec<ReplayAllowlistRule>,
17 pub first_run: ReplayTraceRun,
18 pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22 fn default() -> Self {
23 Self {
24 schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25 name: String::new(),
26 description: None,
27 expect: ReplayExpectation::Match,
28 protocol_fixture_refs: Vec::new(),
29 allowlist: Vec::new(),
30 first_run: ReplayTraceRun::default(),
31 second_run: ReplayTraceRun::default(),
32 }
33 }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39 #[default]
40 Match,
41 Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47 pub path: String,
49 pub reason: String,
50 pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56 pub run_id: String,
57 pub event_log_entries: Vec<JsonValue>,
58 pub trigger_firings: Vec<JsonValue>,
59 pub llm_interactions: Vec<JsonValue>,
60 pub protocol_interactions: Vec<JsonValue>,
61 pub approval_interactions: Vec<JsonValue>,
62 pub effect_receipts: Vec<JsonValue>,
63 pub persona_runtime_states: Vec<JsonValue>,
64 pub agent_transcript_deltas: Vec<JsonValue>,
65 pub final_artifacts: Vec<JsonValue>,
66 pub policy_decisions: Vec<JsonValue>,
67 pub channel_receipts: Vec<JsonValue>,
74 pub lifecycle_receipts: Vec<JsonValue>,
80}
81
82impl ReplayTraceRun {
83 pub fn counts(&self) -> ReplayTraceRunCounts {
84 ReplayTraceRunCounts {
85 event_log_entries: self.event_log_entries.len(),
86 trigger_firings: self.trigger_firings.len(),
87 llm_interactions: self.llm_interactions.len(),
88 protocol_interactions: self.protocol_interactions.len(),
89 approval_interactions: self.approval_interactions.len(),
90 effect_receipts: self.effect_receipts.len(),
91 persona_runtime_states: self.persona_runtime_states.len(),
92 agent_transcript_deltas: self.agent_transcript_deltas.len(),
93 final_artifacts: self.final_artifacts.len(),
94 policy_decisions: self.policy_decisions.len(),
95 channel_receipts: self.channel_receipts.len(),
96 lifecycle_receipts: self.lifecycle_receipts.len(),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct ReplayTraceRunCounts {
104 pub event_log_entries: usize,
105 pub trigger_firings: usize,
106 pub llm_interactions: usize,
107 pub protocol_interactions: usize,
108 pub approval_interactions: usize,
109 pub effect_receipts: usize,
110 pub persona_runtime_states: usize,
111 pub agent_transcript_deltas: usize,
112 pub final_artifacts: usize,
113 pub policy_decisions: usize,
114 pub channel_receipts: usize,
116 pub lifecycle_receipts: usize,
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
120#[serde(default)]
121pub struct ReplayOracleReport {
122 pub name: String,
123 pub expectation: ReplayExpectation,
124 pub passed: bool,
125 pub first_run_counts: ReplayTraceRunCounts,
126 pub second_run_counts: ReplayTraceRunCounts,
127 pub protocol_fixture_refs: Vec<String>,
128 pub divergence: Option<ReplayDivergence>,
129}
130
131impl Default for ReplayOracleReport {
132 fn default() -> Self {
133 Self {
134 name: String::new(),
135 expectation: ReplayExpectation::Match,
136 passed: false,
137 first_run_counts: ReplayTraceRunCounts::default(),
138 second_run_counts: ReplayTraceRunCounts::default(),
139 protocol_fixture_refs: Vec::new(),
140 divergence: None,
141 }
142 }
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
146pub struct ReplayDivergence {
147 pub path: String,
148 pub left: JsonValue,
149 pub right: JsonValue,
150 pub message: String,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub enum ReplayOracleError {
155 InvalidTrace(String),
156 InvalidAllowlistPath(String),
157 AllowlistPathMissing(String),
158 Serialization(String),
159}
160
161impl fmt::Display for ReplayOracleError {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 match self {
164 Self::InvalidTrace(message)
165 | Self::InvalidAllowlistPath(message)
166 | Self::AllowlistPathMissing(message)
167 | Self::Serialization(message) => message.fmt(f),
168 }
169 }
170}
171
172impl std::error::Error for ReplayOracleError {}
173
174pub fn run_replay_oracle_trace(
175 trace: &ReplayOracleTrace,
176) -> Result<ReplayOracleReport, ReplayOracleError> {
177 validate_trace(trace)?;
178 let first_run_counts = trace.first_run.counts();
179 let second_run_counts = trace.second_run.counts();
180 let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
181 let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
182 let divergence = first_divergence(&first, &second);
183 let passed = match (trace.expect, divergence.is_some()) {
184 (ReplayExpectation::Match, false) => true,
185 (ReplayExpectation::Match, true) => false,
186 (ReplayExpectation::Drift, true) => true,
187 (ReplayExpectation::Drift, false) => false,
188 };
189
190 Ok(ReplayOracleReport {
191 name: trace.name.clone(),
192 expectation: trace.expect,
193 passed,
194 first_run_counts,
195 second_run_counts,
196 protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
197 divergence,
198 })
199}
200
201pub fn canonicalize_run(
202 run: &ReplayTraceRun,
203 allowlist: &[ReplayAllowlistRule],
204) -> Result<JsonValue, ReplayOracleError> {
205 let mut value = serde_json::to_value(run)
206 .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
207 for rule in allowlist {
208 apply_allowlist_rule(&mut value, rule)?;
209 }
210 Ok(value)
211}
212
213pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
214 first_divergence_at(left, right, String::new())
215}
216
217fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
218 if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
219 return Err(ReplayOracleError::InvalidTrace(format!(
220 "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
221 trace.schema_version
222 )));
223 }
224 if trace.name.trim().is_empty() {
225 return Err(ReplayOracleError::InvalidTrace(
226 "replay trace name cannot be empty".to_string(),
227 ));
228 }
229 if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
230 return Err(ReplayOracleError::InvalidTrace(format!(
231 "{} must include run ids for both replay executions",
232 trace.name
233 )));
234 }
235 if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
236 return Err(ReplayOracleError::InvalidTrace(format!(
237 "{} must include replay trace material for both executions",
238 trace.name
239 )));
240 }
241 for rule in &trace.allowlist {
242 if rule.path.trim().is_empty() {
243 return Err(ReplayOracleError::InvalidAllowlistPath(
244 "allowlist path cannot be empty".to_string(),
245 ));
246 }
247 if rule.reason.trim().is_empty() {
248 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
249 "allowlist path {} must explain why the field is nondeterministic",
250 rule.path
251 )));
252 }
253 parse_pointer_path(&rule.path)?;
254 }
255 Ok(())
256}
257
258fn trace_material_count(run: &ReplayTraceRun) -> usize {
259 let counts = run.counts();
260 counts.event_log_entries
261 + counts.trigger_firings
262 + counts.llm_interactions
263 + counts.protocol_interactions
264 + counts.approval_interactions
265 + counts.effect_receipts
266 + counts.persona_runtime_states
267 + counts.agent_transcript_deltas
268 + counts.final_artifacts
269 + counts.policy_decisions
270 + counts.channel_receipts
271 + counts.lifecycle_receipts
272}
273
274fn apply_allowlist_rule(
275 value: &mut JsonValue,
276 rule: &ReplayAllowlistRule,
277) -> Result<(), ReplayOracleError> {
278 let segments = parse_pointer_path(&rule.path)?;
279 let replacement = rule.replacement.clone().unwrap_or_else(|| {
280 json!({
281 "$harn_replay_allowlisted": rule.path,
282 })
283 });
284 let replaced = replace_matching_paths(value, &segments, &replacement);
285 if replaced == 0 {
286 return Err(ReplayOracleError::AllowlistPathMissing(format!(
287 "allowlist path {} did not match any replay field",
288 rule.path
289 )));
290 }
291 Ok(())
292}
293
294fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
295 if path == "/" {
296 return Err(ReplayOracleError::InvalidAllowlistPath(
297 "allowlist path cannot replace the whole run".to_string(),
298 ));
299 }
300 if !path.starts_with('/') {
301 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
302 "allowlist path {path:?} must start with '/'"
303 )));
304 }
305 path.split('/')
306 .skip(1)
307 .map(|segment| {
308 if segment.is_empty() {
309 return Err(ReplayOracleError::InvalidAllowlistPath(format!(
310 "allowlist path {path:?} contains an empty segment"
311 )));
312 }
313 Ok(segment.replace("~1", "/").replace("~0", "~"))
314 })
315 .collect()
316}
317
318fn replace_matching_paths(
319 value: &mut JsonValue,
320 segments: &[String],
321 replacement: &JsonValue,
322) -> usize {
323 if segments.is_empty() {
324 *value = replacement.clone();
325 return 1;
326 }
327
328 let head = segments[0].as_str();
329 let tail = &segments[1..];
330 if head == "*" {
331 return match value {
332 JsonValue::Array(items) => items
333 .iter_mut()
334 .map(|item| replace_matching_paths(item, tail, replacement))
335 .sum(),
336 JsonValue::Object(map) => map
337 .values_mut()
338 .map(|item| replace_matching_paths(item, tail, replacement))
339 .sum(),
340 _ => 0,
341 };
342 }
343
344 match value {
345 JsonValue::Object(map) => map
346 .get_mut(head)
347 .map(|child| replace_matching_paths(child, tail, replacement))
348 .unwrap_or(0),
349 JsonValue::Array(items) => head
350 .parse::<usize>()
351 .ok()
352 .and_then(|index| items.get_mut(index))
353 .map(|child| replace_matching_paths(child, tail, replacement))
354 .unwrap_or(0),
355 _ => 0,
356 }
357}
358
359fn first_divergence_at(
360 left: &JsonValue,
361 right: &JsonValue,
362 path: String,
363) -> Option<ReplayDivergence> {
364 if left == right {
365 return None;
366 }
367 match (left, right) {
368 (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
369 let keys = left_map
370 .keys()
371 .chain(right_map.keys())
372 .cloned()
373 .collect::<BTreeSet<_>>();
374 for key in keys {
375 let next_path = pointer_child(&path, &key);
376 match (left_map.get(&key), right_map.get(&key)) {
377 (Some(left_child), Some(right_child)) => {
378 if let Some(divergence) =
379 first_divergence_at(left_child, right_child, next_path)
380 {
381 return Some(divergence);
382 }
383 }
384 (Some(left_child), None) => {
385 return Some(divergence(
386 next_path,
387 left_child.clone(),
388 JsonValue::Null,
389 "right run is missing this field",
390 ));
391 }
392 (None, Some(right_child)) => {
393 return Some(divergence(
394 next_path,
395 JsonValue::Null,
396 right_child.clone(),
397 "left run is missing this field",
398 ));
399 }
400 (None, None) => {}
401 }
402 }
403 Some(divergence(
404 display_path(&path),
405 left.clone(),
406 right.clone(),
407 "objects differ",
408 ))
409 }
410 (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
411 for index in 0..left_items.len().max(right_items.len()) {
412 let next_path = pointer_child(&path, &index.to_string());
413 match (left_items.get(index), right_items.get(index)) {
414 (Some(left_child), Some(right_child)) => {
415 if let Some(divergence) =
416 first_divergence_at(left_child, right_child, next_path)
417 {
418 return Some(divergence);
419 }
420 }
421 (Some(left_child), None) => {
422 return Some(divergence(
423 next_path,
424 left_child.clone(),
425 JsonValue::Null,
426 "right run is missing this array element",
427 ));
428 }
429 (None, Some(right_child)) => {
430 return Some(divergence(
431 next_path,
432 JsonValue::Null,
433 right_child.clone(),
434 "left run is missing this array element",
435 ));
436 }
437 (None, None) => {}
438 }
439 }
440 Some(divergence(
441 display_path(&path),
442 left.clone(),
443 right.clone(),
444 "arrays differ",
445 ))
446 }
447 _ => Some(divergence(
448 display_path(&path),
449 left.clone(),
450 right.clone(),
451 "values differ",
452 )),
453 }
454}
455
456fn pointer_child(parent: &str, child: &str) -> String {
457 let escaped = child.replace('~', "~0").replace('/', "~1");
458 if parent.is_empty() {
459 format!("/{escaped}")
460 } else {
461 format!("{parent}/{escaped}")
462 }
463}
464
465fn display_path(path: &str) -> String {
466 if path.is_empty() {
467 "/".to_string()
468 } else {
469 path.to_string()
470 }
471}
472
473fn divergence(
474 path: String,
475 left: JsonValue,
476 right: JsonValue,
477 message: impl Into<String>,
478) -> ReplayDivergence {
479 ReplayDivergence {
480 path,
481 left,
482 right,
483 message: message.into(),
484 }
485}
486
487#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
492#[serde(tag = "code")]
493pub enum ChannelReplayDiagnostic {
494 #[serde(rename = "HARN-REP-CHN-001")]
499 MatchWithoutEmit {
500 event_id: String,
501 trigger_id: String,
502 },
503 #[serde(rename = "HARN-REP-CHN-002")]
507 PayloadHashMismatch {
508 event_id: String,
509 recorded_hash: String,
510 replay_hash: String,
511 },
512 #[serde(rename = "HARN-REP-CHN-003")]
516 BatchCompositionDrift {
517 event_id: String,
518 trigger_id: String,
519 recorded: Vec<String>,
520 replay: Vec<String>,
521 },
522}
523
524impl ChannelReplayDiagnostic {
525 pub fn code(&self) -> &'static str {
526 match self {
527 Self::MatchWithoutEmit { .. } => "HARN-REP-CHN-001",
528 Self::PayloadHashMismatch { .. } => "HARN-REP-CHN-002",
529 Self::BatchCompositionDrift { .. } => "HARN-REP-CHN-003",
530 }
531 }
532
533 pub fn message(&self) -> String {
534 match self {
535 Self::MatchWithoutEmit {
536 event_id,
537 trigger_id,
538 } => format!(
539 "HARN-REP-CHN-001: replay matched event {event_id} for trigger \
540 {trigger_id} but no corresponding emit receipt was recorded"
541 ),
542 Self::PayloadHashMismatch {
543 event_id,
544 recorded_hash,
545 replay_hash,
546 } => format!(
547 "HARN-REP-CHN-002: emit payload drift for event {event_id} \
548 (recorded {recorded_hash}, replay {replay_hash})"
549 ),
550 Self::BatchCompositionDrift {
551 event_id,
552 trigger_id,
553 recorded,
554 replay,
555 } => format!(
556 "HARN-REP-CHN-003: batched-match composition drift for trigger {trigger_id} \
557 anchor event {event_id} (recorded {recorded:?}, replay {replay:?})"
558 ),
559 }
560 }
561}
562
563pub fn diagnose_channel_replay_drift(
574 recorded_receipts: &[JsonValue],
575 replay_receipts: &[JsonValue],
576) -> Result<Option<ChannelReplayDiagnostic>, ReplayOracleError> {
577 let recorded = ChannelReceiptIndex::from_entries(recorded_receipts)?;
578 let replay = ChannelReceiptIndex::from_entries(replay_receipts)?;
579
580 for (event_id, replay_hash) in &replay.emit_hashes {
583 if let Some(recorded_hash) = recorded.emit_hashes.get(event_id) {
584 if recorded_hash != replay_hash {
585 return Ok(Some(ChannelReplayDiagnostic::PayloadHashMismatch {
586 event_id: event_id.clone(),
587 recorded_hash: recorded_hash.clone(),
588 replay_hash: replay_hash.clone(),
589 }));
590 }
591 }
592 }
593
594 for (event_id, trigger_id) in &replay.match_triggers {
598 if !recorded.emit_hashes.contains_key(event_id) {
599 return Ok(Some(ChannelReplayDiagnostic::MatchWithoutEmit {
600 event_id: event_id.clone(),
601 trigger_id: trigger_id.clone(),
602 }));
603 }
604 }
605
606 for ((event_id, trigger_id), recorded_batch) in &recorded.match_batches {
609 if let Some(replay_batch) = replay
610 .match_batches
611 .get(&(event_id.clone(), trigger_id.clone()))
612 {
613 if recorded_batch != replay_batch {
614 return Ok(Some(ChannelReplayDiagnostic::BatchCompositionDrift {
615 event_id: event_id.clone(),
616 trigger_id: trigger_id.clone(),
617 recorded: recorded_batch.clone(),
618 replay: replay_batch.clone(),
619 }));
620 }
621 }
622 }
623
624 Ok(None)
625}
626
627struct ChannelReceiptIndex {
631 emit_hashes: std::collections::BTreeMap<String, String>,
633 match_triggers: std::collections::BTreeMap<String, String>,
637 match_batches: std::collections::BTreeMap<(String, String), Vec<String>>,
641}
642
643impl ChannelReceiptIndex {
644 fn from_entries(entries: &[JsonValue]) -> Result<Self, ReplayOracleError> {
645 let mut emit_hashes = std::collections::BTreeMap::new();
646 let mut match_triggers = std::collections::BTreeMap::new();
647 let mut match_batches = std::collections::BTreeMap::new();
648 for entry in entries {
649 let map = entry.as_object().ok_or_else(|| {
650 ReplayOracleError::Serialization(format!(
651 "channel receipt entry is not an object: {entry}"
652 ))
653 })?;
654 let (kind, payload) = if let Some(kind) = map.get("kind").and_then(|v| v.as_str()) {
659 let payload = map.get("payload").cloned().unwrap_or_else(|| entry.clone());
660 (Some(kind.to_string()), payload)
661 } else if map.contains_key("payload_hash") {
662 (Some("channel_emit_receipt".to_string()), entry.clone())
663 } else if map.contains_key("matched_at") {
664 (Some("channel_match_receipt".to_string()), entry.clone())
665 } else {
666 (None, entry.clone())
667 };
668 let Some(kind) = kind else {
669 continue;
670 };
671 let payload_map = payload.as_object();
672 match kind.as_str() {
673 "channel_emit_receipt" => {
674 let Some(payload_map) = payload_map else {
675 continue;
676 };
677 let event_id = match payload_map.get("event_id").and_then(|v| v.as_str()) {
678 Some(value) => value.to_string(),
679 None => continue,
680 };
681 let hash = payload_map
682 .get("payload_hash")
683 .and_then(|v| v.as_str())
684 .unwrap_or_default()
685 .to_string();
686 emit_hashes.entry(event_id).or_insert(hash);
687 }
688 "channel_match_receipt" => {
689 let Some(payload_map) = payload_map else {
690 continue;
691 };
692 let event_id = match payload_map.get("event_id").and_then(|v| v.as_str()) {
693 Some(value) => value.to_string(),
694 None => continue,
695 };
696 let trigger_id = payload_map
697 .get("trigger_id")
698 .and_then(|v| v.as_str())
699 .unwrap_or_default()
700 .to_string();
701 match_triggers
702 .entry(event_id.clone())
703 .or_insert(trigger_id.clone());
704 let batch_ids = payload_map
705 .get("batch")
706 .and_then(|v| v.as_object())
707 .and_then(|b| b.get("constituent_event_ids"))
708 .and_then(|v| v.as_array())
709 .map(|arr| {
710 let mut ids: Vec<String> = arr
711 .iter()
712 .filter_map(|v| v.as_str().map(|s| s.to_string()))
713 .collect();
714 ids.sort();
715 ids
716 })
717 .unwrap_or_default();
718 if !batch_ids.is_empty() {
719 match_batches
720 .entry((event_id, trigger_id))
721 .or_insert(batch_ids);
722 }
723 }
724 _ => {}
725 }
726 }
727 Ok(Self {
728 emit_hashes,
729 match_triggers,
730 match_batches,
731 })
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 fn base_trace() -> ReplayOracleTrace {
740 ReplayOracleTrace {
741 name: "fixture".to_string(),
742 allowlist: vec![
743 ReplayAllowlistRule {
744 path: "/run_id".to_string(),
745 reason: "run ids are allocated per execution".to_string(),
746 replacement: Some(JsonValue::String("<run-id>".to_string())),
747 },
748 ReplayAllowlistRule {
749 path: "/event_log_entries/*/event_id".to_string(),
750 reason: "event log offsets are backend-local".to_string(),
751 replacement: Some(JsonValue::String("<event-id>".to_string())),
752 },
753 ReplayAllowlistRule {
754 path: "/event_log_entries/*/occurred_at_ms".to_string(),
755 reason: "append timestamps are wall-clock observations".to_string(),
756 replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
757 },
758 ],
759 first_run: ReplayTraceRun {
760 run_id: "run-a".to_string(),
761 event_log_entries: vec![json!({
762 "event_id": 10,
763 "topic": "trigger.outbox",
764 "kind": "dispatch_succeeded",
765 "occurred_at_ms": 1000,
766 "payload": {"binding_id": "demo", "status": "dispatched"}
767 })],
768 ..ReplayTraceRun::default()
769 },
770 second_run: ReplayTraceRun {
771 run_id: "run-b".to_string(),
772 event_log_entries: vec![json!({
773 "event_id": 42,
774 "topic": "trigger.outbox",
775 "kind": "dispatch_succeeded",
776 "occurred_at_ms": 2000,
777 "payload": {"binding_id": "demo", "status": "dispatched"}
778 })],
779 ..ReplayTraceRun::default()
780 },
781 ..ReplayOracleTrace::default()
782 }
783 }
784
785 #[test]
786 fn canonical_comparison_allows_explicit_nondeterministic_fields() {
787 let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
788 assert!(report.passed, "{report:?}");
789 assert_eq!(report.divergence, None);
790 }
791
792 #[test]
793 fn persona_runtime_states_are_first_class_replay_material() {
794 let mut trace = base_trace();
795 trace.first_run.persona_runtime_states = vec![json!({
796 "name": "merge_captain",
797 "state": "idle",
798 "queued_work": [],
799 "handoff_inbox": [],
800 "budget": {"spent_today_usd": 0.01},
801 })];
802 trace.second_run.persona_runtime_states = trace.first_run.persona_runtime_states.clone();
803
804 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
805
806 assert!(report.passed, "{report:?}");
807 assert_eq!(report.first_run_counts.persona_runtime_states, 1);
808 }
809
810 #[test]
811 fn meaningful_drift_reports_first_divergent_path() {
812 let mut trace = base_trace();
813 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
814
815 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
816
817 assert!(!report.passed);
818 let divergence = report.divergence.expect("drift is reported");
819 assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
820 assert_eq!(divergence.left, json!("dispatched"));
821 assert_eq!(divergence.right, json!("dlq"));
822 }
823
824 #[test]
825 fn expected_drift_fixture_passes_only_when_drift_is_detected() {
826 let mut trace = base_trace();
827 trace.expect = ReplayExpectation::Drift;
828 trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
829
830 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
831
832 assert!(report.passed);
833 assert!(report.divergence.is_some());
834 }
835
836 fn channel_emit_receipt(event_id: &str, payload_hash: &str) -> JsonValue {
837 json!({
838 "kind": "channel_emit_receipt",
839 "payload": {
840 "event_id": event_id,
841 "payload_hash": payload_hash,
842 "name_resolved": "tenant:default:ch.test",
843 "scope": "tenant",
844 "scope_id": "default",
845 "topic": "channels.tenant.default.ch.test",
846 "inserted": true,
847 },
848 })
849 }
850
851 fn channel_match_receipt(
852 event_id: &str,
853 trigger_id: &str,
854 constituent_ids: Option<Vec<&str>>,
855 ) -> JsonValue {
856 let mut payload = serde_json::Map::new();
857 payload.insert("event_id".to_string(), json!(event_id));
858 payload.insert("trigger_id".to_string(), json!(trigger_id));
859 payload.insert("handler_kind".to_string(), json!("local"));
860 if let Some(ids) = constituent_ids {
861 payload.insert(
862 "batch".to_string(),
863 json!({
864 "count": ids.len(),
865 "constituent_event_ids": ids,
866 }),
867 );
868 }
869 json!({
870 "kind": "channel_match_receipt",
871 "payload": JsonValue::Object(payload),
872 })
873 }
874
875 #[test]
876 fn channel_replay_diagnostic_clean_runs_have_no_drift() {
877 let recorded = vec![
878 channel_emit_receipt("evt-1", "sha256:a"),
879 channel_match_receipt("evt-1", "trig-x", None),
880 ];
881 let replay = recorded.clone();
882 let diagnostic = diagnose_channel_replay_drift(&recorded, &replay).unwrap();
883 assert!(diagnostic.is_none(), "{diagnostic:?}");
884 }
885
886 #[test]
887 fn channel_replay_diagnostic_001_match_without_emit() {
888 let recorded = vec![channel_emit_receipt("evt-1", "sha256:a")];
889 let replay = vec![
891 channel_emit_receipt("evt-1", "sha256:a"),
892 channel_match_receipt("evt-2", "trig-x", None),
893 ];
894 let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
895 .unwrap()
896 .expect("drift");
897 assert_eq!(diagnostic.code(), "HARN-REP-CHN-001");
898 assert!(matches!(
899 diagnostic,
900 ChannelReplayDiagnostic::MatchWithoutEmit { ref event_id, .. } if event_id == "evt-2"
901 ));
902 }
903
904 #[test]
905 fn channel_replay_diagnostic_002_payload_hash_drift() {
906 let recorded = vec![channel_emit_receipt("evt-1", "sha256:a")];
907 let replay = vec![channel_emit_receipt("evt-1", "sha256:b")];
908 let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
909 .unwrap()
910 .expect("drift");
911 assert_eq!(diagnostic.code(), "HARN-REP-CHN-002");
912 let message = diagnostic.message();
913 assert!(message.contains("HARN-REP-CHN-002"));
914 assert!(message.contains("evt-1"));
915 }
916
917 #[test]
918 fn channel_replay_diagnostic_003_batch_composition_drift() {
919 let recorded = vec![
920 channel_emit_receipt("evt-1", "sha256:a"),
921 channel_emit_receipt("evt-2", "sha256:b"),
922 channel_emit_receipt("evt-3", "sha256:c"),
923 channel_match_receipt("evt-1", "trig-x", Some(vec!["evt-1", "evt-2", "evt-3"])),
924 ];
925 let replay = vec![
926 channel_emit_receipt("evt-1", "sha256:a"),
927 channel_emit_receipt("evt-2", "sha256:b"),
928 channel_emit_receipt("evt-3", "sha256:c"),
929 channel_match_receipt("evt-1", "trig-x", Some(vec!["evt-1", "evt-2"])),
931 ];
932 let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
933 .unwrap()
934 .expect("drift");
935 assert_eq!(diagnostic.code(), "HARN-REP-CHN-003");
936 }
937
938 #[test]
939 fn channel_receipts_count_first_class_replay_material() {
940 let mut trace = base_trace();
941 trace.first_run.channel_receipts = vec![channel_emit_receipt("evt-1", "sha256:a")];
942 trace.second_run.channel_receipts = trace.first_run.channel_receipts.clone();
943 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
944 assert!(report.passed, "{report:?}");
945 assert_eq!(report.first_run_counts.channel_receipts, 1);
946 assert_eq!(report.second_run_counts.channel_receipts, 1);
947 }
948
949 #[test]
950 fn lifecycle_receipts_are_first_class_replay_material() {
951 let mut trace = base_trace();
952 trace.allowlist.push(ReplayAllowlistRule {
953 path: "/lifecycle_receipts/*/payload/suspended_at/signature".to_string(),
954 reason: "per-process signing salt".to_string(),
955 replacement: Some(JsonValue::String("<signature>".to_string())),
956 });
957 trace.allowlist.push(ReplayAllowlistRule {
958 path: "/lifecycle_receipts/*/payload/suspended_at/at_ms".to_string(),
959 reason: "wall-clock at_ms varies per record".to_string(),
960 replacement: Some(JsonValue::String("<at-ms>".to_string())),
961 });
962 trace.allowlist.push(ReplayAllowlistRule {
963 path: "/lifecycle_receipts/*/payload/suspended_at/at".to_string(),
964 reason: "wall-clock at varies per record".to_string(),
965 replacement: Some(JsonValue::String("<at>".to_string())),
966 });
967 let receipt = json!({
968 "seq": 1,
969 "kind": "suspension_receipt",
970 "payload": {
971 "handle": "worker://x/1",
972 "session_id": null,
973 "initiator": "operator",
974 "initiator_id": "op-1",
975 "reason": "stop",
976 "suspended_at": {
977 "at_ms": 100,
978 "at": "1970-01-01T00:00:00.1Z",
979 "algorithm": "hmac-sha256",
980 "key_id": "local-session",
981 "signature": "sha256:deadbeef",
982 },
983 },
984 });
985 trace.first_run.lifecycle_receipts = vec![receipt.clone()];
986 trace.second_run.lifecycle_receipts = vec![receipt];
987
988 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
989
990 assert!(report.passed, "{report:?}");
991 assert_eq!(report.first_run_counts.lifecycle_receipts, 1);
992 assert_eq!(report.second_run_counts.lifecycle_receipts, 1);
993 }
994
995 #[test]
996 fn lifecycle_receipt_input_hash_drift_is_detected() {
997 let mut trace = base_trace();
998 trace.first_run.lifecycle_receipts = vec![json!({
999 "seq": 1,
1000 "kind": "resumption_receipt",
1001 "payload": {
1002 "handle": "worker://x/1",
1003 "initiator": "operator",
1004 "initiator_id": "op-1",
1005 "input_hash": "sha256:aaaa",
1006 "continue_transcript": true,
1007 },
1008 })];
1009 trace.second_run.lifecycle_receipts = vec![json!({
1010 "seq": 1,
1011 "kind": "resumption_receipt",
1012 "payload": {
1013 "handle": "worker://x/1",
1014 "initiator": "operator",
1015 "initiator_id": "op-1",
1016 "input_hash": "sha256:bbbb",
1017 "continue_transcript": true,
1018 },
1019 })];
1020
1021 let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
1022
1023 assert!(!report.passed);
1024 let divergence = report.divergence.expect("drift is reported");
1025 assert_eq!(divergence.path, "/lifecycle_receipts/0/payload/input_hash");
1026 }
1027
1028 #[test]
1029 fn allowlist_paths_must_match_real_fields() {
1030 let mut trace = base_trace();
1031 trace.allowlist.push(ReplayAllowlistRule {
1032 path: "/llm_interactions/*/latency_ms".to_string(),
1033 reason: "latency is nondeterministic".to_string(),
1034 replacement: None,
1035 });
1036
1037 let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
1038
1039 assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
1040 }
1041}