Skip to main content

harn_vm/orchestration/
replay_oracle.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{json, Value as JsonValue};
3use std::collections::BTreeSet;
4use std::fmt;
5
6pub const REPLAY_TRACE_SCHEMA_VERSION: &str = "harn.orchestration.replay_trace.v1";
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
9#[serde(default)]
10pub struct ReplayOracleTrace {
11    pub schema_version: String,
12    pub name: String,
13    pub description: Option<String>,
14    pub expect: ReplayExpectation,
15    pub protocol_fixture_refs: Vec<String>,
16    pub allowlist: Vec<ReplayAllowlistRule>,
17    pub first_run: ReplayTraceRun,
18    pub second_run: ReplayTraceRun,
19}
20
21impl Default for ReplayOracleTrace {
22    fn default() -> Self {
23        Self {
24            schema_version: REPLAY_TRACE_SCHEMA_VERSION.to_string(),
25            name: String::new(),
26            description: None,
27            expect: ReplayExpectation::Match,
28            protocol_fixture_refs: Vec::new(),
29            allowlist: Vec::new(),
30            first_run: ReplayTraceRun::default(),
31            second_run: ReplayTraceRun::default(),
32        }
33    }
34}
35
36#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38pub enum ReplayExpectation {
39    #[default]
40    Match,
41    Drift,
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
45#[serde(default)]
46pub struct ReplayAllowlistRule {
47    /// JSON-pointer-like path. `*` matches every array element or object value.
48    pub path: String,
49    pub reason: String,
50    pub replacement: Option<JsonValue>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct ReplayTraceRun {
56    pub run_id: String,
57    pub event_log_entries: Vec<JsonValue>,
58    pub trigger_firings: Vec<JsonValue>,
59    pub llm_interactions: Vec<JsonValue>,
60    pub protocol_interactions: Vec<JsonValue>,
61    pub approval_interactions: Vec<JsonValue>,
62    pub effect_receipts: Vec<JsonValue>,
63    pub persona_runtime_states: Vec<JsonValue>,
64    pub agent_transcript_deltas: Vec<JsonValue>,
65    pub final_artifacts: Vec<JsonValue>,
66    pub policy_decisions: Vec<JsonValue>,
67    /// CH-07 (#1878): durable channel emit/match receipts captured from
68    /// the `lifecycle.channel.audit` topic. First-class replay material
69    /// alongside `effect_receipts` so multi-agent channel exchanges
70    /// round-trip byte-identically across two runs of the same
71    /// workload. Each entry is the JSON-encoded
72    /// `ChannelEmitReceipt` / `ChannelMatchReceipt`.
73    pub channel_receipts: Vec<JsonValue>,
74    /// Lifecycle receipts (suspension / resumption / drain decisions) as
75    /// journaled by `crate::orchestration::lifecycle_receipts`. First-class
76    /// replay material per #1861 P-08 so the oracle treats a drift in
77    /// `input_hash`, `action`, or signed timestamps as a determinism
78    /// failure.
79    pub lifecycle_receipts: Vec<JsonValue>,
80}
81
82impl ReplayTraceRun {
83    pub fn counts(&self) -> ReplayTraceRunCounts {
84        ReplayTraceRunCounts {
85            event_log_entries: self.event_log_entries.len(),
86            trigger_firings: self.trigger_firings.len(),
87            llm_interactions: self.llm_interactions.len(),
88            protocol_interactions: self.protocol_interactions.len(),
89            approval_interactions: self.approval_interactions.len(),
90            effect_receipts: self.effect_receipts.len(),
91            persona_runtime_states: self.persona_runtime_states.len(),
92            agent_transcript_deltas: self.agent_transcript_deltas.len(),
93            final_artifacts: self.final_artifacts.len(),
94            policy_decisions: self.policy_decisions.len(),
95            channel_receipts: self.channel_receipts.len(),
96            lifecycle_receipts: self.lifecycle_receipts.len(),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(default)]
103pub struct ReplayTraceRunCounts {
104    pub event_log_entries: usize,
105    pub trigger_firings: usize,
106    pub llm_interactions: usize,
107    pub protocol_interactions: usize,
108    pub approval_interactions: usize,
109    pub effect_receipts: usize,
110    pub persona_runtime_states: usize,
111    pub agent_transcript_deltas: usize,
112    pub final_artifacts: usize,
113    pub policy_decisions: usize,
114    /// CH-07 (#1878).
115    pub channel_receipts: usize,
116    pub lifecycle_receipts: usize,
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
120#[serde(default)]
121pub struct ReplayOracleReport {
122    pub name: String,
123    pub expectation: ReplayExpectation,
124    pub passed: bool,
125    pub first_run_counts: ReplayTraceRunCounts,
126    pub second_run_counts: ReplayTraceRunCounts,
127    pub protocol_fixture_refs: Vec<String>,
128    pub divergence: Option<ReplayDivergence>,
129}
130
131impl Default for ReplayOracleReport {
132    fn default() -> Self {
133        Self {
134            name: String::new(),
135            expectation: ReplayExpectation::Match,
136            passed: false,
137            first_run_counts: ReplayTraceRunCounts::default(),
138            second_run_counts: ReplayTraceRunCounts::default(),
139            protocol_fixture_refs: Vec::new(),
140            divergence: None,
141        }
142    }
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
146pub struct ReplayDivergence {
147    pub path: String,
148    pub left: JsonValue,
149    pub right: JsonValue,
150    pub message: String,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub enum ReplayOracleError {
155    InvalidTrace(String),
156    InvalidAllowlistPath(String),
157    AllowlistPathMissing(String),
158    Serialization(String),
159}
160
161impl fmt::Display for ReplayOracleError {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            Self::InvalidTrace(message)
165            | Self::InvalidAllowlistPath(message)
166            | Self::AllowlistPathMissing(message)
167            | Self::Serialization(message) => message.fmt(f),
168        }
169    }
170}
171
172impl std::error::Error for ReplayOracleError {}
173
174pub fn run_replay_oracle_trace(
175    trace: &ReplayOracleTrace,
176) -> Result<ReplayOracleReport, ReplayOracleError> {
177    validate_trace(trace)?;
178    let first_run_counts = trace.first_run.counts();
179    let second_run_counts = trace.second_run.counts();
180    let first = canonicalize_run(&trace.first_run, &trace.allowlist)?;
181    let second = canonicalize_run(&trace.second_run, &trace.allowlist)?;
182    let divergence = first_divergence(&first, &second);
183    let passed = match (trace.expect, divergence.is_some()) {
184        (ReplayExpectation::Match, false) => true,
185        (ReplayExpectation::Match, true) => false,
186        (ReplayExpectation::Drift, true) => true,
187        (ReplayExpectation::Drift, false) => false,
188    };
189
190    Ok(ReplayOracleReport {
191        name: trace.name.clone(),
192        expectation: trace.expect,
193        passed,
194        first_run_counts,
195        second_run_counts,
196        protocol_fixture_refs: trace.protocol_fixture_refs.clone(),
197        divergence,
198    })
199}
200
201pub fn canonicalize_run(
202    run: &ReplayTraceRun,
203    allowlist: &[ReplayAllowlistRule],
204) -> Result<JsonValue, ReplayOracleError> {
205    let mut value = serde_json::to_value(run)
206        .map_err(|error| ReplayOracleError::Serialization(error.to_string()))?;
207    for rule in allowlist {
208        apply_allowlist_rule(&mut value, rule)?;
209    }
210    Ok(value)
211}
212
213pub fn first_divergence(left: &JsonValue, right: &JsonValue) -> Option<ReplayDivergence> {
214    first_divergence_at(left, right, String::new())
215}
216
217fn validate_trace(trace: &ReplayOracleTrace) -> Result<(), ReplayOracleError> {
218    if trace.schema_version != REPLAY_TRACE_SCHEMA_VERSION {
219        return Err(ReplayOracleError::InvalidTrace(format!(
220            "unsupported replay trace schema_version {:?}; expected {REPLAY_TRACE_SCHEMA_VERSION}",
221            trace.schema_version
222        )));
223    }
224    if trace.name.trim().is_empty() {
225        return Err(ReplayOracleError::InvalidTrace(
226            "replay trace name cannot be empty".to_string(),
227        ));
228    }
229    if trace.first_run.run_id.trim().is_empty() || trace.second_run.run_id.trim().is_empty() {
230        return Err(ReplayOracleError::InvalidTrace(format!(
231            "{} must include run ids for both replay executions",
232            trace.name
233        )));
234    }
235    if trace_material_count(&trace.first_run) == 0 || trace_material_count(&trace.second_run) == 0 {
236        return Err(ReplayOracleError::InvalidTrace(format!(
237            "{} must include replay trace material for both executions",
238            trace.name
239        )));
240    }
241    for rule in &trace.allowlist {
242        if rule.path.trim().is_empty() {
243            return Err(ReplayOracleError::InvalidAllowlistPath(
244                "allowlist path cannot be empty".to_string(),
245            ));
246        }
247        if rule.reason.trim().is_empty() {
248            return Err(ReplayOracleError::InvalidAllowlistPath(format!(
249                "allowlist path {} must explain why the field is nondeterministic",
250                rule.path
251            )));
252        }
253        parse_pointer_path(&rule.path)?;
254    }
255    Ok(())
256}
257
258fn trace_material_count(run: &ReplayTraceRun) -> usize {
259    let counts = run.counts();
260    counts.event_log_entries
261        + counts.trigger_firings
262        + counts.llm_interactions
263        + counts.protocol_interactions
264        + counts.approval_interactions
265        + counts.effect_receipts
266        + counts.persona_runtime_states
267        + counts.agent_transcript_deltas
268        + counts.final_artifacts
269        + counts.policy_decisions
270        + counts.channel_receipts
271        + counts.lifecycle_receipts
272}
273
274fn apply_allowlist_rule(
275    value: &mut JsonValue,
276    rule: &ReplayAllowlistRule,
277) -> Result<(), ReplayOracleError> {
278    let segments = parse_pointer_path(&rule.path)?;
279    let replacement = rule.replacement.clone().unwrap_or_else(|| {
280        json!({
281            "$harn_replay_allowlisted": rule.path,
282        })
283    });
284    let replaced = replace_matching_paths(value, &segments, &replacement);
285    if replaced == 0 {
286        return Err(ReplayOracleError::AllowlistPathMissing(format!(
287            "allowlist path {} did not match any replay field",
288            rule.path
289        )));
290    }
291    Ok(())
292}
293
294fn parse_pointer_path(path: &str) -> Result<Vec<String>, ReplayOracleError> {
295    if path == "/" {
296        return Err(ReplayOracleError::InvalidAllowlistPath(
297            "allowlist path cannot replace the whole run".to_string(),
298        ));
299    }
300    if !path.starts_with('/') {
301        return Err(ReplayOracleError::InvalidAllowlistPath(format!(
302            "allowlist path {path:?} must start with '/'"
303        )));
304    }
305    path.split('/')
306        .skip(1)
307        .map(|segment| {
308            if segment.is_empty() {
309                return Err(ReplayOracleError::InvalidAllowlistPath(format!(
310                    "allowlist path {path:?} contains an empty segment"
311                )));
312            }
313            Ok(segment.replace("~1", "/").replace("~0", "~"))
314        })
315        .collect()
316}
317
318fn replace_matching_paths(
319    value: &mut JsonValue,
320    segments: &[String],
321    replacement: &JsonValue,
322) -> usize {
323    if segments.is_empty() {
324        *value = replacement.clone();
325        return 1;
326    }
327
328    let head = segments[0].as_str();
329    let tail = &segments[1..];
330    if head == "*" {
331        return match value {
332            JsonValue::Array(items) => items
333                .iter_mut()
334                .map(|item| replace_matching_paths(item, tail, replacement))
335                .sum(),
336            JsonValue::Object(map) => map
337                .values_mut()
338                .map(|item| replace_matching_paths(item, tail, replacement))
339                .sum(),
340            _ => 0,
341        };
342    }
343
344    match value {
345        JsonValue::Object(map) => map
346            .get_mut(head)
347            .map(|child| replace_matching_paths(child, tail, replacement))
348            .unwrap_or(0),
349        JsonValue::Array(items) => head
350            .parse::<usize>()
351            .ok()
352            .and_then(|index| items.get_mut(index))
353            .map(|child| replace_matching_paths(child, tail, replacement))
354            .unwrap_or(0),
355        _ => 0,
356    }
357}
358
359fn first_divergence_at(
360    left: &JsonValue,
361    right: &JsonValue,
362    path: String,
363) -> Option<ReplayDivergence> {
364    if left == right {
365        return None;
366    }
367    match (left, right) {
368        (JsonValue::Object(left_map), JsonValue::Object(right_map)) => {
369            let keys = left_map
370                .keys()
371                .chain(right_map.keys())
372                .cloned()
373                .collect::<BTreeSet<_>>();
374            for key in keys {
375                let next_path = pointer_child(&path, &key);
376                match (left_map.get(&key), right_map.get(&key)) {
377                    (Some(left_child), Some(right_child)) => {
378                        if let Some(divergence) =
379                            first_divergence_at(left_child, right_child, next_path)
380                        {
381                            return Some(divergence);
382                        }
383                    }
384                    (Some(left_child), None) => {
385                        return Some(divergence(
386                            next_path,
387                            left_child.clone(),
388                            JsonValue::Null,
389                            "right run is missing this field",
390                        ));
391                    }
392                    (None, Some(right_child)) => {
393                        return Some(divergence(
394                            next_path,
395                            JsonValue::Null,
396                            right_child.clone(),
397                            "left run is missing this field",
398                        ));
399                    }
400                    (None, None) => {}
401                }
402            }
403            Some(divergence(
404                display_path(&path),
405                left.clone(),
406                right.clone(),
407                "objects differ",
408            ))
409        }
410        (JsonValue::Array(left_items), JsonValue::Array(right_items)) => {
411            for index in 0..left_items.len().max(right_items.len()) {
412                let next_path = pointer_child(&path, &index.to_string());
413                match (left_items.get(index), right_items.get(index)) {
414                    (Some(left_child), Some(right_child)) => {
415                        if let Some(divergence) =
416                            first_divergence_at(left_child, right_child, next_path)
417                        {
418                            return Some(divergence);
419                        }
420                    }
421                    (Some(left_child), None) => {
422                        return Some(divergence(
423                            next_path,
424                            left_child.clone(),
425                            JsonValue::Null,
426                            "right run is missing this array element",
427                        ));
428                    }
429                    (None, Some(right_child)) => {
430                        return Some(divergence(
431                            next_path,
432                            JsonValue::Null,
433                            right_child.clone(),
434                            "left run is missing this array element",
435                        ));
436                    }
437                    (None, None) => {}
438                }
439            }
440            Some(divergence(
441                display_path(&path),
442                left.clone(),
443                right.clone(),
444                "arrays differ",
445            ))
446        }
447        _ => Some(divergence(
448            display_path(&path),
449            left.clone(),
450            right.clone(),
451            "values differ",
452        )),
453    }
454}
455
456fn pointer_child(parent: &str, child: &str) -> String {
457    let escaped = child.replace('~', "~0").replace('/', "~1");
458    if parent.is_empty() {
459        format!("/{escaped}")
460    } else {
461        format!("{parent}/{escaped}")
462    }
463}
464
465fn display_path(path: &str) -> String {
466    if path.is_empty() {
467        "/".to_string()
468    } else {
469        path.to_string()
470    }
471}
472
473fn divergence(
474    path: String,
475    left: JsonValue,
476    right: JsonValue,
477    message: impl Into<String>,
478) -> ReplayDivergence {
479    ReplayDivergence {
480        path,
481        left,
482        right,
483        message: message.into(),
484    }
485}
486
487/// CH-07 (#1878): typed channel replay diagnostic codes. Surfaced by
488/// `diagnose_channel_replay_drift` so audit/compliance tooling can
489/// distinguish "missing match cache" from "producer drift" from
490/// "batch composition drift" without parsing free-form messages.
491#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
492#[serde(tag = "code")]
493pub enum ChannelReplayDiagnostic {
494    /// `HARN-REP-CHN-001`: replay encountered a match for an `event_id`
495    /// that isn't in the recorded emit log. Either the replay generated a
496    /// new emit between baseline and replay, or the baseline lost an
497    /// emit receipt mid-flight.
498    #[serde(rename = "HARN-REP-CHN-001")]
499    MatchWithoutEmit {
500        event_id: String,
501        trigger_id: String,
502    },
503    /// `HARN-REP-CHN-002`: the replay's emit `payload_hash` doesn't
504    /// match the recorded hash for the same `event_id`. Producer code
505    /// drifted between runs.
506    #[serde(rename = "HARN-REP-CHN-002")]
507    PayloadHashMismatch {
508        event_id: String,
509        recorded_hash: String,
510        replay_hash: String,
511    },
512    /// `HARN-REP-CHN-003`: the replay's batched-match composition
513    /// (`constituent_event_ids`) doesn't match the recorded batch.
514    /// Aggregation policy or upstream emit ordering drifted.
515    #[serde(rename = "HARN-REP-CHN-003")]
516    BatchCompositionDrift {
517        event_id: String,
518        trigger_id: String,
519        recorded: Vec<String>,
520        replay: Vec<String>,
521    },
522}
523
524impl ChannelReplayDiagnostic {
525    pub fn code(&self) -> &'static str {
526        match self {
527            Self::MatchWithoutEmit { .. } => "HARN-REP-CHN-001",
528            Self::PayloadHashMismatch { .. } => "HARN-REP-CHN-002",
529            Self::BatchCompositionDrift { .. } => "HARN-REP-CHN-003",
530        }
531    }
532
533    pub fn message(&self) -> String {
534        match self {
535            Self::MatchWithoutEmit {
536                event_id,
537                trigger_id,
538            } => format!(
539                "HARN-REP-CHN-001: replay matched event {event_id} for trigger \
540                 {trigger_id} but no corresponding emit receipt was recorded"
541            ),
542            Self::PayloadHashMismatch {
543                event_id,
544                recorded_hash,
545                replay_hash,
546            } => format!(
547                "HARN-REP-CHN-002: emit payload drift for event {event_id} \
548                 (recorded {recorded_hash}, replay {replay_hash})"
549            ),
550            Self::BatchCompositionDrift {
551                event_id,
552                trigger_id,
553                recorded,
554                replay,
555            } => format!(
556                "HARN-REP-CHN-003: batched-match composition drift for trigger {trigger_id} \
557                 anchor event {event_id} (recorded {recorded:?}, replay {replay:?})"
558            ),
559        }
560    }
561}
562
563/// CH-07 (#1878): compare two captured channel-receipt sequences and
564/// return the first replay-determinism violation, if any. Designed to
565/// run after `run_replay_oracle_trace` so the canonical JSON diff catches
566/// structural drift first; this function adds the channel-specific typed
567/// codes (HARN-REP-CHN-001/002/003) that downstream audit tooling
568/// consumes.
569///
570/// Returns `Ok(None)` when the two runs are channel-determinism
571/// equivalent. Returns `Err(ReplayOracleError::Serialization)` only
572/// when a receipt entry can't be parsed.
573pub fn diagnose_channel_replay_drift(
574    recorded_receipts: &[JsonValue],
575    replay_receipts: &[JsonValue],
576) -> Result<Option<ChannelReplayDiagnostic>, ReplayOracleError> {
577    let recorded = ChannelReceiptIndex::from_entries(recorded_receipts)?;
578    let replay = ChannelReceiptIndex::from_entries(replay_receipts)?;
579
580    // HARN-REP-CHN-002: producer drift. Walk each replay emit and check
581    // its hash against the recorded one.
582    for (event_id, replay_hash) in &replay.emit_hashes {
583        if let Some(recorded_hash) = recorded.emit_hashes.get(event_id) {
584            if recorded_hash != replay_hash {
585                return Ok(Some(ChannelReplayDiagnostic::PayloadHashMismatch {
586                    event_id: event_id.clone(),
587                    recorded_hash: recorded_hash.clone(),
588                    replay_hash: replay_hash.clone(),
589                }));
590            }
591        }
592    }
593
594    // HARN-REP-CHN-001: replay matched an event_id that no recorded
595    // emit announced. Indicates either an extra emit on replay or a lost
596    // emit receipt in the baseline.
597    for (event_id, trigger_id) in &replay.match_triggers {
598        if !recorded.emit_hashes.contains_key(event_id) {
599            return Ok(Some(ChannelReplayDiagnostic::MatchWithoutEmit {
600                event_id: event_id.clone(),
601                trigger_id: trigger_id.clone(),
602            }));
603        }
604    }
605
606    // HARN-REP-CHN-003: batched-match composition drift. Compare the
607    // `constituent_event_ids` for matching (event_id, trigger_id) pairs.
608    for ((event_id, trigger_id), recorded_batch) in &recorded.match_batches {
609        if let Some(replay_batch) = replay
610            .match_batches
611            .get(&(event_id.clone(), trigger_id.clone()))
612        {
613            if recorded_batch != replay_batch {
614                return Ok(Some(ChannelReplayDiagnostic::BatchCompositionDrift {
615                    event_id: event_id.clone(),
616                    trigger_id: trigger_id.clone(),
617                    recorded: recorded_batch.clone(),
618                    replay: replay_batch.clone(),
619                }));
620            }
621        }
622    }
623
624    Ok(None)
625}
626
627/// Internal index for `diagnose_channel_replay_drift`. Separates the
628/// emit-hash lookup from the match-trigger / match-batch lookups so each
629/// diagnostic walks the relevant slice only.
630struct ChannelReceiptIndex {
631    /// `event_id` -> `payload_hash` for every emit receipt in the run.
632    emit_hashes: std::collections::BTreeMap<String, String>,
633    /// `event_id` -> first observed `trigger_id` for every match
634    /// receipt. Used by HARN-REP-CHN-001 — we only need to know that
635    /// *some* match fired, not which one.
636    match_triggers: std::collections::BTreeMap<String, String>,
637    /// `(event_id, trigger_id)` -> sorted `constituent_event_ids` for
638    /// every batched-match receipt. Empty list for non-batched matches
639    /// (those don't carry a `batch.constituent_event_ids` array).
640    match_batches: std::collections::BTreeMap<(String, String), Vec<String>>,
641}
642
643impl ChannelReceiptIndex {
644    fn from_entries(entries: &[JsonValue]) -> Result<Self, ReplayOracleError> {
645        let mut emit_hashes = std::collections::BTreeMap::new();
646        let mut match_triggers = std::collections::BTreeMap::new();
647        let mut match_batches = std::collections::BTreeMap::new();
648        for entry in entries {
649            let map = entry.as_object().ok_or_else(|| {
650                ReplayOracleError::Serialization(format!(
651                    "channel receipt entry is not an object: {entry}"
652                ))
653            })?;
654            // Tolerate two shapes: the raw receipt JSON
655            // (`ChannelEmitReceipt`/`ChannelMatchReceipt`), and an
656            // `event_log.subscribe(...)` envelope where the receipt
657            // lives under `payload` and `kind` classifies the variant.
658            let (kind, payload) = if let Some(kind) = map.get("kind").and_then(|v| v.as_str()) {
659                let payload = map.get("payload").cloned().unwrap_or_else(|| entry.clone());
660                (Some(kind.to_string()), payload)
661            } else if map.contains_key("payload_hash") {
662                (Some("channel_emit_receipt".to_string()), entry.clone())
663            } else if map.contains_key("matched_at") {
664                (Some("channel_match_receipt".to_string()), entry.clone())
665            } else {
666                (None, entry.clone())
667            };
668            let Some(kind) = kind else {
669                continue;
670            };
671            let payload_map = payload.as_object();
672            match kind.as_str() {
673                "channel_emit_receipt" => {
674                    let Some(payload_map) = payload_map else {
675                        continue;
676                    };
677                    let event_id = match payload_map.get("event_id").and_then(|v| v.as_str()) {
678                        Some(value) => value.to_string(),
679                        None => continue,
680                    };
681                    let hash = payload_map
682                        .get("payload_hash")
683                        .and_then(|v| v.as_str())
684                        .unwrap_or_default()
685                        .to_string();
686                    emit_hashes.entry(event_id).or_insert(hash);
687                }
688                "channel_match_receipt" => {
689                    let Some(payload_map) = payload_map else {
690                        continue;
691                    };
692                    let event_id = match payload_map.get("event_id").and_then(|v| v.as_str()) {
693                        Some(value) => value.to_string(),
694                        None => continue,
695                    };
696                    let trigger_id = payload_map
697                        .get("trigger_id")
698                        .and_then(|v| v.as_str())
699                        .unwrap_or_default()
700                        .to_string();
701                    match_triggers
702                        .entry(event_id.clone())
703                        .or_insert(trigger_id.clone());
704                    let batch_ids = payload_map
705                        .get("batch")
706                        .and_then(|v| v.as_object())
707                        .and_then(|b| b.get("constituent_event_ids"))
708                        .and_then(|v| v.as_array())
709                        .map(|arr| {
710                            let mut ids: Vec<String> = arr
711                                .iter()
712                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
713                                .collect();
714                            ids.sort();
715                            ids
716                        })
717                        .unwrap_or_default();
718                    if !batch_ids.is_empty() {
719                        match_batches
720                            .entry((event_id, trigger_id))
721                            .or_insert(batch_ids);
722                    }
723                }
724                _ => {}
725            }
726        }
727        Ok(Self {
728            emit_hashes,
729            match_triggers,
730            match_batches,
731        })
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    fn base_trace() -> ReplayOracleTrace {
740        ReplayOracleTrace {
741            name: "fixture".to_string(),
742            allowlist: vec![
743                ReplayAllowlistRule {
744                    path: "/run_id".to_string(),
745                    reason: "run ids are allocated per execution".to_string(),
746                    replacement: Some(JsonValue::String("<run-id>".to_string())),
747                },
748                ReplayAllowlistRule {
749                    path: "/event_log_entries/*/event_id".to_string(),
750                    reason: "event log offsets are backend-local".to_string(),
751                    replacement: Some(JsonValue::String("<event-id>".to_string())),
752                },
753                ReplayAllowlistRule {
754                    path: "/event_log_entries/*/occurred_at_ms".to_string(),
755                    reason: "append timestamps are wall-clock observations".to_string(),
756                    replacement: Some(JsonValue::String("<timestamp-ms>".to_string())),
757                },
758            ],
759            first_run: ReplayTraceRun {
760                run_id: "run-a".to_string(),
761                event_log_entries: vec![json!({
762                    "event_id": 10,
763                    "topic": "trigger.outbox",
764                    "kind": "dispatch_succeeded",
765                    "occurred_at_ms": 1000,
766                    "payload": {"binding_id": "demo", "status": "dispatched"}
767                })],
768                ..ReplayTraceRun::default()
769            },
770            second_run: ReplayTraceRun {
771                run_id: "run-b".to_string(),
772                event_log_entries: vec![json!({
773                    "event_id": 42,
774                    "topic": "trigger.outbox",
775                    "kind": "dispatch_succeeded",
776                    "occurred_at_ms": 2000,
777                    "payload": {"binding_id": "demo", "status": "dispatched"}
778                })],
779                ..ReplayTraceRun::default()
780            },
781            ..ReplayOracleTrace::default()
782        }
783    }
784
785    #[test]
786    fn canonical_comparison_allows_explicit_nondeterministic_fields() {
787        let report = run_replay_oracle_trace(&base_trace()).expect("oracle succeeds");
788        assert!(report.passed, "{report:?}");
789        assert_eq!(report.divergence, None);
790    }
791
792    #[test]
793    fn persona_runtime_states_are_first_class_replay_material() {
794        let mut trace = base_trace();
795        trace.first_run.persona_runtime_states = vec![json!({
796            "name": "merge_captain",
797            "state": "idle",
798            "queued_work": [],
799            "handoff_inbox": [],
800            "budget": {"spent_today_usd": 0.01},
801        })];
802        trace.second_run.persona_runtime_states = trace.first_run.persona_runtime_states.clone();
803
804        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
805
806        assert!(report.passed, "{report:?}");
807        assert_eq!(report.first_run_counts.persona_runtime_states, 1);
808    }
809
810    #[test]
811    fn meaningful_drift_reports_first_divergent_path() {
812        let mut trace = base_trace();
813        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
814
815        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
816
817        assert!(!report.passed);
818        let divergence = report.divergence.expect("drift is reported");
819        assert_eq!(divergence.path, "/event_log_entries/0/payload/status");
820        assert_eq!(divergence.left, json!("dispatched"));
821        assert_eq!(divergence.right, json!("dlq"));
822    }
823
824    #[test]
825    fn expected_drift_fixture_passes_only_when_drift_is_detected() {
826        let mut trace = base_trace();
827        trace.expect = ReplayExpectation::Drift;
828        trace.second_run.event_log_entries[0]["payload"]["status"] = json!("dlq");
829
830        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
831
832        assert!(report.passed);
833        assert!(report.divergence.is_some());
834    }
835
836    fn channel_emit_receipt(event_id: &str, payload_hash: &str) -> JsonValue {
837        json!({
838            "kind": "channel_emit_receipt",
839            "payload": {
840                "event_id": event_id,
841                "payload_hash": payload_hash,
842                "name_resolved": "tenant:default:ch.test",
843                "scope": "tenant",
844                "scope_id": "default",
845                "topic": "channels.tenant.default.ch.test",
846                "inserted": true,
847            },
848        })
849    }
850
851    fn channel_match_receipt(
852        event_id: &str,
853        trigger_id: &str,
854        constituent_ids: Option<Vec<&str>>,
855    ) -> JsonValue {
856        let mut payload = serde_json::Map::new();
857        payload.insert("event_id".to_string(), json!(event_id));
858        payload.insert("trigger_id".to_string(), json!(trigger_id));
859        payload.insert("handler_kind".to_string(), json!("local"));
860        if let Some(ids) = constituent_ids {
861            payload.insert(
862                "batch".to_string(),
863                json!({
864                    "count": ids.len(),
865                    "constituent_event_ids": ids,
866                }),
867            );
868        }
869        json!({
870            "kind": "channel_match_receipt",
871            "payload": JsonValue::Object(payload),
872        })
873    }
874
875    #[test]
876    fn channel_replay_diagnostic_clean_runs_have_no_drift() {
877        let recorded = vec![
878            channel_emit_receipt("evt-1", "sha256:a"),
879            channel_match_receipt("evt-1", "trig-x", None),
880        ];
881        let replay = recorded.clone();
882        let diagnostic = diagnose_channel_replay_drift(&recorded, &replay).unwrap();
883        assert!(diagnostic.is_none(), "{diagnostic:?}");
884    }
885
886    #[test]
887    fn channel_replay_diagnostic_001_match_without_emit() {
888        let recorded = vec![channel_emit_receipt("evt-1", "sha256:a")];
889        // Replay matched a different event id with no emit recorded for it.
890        let replay = vec![
891            channel_emit_receipt("evt-1", "sha256:a"),
892            channel_match_receipt("evt-2", "trig-x", None),
893        ];
894        let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
895            .unwrap()
896            .expect("drift");
897        assert_eq!(diagnostic.code(), "HARN-REP-CHN-001");
898        assert!(matches!(
899            diagnostic,
900            ChannelReplayDiagnostic::MatchWithoutEmit { ref event_id, .. } if event_id == "evt-2"
901        ));
902    }
903
904    #[test]
905    fn channel_replay_diagnostic_002_payload_hash_drift() {
906        let recorded = vec![channel_emit_receipt("evt-1", "sha256:a")];
907        let replay = vec![channel_emit_receipt("evt-1", "sha256:b")];
908        let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
909            .unwrap()
910            .expect("drift");
911        assert_eq!(diagnostic.code(), "HARN-REP-CHN-002");
912        let message = diagnostic.message();
913        assert!(message.contains("HARN-REP-CHN-002"));
914        assert!(message.contains("evt-1"));
915    }
916
917    #[test]
918    fn channel_replay_diagnostic_003_batch_composition_drift() {
919        let recorded = vec![
920            channel_emit_receipt("evt-1", "sha256:a"),
921            channel_emit_receipt("evt-2", "sha256:b"),
922            channel_emit_receipt("evt-3", "sha256:c"),
923            channel_match_receipt("evt-1", "trig-x", Some(vec!["evt-1", "evt-2", "evt-3"])),
924        ];
925        let replay = vec![
926            channel_emit_receipt("evt-1", "sha256:a"),
927            channel_emit_receipt("evt-2", "sha256:b"),
928            channel_emit_receipt("evt-3", "sha256:c"),
929            // Replay aggregated a different subset.
930            channel_match_receipt("evt-1", "trig-x", Some(vec!["evt-1", "evt-2"])),
931        ];
932        let diagnostic = diagnose_channel_replay_drift(&recorded, &replay)
933            .unwrap()
934            .expect("drift");
935        assert_eq!(diagnostic.code(), "HARN-REP-CHN-003");
936    }
937
938    #[test]
939    fn channel_receipts_count_first_class_replay_material() {
940        let mut trace = base_trace();
941        trace.first_run.channel_receipts = vec![channel_emit_receipt("evt-1", "sha256:a")];
942        trace.second_run.channel_receipts = trace.first_run.channel_receipts.clone();
943        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
944        assert!(report.passed, "{report:?}");
945        assert_eq!(report.first_run_counts.channel_receipts, 1);
946        assert_eq!(report.second_run_counts.channel_receipts, 1);
947    }
948
949    #[test]
950    fn lifecycle_receipts_are_first_class_replay_material() {
951        let mut trace = base_trace();
952        trace.allowlist.push(ReplayAllowlistRule {
953            path: "/lifecycle_receipts/*/payload/suspended_at/signature".to_string(),
954            reason: "per-process signing salt".to_string(),
955            replacement: Some(JsonValue::String("<signature>".to_string())),
956        });
957        trace.allowlist.push(ReplayAllowlistRule {
958            path: "/lifecycle_receipts/*/payload/suspended_at/at_ms".to_string(),
959            reason: "wall-clock at_ms varies per record".to_string(),
960            replacement: Some(JsonValue::String("<at-ms>".to_string())),
961        });
962        trace.allowlist.push(ReplayAllowlistRule {
963            path: "/lifecycle_receipts/*/payload/suspended_at/at".to_string(),
964            reason: "wall-clock at varies per record".to_string(),
965            replacement: Some(JsonValue::String("<at>".to_string())),
966        });
967        let receipt = json!({
968            "seq": 1,
969            "kind": "suspension_receipt",
970            "payload": {
971                "handle": "worker://x/1",
972                "session_id": null,
973                "initiator": "operator",
974                "initiator_id": "op-1",
975                "reason": "stop",
976                "suspended_at": {
977                    "at_ms": 100,
978                    "at": "1970-01-01T00:00:00.1Z",
979                    "algorithm": "hmac-sha256",
980                    "key_id": "local-session",
981                    "signature": "sha256:deadbeef",
982                },
983            },
984        });
985        trace.first_run.lifecycle_receipts = vec![receipt.clone()];
986        trace.second_run.lifecycle_receipts = vec![receipt];
987
988        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
989
990        assert!(report.passed, "{report:?}");
991        assert_eq!(report.first_run_counts.lifecycle_receipts, 1);
992        assert_eq!(report.second_run_counts.lifecycle_receipts, 1);
993    }
994
995    #[test]
996    fn lifecycle_receipt_input_hash_drift_is_detected() {
997        let mut trace = base_trace();
998        trace.first_run.lifecycle_receipts = vec![json!({
999            "seq": 1,
1000            "kind": "resumption_receipt",
1001            "payload": {
1002                "handle": "worker://x/1",
1003                "initiator": "operator",
1004                "initiator_id": "op-1",
1005                "input_hash": "sha256:aaaa",
1006                "continue_transcript": true,
1007            },
1008        })];
1009        trace.second_run.lifecycle_receipts = vec![json!({
1010            "seq": 1,
1011            "kind": "resumption_receipt",
1012            "payload": {
1013                "handle": "worker://x/1",
1014                "initiator": "operator",
1015                "initiator_id": "op-1",
1016                "input_hash": "sha256:bbbb",
1017                "continue_transcript": true,
1018            },
1019        })];
1020
1021        let report = run_replay_oracle_trace(&trace).expect("oracle succeeds");
1022
1023        assert!(!report.passed);
1024        let divergence = report.divergence.expect("drift is reported");
1025        assert_eq!(divergence.path, "/lifecycle_receipts/0/payload/input_hash");
1026    }
1027
1028    #[test]
1029    fn allowlist_paths_must_match_real_fields() {
1030        let mut trace = base_trace();
1031        trace.allowlist.push(ReplayAllowlistRule {
1032            path: "/llm_interactions/*/latency_ms".to_string(),
1033            reason: "latency is nondeterministic".to_string(),
1034            replacement: None,
1035        });
1036
1037        let error = run_replay_oracle_trace(&trace).expect_err("missing path should fail");
1038
1039        assert!(matches!(error, ReplayOracleError::AllowlistPathMissing(_)));
1040    }
1041}