Skip to main content

pi/
extension_replay.rs

1//! Deterministic replay trace bundle core for extension runtime forensics.
2//!
3//! This module provides a standalone schema + codec surface that records
4//! extension runtime events in a stable order so race/tail anomalies can be
5//! replayed and compared deterministically.
6
7use serde::{Deserialize, Serialize};
8use std::cmp::Ordering;
9use std::collections::{BTreeMap, BTreeSet};
10use thiserror::Error;
11
12/// Canonical schema identifier for replay trace bundles.
13pub const REPLAY_TRACE_SCHEMA_V1: &str = "pi.ext.replay.trace.v1";
14
15/// Kind of extension runtime event captured for deterministic replay.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
17#[serde(rename_all = "snake_case")]
18pub enum ReplayEventKind {
19    Scheduled,
20    QueueAccepted,
21    PolicyDecision,
22    Cancelled,
23    Retried,
24    Completed,
25    Failed,
26}
27
28impl ReplayEventKind {
29    const fn canonical_rank(self) -> u8 {
30        match self {
31            Self::Scheduled => 0,
32            Self::QueueAccepted => 1,
33            Self::PolicyDecision => 2,
34            Self::Cancelled => 3,
35            Self::Retried => 4,
36            Self::Completed => 5,
37            Self::Failed => 6,
38        }
39    }
40}
41
42/// Single deterministic replay trace event.
43#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ReplayTraceEvent {
46    pub seq: u64,
47    pub logical_clock: u64,
48    pub extension_id: String,
49    pub request_id: String,
50    pub kind: ReplayEventKind,
51    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
52    pub attributes: BTreeMap<String, String>,
53}
54
55/// Builder input event before canonical ordering/sequence assignment.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ReplayEventDraft {
58    pub logical_clock: u64,
59    pub extension_id: String,
60    pub request_id: String,
61    pub kind: ReplayEventKind,
62    pub attributes: BTreeMap<String, String>,
63}
64
65impl ReplayEventDraft {
66    #[must_use]
67    pub fn new(
68        logical_clock: u64,
69        extension_id: impl Into<String>,
70        request_id: impl Into<String>,
71        kind: ReplayEventKind,
72    ) -> Self {
73        Self {
74            logical_clock,
75            extension_id: extension_id.into(),
76            request_id: request_id.into(),
77            kind,
78            attributes: BTreeMap::new(),
79        }
80    }
81}
82
83/// Deterministic replay trace bundle.
84#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct ReplayTraceBundle {
87    pub schema: String,
88    pub trace_id: String,
89    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
90    pub metadata: BTreeMap<String, String>,
91    pub events: Vec<ReplayTraceEvent>,
92}
93
94/// First deterministic mismatch between two replay bundles.
95#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
96#[serde(rename_all = "camelCase")]
97pub struct ReplayDivergence {
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub seq: Option<u64>,
100    pub reason: ReplayDivergenceReason,
101}
102
103/// Machine-readable mismatch reason for replay comparisons.
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ReplayDivergenceReason {
107    SchemaMismatch {
108        expected: String,
109        observed: String,
110    },
111    TraceIdMismatch {
112        expected: String,
113        observed: String,
114    },
115    EventCountMismatch {
116        expected: u64,
117        observed: u64,
118    },
119    EventFieldMismatch {
120        field: String,
121        expected: String,
122        observed: String,
123    },
124}
125
126/// Configuration budget for deterministic replay trace capture.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ReplayCaptureBudget {
130    /// Global kill switch for replay capture in production.
131    pub capture_enabled: bool,
132    /// Maximum allowed overhead in per-mille (1/1000) units.
133    pub max_overhead_per_mille: u32,
134    /// Maximum allowed serialized trace bytes for a capture window.
135    pub max_trace_bytes: u64,
136}
137
138/// Runtime observation used to evaluate replay capture budget gates.
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
140#[serde(rename_all = "camelCase")]
141pub struct ReplayCaptureObservation {
142    /// Baseline runtime cost without replay capture.
143    pub baseline_micros: u64,
144    /// Measured runtime cost with replay capture active.
145    pub captured_micros: u64,
146    /// Size of the collected replay trace payload in bytes.
147    pub trace_bytes: u64,
148}
149
150/// Deterministic gate reason emitted by replay capture budget evaluation.
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
152#[serde(rename_all = "snake_case")]
153pub enum ReplayCaptureGateReason {
154    Enabled,
155    DisabledByConfig,
156    DisabledByOverheadBudget,
157    DisabledByTraceBudget,
158    DisabledByInvalidBaseline,
159}
160
161/// Machine-readable report for replay capture gating decisions.
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ReplayCaptureGateReport {
165    pub capture_allowed: bool,
166    pub reason: ReplayCaptureGateReason,
167    pub observed_overhead_per_mille: u32,
168    pub max_overhead_per_mille: u32,
169    pub observed_trace_bytes: u64,
170    pub max_trace_bytes: u64,
171}
172
173/// Deterministic hint categories for automated replay triage.
174#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
175#[serde(rename_all = "snake_case")]
176pub enum ReplayRootCauseHint {
177    TraceSchemaMismatch,
178    TraceIdMismatch,
179    EventCountDrift,
180    EventPayloadDrift,
181    LogicalClockDrift,
182    PolicyGateDisabled,
183    OverheadBudgetExceeded,
184    TraceBudgetExceeded,
185    InvalidBaselineTelemetry,
186}
187
188/// Structured replay diagnostic snapshot for log/event sinks.
189#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190#[serde(rename_all = "camelCase")]
191pub struct ReplayDiagnosticSnapshot {
192    pub trace_id: String,
193    pub schema: String,
194    pub event_count: u64,
195    pub capture_gate: ReplayCaptureGateReport,
196    #[serde(skip_serializing_if = "Option::is_none")]
197    pub divergence: Option<ReplayDivergence>,
198    pub root_cause_hints: Vec<ReplayRootCauseHint>,
199}
200
201/// Evaluate replay capture against overhead and size budgets.
202#[must_use]
203pub fn evaluate_replay_capture_gate(
204    budget: ReplayCaptureBudget,
205    observation: ReplayCaptureObservation,
206) -> ReplayCaptureGateReport {
207    if !budget.capture_enabled {
208        return ReplayCaptureGateReport {
209            capture_allowed: false,
210            reason: ReplayCaptureGateReason::DisabledByConfig,
211            observed_overhead_per_mille: 0,
212            max_overhead_per_mille: budget.max_overhead_per_mille,
213            observed_trace_bytes: observation.trace_bytes,
214            max_trace_bytes: budget.max_trace_bytes,
215        };
216    }
217
218    let observed_overhead_per_mille =
219        compute_overhead_per_mille(observation.baseline_micros, observation.captured_micros);
220
221    if observed_overhead_per_mille == u32::MAX {
222        return ReplayCaptureGateReport {
223            capture_allowed: false,
224            reason: ReplayCaptureGateReason::DisabledByInvalidBaseline,
225            observed_overhead_per_mille,
226            max_overhead_per_mille: budget.max_overhead_per_mille,
227            observed_trace_bytes: observation.trace_bytes,
228            max_trace_bytes: budget.max_trace_bytes,
229        };
230    }
231
232    if observed_overhead_per_mille > budget.max_overhead_per_mille {
233        return ReplayCaptureGateReport {
234            capture_allowed: false,
235            reason: ReplayCaptureGateReason::DisabledByOverheadBudget,
236            observed_overhead_per_mille,
237            max_overhead_per_mille: budget.max_overhead_per_mille,
238            observed_trace_bytes: observation.trace_bytes,
239            max_trace_bytes: budget.max_trace_bytes,
240        };
241    }
242
243    if observation.trace_bytes > budget.max_trace_bytes {
244        return ReplayCaptureGateReport {
245            capture_allowed: false,
246            reason: ReplayCaptureGateReason::DisabledByTraceBudget,
247            observed_overhead_per_mille,
248            max_overhead_per_mille: budget.max_overhead_per_mille,
249            observed_trace_bytes: observation.trace_bytes,
250            max_trace_bytes: budget.max_trace_bytes,
251        };
252    }
253
254    ReplayCaptureGateReport {
255        capture_allowed: true,
256        reason: ReplayCaptureGateReason::Enabled,
257        observed_overhead_per_mille,
258        max_overhead_per_mille: budget.max_overhead_per_mille,
259        observed_trace_bytes: observation.trace_bytes,
260        max_trace_bytes: budget.max_trace_bytes,
261    }
262}
263
264/// Build a machine-readable replay diagnostics snapshot.
265///
266/// # Errors
267///
268/// Returns an error when the replay bundle fails deterministic validation.
269pub fn build_replay_diagnostic_snapshot(
270    bundle: &ReplayTraceBundle,
271    capture_gate: ReplayCaptureGateReport,
272    divergence: Option<&ReplayDivergence>,
273) -> Result<ReplayDiagnosticSnapshot, ReplayTraceValidationError> {
274    bundle.validate()?;
275
276    let event_count = u64::try_from(bundle.events.len())
277        .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
278    let root_cause_hints = derive_root_cause_hints(capture_gate.reason, divergence);
279
280    Ok(ReplayDiagnosticSnapshot {
281        trace_id: bundle.trace_id.clone(),
282        schema: bundle.schema.clone(),
283        event_count,
284        capture_gate,
285        divergence: divergence.cloned(),
286        root_cause_hints,
287    })
288}
289
290fn compute_overhead_per_mille(baseline_micros: u64, captured_micros: u64) -> u32 {
291    if captured_micros <= baseline_micros {
292        return 0;
293    }
294    if baseline_micros == 0 {
295        return u32::MAX;
296    }
297
298    let overhead = u128::from(captured_micros - baseline_micros);
299    let baseline = u128::from(baseline_micros);
300    let scaled = overhead.saturating_mul(1_000);
301    let rounded_up = scaled
302        .saturating_add(baseline - 1)
303        .checked_div(baseline)
304        .unwrap_or(u128::MAX);
305    u32::try_from(rounded_up).unwrap_or(u32::MAX)
306}
307
308fn derive_root_cause_hints(
309    gate_reason: ReplayCaptureGateReason,
310    divergence: Option<&ReplayDivergence>,
311) -> Vec<ReplayRootCauseHint> {
312    let mut hints = BTreeSet::new();
313
314    match gate_reason {
315        ReplayCaptureGateReason::Enabled => {}
316        ReplayCaptureGateReason::DisabledByConfig => {
317            hints.insert(ReplayRootCauseHint::PolicyGateDisabled);
318        }
319        ReplayCaptureGateReason::DisabledByOverheadBudget => {
320            hints.insert(ReplayRootCauseHint::OverheadBudgetExceeded);
321        }
322        ReplayCaptureGateReason::DisabledByTraceBudget => {
323            hints.insert(ReplayRootCauseHint::TraceBudgetExceeded);
324        }
325        ReplayCaptureGateReason::DisabledByInvalidBaseline => {
326            hints.insert(ReplayRootCauseHint::InvalidBaselineTelemetry);
327        }
328    }
329
330    if let Some(divergence) = divergence {
331        match &divergence.reason {
332            ReplayDivergenceReason::SchemaMismatch { .. } => {
333                hints.insert(ReplayRootCauseHint::TraceSchemaMismatch);
334            }
335            ReplayDivergenceReason::TraceIdMismatch { .. } => {
336                hints.insert(ReplayRootCauseHint::TraceIdMismatch);
337            }
338            ReplayDivergenceReason::EventCountMismatch { .. } => {
339                hints.insert(ReplayRootCauseHint::EventCountDrift);
340            }
341            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
342                if field == "logical_clock" {
343                    hints.insert(ReplayRootCauseHint::LogicalClockDrift);
344                } else {
345                    hints.insert(ReplayRootCauseHint::EventPayloadDrift);
346                }
347            }
348        }
349    }
350
351    hints.into_iter().collect()
352}
353
354impl ReplayTraceBundle {
355    /// Encode bundle as compact JSON.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if serialization fails.
360    pub fn encode_json(&self) -> Result<String, serde_json::Error> {
361        serde_json::to_string(self)
362    }
363
364    /// Decode bundle from JSON and validate deterministic invariants.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if parsing fails or bundle invariants are invalid.
369    pub fn decode_json(input: &str) -> Result<Self, ReplayTraceCodecError> {
370        let bundle: Self = serde_json::from_str(input)?;
371        bundle.validate()?;
372        Ok(bundle)
373    }
374
375    /// Validate schema, sequence continuity, and cancellation/retry ordering.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error when the bundle is malformed or violates replay
380    /// ordering invariants.
381    pub fn validate(&self) -> Result<(), ReplayTraceValidationError> {
382        if self.schema != REPLAY_TRACE_SCHEMA_V1 {
383            return Err(ReplayTraceValidationError::UnknownSchema(
384                self.schema.clone(),
385            ));
386        }
387
388        if self.trace_id.trim().is_empty() {
389            return Err(ReplayTraceValidationError::EmptyTraceId);
390        }
391
392        for (idx, event) in self.events.iter().enumerate() {
393            let seq_index = idx
394                .checked_add(1)
395                .ok_or(ReplayTraceValidationError::TooManyEvents)?;
396            let expected_seq =
397                u64::try_from(seq_index).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
398            if event.seq != expected_seq {
399                return Err(ReplayTraceValidationError::NonContiguousSequence {
400                    expected: expected_seq,
401                    observed: event.seq,
402                });
403            }
404
405            if event.extension_id.trim().is_empty() {
406                return Err(ReplayTraceValidationError::MissingExtensionId { seq: event.seq });
407            }
408            if event.request_id.trim().is_empty() {
409                return Err(ReplayTraceValidationError::MissingRequestId { seq: event.seq });
410            }
411        }
412
413        self.validate_retry_ordering()
414    }
415
416    fn validate_retry_ordering(&self) -> Result<(), ReplayTraceValidationError> {
417        let mut pending_cancel: BTreeSet<(String, String)> = BTreeSet::new();
418        for event in &self.events {
419            let key = (event.extension_id.clone(), event.request_id.clone());
420            match event.kind {
421                ReplayEventKind::Cancelled => {
422                    if !pending_cancel.insert(key) {
423                        return Err(ReplayTraceValidationError::DuplicateCancelWithoutRetry {
424                            seq: event.seq,
425                            extension_id: event.extension_id.clone(),
426                            request_id: event.request_id.clone(),
427                        });
428                    }
429                }
430                ReplayEventKind::Retried => {
431                    if !pending_cancel.remove(&key) {
432                        return Err(ReplayTraceValidationError::RetryWithoutCancel {
433                            seq: event.seq,
434                            extension_id: event.extension_id.clone(),
435                            request_id: event.request_id.clone(),
436                        });
437                    }
438                }
439                ReplayEventKind::Completed | ReplayEventKind::Failed => {
440                    pending_cancel.remove(&key);
441                }
442                ReplayEventKind::Scheduled
443                | ReplayEventKind::QueueAccepted
444                | ReplayEventKind::PolicyDecision => {}
445            }
446        }
447        Ok(())
448    }
449}
450
451/// Compare two bundles and return the first deterministic divergence.
452///
453/// Both bundles are validated before comparison.
454///
455/// # Errors
456///
457/// Returns an error if either bundle fails validation.
458pub fn first_divergence(
459    expected: &ReplayTraceBundle,
460    observed: &ReplayTraceBundle,
461) -> Result<Option<ReplayDivergence>, ReplayTraceValidationError> {
462    expected.validate()?;
463    observed.validate()?;
464
465    if expected.schema != observed.schema {
466        return Ok(Some(ReplayDivergence {
467            seq: None,
468            reason: ReplayDivergenceReason::SchemaMismatch {
469                expected: expected.schema.clone(),
470                observed: observed.schema.clone(),
471            },
472        }));
473    }
474
475    if expected.trace_id != observed.trace_id {
476        return Ok(Some(ReplayDivergence {
477            seq: None,
478            reason: ReplayDivergenceReason::TraceIdMismatch {
479                expected: expected.trace_id.clone(),
480                observed: observed.trace_id.clone(),
481            },
482        }));
483    }
484
485    let max_shared = expected.events.len().min(observed.events.len());
486    for idx in 0..max_shared {
487        let left = &expected.events[idx];
488        let right = &observed.events[idx];
489        if left.logical_clock != right.logical_clock {
490            return Ok(Some(field_mismatch(
491                left.seq,
492                "logical_clock",
493                left.logical_clock.to_string(),
494                right.logical_clock.to_string(),
495            )));
496        }
497        if left.extension_id != right.extension_id {
498            return Ok(Some(field_mismatch(
499                left.seq,
500                "extension_id",
501                left.extension_id.clone(),
502                right.extension_id.clone(),
503            )));
504        }
505        if left.request_id != right.request_id {
506            return Ok(Some(field_mismatch(
507                left.seq,
508                "request_id",
509                left.request_id.clone(),
510                right.request_id.clone(),
511            )));
512        }
513        if left.kind != right.kind {
514            return Ok(Some(field_mismatch(
515                left.seq,
516                "kind",
517                format!("{:?}", left.kind),
518                format!("{:?}", right.kind),
519            )));
520        }
521        if left.attributes != right.attributes {
522            return Ok(Some(field_mismatch(
523                left.seq,
524                "attributes",
525                format!("{:?}", left.attributes),
526                format!("{:?}", right.attributes),
527            )));
528        }
529    }
530
531    if expected.events.len() != observed.events.len() {
532        let next_seq = max_shared
533            .checked_add(1)
534            .ok_or(ReplayTraceValidationError::TooManyEvents)?;
535        let seq = u64::try_from(next_seq).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
536        return Ok(Some(ReplayDivergence {
537            seq: Some(seq),
538            reason: ReplayDivergenceReason::EventCountMismatch {
539                expected: u64::try_from(expected.events.len())
540                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
541                observed: u64::try_from(observed.events.len())
542                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
543            },
544        }));
545    }
546
547    Ok(None)
548}
549
550fn field_mismatch(seq: u64, field: &str, expected: String, observed: String) -> ReplayDivergence {
551    ReplayDivergence {
552        seq: Some(seq),
553        reason: ReplayDivergenceReason::EventFieldMismatch {
554            field: field.to_string(),
555            expected,
556            observed,
557        },
558    }
559}
560
561/// Builder that canonicalizes event ordering and sequence assignment.
562#[derive(Debug, Clone, Default)]
563pub struct ReplayTraceBuilder {
564    trace_id: String,
565    metadata: BTreeMap<String, String>,
566    drafts: Vec<ReplayEventDraft>,
567}
568
569impl ReplayTraceBuilder {
570    #[must_use]
571    pub fn new(trace_id: impl Into<String>) -> Self {
572        Self {
573            trace_id: trace_id.into(),
574            metadata: BTreeMap::new(),
575            drafts: Vec::new(),
576        }
577    }
578
579    pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
580        self.metadata.insert(key.into(), value.into());
581    }
582
583    pub fn push(&mut self, draft: ReplayEventDraft) {
584        self.drafts.push(draft);
585    }
586
587    /// Build a validated, deterministic trace bundle.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if sequence assignment overflows or validation fails.
592    pub fn build(self) -> Result<ReplayTraceBundle, ReplayTraceValidationError> {
593        let mut indexed = self
594            .drafts
595            .into_iter()
596            .enumerate()
597            .map(|(insertion_index, draft)| IndexedDraft {
598                insertion_index,
599                draft,
600            })
601            .collect::<Vec<_>>();
602        indexed.sort_by(compare_indexed_drafts);
603
604        let events = indexed
605            .into_iter()
606            .enumerate()
607            .map(|(idx, entry)| {
608                let seq_index = idx
609                    .checked_add(1)
610                    .ok_or(ReplayTraceValidationError::TooManyEvents)?;
611                let seq = u64::try_from(seq_index)
612                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
613                Ok(ReplayTraceEvent {
614                    seq,
615                    logical_clock: entry.draft.logical_clock,
616                    extension_id: entry.draft.extension_id,
617                    request_id: entry.draft.request_id,
618                    kind: entry.draft.kind,
619                    attributes: entry.draft.attributes,
620                })
621            })
622            .collect::<Result<Vec<_>, ReplayTraceValidationError>>()?;
623
624        let bundle = ReplayTraceBundle {
625            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
626            trace_id: self.trace_id,
627            metadata: self.metadata,
628            events,
629        };
630        bundle.validate()?;
631        Ok(bundle)
632    }
633}
634
635#[derive(Debug, Clone)]
636struct IndexedDraft {
637    insertion_index: usize,
638    draft: ReplayEventDraft,
639}
640
641fn compare_indexed_drafts(left: &IndexedDraft, right: &IndexedDraft) -> Ordering {
642    left.draft
643        .logical_clock
644        .cmp(&right.draft.logical_clock)
645        .then_with(|| left.draft.extension_id.cmp(&right.draft.extension_id))
646        .then_with(|| left.draft.request_id.cmp(&right.draft.request_id))
647        .then_with(|| {
648            left.draft
649                .kind
650                .canonical_rank()
651                .cmp(&right.draft.kind.canonical_rank())
652        })
653        .then_with(|| left.insertion_index.cmp(&right.insertion_index))
654}
655
656/// Validation failures for replay bundle semantics.
657#[derive(Debug, Clone, PartialEq, Eq, Error)]
658pub enum ReplayTraceValidationError {
659    #[error("unknown replay trace schema: {0}")]
660    UnknownSchema(String),
661    #[error("trace id must not be empty")]
662    EmptyTraceId,
663    #[error("replay bundle contains too many events to index")]
664    TooManyEvents,
665    #[error("non-contiguous sequence: expected {expected}, observed {observed}")]
666    NonContiguousSequence { expected: u64, observed: u64 },
667    #[error("event seq {seq} missing extension id")]
668    MissingExtensionId { seq: u64 },
669    #[error("event seq {seq} missing request id")]
670    MissingRequestId { seq: u64 },
671    #[error("retry without prior cancel at seq {seq} for {extension_id}/{request_id}")]
672    RetryWithoutCancel {
673        seq: u64,
674        extension_id: String,
675        request_id: String,
676    },
677    #[error("duplicate cancel without retry at seq {seq} for {extension_id}/{request_id}")]
678    DuplicateCancelWithoutRetry {
679        seq: u64,
680        extension_id: String,
681        request_id: String,
682    },
683}
684
685/// Codec-level decode failures.
686#[derive(Debug, Error)]
687pub enum ReplayTraceCodecError {
688    #[error("failed to parse replay trace JSON: {0}")]
689    Deserialize(#[from] serde_json::Error),
690    #[error("invalid replay trace bundle: {0}")]
691    Validation(#[from] ReplayTraceValidationError),
692}
693
694/// Configuration for a replay recording lane.
695#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
696#[serde(rename_all = "camelCase")]
697pub struct ReplayLaneConfig {
698    /// Budget constraints for this lane.
699    pub budget: ReplayCaptureBudget,
700    /// Static metadata attached to every trace produced by this lane.
701    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
702    pub lane_metadata: BTreeMap<String, String>,
703}
704
705impl ReplayLaneConfig {
706    #[must_use]
707    pub const fn new(budget: ReplayCaptureBudget) -> Self {
708        Self {
709            budget,
710            lane_metadata: BTreeMap::new(),
711        }
712    }
713
714    /// Insert a metadata key-value pair into the lane config.
715    pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
716        self.lane_metadata.insert(key.into(), value.into());
717    }
718}
719
720/// Outcome of a completed replay recording session.
721#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
722#[serde(rename_all = "camelCase")]
723pub struct ReplayLaneResult {
724    /// The recorded trace bundle (present even when gated, for forensic access).
725    pub bundle: ReplayTraceBundle,
726    /// Budget gate report for this recording.
727    pub gate_report: ReplayCaptureGateReport,
728    /// Diagnostic snapshot with root-cause hints.
729    pub diagnostic: ReplayDiagnosticSnapshot,
730}
731
732/// Outcome of comparing a recorded trace against a reference.
733#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
734#[serde(rename_all = "camelCase")]
735pub struct ReplayComparisonResult {
736    /// The reference bundle used for comparison.
737    pub reference_trace_id: String,
738    /// The observed bundle being compared.
739    pub observed_trace_id: String,
740    /// First divergence found, if any.
741    #[serde(skip_serializing_if = "Option::is_none")]
742    pub divergence: Option<ReplayDivergence>,
743    /// Root-cause hints derived from the comparison.
744    pub root_cause_hints: Vec<ReplayRootCauseHint>,
745}
746
747/// Stateful recorder that accumulates extension runtime events during a
748/// dispatch cycle and produces a deterministic [`ReplayTraceBundle`].
749///
750/// Events are pushed in arrival order; the recorder's [`finish`](Self::finish)
751/// method canonicalizes ordering, applies the budget gate, and builds the
752/// diagnostic snapshot.
753#[derive(Debug)]
754pub struct ReplayRecorder {
755    config: ReplayLaneConfig,
756    builder: ReplayTraceBuilder,
757    logical_clock: u64,
758    event_count: u64,
759}
760
761impl ReplayRecorder {
762    /// Create a new recorder for a single dispatch cycle.
763    #[must_use]
764    pub fn new(trace_id: impl Into<String>, config: ReplayLaneConfig) -> Self {
765        let mut builder = ReplayTraceBuilder::new(trace_id);
766        for (key, value) in &config.lane_metadata {
767            builder.insert_metadata(key.clone(), value.clone());
768        }
769        Self {
770            config,
771            builder,
772            logical_clock: 0,
773            event_count: 0,
774        }
775    }
776
777    /// Current logical clock value.
778    #[must_use]
779    pub const fn logical_clock(&self) -> u64 {
780        self.logical_clock
781    }
782
783    /// Number of events recorded so far.
784    #[must_use]
785    pub const fn event_count(&self) -> u64 {
786        self.event_count
787    }
788
789    /// Advance the logical clock and return the new value.
790    pub const fn tick(&mut self) -> u64 {
791        self.logical_clock = self.logical_clock.saturating_add(1);
792        self.logical_clock
793    }
794
795    /// Record an event at the current logical clock.
796    pub fn record(
797        &mut self,
798        extension_id: impl Into<String>,
799        request_id: impl Into<String>,
800        kind: ReplayEventKind,
801        attributes: BTreeMap<String, String>,
802    ) {
803        let mut draft = ReplayEventDraft::new(self.logical_clock, extension_id, request_id, kind);
804        draft.attributes = attributes;
805        self.builder.push(draft);
806        self.event_count = self.event_count.saturating_add(1);
807    }
808
809    /// Record a `Scheduled` event.
810    pub fn record_scheduled(
811        &mut self,
812        extension_id: impl Into<String>,
813        request_id: impl Into<String>,
814        attributes: BTreeMap<String, String>,
815    ) {
816        self.record(
817            extension_id,
818            request_id,
819            ReplayEventKind::Scheduled,
820            attributes,
821        );
822    }
823
824    /// Record a `QueueAccepted` event.
825    pub fn record_queue_accepted(
826        &mut self,
827        extension_id: impl Into<String>,
828        request_id: impl Into<String>,
829        attributes: BTreeMap<String, String>,
830    ) {
831        self.record(
832            extension_id,
833            request_id,
834            ReplayEventKind::QueueAccepted,
835            attributes,
836        );
837    }
838
839    /// Record a `PolicyDecision` event.
840    pub fn record_policy_decision(
841        &mut self,
842        extension_id: impl Into<String>,
843        request_id: impl Into<String>,
844        attributes: BTreeMap<String, String>,
845    ) {
846        self.record(
847            extension_id,
848            request_id,
849            ReplayEventKind::PolicyDecision,
850            attributes,
851        );
852    }
853
854    /// Record a `Cancelled` event.
855    pub fn record_cancelled(
856        &mut self,
857        extension_id: impl Into<String>,
858        request_id: impl Into<String>,
859        attributes: BTreeMap<String, String>,
860    ) {
861        self.record(
862            extension_id,
863            request_id,
864            ReplayEventKind::Cancelled,
865            attributes,
866        );
867    }
868
869    /// Record a `Retried` event.
870    pub fn record_retried(
871        &mut self,
872        extension_id: impl Into<String>,
873        request_id: impl Into<String>,
874        attributes: BTreeMap<String, String>,
875    ) {
876        self.record(
877            extension_id,
878            request_id,
879            ReplayEventKind::Retried,
880            attributes,
881        );
882    }
883
884    /// Record a `Completed` event.
885    pub fn record_completed(
886        &mut self,
887        extension_id: impl Into<String>,
888        request_id: impl Into<String>,
889        attributes: BTreeMap<String, String>,
890    ) {
891        self.record(
892            extension_id,
893            request_id,
894            ReplayEventKind::Completed,
895            attributes,
896        );
897    }
898
899    /// Record a `Failed` event.
900    pub fn record_failed(
901        &mut self,
902        extension_id: impl Into<String>,
903        request_id: impl Into<String>,
904        attributes: BTreeMap<String, String>,
905    ) {
906        self.record(
907            extension_id,
908            request_id,
909            ReplayEventKind::Failed,
910            attributes,
911        );
912    }
913
914    /// Finalize the recording: canonicalize event ordering, apply budget gate,
915    /// and build the diagnostic snapshot.
916    ///
917    /// The `observation` provides runtime telemetry needed for budget gating.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the trace bundle fails validation.
922    pub fn finish(
923        self,
924        observation: ReplayCaptureObservation,
925    ) -> Result<ReplayLaneResult, ReplayTraceValidationError> {
926        let bundle = self.builder.build()?;
927        let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
928        let diagnostic = build_replay_diagnostic_snapshot(&bundle, gate_report, None)?;
929
930        Ok(ReplayLaneResult {
931            bundle,
932            gate_report,
933            diagnostic,
934        })
935    }
936
937    /// Finalize and compare against a reference bundle.
938    ///
939    /// Returns the lane result together with a comparison result that
940    /// includes the first divergence (if any) and derived root-cause hints.
941    ///
942    /// # Errors
943    ///
944    /// Returns an error if either bundle fails validation.
945    pub fn finish_and_compare(
946        self,
947        observation: ReplayCaptureObservation,
948        reference: &ReplayTraceBundle,
949    ) -> Result<(ReplayLaneResult, ReplayComparisonResult), ReplayTraceValidationError> {
950        let bundle = self.builder.build()?;
951        let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
952        let divergence_opt = first_divergence(reference, &bundle)?;
953        let diagnostic =
954            build_replay_diagnostic_snapshot(&bundle, gate_report, divergence_opt.as_ref())?;
955
956        let comparison = ReplayComparisonResult {
957            reference_trace_id: reference.trace_id.clone(),
958            observed_trace_id: bundle.trace_id.clone(),
959            divergence: divergence_opt,
960            root_cause_hints: diagnostic.root_cause_hints.clone(),
961        };
962
963        let result = ReplayLaneResult {
964            bundle,
965            gate_report,
966            diagnostic,
967        };
968
969        Ok((result, comparison))
970    }
971}
972
973/// Compare two previously-recorded bundles without an active recorder.
974///
975/// # Errors
976///
977/// Returns an error if either bundle fails validation.
978pub fn compare_replay_bundles(
979    reference: &ReplayTraceBundle,
980    observed: &ReplayTraceBundle,
981    gate_report: ReplayCaptureGateReport,
982) -> Result<(ReplayDiagnosticSnapshot, ReplayComparisonResult), ReplayTraceValidationError> {
983    let divergence_opt = first_divergence(reference, observed)?;
984    let diagnostic =
985        build_replay_diagnostic_snapshot(observed, gate_report, divergence_opt.as_ref())?;
986
987    let comparison = ReplayComparisonResult {
988        reference_trace_id: reference.trace_id.clone(),
989        observed_trace_id: observed.trace_id.clone(),
990        divergence: divergence_opt,
991        root_cause_hints: diagnostic.root_cause_hints.clone(),
992    };
993
994    Ok((diagnostic, comparison))
995}
996
997#[cfg(test)]
998mod tests {
999    use super::{
1000        REPLAY_TRACE_SCHEMA_V1, ReplayCaptureBudget, ReplayCaptureGateReason,
1001        ReplayCaptureObservation, ReplayDivergenceReason, ReplayEventDraft, ReplayEventKind,
1002        ReplayRootCauseHint, ReplayTraceBuilder, ReplayTraceBundle, ReplayTraceCodecError,
1003        ReplayTraceValidationError, build_replay_diagnostic_snapshot, evaluate_replay_capture_gate,
1004        first_divergence,
1005    };
1006    use std::collections::BTreeMap;
1007
1008    fn draft(
1009        logical_clock: u64,
1010        extension_id: &str,
1011        request_id: &str,
1012        kind: ReplayEventKind,
1013    ) -> ReplayEventDraft {
1014        ReplayEventDraft::new(
1015            logical_clock,
1016            extension_id.to_string(),
1017            request_id.to_string(),
1018            kind,
1019        )
1020    }
1021
1022    const fn standard_capture_budget() -> ReplayCaptureBudget {
1023        ReplayCaptureBudget {
1024            capture_enabled: true,
1025            max_overhead_per_mille: 120,
1026            max_trace_bytes: 8_192,
1027        }
1028    }
1029
1030    fn standard_bundle() -> ReplayTraceBundle {
1031        let mut builder = ReplayTraceBuilder::new("trace-diagnostic");
1032        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1033        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1034        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Completed));
1035        builder.build().expect("bundle should build")
1036    }
1037
1038    #[test]
1039    fn deterministic_build_is_order_stable_across_input_permutations() {
1040        let mut left = ReplayTraceBuilder::new("trace-a");
1041        left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1042        left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1043        left.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1044        left.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1045
1046        let mut right = ReplayTraceBuilder::new("trace-a");
1047        right.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1048        right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1049        right.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1050        right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1051
1052        let left_bundle = left.build().expect("left bundle should build");
1053        let right_bundle = right.build().expect("right bundle should build");
1054
1055        assert_eq!(left_bundle, right_bundle);
1056        assert_eq!(left_bundle.events[0].kind, ReplayEventKind::Cancelled);
1057        assert_eq!(left_bundle.events[1].kind, ReplayEventKind::Retried);
1058    }
1059
1060    #[test]
1061    fn json_roundtrip_preserves_replay_bundle() {
1062        let mut builder = ReplayTraceBuilder::new("trace-roundtrip");
1063        builder.insert_metadata("lane", "shadow");
1064        let mut event = draft(20, "ext.gamma", "req-7", ReplayEventKind::PolicyDecision);
1065        event
1066            .attributes
1067            .insert("decision".to_string(), "fast_lane".to_string());
1068        builder.push(draft(19, "ext.gamma", "req-7", ReplayEventKind::Scheduled));
1069        builder.push(event);
1070        builder.push(draft(21, "ext.gamma", "req-7", ReplayEventKind::Completed));
1071
1072        let bundle = builder.build().expect("bundle should build");
1073        let encoded = bundle.encode_json().expect("bundle should encode");
1074        let decoded = ReplayTraceBundle::decode_json(&encoded).expect("bundle should decode");
1075        assert_eq!(decoded, bundle);
1076    }
1077
1078    #[test]
1079    fn decode_rejects_retry_without_prior_cancel() {
1080        let bundle = ReplayTraceBundle {
1081            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1082            trace_id: "trace-invalid".to_string(),
1083            metadata: BTreeMap::new(),
1084            events: vec![super::ReplayTraceEvent {
1085                seq: 1,
1086                logical_clock: 1,
1087                extension_id: "ext.a".to_string(),
1088                request_id: "req".to_string(),
1089                kind: ReplayEventKind::Retried,
1090                attributes: BTreeMap::new(),
1091            }],
1092        };
1093        let encoded = bundle
1094            .encode_json()
1095            .expect("invalid bundle should serialize");
1096
1097        let error = ReplayTraceBundle::decode_json(&encoded).expect_err("retry without cancel");
1098        match error {
1099            ReplayTraceCodecError::Validation(ReplayTraceValidationError::RetryWithoutCancel {
1100                ..
1101            }) => {}
1102            other => panic!("unexpected error: {other:?}"),
1103        }
1104    }
1105
1106    #[test]
1107    fn decode_rejects_non_contiguous_sequence() {
1108        let bundle = ReplayTraceBundle {
1109            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1110            trace_id: "trace-seq".to_string(),
1111            metadata: BTreeMap::new(),
1112            events: vec![
1113                super::ReplayTraceEvent {
1114                    seq: 1,
1115                    logical_clock: 1,
1116                    extension_id: "ext.a".to_string(),
1117                    request_id: "req-1".to_string(),
1118                    kind: ReplayEventKind::Scheduled,
1119                    attributes: BTreeMap::new(),
1120                },
1121                super::ReplayTraceEvent {
1122                    seq: 3,
1123                    logical_clock: 2,
1124                    extension_id: "ext.a".to_string(),
1125                    request_id: "req-1".to_string(),
1126                    kind: ReplayEventKind::Completed,
1127                    attributes: BTreeMap::new(),
1128                },
1129            ],
1130        };
1131        let encoded = bundle
1132            .encode_json()
1133            .expect("invalid bundle should serialize");
1134
1135        let error = ReplayTraceBundle::decode_json(&encoded).expect_err("non-contiguous seq");
1136        match error {
1137            ReplayTraceCodecError::Validation(
1138                ReplayTraceValidationError::NonContiguousSequence { expected, observed },
1139            ) => {
1140                assert_eq!(expected, 2);
1141                assert_eq!(observed, 3);
1142            }
1143            other => panic!("unexpected error: {other:?}"),
1144        }
1145    }
1146
1147    #[test]
1148    fn divergence_reports_kind_mismatch_with_seq() {
1149        let mut expected_builder = ReplayTraceBuilder::new("trace-divergence");
1150        expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1151        expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1152        let expected = expected_builder.build().expect("expected bundle");
1153
1154        let mut observed_builder = ReplayTraceBuilder::new("trace-divergence");
1155        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1156        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Failed));
1157        let observed = observed_builder.build().expect("observed bundle");
1158
1159        let divergence = first_divergence(&expected, &observed)
1160            .expect("comparison should succeed")
1161            .expect("divergence expected");
1162        assert_eq!(divergence.seq, Some(2));
1163        match divergence.reason {
1164            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1165                assert_eq!(field, "kind");
1166            }
1167            other => panic!("unexpected divergence reason: {other:?}"),
1168        }
1169    }
1170
1171    #[test]
1172    fn divergence_reports_event_count_mismatch() {
1173        let mut expected_builder = ReplayTraceBuilder::new("trace-length");
1174        expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1175        expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1176        let expected = expected_builder.build().expect("expected bundle");
1177
1178        let mut observed_builder = ReplayTraceBuilder::new("trace-length");
1179        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1180        let observed = observed_builder.build().expect("observed bundle");
1181
1182        let divergence = first_divergence(&expected, &observed)
1183            .expect("comparison should succeed")
1184            .expect("divergence expected");
1185        assert_eq!(divergence.seq, Some(2));
1186        match divergence.reason {
1187            ReplayDivergenceReason::EventCountMismatch { expected, observed } => {
1188                assert_eq!(expected, 2);
1189                assert_eq!(observed, 1);
1190            }
1191            other => panic!("unexpected divergence reason: {other:?}"),
1192        }
1193    }
1194
1195    #[test]
1196    fn divergence_returns_none_for_identical_bundles() {
1197        let mut builder = ReplayTraceBuilder::new("trace-identical");
1198        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1199        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1200        let bundle = builder.build().expect("bundle");
1201
1202        let divergence =
1203            first_divergence(&bundle, &bundle).expect("comparison should validate identical");
1204        assert!(divergence.is_none());
1205    }
1206
1207    #[test]
1208    fn capture_gate_disables_when_config_switch_is_off() {
1209        let mut budget = standard_capture_budget();
1210        budget.capture_enabled = false;
1211        let observation = ReplayCaptureObservation {
1212            baseline_micros: 1_000,
1213            captured_micros: 1_010,
1214            trace_bytes: 128,
1215        };
1216
1217        let report = evaluate_replay_capture_gate(budget, observation);
1218        assert!(!report.capture_allowed);
1219        assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
1220    }
1221
1222    #[test]
1223    fn capture_gate_disables_when_overhead_exceeds_budget() {
1224        let budget = standard_capture_budget();
1225        let observation = ReplayCaptureObservation {
1226            baseline_micros: 1_000,
1227            captured_micros: 1_140,
1228            trace_bytes: 512,
1229        };
1230
1231        let report = evaluate_replay_capture_gate(budget, observation);
1232        assert!(!report.capture_allowed);
1233        assert_eq!(
1234            report.reason,
1235            ReplayCaptureGateReason::DisabledByOverheadBudget
1236        );
1237        assert_eq!(report.observed_overhead_per_mille, 140);
1238    }
1239
1240    #[test]
1241    fn capture_gate_disables_when_trace_budget_exceeded() {
1242        let budget = standard_capture_budget();
1243        let observation = ReplayCaptureObservation {
1244            baseline_micros: 1_000,
1245            captured_micros: 1_050,
1246            trace_bytes: 9_000,
1247        };
1248
1249        let report = evaluate_replay_capture_gate(budget, observation);
1250        assert!(!report.capture_allowed);
1251        assert_eq!(
1252            report.reason,
1253            ReplayCaptureGateReason::DisabledByTraceBudget
1254        );
1255        assert_eq!(report.observed_overhead_per_mille, 50);
1256    }
1257
1258    #[test]
1259    fn capture_gate_fails_closed_on_invalid_baseline() {
1260        let budget = standard_capture_budget();
1261        let observation = ReplayCaptureObservation {
1262            baseline_micros: 0,
1263            captured_micros: 1,
1264            trace_bytes: 64,
1265        };
1266
1267        let report = evaluate_replay_capture_gate(budget, observation);
1268        assert!(!report.capture_allowed);
1269        assert_eq!(
1270            report.reason,
1271            ReplayCaptureGateReason::DisabledByInvalidBaseline
1272        );
1273        assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1274    }
1275
1276    #[test]
1277    fn capture_gate_reports_deterministic_within_budget_enablement() {
1278        let budget = standard_capture_budget();
1279        let observation = ReplayCaptureObservation {
1280            baseline_micros: 1_000,
1281            captured_micros: 1_080,
1282            trace_bytes: 4_096,
1283        };
1284
1285        let first = evaluate_replay_capture_gate(budget, observation);
1286        let second = evaluate_replay_capture_gate(budget, observation);
1287
1288        assert_eq!(first, second);
1289        assert!(first.capture_allowed);
1290        assert_eq!(first.reason, ReplayCaptureGateReason::Enabled);
1291        assert_eq!(first.observed_overhead_per_mille, 80);
1292    }
1293
1294    #[test]
1295    fn diagnostic_snapshot_emits_hint_codes_for_gate_and_payload_drift() {
1296        let expected = standard_bundle();
1297
1298        let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
1299        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1300        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1301        observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
1302        let observed = observed_builder.build().expect("observed bundle");
1303
1304        let divergence = first_divergence(&expected, &observed)
1305            .expect("comparison should succeed")
1306            .expect("divergence expected");
1307        let capture_gate = evaluate_replay_capture_gate(
1308            standard_capture_budget(),
1309            ReplayCaptureObservation {
1310                baseline_micros: 1_000,
1311                captured_micros: 1_150,
1312                trace_bytes: 64,
1313            },
1314        );
1315
1316        let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1317            .expect("snapshot should build");
1318        assert_eq!(snapshot.event_count, 3);
1319        assert_eq!(
1320            snapshot.root_cause_hints,
1321            vec![
1322                ReplayRootCauseHint::EventPayloadDrift,
1323                ReplayRootCauseHint::OverheadBudgetExceeded,
1324            ]
1325        );
1326    }
1327
1328    #[test]
1329    fn diagnostic_snapshot_maps_logical_clock_drift_hint() {
1330        let expected = standard_bundle();
1331        let mut observed = expected.clone();
1332        observed.events[1].logical_clock = 77;
1333
1334        let divergence = first_divergence(&expected, &observed)
1335            .expect("comparison should succeed")
1336            .expect("divergence expected");
1337        let capture_gate = evaluate_replay_capture_gate(
1338            standard_capture_budget(),
1339            ReplayCaptureObservation {
1340                baseline_micros: 1_000,
1341                captured_micros: 1_010,
1342                trace_bytes: 64,
1343            },
1344        );
1345
1346        let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1347            .expect("snapshot should build");
1348        assert_eq!(
1349            snapshot.root_cause_hints,
1350            vec![ReplayRootCauseHint::LogicalClockDrift]
1351        );
1352    }
1353
1354    #[test]
1355    fn diagnostic_snapshot_is_deterministic_for_same_inputs() {
1356        let bundle = standard_bundle();
1357        let capture_gate = evaluate_replay_capture_gate(
1358            standard_capture_budget(),
1359            ReplayCaptureObservation {
1360                baseline_micros: 1_000,
1361                captured_micros: 1_020,
1362                trace_bytes: 256,
1363            },
1364        );
1365
1366        let first =
1367            build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("first snapshot");
1368        let second =
1369            build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("second snapshot");
1370        assert_eq!(first, second);
1371    }
1372
1373    #[test]
1374    fn diagnostic_snapshot_rejects_invalid_bundle() {
1375        let invalid = ReplayTraceBundle {
1376            schema: "invalid.schema".to_string(),
1377            trace_id: "trace-bad".to_string(),
1378            metadata: BTreeMap::new(),
1379            events: Vec::new(),
1380        };
1381        let capture_gate = evaluate_replay_capture_gate(
1382            standard_capture_budget(),
1383            ReplayCaptureObservation {
1384                baseline_micros: 1_000,
1385                captured_micros: 1_000,
1386                trace_bytes: 0,
1387            },
1388        );
1389
1390        let error = build_replay_diagnostic_snapshot(&invalid, capture_gate, None)
1391            .expect_err("invalid schema should fail");
1392        assert!(matches!(
1393            error,
1394            ReplayTraceValidationError::UnknownSchema(_)
1395        ));
1396    }
1397
1398    // ── Builder edge cases ──
1399
1400    #[test]
1401    fn builder_empty_events_produces_valid_bundle() {
1402        let builder = ReplayTraceBuilder::new("trace-empty");
1403        let bundle = builder.build().expect("empty bundle should be valid");
1404        assert!(bundle.events.is_empty());
1405        assert_eq!(bundle.schema, REPLAY_TRACE_SCHEMA_V1);
1406        assert_eq!(bundle.trace_id, "trace-empty");
1407    }
1408
1409    #[test]
1410    fn builder_metadata_preserved_in_output() {
1411        let mut builder = ReplayTraceBuilder::new("trace-meta");
1412        builder.insert_metadata("env", "production");
1413        builder.insert_metadata("version", "1.2.3");
1414        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1415        let bundle = builder.build().expect("bundle with metadata");
1416        assert_eq!(
1417            bundle.metadata.get("env").map(String::as_str),
1418            Some("production")
1419        );
1420        assert_eq!(
1421            bundle.metadata.get("version").map(String::as_str),
1422            Some("1.2.3")
1423        );
1424    }
1425
1426    #[test]
1427    fn builder_metadata_overwrite_works() {
1428        let mut builder = ReplayTraceBuilder::new("trace-meta-ow");
1429        builder.insert_metadata("key", "old");
1430        builder.insert_metadata("key", "new");
1431        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1432        let bundle = builder.build().expect("metadata overwrite");
1433        assert_eq!(bundle.metadata.get("key").map(String::as_str), Some("new"));
1434    }
1435
1436    #[test]
1437    fn draft_attributes_carried_through_build() {
1438        let mut d = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1439        d.attributes
1440            .insert("policy".to_string(), "fast_lane".to_string());
1441        d.attributes
1442            .insert("latency_ms".to_string(), "12".to_string());
1443        let mut builder = ReplayTraceBuilder::new("trace-attrs");
1444        builder.push(d);
1445        let bundle = builder.build().expect("bundle with attrs");
1446        assert_eq!(bundle.events[0].attributes.len(), 2);
1447        assert_eq!(
1448            bundle.events[0]
1449                .attributes
1450                .get("policy")
1451                .map(String::as_str),
1452            Some("fast_lane")
1453        );
1454    }
1455
1456    // ── Validation error paths ──
1457
1458    #[test]
1459    fn validate_rejects_empty_trace_id() {
1460        let mut builder = ReplayTraceBuilder::new("");
1461        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1462        let err = builder.build().expect_err("empty trace_id should fail");
1463        assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1464    }
1465
1466    #[test]
1467    fn validate_rejects_whitespace_only_trace_id() {
1468        let mut builder = ReplayTraceBuilder::new("   ");
1469        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1470        let err = builder
1471            .build()
1472            .expect_err("whitespace trace_id should fail");
1473        assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1474    }
1475
1476    #[test]
1477    fn validate_rejects_empty_extension_id() {
1478        let mut builder = ReplayTraceBuilder::new("trace-val");
1479        builder.push(draft(1, "", "req-1", ReplayEventKind::Scheduled));
1480        let err = builder.build().expect_err("empty extension_id should fail");
1481        assert!(matches!(
1482            err,
1483            ReplayTraceValidationError::MissingExtensionId { .. }
1484        ));
1485    }
1486
1487    #[test]
1488    fn validate_rejects_empty_request_id() {
1489        let mut builder = ReplayTraceBuilder::new("trace-val");
1490        builder.push(draft(1, "ext.a", "", ReplayEventKind::Scheduled));
1491        let err = builder.build().expect_err("empty request_id should fail");
1492        assert!(matches!(
1493            err,
1494            ReplayTraceValidationError::MissingRequestId { .. }
1495        ));
1496    }
1497
1498    #[test]
1499    fn validate_rejects_duplicate_cancel_without_retry() {
1500        let mut builder = ReplayTraceBuilder::new("trace-dup-cancel");
1501        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1502        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Cancelled));
1503        let err = builder.build().expect_err("duplicate cancel should fail");
1504        assert!(matches!(
1505            err,
1506            ReplayTraceValidationError::DuplicateCancelWithoutRetry { .. }
1507        ));
1508    }
1509
1510    #[test]
1511    fn cancel_then_retry_then_cancel_is_valid() {
1512        let mut builder = ReplayTraceBuilder::new("trace-cancel-retry-cancel");
1513        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1514        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Retried));
1515        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1516        let bundle = builder
1517            .build()
1518            .expect("cancel-retry-cancel should be valid");
1519        assert_eq!(bundle.events.len(), 3);
1520    }
1521
1522    #[test]
1523    fn completed_clears_pending_cancel() {
1524        let mut builder = ReplayTraceBuilder::new("trace-complete-clear");
1525        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1526        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1527        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1528        let bundle = builder
1529            .build()
1530            .expect("completed should clear cancel state");
1531        assert_eq!(bundle.events.len(), 3);
1532    }
1533
1534    // ── ReplayEventKind ordering ──
1535
1536    #[test]
1537    fn event_kind_canonical_rank_is_monotonic() {
1538        let kinds = [
1539            ReplayEventKind::Scheduled,
1540            ReplayEventKind::QueueAccepted,
1541            ReplayEventKind::PolicyDecision,
1542            ReplayEventKind::Cancelled,
1543            ReplayEventKind::Retried,
1544            ReplayEventKind::Completed,
1545            ReplayEventKind::Failed,
1546        ];
1547        for pair in kinds.windows(2) {
1548            assert!(
1549                pair[0].canonical_rank() < pair[1].canonical_rank(),
1550                "{:?} should have lower rank than {:?}",
1551                pair[0],
1552                pair[1]
1553            );
1554        }
1555    }
1556
1557    #[test]
1558    fn event_kind_serde_roundtrip() {
1559        let kinds = [
1560            ReplayEventKind::Scheduled,
1561            ReplayEventKind::QueueAccepted,
1562            ReplayEventKind::PolicyDecision,
1563            ReplayEventKind::Cancelled,
1564            ReplayEventKind::Retried,
1565            ReplayEventKind::Completed,
1566            ReplayEventKind::Failed,
1567        ];
1568        for kind in kinds {
1569            let json = serde_json::to_string(&kind).expect("serialize kind");
1570            let roundtrip: ReplayEventKind = serde_json::from_str(&json).expect("deserialize kind");
1571            assert_eq!(kind, roundtrip);
1572        }
1573    }
1574
1575    // ── Divergence edge cases ──
1576
1577    #[test]
1578    fn divergence_detects_schema_mismatch() {
1579        let mut observed = standard_bundle();
1580        observed.schema = "pi.ext.replay.trace.v2".to_string();
1581
1582        // We can't use first_divergence because validate() would reject v2.
1583        // Instead test the divergence reason enum directly.
1584        let d = super::ReplayDivergence {
1585            seq: None,
1586            reason: ReplayDivergenceReason::SchemaMismatch {
1587                expected: REPLAY_TRACE_SCHEMA_V1.to_string(),
1588                observed: "pi.ext.replay.trace.v2".to_string(),
1589            },
1590        };
1591        let json = serde_json::to_string(&d).expect("serialize divergence");
1592        let roundtrip: super::ReplayDivergence =
1593            serde_json::from_str(&json).expect("deserialize divergence");
1594        assert_eq!(d, roundtrip);
1595    }
1596
1597    #[test]
1598    fn divergence_detects_attribute_mismatch() {
1599        let mut builder_a = ReplayTraceBuilder::new("trace-attrs-cmp");
1600        let mut d1 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1601        d1.attributes
1602            .insert("decision".to_string(), "fast".to_string());
1603        builder_a.push(d1);
1604        let expected = builder_a.build().expect("bundle a");
1605
1606        let mut builder_b = ReplayTraceBuilder::new("trace-attrs-cmp");
1607        let mut d2 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1608        d2.attributes
1609            .insert("decision".to_string(), "slow".to_string());
1610        builder_b.push(d2);
1611        let observed = builder_b.build().expect("bundle b");
1612
1613        let divergence = first_divergence(&expected, &observed)
1614            .expect("comparison should succeed")
1615            .expect("attribute mismatch expected");
1616        assert_eq!(divergence.seq, Some(1));
1617        match divergence.reason {
1618            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1619                assert_eq!(field, "attributes");
1620            }
1621            other => panic!("unexpected: {other:?}"),
1622        }
1623    }
1624
1625    // ── Capture gate boundary cases ──
1626
1627    #[test]
1628    fn capture_gate_zero_overhead_when_captured_equals_baseline() {
1629        let budget = standard_capture_budget();
1630        let observation = ReplayCaptureObservation {
1631            baseline_micros: 1_000,
1632            captured_micros: 1_000,
1633            trace_bytes: 100,
1634        };
1635        let report = evaluate_replay_capture_gate(budget, observation);
1636        assert!(report.capture_allowed);
1637        assert_eq!(report.observed_overhead_per_mille, 0);
1638    }
1639
1640    #[test]
1641    fn capture_gate_zero_overhead_when_captured_less_than_baseline() {
1642        let budget = standard_capture_budget();
1643        let observation = ReplayCaptureObservation {
1644            baseline_micros: 1_000,
1645            captured_micros: 900,
1646            trace_bytes: 100,
1647        };
1648        let report = evaluate_replay_capture_gate(budget, observation);
1649        assert!(report.capture_allowed);
1650        assert_eq!(report.observed_overhead_per_mille, 0);
1651    }
1652
1653    #[test]
1654    fn capture_gate_exact_boundary_at_max_overhead() {
1655        let budget = ReplayCaptureBudget {
1656            capture_enabled: true,
1657            max_overhead_per_mille: 100,
1658            max_trace_bytes: 10_000,
1659        };
1660        // 100/1000 = 100 per mille — exactly at budget
1661        let observation = ReplayCaptureObservation {
1662            baseline_micros: 1_000,
1663            captured_micros: 1_100,
1664            trace_bytes: 100,
1665        };
1666        let report = evaluate_replay_capture_gate(budget, observation);
1667        assert!(report.capture_allowed);
1668        assert_eq!(report.observed_overhead_per_mille, 100);
1669    }
1670
1671    #[test]
1672    fn capture_gate_exact_boundary_at_max_trace_bytes() {
1673        let budget = ReplayCaptureBudget {
1674            capture_enabled: true,
1675            max_overhead_per_mille: 1_000,
1676            max_trace_bytes: 500,
1677        };
1678        // Exactly at budget
1679        let at_limit = ReplayCaptureObservation {
1680            baseline_micros: 1_000,
1681            captured_micros: 1_010,
1682            trace_bytes: 500,
1683        };
1684        let report = evaluate_replay_capture_gate(budget, at_limit);
1685        assert!(report.capture_allowed);
1686
1687        // One over budget
1688        let over_limit = ReplayCaptureObservation {
1689            baseline_micros: 1_000,
1690            captured_micros: 1_010,
1691            trace_bytes: 501,
1692        };
1693        let report = evaluate_replay_capture_gate(budget, over_limit);
1694        assert!(!report.capture_allowed);
1695        assert_eq!(
1696            report.reason,
1697            ReplayCaptureGateReason::DisabledByTraceBudget
1698        );
1699    }
1700
1701    // ── Diagnostic snapshot root cause hints ──
1702
1703    #[test]
1704    fn diagnostic_snapshot_maps_config_disabled_hint() {
1705        let bundle = standard_bundle();
1706        let budget = ReplayCaptureBudget {
1707            capture_enabled: false,
1708            max_overhead_per_mille: 100,
1709            max_trace_bytes: 1_000,
1710        };
1711        let gate = evaluate_replay_capture_gate(
1712            budget,
1713            ReplayCaptureObservation {
1714                baseline_micros: 100,
1715                captured_micros: 100,
1716                trace_bytes: 0,
1717            },
1718        );
1719        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1720        assert_eq!(
1721            snapshot.root_cause_hints,
1722            vec![ReplayRootCauseHint::PolicyGateDisabled]
1723        );
1724    }
1725
1726    #[test]
1727    fn diagnostic_snapshot_maps_trace_budget_hint() {
1728        let bundle = standard_bundle();
1729        let budget = ReplayCaptureBudget {
1730            capture_enabled: true,
1731            max_overhead_per_mille: 1_000,
1732            max_trace_bytes: 100,
1733        };
1734        let gate = evaluate_replay_capture_gate(
1735            budget,
1736            ReplayCaptureObservation {
1737                baseline_micros: 1_000,
1738                captured_micros: 1_010,
1739                trace_bytes: 200,
1740            },
1741        );
1742        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1743        assert_eq!(
1744            snapshot.root_cause_hints,
1745            vec![ReplayRootCauseHint::TraceBudgetExceeded]
1746        );
1747    }
1748
1749    #[test]
1750    fn diagnostic_snapshot_serde_roundtrip() {
1751        let bundle = standard_bundle();
1752        let gate = evaluate_replay_capture_gate(
1753            standard_capture_budget(),
1754            ReplayCaptureObservation {
1755                baseline_micros: 1_000,
1756                captured_micros: 1_010,
1757                trace_bytes: 64,
1758            },
1759        );
1760        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1761        let json = serde_json::to_string(&snapshot).expect("serialize");
1762        let roundtrip: super::ReplayDiagnosticSnapshot =
1763            serde_json::from_str(&json).expect("deserialize");
1764        assert_eq!(snapshot, roundtrip);
1765    }
1766
1767    // ── compute_overhead_per_mille edge cases ──
1768
1769    #[test]
1770    fn overhead_per_mille_exact_computation() {
1771        // 50 overhead on 1000 baseline = 50 per mille
1772        assert_eq!(super::compute_overhead_per_mille(1_000, 1_050), 50);
1773        // 200 overhead on 1000 baseline = 200 per mille
1774        assert_eq!(super::compute_overhead_per_mille(1_000, 1_200), 200);
1775        // 0 overhead
1776        assert_eq!(super::compute_overhead_per_mille(1_000, 1_000), 0);
1777        // Captured < baseline
1778        assert_eq!(super::compute_overhead_per_mille(1_000, 500), 0);
1779    }
1780
1781    #[test]
1782    fn overhead_per_mille_rounding_up() {
1783        // 1 overhead on 3 baseline = 333.3... per mille → rounds up to 334
1784        assert_eq!(super::compute_overhead_per_mille(3, 4), 334);
1785    }
1786
1787    #[test]
1788    fn overhead_per_mille_zero_baseline_returns_max() {
1789        assert_eq!(super::compute_overhead_per_mille(0, 1), u32::MAX);
1790        assert_eq!(super::compute_overhead_per_mille(0, 0), 0);
1791    }
1792
1793    // ── ReplayRecorder tests ──
1794
1795    fn within_budget_observation() -> ReplayCaptureObservation {
1796        ReplayCaptureObservation {
1797            baseline_micros: 1_000,
1798            captured_micros: 1_050,
1799            trace_bytes: 256,
1800        }
1801    }
1802
1803    fn standard_lane_config() -> super::ReplayLaneConfig {
1804        super::ReplayLaneConfig::new(standard_capture_budget())
1805    }
1806
1807    #[test]
1808    fn recorder_empty_produces_valid_bundle() {
1809        let recorder = super::ReplayRecorder::new("trace-empty-rec", standard_lane_config());
1810        assert_eq!(recorder.event_count(), 0);
1811        assert_eq!(recorder.logical_clock(), 0);
1812
1813        let result = recorder
1814            .finish(within_budget_observation())
1815            .expect("finish");
1816        assert!(result.bundle.events.is_empty());
1817        assert!(result.gate_report.capture_allowed);
1818        assert_eq!(result.diagnostic.event_count, 0);
1819    }
1820
1821    #[test]
1822    fn recorder_captures_events_in_sequence() {
1823        let mut recorder = super::ReplayRecorder::new("trace-seq-rec", standard_lane_config());
1824        recorder.tick();
1825        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1826        recorder.tick();
1827        recorder.record_queue_accepted("ext.a", "req-1", BTreeMap::new());
1828        recorder.tick();
1829        recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
1830        recorder.tick();
1831        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1832
1833        assert_eq!(recorder.event_count(), 4);
1834        assert_eq!(recorder.logical_clock(), 4);
1835
1836        let result = recorder
1837            .finish(within_budget_observation())
1838            .expect("finish");
1839        assert_eq!(result.bundle.events.len(), 4);
1840        assert_eq!(result.bundle.events[0].kind, ReplayEventKind::Scheduled);
1841        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::QueueAccepted);
1842        assert_eq!(
1843            result.bundle.events[2].kind,
1844            ReplayEventKind::PolicyDecision
1845        );
1846        assert_eq!(result.bundle.events[3].kind, ReplayEventKind::Completed);
1847
1848        // Sequences are 1-based contiguous
1849        for (i, event) in result.bundle.events.iter().enumerate() {
1850            assert_eq!(event.seq, (i + 1) as u64);
1851        }
1852    }
1853
1854    #[test]
1855    fn recorder_attributes_flow_through() {
1856        let mut recorder = super::ReplayRecorder::new("trace-attrs-rec", standard_lane_config());
1857        recorder.tick();
1858        let mut attrs = BTreeMap::new();
1859        attrs.insert("lane".to_string(), "fast".to_string());
1860        attrs.insert("capability".to_string(), "tool".to_string());
1861        recorder.record_policy_decision("ext.a", "req-1", attrs);
1862
1863        let result = recorder
1864            .finish(within_budget_observation())
1865            .expect("finish");
1866        let event = &result.bundle.events[0];
1867        assert_eq!(
1868            event.attributes.get("lane").map(String::as_str),
1869            Some("fast")
1870        );
1871        assert_eq!(
1872            event.attributes.get("capability").map(String::as_str),
1873            Some("tool")
1874        );
1875    }
1876
1877    #[test]
1878    fn recorder_lane_metadata_propagated() {
1879        let mut config = standard_lane_config();
1880        config.insert_metadata("env", "staging");
1881        config.insert_metadata("worker", "w-3");
1882        let mut recorder = super::ReplayRecorder::new("trace-meta-rec", config);
1883        recorder.tick();
1884        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1885
1886        let result = recorder
1887            .finish(within_budget_observation())
1888            .expect("finish");
1889        assert_eq!(
1890            result.bundle.metadata.get("env").map(String::as_str),
1891            Some("staging")
1892        );
1893        assert_eq!(
1894            result.bundle.metadata.get("worker").map(String::as_str),
1895            Some("w-3")
1896        );
1897    }
1898
1899    #[test]
1900    fn recorder_cancel_retry_lifecycle() {
1901        let mut recorder = super::ReplayRecorder::new("trace-cancel-retry", standard_lane_config());
1902        recorder.tick();
1903        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1904        recorder.tick();
1905        recorder.record_cancelled("ext.a", "req-1", BTreeMap::new());
1906        recorder.tick();
1907        recorder.record_retried("ext.a", "req-1", BTreeMap::new());
1908        recorder.tick();
1909        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1910
1911        let result = recorder
1912            .finish(within_budget_observation())
1913            .expect("finish");
1914        assert_eq!(result.bundle.events.len(), 4);
1915        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Cancelled);
1916        assert_eq!(result.bundle.events[2].kind, ReplayEventKind::Retried);
1917    }
1918
1919    #[test]
1920    fn recorder_failed_event() {
1921        let mut recorder = super::ReplayRecorder::new("trace-fail", standard_lane_config());
1922        recorder.tick();
1923        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1924        recorder.tick();
1925        let mut attrs = BTreeMap::new();
1926        attrs.insert("error".to_string(), "timeout".to_string());
1927        recorder.record_failed("ext.a", "req-1", attrs);
1928
1929        let result = recorder
1930            .finish(within_budget_observation())
1931            .expect("finish");
1932        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Failed);
1933        assert_eq!(
1934            result.bundle.events[1]
1935                .attributes
1936                .get("error")
1937                .map(String::as_str),
1938            Some("timeout")
1939        );
1940    }
1941
1942    #[test]
1943    fn recorder_gate_report_reflects_budget() {
1944        let mut config = super::ReplayLaneConfig::new(ReplayCaptureBudget {
1945            capture_enabled: true,
1946            max_overhead_per_mille: 50,
1947            max_trace_bytes: 10_000,
1948        });
1949        config.insert_metadata("lane", "shadow");
1950        let mut recorder = super::ReplayRecorder::new("trace-gated", config);
1951        recorder.tick();
1952        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1953
1954        // Overhead 100 per mille > budget 50 per mille
1955        let result = recorder
1956            .finish(ReplayCaptureObservation {
1957                baseline_micros: 1_000,
1958                captured_micros: 1_100,
1959                trace_bytes: 64,
1960            })
1961            .expect("finish");
1962
1963        assert!(!result.gate_report.capture_allowed);
1964        assert_eq!(
1965            result.gate_report.reason,
1966            ReplayCaptureGateReason::DisabledByOverheadBudget
1967        );
1968        // Bundle is still present even when gated
1969        assert_eq!(result.bundle.events.len(), 1);
1970    }
1971
1972    #[test]
1973    fn recorder_diagnostic_snapshot_populated() {
1974        let mut recorder = super::ReplayRecorder::new("trace-diag", standard_lane_config());
1975        recorder.tick();
1976        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1977        recorder.tick();
1978        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1979
1980        let result = recorder
1981            .finish(within_budget_observation())
1982            .expect("finish");
1983        assert_eq!(result.diagnostic.trace_id, "trace-diag");
1984        assert_eq!(result.diagnostic.schema, REPLAY_TRACE_SCHEMA_V1);
1985        assert_eq!(result.diagnostic.event_count, 2);
1986        assert!(result.diagnostic.divergence.is_none());
1987        assert!(result.diagnostic.root_cause_hints.is_empty());
1988    }
1989
1990    #[test]
1991    fn recorder_finish_and_compare_identical() {
1992        let mut rec1 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
1993        rec1.tick();
1994        rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
1995        rec1.tick();
1996        rec1.record_completed("ext.a", "req-1", BTreeMap::new());
1997        let reference = rec1
1998            .finish(within_budget_observation())
1999            .expect("ref")
2000            .bundle;
2001
2002        let mut rec2 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2003        rec2.tick();
2004        rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2005        rec2.tick();
2006        rec2.record_completed("ext.a", "req-1", BTreeMap::new());
2007
2008        let (result, comparison) = rec2
2009            .finish_and_compare(within_budget_observation(), &reference)
2010            .expect("compare");
2011        assert!(comparison.divergence.is_none());
2012        assert!(comparison.root_cause_hints.is_empty());
2013        assert_eq!(comparison.reference_trace_id, "trace-cmp");
2014        assert_eq!(comparison.observed_trace_id, "trace-cmp");
2015        assert!(result.diagnostic.divergence.is_none());
2016    }
2017
2018    #[test]
2019    fn recorder_finish_and_compare_detects_divergence() {
2020        let mut rec1 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2021        rec1.tick();
2022        rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2023        rec1.tick();
2024        rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2025        let reference = rec1
2026            .finish(within_budget_observation())
2027            .expect("ref")
2028            .bundle;
2029
2030        let mut rec2 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2031        rec2.tick();
2032        rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2033        rec2.tick();
2034        rec2.record_failed("ext.a", "req-1", BTreeMap::new());
2035
2036        let (result, comparison) = rec2
2037            .finish_and_compare(within_budget_observation(), &reference)
2038            .expect("compare");
2039        assert!(comparison.divergence.is_some());
2040        let div = comparison.divergence.as_ref().unwrap();
2041        assert_eq!(div.seq, Some(2));
2042        assert!(matches!(
2043            div.reason,
2044            ReplayDivergenceReason::EventFieldMismatch { ref field, .. } if field == "kind"
2045        ));
2046        assert!(result.diagnostic.divergence.is_some());
2047        assert!(!result.diagnostic.root_cause_hints.is_empty());
2048    }
2049
2050    #[test]
2051    fn recorder_multi_extension_interleaving() {
2052        let mut recorder = super::ReplayRecorder::new("trace-multi", standard_lane_config());
2053        recorder.tick();
2054        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2055        recorder.record_scheduled("ext.b", "req-2", BTreeMap::new());
2056        recorder.tick();
2057        recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
2058        recorder.record_policy_decision("ext.b", "req-2", BTreeMap::new());
2059        recorder.tick();
2060        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2061        recorder.record_completed("ext.b", "req-2", BTreeMap::new());
2062
2063        let result = recorder
2064            .finish(within_budget_observation())
2065            .expect("finish");
2066        assert_eq!(result.bundle.events.len(), 6);
2067
2068        // Canonical ordering: at same clock, ext.a < ext.b
2069        let clock_1_events: Vec<_> = result
2070            .bundle
2071            .events
2072            .iter()
2073            .filter(|e| e.logical_clock == 1)
2074            .collect();
2075        assert_eq!(clock_1_events.len(), 2);
2076        assert_eq!(clock_1_events[0].extension_id, "ext.a");
2077        assert_eq!(clock_1_events[1].extension_id, "ext.b");
2078    }
2079
2080    // ── compare_replay_bundles standalone tests ──
2081
2082    #[test]
2083    fn compare_replay_bundles_no_divergence() {
2084        let bundle = standard_bundle();
2085        let gate =
2086            evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2087
2088        let (diagnostic, comparison) =
2089            super::compare_replay_bundles(&bundle, &bundle, gate).expect("compare");
2090        assert!(comparison.divergence.is_none());
2091        assert!(comparison.root_cause_hints.is_empty());
2092        assert!(diagnostic.divergence.is_none());
2093    }
2094
2095    #[test]
2096    fn compare_replay_bundles_with_divergence() {
2097        let reference = standard_bundle();
2098        let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
2099        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
2100        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
2101        observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
2102        let observed = observed_builder.build().expect("observed bundle");
2103
2104        let gate =
2105            evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2106
2107        let (diagnostic, comparison) =
2108            super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2109        assert!(comparison.divergence.is_some());
2110        assert!(!comparison.root_cause_hints.is_empty());
2111        assert!(diagnostic.divergence.is_some());
2112    }
2113
2114    // ── ReplayLaneConfig tests ──
2115
2116    #[test]
2117    fn lane_config_serde_roundtrip() {
2118        let mut config = super::ReplayLaneConfig::new(standard_capture_budget());
2119        config.insert_metadata("env", "prod");
2120
2121        let json = serde_json::to_string(&config).expect("serialize");
2122        let roundtrip: super::ReplayLaneConfig = serde_json::from_str(&json).expect("deserialize");
2123        assert_eq!(config, roundtrip);
2124    }
2125
2126    #[test]
2127    fn lane_config_empty_metadata_omitted_in_json() {
2128        let config = super::ReplayLaneConfig::new(standard_capture_budget());
2129        let json = serde_json::to_string(&config).expect("serialize");
2130        assert!(!json.contains("laneMetadata"));
2131    }
2132
2133    #[test]
2134    fn lane_result_serde_roundtrip() {
2135        let mut recorder = super::ReplayRecorder::new("trace-serde", standard_lane_config());
2136        recorder.tick();
2137        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2138        recorder.tick();
2139        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2140
2141        let result = recorder
2142            .finish(within_budget_observation())
2143            .expect("finish");
2144        let json = serde_json::to_string(&result).expect("serialize");
2145        let roundtrip: super::ReplayLaneResult = serde_json::from_str(&json).expect("deserialize");
2146        assert_eq!(result, roundtrip);
2147    }
2148
2149    #[test]
2150    fn comparison_result_serde_roundtrip() {
2151        let comparison = super::ReplayComparisonResult {
2152            reference_trace_id: "ref-1".to_string(),
2153            observed_trace_id: "obs-1".to_string(),
2154            divergence: None,
2155            root_cause_hints: vec![],
2156        };
2157        let json = serde_json::to_string(&comparison).expect("serialize");
2158        let roundtrip: super::ReplayComparisonResult =
2159            serde_json::from_str(&json).expect("deserialize");
2160        assert_eq!(comparison, roundtrip);
2161    }
2162
2163    #[test]
2164    fn recorder_tick_is_monotonic() {
2165        let mut recorder = super::ReplayRecorder::new("trace-tick", standard_lane_config());
2166        let t1 = recorder.tick();
2167        let t2 = recorder.tick();
2168        let t3 = recorder.tick();
2169        assert_eq!(t1, 1);
2170        assert_eq!(t2, 2);
2171        assert_eq!(t3, 3);
2172    }
2173
2174    // ── Property tests ──────────────────────────────────────────────────
2175
2176    mod proptest_extension_replay {
2177        use super::*;
2178        use proptest::prelude::*;
2179
2180        fn arb_event_kind() -> impl Strategy<Value = ReplayEventKind> {
2181            prop::sample::select(vec![
2182                ReplayEventKind::Scheduled,
2183                ReplayEventKind::QueueAccepted,
2184                ReplayEventKind::PolicyDecision,
2185                ReplayEventKind::Completed,
2186                ReplayEventKind::Failed,
2187            ])
2188        }
2189
2190        fn arb_ext_id() -> impl Strategy<Value = String> {
2191            "ext\\.[a-z]{1,5}"
2192        }
2193
2194        fn arb_req_id() -> impl Strategy<Value = String> {
2195            "req-[0-9]{1,4}"
2196        }
2197
2198        fn arb_simple_draft() -> impl Strategy<Value = ReplayEventDraft> {
2199            (1..100u64, arb_ext_id(), arb_req_id(), arb_event_kind())
2200                .prop_map(|(clock, ext, req, kind)| ReplayEventDraft::new(clock, ext, req, kind))
2201        }
2202
2203        proptest! {
2204            #[test]
2205            fn compute_overhead_per_mille_zero_when_captured_leq_baseline(
2206                baseline in 1..10_000u64,
2207                captured in 0..10_000u64,
2208            ) {
2209                if captured <= baseline {
2210                    let result = super::super::compute_overhead_per_mille(baseline, captured);
2211                    assert_eq!(
2212                        result, 0,
2213                        "captured <= baseline should yield 0 overhead"
2214                    );
2215                }
2216            }
2217
2218            #[test]
2219            fn compute_overhead_per_mille_zero_baseline_returns_max(
2220                captured in 1..10_000u64,
2221            ) {
2222                let result = super::super::compute_overhead_per_mille(0, captured);
2223                assert_eq!(
2224                    result, u32::MAX,
2225                    "zero baseline with positive captured should be MAX"
2226                );
2227            }
2228
2229            #[test]
2230            fn compute_overhead_per_mille_is_non_negative(
2231                baseline in 0..10_000u64,
2232                captured in 0..10_000u64,
2233            ) {
2234                let result = super::super::compute_overhead_per_mille(baseline, captured);
2235                // u32 is always non-negative, but verify we never panic
2236                let _ = result;
2237            }
2238
2239            #[test]
2240            fn builder_produces_contiguous_sequences(
2241                drafts in prop::collection::vec(arb_simple_draft(), 0..10),
2242            ) {
2243                let mut builder = ReplayTraceBuilder::new("trace-prop");
2244                for d in drafts {
2245                    builder.push(d);
2246                }
2247                let bundle = builder.build().expect("build should succeed");
2248                for (idx, event) in bundle.events.iter().enumerate() {
2249                    assert_eq!(
2250                        event.seq,
2251                        (idx + 1) as u64,
2252                        "sequence should be 1-based contiguous"
2253                    );
2254                }
2255            }
2256
2257            #[test]
2258            fn builder_is_deterministic_regardless_of_push_order(
2259                drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2260            ) {
2261                let mut builder1 = ReplayTraceBuilder::new("trace-det");
2262                for d in &drafts {
2263                    builder1.push(d.clone());
2264                }
2265                let bundle1 = builder1.build().expect("build1");
2266
2267                let mut reversed = drafts;
2268                reversed.reverse();
2269                let mut builder2 = ReplayTraceBuilder::new("trace-det");
2270                for d in &reversed {
2271                    builder2.push(d.clone());
2272                }
2273                let bundle2 = builder2.build().expect("build2");
2274
2275                assert_eq!(
2276                    bundle1, bundle2,
2277                    "canonical ordering should be same regardless of push order"
2278                );
2279            }
2280
2281            #[test]
2282            fn identical_bundles_have_no_divergence(
2283                drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2284            ) {
2285                let mut builder = ReplayTraceBuilder::new("trace-id");
2286                for d in &drafts {
2287                    builder.push(d.clone());
2288                }
2289                let bundle = builder.build().expect("build");
2290                let divergence = first_divergence(&bundle, &bundle)
2291                    .expect("comparison should succeed");
2292                assert!(
2293                    divergence.is_none(),
2294                    "identical bundles should have no divergence"
2295                );
2296            }
2297
2298            #[test]
2299            fn json_roundtrip_preserves_bundle(
2300                drafts in prop::collection::vec(arb_simple_draft(), 0..6),
2301            ) {
2302                let mut builder = ReplayTraceBuilder::new("trace-rt");
2303                for d in drafts {
2304                    builder.push(d);
2305                }
2306                let bundle = builder.build().expect("build");
2307                let json = bundle.encode_json().expect("encode");
2308                let decoded = ReplayTraceBundle::decode_json(&json).expect("decode");
2309                assert_eq!(bundle, decoded, "JSON roundtrip should preserve bundle");
2310            }
2311
2312            #[test]
2313            fn capture_gate_disabled_config_always_rejects(
2314                baseline in 1..10_000u64,
2315                captured in 1..10_000u64,
2316                trace_bytes in 0..10_000u64,
2317                max_overhead in 0..1_000u32,
2318                max_bytes in 0..10_000u64,
2319            ) {
2320                let budget = ReplayCaptureBudget {
2321                    capture_enabled: false,
2322                    max_overhead_per_mille: max_overhead,
2323                    max_trace_bytes: max_bytes,
2324                };
2325                let observation = ReplayCaptureObservation {
2326                    baseline_micros: baseline,
2327                    captured_micros: captured,
2328                    trace_bytes,
2329                };
2330                let report = evaluate_replay_capture_gate(budget, observation);
2331                assert!(
2332                    !report.capture_allowed,
2333                    "disabled config should always reject"
2334                );
2335                assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
2336            }
2337
2338            #[test]
2339            fn capture_gate_is_deterministic(
2340                baseline in 0..5_000u64,
2341                captured in 0..5_000u64,
2342                trace_bytes in 0..5_000u64,
2343                enabled in any::<bool>(),
2344                max_overhead in 0..500u32,
2345                max_bytes in 0..5_000u64,
2346            ) {
2347                let budget = ReplayCaptureBudget {
2348                    capture_enabled: enabled,
2349                    max_overhead_per_mille: max_overhead,
2350                    max_trace_bytes: max_bytes,
2351                };
2352                let observation = ReplayCaptureObservation {
2353                    baseline_micros: baseline,
2354                    captured_micros: captured,
2355                    trace_bytes,
2356                };
2357                let r1 = evaluate_replay_capture_gate(budget, observation);
2358                let r2 = evaluate_replay_capture_gate(budget, observation);
2359                assert_eq!(r1, r2, "capture gate must be deterministic");
2360            }
2361
2362            #[test]
2363            fn event_kind_canonical_rank_all_distinct(
2364                a_idx in 0..7usize,
2365                b_idx in 0..7usize,
2366            ) {
2367                let kinds = [
2368                    ReplayEventKind::Scheduled,
2369                    ReplayEventKind::QueueAccepted,
2370                    ReplayEventKind::PolicyDecision,
2371                    ReplayEventKind::Cancelled,
2372                    ReplayEventKind::Retried,
2373                    ReplayEventKind::Completed,
2374                    ReplayEventKind::Failed,
2375                ];
2376                if a_idx != b_idx {
2377                    assert_ne!(
2378                        kinds[a_idx].canonical_rank(),
2379                        kinds[b_idx].canonical_rank(),
2380                        "distinct kinds should have distinct ranks"
2381                    );
2382                }
2383            }
2384
2385            #[test]
2386            fn builder_events_sorted_by_logical_clock(
2387                clocks in prop::collection::vec(0..50u64, 1..10),
2388            ) {
2389                let mut builder = ReplayTraceBuilder::new("trace-clock");
2390                for (i, clock) in clocks.iter().enumerate() {
2391                    builder.push(ReplayEventDraft::new(
2392                        *clock,
2393                        format!("ext.{i}"),
2394                        format!("req-{i}"),
2395                        ReplayEventKind::Scheduled,
2396                    ));
2397                }
2398                let bundle = builder.build().expect("build");
2399                for pair in bundle.events.windows(2) {
2400                    assert!(
2401                        pair[0].logical_clock <= pair[1].logical_clock,
2402                        "events should be sorted by logical clock: {} > {}",
2403                        pair[0].logical_clock,
2404                        pair[1].logical_clock,
2405                    );
2406                }
2407            }
2408        }
2409    }
2410}