Skip to main content

pi/
extension_replay.rs

1//! Deterministic replay trace bundle core for extension runtime forensics.
2//!
3//! This module provides a standalone schema + codec surface that records
4//! extension runtime events in a stable order so race/tail anomalies can be
5//! replayed and compared deterministically.
6
7use serde::{Deserialize, Serialize};
8use std::cmp::Ordering;
9use std::collections::{BTreeMap, BTreeSet};
10use thiserror::Error;
11
12/// Canonical schema identifier for replay trace bundles.
13pub const REPLAY_TRACE_SCHEMA_V1: &str = "pi.ext.replay.trace.v1";
14
15/// Kind of extension runtime event captured for deterministic replay.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
17#[serde(rename_all = "snake_case")]
18pub enum ReplayEventKind {
19    Scheduled,
20    QueueAccepted,
21    PolicyDecision,
22    Cancelled,
23    Retried,
24    Completed,
25    Failed,
26}
27
28impl ReplayEventKind {
29    const fn canonical_rank(self) -> u8 {
30        match self {
31            Self::Scheduled => 0,
32            Self::QueueAccepted => 1,
33            Self::PolicyDecision => 2,
34            Self::Cancelled => 3,
35            Self::Retried => 4,
36            Self::Completed => 5,
37            Self::Failed => 6,
38        }
39    }
40}
41
42/// Single deterministic replay trace event.
43#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ReplayTraceEvent {
46    pub seq: u64,
47    pub logical_clock: u64,
48    pub extension_id: String,
49    pub request_id: String,
50    pub kind: ReplayEventKind,
51    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
52    pub attributes: BTreeMap<String, String>,
53}
54
55/// Builder input event before canonical ordering/sequence assignment.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ReplayEventDraft {
58    pub logical_clock: u64,
59    pub extension_id: String,
60    pub request_id: String,
61    pub kind: ReplayEventKind,
62    pub attributes: BTreeMap<String, String>,
63}
64
65impl ReplayEventDraft {
66    #[must_use]
67    pub fn new(
68        logical_clock: u64,
69        extension_id: impl Into<String>,
70        request_id: impl Into<String>,
71        kind: ReplayEventKind,
72    ) -> Self {
73        Self {
74            logical_clock,
75            extension_id: extension_id.into(),
76            request_id: request_id.into(),
77            kind,
78            attributes: BTreeMap::new(),
79        }
80    }
81}
82
83/// Deterministic replay trace bundle.
84#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct ReplayTraceBundle {
87    pub schema: String,
88    pub trace_id: String,
89    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
90    pub metadata: BTreeMap<String, String>,
91    pub events: Vec<ReplayTraceEvent>,
92}
93
94/// First deterministic mismatch between two replay bundles.
95#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
96#[serde(rename_all = "camelCase")]
97pub struct ReplayDivergence {
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub seq: Option<u64>,
100    pub reason: ReplayDivergenceReason,
101}
102
103/// Machine-readable mismatch reason for replay comparisons.
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ReplayDivergenceReason {
107    SchemaMismatch {
108        expected: String,
109        observed: String,
110    },
111    TraceIdMismatch {
112        expected: String,
113        observed: String,
114    },
115    EventCountMismatch {
116        expected: u64,
117        observed: u64,
118    },
119    EventFieldMismatch {
120        field: String,
121        expected: String,
122        observed: String,
123    },
124}
125
126/// Configuration budget for deterministic replay trace capture.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ReplayCaptureBudget {
130    /// Global kill switch for replay capture in production.
131    pub capture_enabled: bool,
132    /// Maximum allowed overhead in per-mille (1/1000) units.
133    pub max_overhead_per_mille: u32,
134    /// Maximum allowed serialized trace bytes for a capture window.
135    pub max_trace_bytes: u64,
136}
137
138/// Runtime observation used to evaluate replay capture budget gates.
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
140#[serde(rename_all = "camelCase")]
141pub struct ReplayCaptureObservation {
142    /// Baseline runtime cost without replay capture.
143    pub baseline_micros: u64,
144    /// Measured runtime cost with replay capture active.
145    pub captured_micros: u64,
146    /// Size of the collected replay trace payload in bytes.
147    pub trace_bytes: u64,
148}
149
150/// Deterministic gate reason emitted by replay capture budget evaluation.
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
152#[serde(rename_all = "snake_case")]
153pub enum ReplayCaptureGateReason {
154    Enabled,
155    DisabledByConfig,
156    DisabledByOverheadBudget,
157    DisabledByTraceBudget,
158    DisabledByInvalidBaseline,
159}
160
161/// Machine-readable report for replay capture gating decisions.
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ReplayCaptureGateReport {
165    pub capture_allowed: bool,
166    pub reason: ReplayCaptureGateReason,
167    pub observed_overhead_per_mille: u32,
168    pub max_overhead_per_mille: u32,
169    pub observed_trace_bytes: u64,
170    pub max_trace_bytes: u64,
171}
172
173/// Deterministic hint categories for automated replay triage.
174#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
175#[serde(rename_all = "snake_case")]
176pub enum ReplayRootCauseHint {
177    TraceSchemaMismatch,
178    TraceIdMismatch,
179    EventCountDrift,
180    EventPayloadDrift,
181    LogicalClockDrift,
182    PolicyGateDisabled,
183    OverheadBudgetExceeded,
184    TraceBudgetExceeded,
185    InvalidBaselineTelemetry,
186}
187
188/// Structured replay diagnostic snapshot for log/event sinks.
189#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190#[serde(rename_all = "camelCase")]
191pub struct ReplayDiagnosticSnapshot {
192    pub trace_id: String,
193    pub schema: String,
194    pub event_count: u64,
195    pub capture_gate: ReplayCaptureGateReport,
196    #[serde(skip_serializing_if = "Option::is_none")]
197    pub divergence: Option<ReplayDivergence>,
198    pub root_cause_hints: Vec<ReplayRootCauseHint>,
199}
200
201/// Evaluate replay capture against overhead and size budgets.
202#[must_use]
203pub fn evaluate_replay_capture_gate(
204    budget: ReplayCaptureBudget,
205    observation: ReplayCaptureObservation,
206) -> ReplayCaptureGateReport {
207    if !budget.capture_enabled {
208        return ReplayCaptureGateReport {
209            capture_allowed: false,
210            reason: ReplayCaptureGateReason::DisabledByConfig,
211            observed_overhead_per_mille: 0,
212            max_overhead_per_mille: budget.max_overhead_per_mille,
213            observed_trace_bytes: observation.trace_bytes,
214            max_trace_bytes: budget.max_trace_bytes,
215        };
216    }
217
218    let observed_overhead_per_mille =
219        compute_overhead_per_mille(observation.baseline_micros, observation.captured_micros);
220
221    if observed_overhead_per_mille == u32::MAX {
222        return ReplayCaptureGateReport {
223            capture_allowed: false,
224            reason: ReplayCaptureGateReason::DisabledByInvalidBaseline,
225            observed_overhead_per_mille,
226            max_overhead_per_mille: budget.max_overhead_per_mille,
227            observed_trace_bytes: observation.trace_bytes,
228            max_trace_bytes: budget.max_trace_bytes,
229        };
230    }
231
232    if observed_overhead_per_mille > budget.max_overhead_per_mille {
233        return ReplayCaptureGateReport {
234            capture_allowed: false,
235            reason: ReplayCaptureGateReason::DisabledByOverheadBudget,
236            observed_overhead_per_mille,
237            max_overhead_per_mille: budget.max_overhead_per_mille,
238            observed_trace_bytes: observation.trace_bytes,
239            max_trace_bytes: budget.max_trace_bytes,
240        };
241    }
242
243    if observation.trace_bytes > budget.max_trace_bytes {
244        return ReplayCaptureGateReport {
245            capture_allowed: false,
246            reason: ReplayCaptureGateReason::DisabledByTraceBudget,
247            observed_overhead_per_mille,
248            max_overhead_per_mille: budget.max_overhead_per_mille,
249            observed_trace_bytes: observation.trace_bytes,
250            max_trace_bytes: budget.max_trace_bytes,
251        };
252    }
253
254    ReplayCaptureGateReport {
255        capture_allowed: true,
256        reason: ReplayCaptureGateReason::Enabled,
257        observed_overhead_per_mille,
258        max_overhead_per_mille: budget.max_overhead_per_mille,
259        observed_trace_bytes: observation.trace_bytes,
260        max_trace_bytes: budget.max_trace_bytes,
261    }
262}
263
264/// Build a machine-readable replay diagnostics snapshot.
265///
266/// # Errors
267///
268/// Returns an error when the replay bundle fails deterministic validation.
269/// Schema mismatches relax only the schema-version check so diagnostics can
270/// still be emitted for otherwise well-formed foreign bundles.
271pub fn build_replay_diagnostic_snapshot(
272    bundle: &ReplayTraceBundle,
273    capture_gate: ReplayCaptureGateReport,
274    divergence: Option<&ReplayDivergence>,
275) -> Result<ReplayDiagnosticSnapshot, ReplayTraceValidationError> {
276    if matches!(
277        divergence,
278        Some(ReplayDivergence {
279            reason: ReplayDivergenceReason::SchemaMismatch { .. },
280            ..
281        })
282    ) {
283        bundle.validate_structure()?;
284    } else {
285        bundle.validate()?;
286    }
287
288    let event_count = u64::try_from(bundle.events.len())
289        .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
290    let root_cause_hints = derive_root_cause_hints(capture_gate.reason, divergence);
291
292    Ok(ReplayDiagnosticSnapshot {
293        trace_id: bundle.trace_id.clone(),
294        schema: bundle.schema.clone(),
295        event_count,
296        capture_gate,
297        divergence: divergence.cloned(),
298        root_cause_hints,
299    })
300}
301
302fn compute_overhead_per_mille(baseline_micros: u64, captured_micros: u64) -> u32 {
303    if baseline_micros == 0 {
304        return u32::MAX;
305    }
306    if captured_micros <= baseline_micros {
307        return 0;
308    }
309
310    let overhead = u128::from(captured_micros - baseline_micros);
311    let baseline = u128::from(baseline_micros);
312    let scaled = overhead.saturating_mul(1_000);
313    let rounded_up = scaled
314        .saturating_add(baseline - 1)
315        .checked_div(baseline)
316        .unwrap_or(u128::MAX);
317    u32::try_from(rounded_up).unwrap_or(u32::MAX)
318}
319
320fn derive_root_cause_hints(
321    gate_reason: ReplayCaptureGateReason,
322    divergence: Option<&ReplayDivergence>,
323) -> Vec<ReplayRootCauseHint> {
324    let mut hints = BTreeSet::new();
325
326    match gate_reason {
327        ReplayCaptureGateReason::Enabled => {}
328        ReplayCaptureGateReason::DisabledByConfig => {
329            hints.insert(ReplayRootCauseHint::PolicyGateDisabled);
330        }
331        ReplayCaptureGateReason::DisabledByOverheadBudget => {
332            hints.insert(ReplayRootCauseHint::OverheadBudgetExceeded);
333        }
334        ReplayCaptureGateReason::DisabledByTraceBudget => {
335            hints.insert(ReplayRootCauseHint::TraceBudgetExceeded);
336        }
337        ReplayCaptureGateReason::DisabledByInvalidBaseline => {
338            hints.insert(ReplayRootCauseHint::InvalidBaselineTelemetry);
339        }
340    }
341
342    if let Some(divergence) = divergence {
343        match &divergence.reason {
344            ReplayDivergenceReason::SchemaMismatch { .. } => {
345                hints.insert(ReplayRootCauseHint::TraceSchemaMismatch);
346            }
347            ReplayDivergenceReason::TraceIdMismatch { .. } => {
348                hints.insert(ReplayRootCauseHint::TraceIdMismatch);
349            }
350            ReplayDivergenceReason::EventCountMismatch { .. } => {
351                hints.insert(ReplayRootCauseHint::EventCountDrift);
352            }
353            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
354                if field == "logical_clock" {
355                    hints.insert(ReplayRootCauseHint::LogicalClockDrift);
356                } else {
357                    hints.insert(ReplayRootCauseHint::EventPayloadDrift);
358                }
359            }
360        }
361    }
362
363    hints.into_iter().collect()
364}
365
366impl ReplayTraceBundle {
367    /// Encode bundle as compact JSON.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if serialization fails.
372    pub fn encode_json(&self) -> Result<String, serde_json::Error> {
373        serde_json::to_string(self)
374    }
375
376    /// Decode bundle from JSON and validate deterministic invariants.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if parsing fails or bundle invariants are invalid.
381    pub fn decode_json(input: &str) -> Result<Self, ReplayTraceCodecError> {
382        let bundle: Self = serde_json::from_str(input)?;
383        bundle.validate()?;
384        Ok(bundle)
385    }
386
387    /// Validate schema, sequence continuity, and cancellation/retry ordering.
388    ///
389    /// # Errors
390    ///
391    /// Returns an error when the bundle is malformed or violates replay
392    /// ordering invariants.
393    pub fn validate(&self) -> Result<(), ReplayTraceValidationError> {
394        self.validate_schema()?;
395        self.validate_structure()
396    }
397
398    fn validate_schema(&self) -> Result<(), ReplayTraceValidationError> {
399        if self.schema != REPLAY_TRACE_SCHEMA_V1 {
400            return Err(ReplayTraceValidationError::UnknownSchema(
401                self.schema.clone(),
402            ));
403        }
404        Ok(())
405    }
406
407    fn validate_structure(&self) -> Result<(), ReplayTraceValidationError> {
408        if self.trace_id.trim().is_empty() {
409            return Err(ReplayTraceValidationError::EmptyTraceId);
410        }
411
412        for (idx, event) in self.events.iter().enumerate() {
413            let seq_index = idx
414                .checked_add(1)
415                .ok_or(ReplayTraceValidationError::TooManyEvents)?;
416            let expected_seq =
417                u64::try_from(seq_index).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
418            if event.seq != expected_seq {
419                return Err(ReplayTraceValidationError::NonContiguousSequence {
420                    expected: expected_seq,
421                    observed: event.seq,
422                });
423            }
424
425            if event.extension_id.trim().is_empty() {
426                return Err(ReplayTraceValidationError::MissingExtensionId { seq: event.seq });
427            }
428            if event.request_id.trim().is_empty() {
429                return Err(ReplayTraceValidationError::MissingRequestId { seq: event.seq });
430            }
431        }
432
433        self.validate_retry_ordering()
434    }
435
436    fn validate_retry_ordering(&self) -> Result<(), ReplayTraceValidationError> {
437        let mut pending_cancel: BTreeSet<(String, String)> = BTreeSet::new();
438        for event in &self.events {
439            let key = (event.extension_id.clone(), event.request_id.clone());
440            match event.kind {
441                ReplayEventKind::Cancelled => {
442                    if !pending_cancel.insert(key) {
443                        return Err(ReplayTraceValidationError::DuplicateCancelWithoutRetry {
444                            seq: event.seq,
445                            extension_id: event.extension_id.clone(),
446                            request_id: event.request_id.clone(),
447                        });
448                    }
449                }
450                ReplayEventKind::Retried => {
451                    if !pending_cancel.remove(&key) {
452                        return Err(ReplayTraceValidationError::RetryWithoutCancel {
453                            seq: event.seq,
454                            extension_id: event.extension_id.clone(),
455                            request_id: event.request_id.clone(),
456                        });
457                    }
458                }
459                ReplayEventKind::Completed | ReplayEventKind::Failed => {
460                    pending_cancel.remove(&key);
461                }
462                ReplayEventKind::Scheduled
463                | ReplayEventKind::QueueAccepted
464                | ReplayEventKind::PolicyDecision => {}
465            }
466        }
467        Ok(())
468    }
469}
470
471/// Compare two bundles and return the first deterministic divergence.
472///
473/// Both bundles must be structurally valid before comparison. If the schemas
474/// differ, the comparison reports a schema mismatch without requiring support
475/// for the foreign schema version.
476///
477/// # Errors
478///
479/// Returns an error if either bundle fails validation.
480pub fn first_divergence(
481    expected: &ReplayTraceBundle,
482    observed: &ReplayTraceBundle,
483) -> Result<Option<ReplayDivergence>, ReplayTraceValidationError> {
484    expected.validate_structure()?;
485    observed.validate_structure()?;
486
487    if expected.schema != observed.schema {
488        return Ok(Some(ReplayDivergence {
489            seq: None,
490            reason: ReplayDivergenceReason::SchemaMismatch {
491                expected: expected.schema.clone(),
492                observed: observed.schema.clone(),
493            },
494        }));
495    }
496
497    expected.validate_schema()?;
498    observed.validate_schema()?;
499
500    if expected.trace_id != observed.trace_id {
501        return Ok(Some(ReplayDivergence {
502            seq: None,
503            reason: ReplayDivergenceReason::TraceIdMismatch {
504                expected: expected.trace_id.clone(),
505                observed: observed.trace_id.clone(),
506            },
507        }));
508    }
509
510    let max_shared = expected.events.len().min(observed.events.len());
511    for idx in 0..max_shared {
512        let left = &expected.events[idx];
513        let right = &observed.events[idx];
514        if left.logical_clock != right.logical_clock {
515            return Ok(Some(field_mismatch(
516                left.seq,
517                "logical_clock",
518                left.logical_clock.to_string(),
519                right.logical_clock.to_string(),
520            )));
521        }
522        if left.extension_id != right.extension_id {
523            return Ok(Some(field_mismatch(
524                left.seq,
525                "extension_id",
526                left.extension_id.clone(),
527                right.extension_id.clone(),
528            )));
529        }
530        if left.request_id != right.request_id {
531            return Ok(Some(field_mismatch(
532                left.seq,
533                "request_id",
534                left.request_id.clone(),
535                right.request_id.clone(),
536            )));
537        }
538        if left.kind != right.kind {
539            return Ok(Some(field_mismatch(
540                left.seq,
541                "kind",
542                format!("{:?}", left.kind),
543                format!("{:?}", right.kind),
544            )));
545        }
546        if left.attributes != right.attributes {
547            return Ok(Some(field_mismatch(
548                left.seq,
549                "attributes",
550                format!("{:?}", left.attributes),
551                format!("{:?}", right.attributes),
552            )));
553        }
554    }
555
556    if expected.events.len() != observed.events.len() {
557        let next_seq = max_shared
558            .checked_add(1)
559            .ok_or(ReplayTraceValidationError::TooManyEvents)?;
560        let seq = u64::try_from(next_seq).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
561        return Ok(Some(ReplayDivergence {
562            seq: Some(seq),
563            reason: ReplayDivergenceReason::EventCountMismatch {
564                expected: u64::try_from(expected.events.len())
565                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
566                observed: u64::try_from(observed.events.len())
567                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
568            },
569        }));
570    }
571
572    Ok(None)
573}
574
575fn field_mismatch(seq: u64, field: &str, expected: String, observed: String) -> ReplayDivergence {
576    ReplayDivergence {
577        seq: Some(seq),
578        reason: ReplayDivergenceReason::EventFieldMismatch {
579            field: field.to_string(),
580            expected,
581            observed,
582        },
583    }
584}
585
586/// Builder that canonicalizes event ordering and sequence assignment.
587#[derive(Debug, Clone, Default)]
588pub struct ReplayTraceBuilder {
589    trace_id: String,
590    metadata: BTreeMap<String, String>,
591    drafts: Vec<ReplayEventDraft>,
592}
593
594impl ReplayTraceBuilder {
595    #[must_use]
596    pub fn new(trace_id: impl Into<String>) -> Self {
597        Self {
598            trace_id: trace_id.into(),
599            metadata: BTreeMap::new(),
600            drafts: Vec::new(),
601        }
602    }
603
604    pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
605        self.metadata.insert(key.into(), value.into());
606    }
607
608    pub fn push(&mut self, draft: ReplayEventDraft) {
609        self.drafts.push(draft);
610    }
611
612    /// Build a validated, deterministic trace bundle.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if sequence assignment overflows or validation fails.
617    pub fn build(self) -> Result<ReplayTraceBundle, ReplayTraceValidationError> {
618        let mut indexed = self
619            .drafts
620            .into_iter()
621            .enumerate()
622            .map(|(insertion_index, draft)| IndexedDraft {
623                insertion_index,
624                draft,
625            })
626            .collect::<Vec<_>>();
627        indexed.sort_by(compare_indexed_drafts);
628
629        let events = indexed
630            .into_iter()
631            .enumerate()
632            .map(|(idx, entry)| {
633                let seq_index = idx
634                    .checked_add(1)
635                    .ok_or(ReplayTraceValidationError::TooManyEvents)?;
636                let seq = u64::try_from(seq_index)
637                    .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
638                Ok(ReplayTraceEvent {
639                    seq,
640                    logical_clock: entry.draft.logical_clock,
641                    extension_id: entry.draft.extension_id,
642                    request_id: entry.draft.request_id,
643                    kind: entry.draft.kind,
644                    attributes: entry.draft.attributes,
645                })
646            })
647            .collect::<Result<Vec<_>, ReplayTraceValidationError>>()?;
648
649        let bundle = ReplayTraceBundle {
650            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
651            trace_id: self.trace_id,
652            metadata: self.metadata,
653            events,
654        };
655        bundle.validate()?;
656        Ok(bundle)
657    }
658}
659
660#[derive(Debug, Clone)]
661struct IndexedDraft {
662    insertion_index: usize,
663    draft: ReplayEventDraft,
664}
665
666fn compare_indexed_drafts(left: &IndexedDraft, right: &IndexedDraft) -> Ordering {
667    left.draft
668        .logical_clock
669        .cmp(&right.draft.logical_clock)
670        .then_with(|| left.draft.extension_id.cmp(&right.draft.extension_id))
671        .then_with(|| left.draft.request_id.cmp(&right.draft.request_id))
672        .then_with(|| {
673            left.draft
674                .kind
675                .canonical_rank()
676                .cmp(&right.draft.kind.canonical_rank())
677        })
678        .then_with(|| left.insertion_index.cmp(&right.insertion_index))
679}
680
681/// Validation failures for replay bundle semantics.
682#[derive(Debug, Clone, PartialEq, Eq, Error)]
683pub enum ReplayTraceValidationError {
684    #[error("unknown replay trace schema: {0}")]
685    UnknownSchema(String),
686    #[error("trace id must not be empty")]
687    EmptyTraceId,
688    #[error("replay bundle contains too many events to index")]
689    TooManyEvents,
690    #[error("non-contiguous sequence: expected {expected}, observed {observed}")]
691    NonContiguousSequence { expected: u64, observed: u64 },
692    #[error("event seq {seq} missing extension id")]
693    MissingExtensionId { seq: u64 },
694    #[error("event seq {seq} missing request id")]
695    MissingRequestId { seq: u64 },
696    #[error("retry without prior cancel at seq {seq} for {extension_id}/{request_id}")]
697    RetryWithoutCancel {
698        seq: u64,
699        extension_id: String,
700        request_id: String,
701    },
702    #[error("duplicate cancel without retry at seq {seq} for {extension_id}/{request_id}")]
703    DuplicateCancelWithoutRetry {
704        seq: u64,
705        extension_id: String,
706        request_id: String,
707    },
708}
709
710/// Codec-level decode failures.
711#[derive(Debug, Error)]
712pub enum ReplayTraceCodecError {
713    #[error("failed to parse replay trace JSON: {0}")]
714    Deserialize(#[from] serde_json::Error),
715    #[error("invalid replay trace bundle: {0}")]
716    Validation(#[from] ReplayTraceValidationError),
717}
718
719/// Configuration for a replay recording lane.
720#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
721#[serde(rename_all = "camelCase")]
722pub struct ReplayLaneConfig {
723    /// Budget constraints for this lane.
724    pub budget: ReplayCaptureBudget,
725    /// Static metadata attached to every trace produced by this lane.
726    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
727    pub lane_metadata: BTreeMap<String, String>,
728}
729
730impl ReplayLaneConfig {
731    #[must_use]
732    pub const fn new(budget: ReplayCaptureBudget) -> Self {
733        Self {
734            budget,
735            lane_metadata: BTreeMap::new(),
736        }
737    }
738
739    /// Insert a metadata key-value pair into the lane config.
740    pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
741        self.lane_metadata.insert(key.into(), value.into());
742    }
743}
744
745/// Outcome of a completed replay recording session.
746#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
747#[serde(rename_all = "camelCase")]
748pub struct ReplayLaneResult {
749    /// The recorded trace bundle (present even when gated, for forensic access).
750    pub bundle: ReplayTraceBundle,
751    /// Budget gate report for this recording.
752    pub gate_report: ReplayCaptureGateReport,
753    /// Diagnostic snapshot with root-cause hints.
754    pub diagnostic: ReplayDiagnosticSnapshot,
755}
756
757/// Outcome of comparing a recorded trace against a reference.
758#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
759#[serde(rename_all = "camelCase")]
760pub struct ReplayComparisonResult {
761    /// The reference bundle used for comparison.
762    pub reference_trace_id: String,
763    /// The observed bundle being compared.
764    pub observed_trace_id: String,
765    /// First divergence found, if any.
766    #[serde(skip_serializing_if = "Option::is_none")]
767    pub divergence: Option<ReplayDivergence>,
768    /// Root-cause hints derived from the comparison.
769    pub root_cause_hints: Vec<ReplayRootCauseHint>,
770}
771
772/// Stateful recorder that accumulates extension runtime events during a
773/// dispatch cycle and produces a deterministic [`ReplayTraceBundle`].
774///
775/// Events are pushed in arrival order; the recorder's [`finish`](Self::finish)
776/// method canonicalizes ordering, applies the budget gate, and builds the
777/// diagnostic snapshot.
778#[derive(Debug)]
779pub struct ReplayRecorder {
780    config: ReplayLaneConfig,
781    builder: ReplayTraceBuilder,
782    logical_clock: u64,
783    event_count: u64,
784}
785
786impl ReplayRecorder {
787    /// Create a new recorder for a single dispatch cycle.
788    #[must_use]
789    pub fn new(trace_id: impl Into<String>, config: ReplayLaneConfig) -> Self {
790        let mut builder = ReplayTraceBuilder::new(trace_id);
791        for (key, value) in &config.lane_metadata {
792            builder.insert_metadata(key.clone(), value.clone());
793        }
794        Self {
795            config,
796            builder,
797            logical_clock: 0,
798            event_count: 0,
799        }
800    }
801
802    /// Current logical clock value.
803    #[must_use]
804    pub const fn logical_clock(&self) -> u64 {
805        self.logical_clock
806    }
807
808    /// Number of events recorded so far.
809    #[must_use]
810    pub const fn event_count(&self) -> u64 {
811        self.event_count
812    }
813
814    /// Advance the logical clock and return the new value.
815    pub const fn tick(&mut self) -> u64 {
816        self.logical_clock = self.logical_clock.saturating_add(1);
817        self.logical_clock
818    }
819
820    /// Record an event at the current logical clock.
821    pub fn record(
822        &mut self,
823        extension_id: impl Into<String>,
824        request_id: impl Into<String>,
825        kind: ReplayEventKind,
826        attributes: BTreeMap<String, String>,
827    ) {
828        let mut draft = ReplayEventDraft::new(self.logical_clock, extension_id, request_id, kind);
829        draft.attributes = attributes;
830        self.builder.push(draft);
831        self.event_count = self.event_count.saturating_add(1);
832    }
833
834    /// Record a `Scheduled` event.
835    pub fn record_scheduled(
836        &mut self,
837        extension_id: impl Into<String>,
838        request_id: impl Into<String>,
839        attributes: BTreeMap<String, String>,
840    ) {
841        self.record(
842            extension_id,
843            request_id,
844            ReplayEventKind::Scheduled,
845            attributes,
846        );
847    }
848
849    /// Record a `QueueAccepted` event.
850    pub fn record_queue_accepted(
851        &mut self,
852        extension_id: impl Into<String>,
853        request_id: impl Into<String>,
854        attributes: BTreeMap<String, String>,
855    ) {
856        self.record(
857            extension_id,
858            request_id,
859            ReplayEventKind::QueueAccepted,
860            attributes,
861        );
862    }
863
864    /// Record a `PolicyDecision` event.
865    pub fn record_policy_decision(
866        &mut self,
867        extension_id: impl Into<String>,
868        request_id: impl Into<String>,
869        attributes: BTreeMap<String, String>,
870    ) {
871        self.record(
872            extension_id,
873            request_id,
874            ReplayEventKind::PolicyDecision,
875            attributes,
876        );
877    }
878
879    /// Record a `Cancelled` event.
880    pub fn record_cancelled(
881        &mut self,
882        extension_id: impl Into<String>,
883        request_id: impl Into<String>,
884        attributes: BTreeMap<String, String>,
885    ) {
886        self.record(
887            extension_id,
888            request_id,
889            ReplayEventKind::Cancelled,
890            attributes,
891        );
892    }
893
894    /// Record a `Retried` event.
895    pub fn record_retried(
896        &mut self,
897        extension_id: impl Into<String>,
898        request_id: impl Into<String>,
899        attributes: BTreeMap<String, String>,
900    ) {
901        self.record(
902            extension_id,
903            request_id,
904            ReplayEventKind::Retried,
905            attributes,
906        );
907    }
908
909    /// Record a `Completed` event.
910    pub fn record_completed(
911        &mut self,
912        extension_id: impl Into<String>,
913        request_id: impl Into<String>,
914        attributes: BTreeMap<String, String>,
915    ) {
916        self.record(
917            extension_id,
918            request_id,
919            ReplayEventKind::Completed,
920            attributes,
921        );
922    }
923
924    /// Record a `Failed` event.
925    pub fn record_failed(
926        &mut self,
927        extension_id: impl Into<String>,
928        request_id: impl Into<String>,
929        attributes: BTreeMap<String, String>,
930    ) {
931        self.record(
932            extension_id,
933            request_id,
934            ReplayEventKind::Failed,
935            attributes,
936        );
937    }
938
939    /// Finalize the recording: canonicalize event ordering, apply budget gate,
940    /// and build the diagnostic snapshot.
941    ///
942    /// The `observation` provides runtime telemetry needed for budget gating.
943    ///
944    /// # Errors
945    ///
946    /// Returns an error if the trace bundle fails validation.
947    pub fn finish(
948        self,
949        observation: ReplayCaptureObservation,
950    ) -> Result<ReplayLaneResult, ReplayTraceValidationError> {
951        let bundle = self.builder.build()?;
952        let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
953        let diagnostic = build_replay_diagnostic_snapshot(&bundle, gate_report, None)?;
954
955        Ok(ReplayLaneResult {
956            bundle,
957            gate_report,
958            diagnostic,
959        })
960    }
961
962    /// Finalize and compare against a reference bundle.
963    ///
964    /// Returns the lane result together with a comparison result that
965    /// includes the first divergence (if any) and derived root-cause hints.
966    ///
967    /// # Errors
968    ///
969    /// Returns an error if either bundle fails validation.
970    pub fn finish_and_compare(
971        self,
972        observation: ReplayCaptureObservation,
973        reference: &ReplayTraceBundle,
974    ) -> Result<(ReplayLaneResult, ReplayComparisonResult), ReplayTraceValidationError> {
975        let bundle = self.builder.build()?;
976        let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
977        let divergence_opt = first_divergence(reference, &bundle)?;
978        let diagnostic =
979            build_replay_diagnostic_snapshot(&bundle, gate_report, divergence_opt.as_ref())?;
980
981        let comparison = ReplayComparisonResult {
982            reference_trace_id: reference.trace_id.clone(),
983            observed_trace_id: bundle.trace_id.clone(),
984            divergence: divergence_opt,
985            root_cause_hints: diagnostic.root_cause_hints.clone(),
986        };
987
988        let result = ReplayLaneResult {
989            bundle,
990            gate_report,
991            diagnostic,
992        };
993
994        Ok((result, comparison))
995    }
996}
997
998/// Compare two previously-recorded bundles without an active recorder.
999///
1000/// # Errors
1001///
1002/// Returns an error if either bundle fails structural validation, or if a
1003/// same-schema comparison uses an unsupported schema version.
1004pub fn compare_replay_bundles(
1005    reference: &ReplayTraceBundle,
1006    observed: &ReplayTraceBundle,
1007    gate_report: ReplayCaptureGateReport,
1008) -> Result<(ReplayDiagnosticSnapshot, ReplayComparisonResult), ReplayTraceValidationError> {
1009    let divergence_opt = first_divergence(reference, observed)?;
1010    let diagnostic =
1011        build_replay_diagnostic_snapshot(observed, gate_report, divergence_opt.as_ref())?;
1012
1013    let comparison = ReplayComparisonResult {
1014        reference_trace_id: reference.trace_id.clone(),
1015        observed_trace_id: observed.trace_id.clone(),
1016        divergence: divergence_opt,
1017        root_cause_hints: diagnostic.root_cause_hints.clone(),
1018    };
1019
1020    Ok((diagnostic, comparison))
1021}
1022
1023#[cfg(test)]
1024mod tests {
1025    use super::{
1026        REPLAY_TRACE_SCHEMA_V1, ReplayCaptureBudget, ReplayCaptureGateReason,
1027        ReplayCaptureObservation, ReplayDivergenceReason, ReplayEventDraft, ReplayEventKind,
1028        ReplayRootCauseHint, ReplayTraceBuilder, ReplayTraceBundle, ReplayTraceCodecError,
1029        ReplayTraceValidationError, build_replay_diagnostic_snapshot, evaluate_replay_capture_gate,
1030        first_divergence,
1031    };
1032    use std::collections::BTreeMap;
1033
1034    fn draft(
1035        logical_clock: u64,
1036        extension_id: &str,
1037        request_id: &str,
1038        kind: ReplayEventKind,
1039    ) -> ReplayEventDraft {
1040        ReplayEventDraft::new(
1041            logical_clock,
1042            extension_id.to_string(),
1043            request_id.to_string(),
1044            kind,
1045        )
1046    }
1047
1048    const fn standard_capture_budget() -> ReplayCaptureBudget {
1049        ReplayCaptureBudget {
1050            capture_enabled: true,
1051            max_overhead_per_mille: 120,
1052            max_trace_bytes: 8_192,
1053        }
1054    }
1055
1056    fn standard_bundle() -> ReplayTraceBundle {
1057        let mut builder = ReplayTraceBuilder::new("trace-diagnostic");
1058        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1059        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1060        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Completed));
1061        builder.build().expect("bundle should build")
1062    }
1063
1064    #[test]
1065    fn deterministic_build_is_order_stable_across_input_permutations() {
1066        let mut left = ReplayTraceBuilder::new("trace-a");
1067        left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1068        left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1069        left.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1070        left.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1071
1072        let mut right = ReplayTraceBuilder::new("trace-a");
1073        right.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1074        right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1075        right.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1076        right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1077
1078        let left_bundle = left.build().expect("left bundle should build");
1079        let right_bundle = right.build().expect("right bundle should build");
1080
1081        assert_eq!(left_bundle, right_bundle);
1082        assert_eq!(left_bundle.events[0].kind, ReplayEventKind::Cancelled);
1083        assert_eq!(left_bundle.events[1].kind, ReplayEventKind::Retried);
1084    }
1085
1086    #[test]
1087    fn json_roundtrip_preserves_replay_bundle() {
1088        let mut builder = ReplayTraceBuilder::new("trace-roundtrip");
1089        builder.insert_metadata("lane", "shadow");
1090        let mut event = draft(20, "ext.gamma", "req-7", ReplayEventKind::PolicyDecision);
1091        event
1092            .attributes
1093            .insert("decision".to_string(), "fast_lane".to_string());
1094        builder.push(draft(19, "ext.gamma", "req-7", ReplayEventKind::Scheduled));
1095        builder.push(event);
1096        builder.push(draft(21, "ext.gamma", "req-7", ReplayEventKind::Completed));
1097
1098        let bundle = builder.build().expect("bundle should build");
1099        let encoded = bundle.encode_json().expect("bundle should encode");
1100        let decoded = ReplayTraceBundle::decode_json(&encoded).expect("bundle should decode");
1101        assert_eq!(decoded, bundle);
1102    }
1103
1104    #[test]
1105    fn decode_rejects_retry_without_prior_cancel() {
1106        let bundle = ReplayTraceBundle {
1107            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1108            trace_id: "trace-invalid".to_string(),
1109            metadata: BTreeMap::new(),
1110            events: vec![super::ReplayTraceEvent {
1111                seq: 1,
1112                logical_clock: 1,
1113                extension_id: "ext.a".to_string(),
1114                request_id: "req".to_string(),
1115                kind: ReplayEventKind::Retried,
1116                attributes: BTreeMap::new(),
1117            }],
1118        };
1119        let encoded = bundle
1120            .encode_json()
1121            .expect("invalid bundle should serialize");
1122
1123        let error = ReplayTraceBundle::decode_json(&encoded).expect_err("retry without cancel");
1124        match error {
1125            ReplayTraceCodecError::Validation(ReplayTraceValidationError::RetryWithoutCancel {
1126                ..
1127            }) => {}
1128            other => unreachable!("expected RetryWithoutCancel, got: {other:?}"),
1129        }
1130    }
1131
1132    #[test]
1133    fn decode_rejects_non_contiguous_sequence() {
1134        let bundle = ReplayTraceBundle {
1135            schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1136            trace_id: "trace-seq".to_string(),
1137            metadata: BTreeMap::new(),
1138            events: vec![
1139                super::ReplayTraceEvent {
1140                    seq: 1,
1141                    logical_clock: 1,
1142                    extension_id: "ext.a".to_string(),
1143                    request_id: "req-1".to_string(),
1144                    kind: ReplayEventKind::Scheduled,
1145                    attributes: BTreeMap::new(),
1146                },
1147                super::ReplayTraceEvent {
1148                    seq: 3,
1149                    logical_clock: 2,
1150                    extension_id: "ext.a".to_string(),
1151                    request_id: "req-1".to_string(),
1152                    kind: ReplayEventKind::Completed,
1153                    attributes: BTreeMap::new(),
1154                },
1155            ],
1156        };
1157        let encoded = bundle
1158            .encode_json()
1159            .expect("invalid bundle should serialize");
1160
1161        let error = ReplayTraceBundle::decode_json(&encoded).expect_err("non-contiguous seq");
1162        match error {
1163            ReplayTraceCodecError::Validation(
1164                ReplayTraceValidationError::NonContiguousSequence { expected, observed },
1165            ) => {
1166                assert_eq!(expected, 2);
1167                assert_eq!(observed, 3);
1168            }
1169            other => unreachable!("expected NonContiguousSequence, got: {other:?}"),
1170        }
1171    }
1172
1173    #[test]
1174    fn divergence_reports_kind_mismatch_with_seq() {
1175        let mut expected_builder = ReplayTraceBuilder::new("trace-divergence");
1176        expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1177        expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1178        let expected = expected_builder.build().expect("expected bundle");
1179
1180        let mut observed_builder = ReplayTraceBuilder::new("trace-divergence");
1181        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1182        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Failed));
1183        let observed = observed_builder.build().expect("observed bundle");
1184
1185        let divergence = first_divergence(&expected, &observed)
1186            .expect("comparison should succeed")
1187            .expect("divergence expected");
1188        assert_eq!(divergence.seq, Some(2));
1189        match divergence.reason {
1190            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1191                assert_eq!(field, "kind");
1192            }
1193            other => unreachable!("expected EventFieldMismatch, got: {other:?}"),
1194        }
1195    }
1196
1197    #[test]
1198    fn divergence_reports_event_count_mismatch() {
1199        let mut expected_builder = ReplayTraceBuilder::new("trace-length");
1200        expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1201        expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1202        let expected = expected_builder.build().expect("expected bundle");
1203
1204        let mut observed_builder = ReplayTraceBuilder::new("trace-length");
1205        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1206        let observed = observed_builder.build().expect("observed bundle");
1207
1208        let divergence = first_divergence(&expected, &observed)
1209            .expect("comparison should succeed")
1210            .expect("divergence expected");
1211        assert_eq!(divergence.seq, Some(2));
1212        match divergence.reason {
1213            ReplayDivergenceReason::EventCountMismatch { expected, observed } => {
1214                assert_eq!(expected, 2);
1215                assert_eq!(observed, 1);
1216            }
1217            other => unreachable!("expected EventCountMismatch, got: {other:?}"),
1218        }
1219    }
1220
1221    #[test]
1222    fn divergence_returns_none_for_identical_bundles() {
1223        let mut builder = ReplayTraceBuilder::new("trace-identical");
1224        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1225        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1226        let bundle = builder.build().expect("bundle");
1227
1228        let divergence =
1229            first_divergence(&bundle, &bundle).expect("comparison should validate identical");
1230        assert!(divergence.is_none());
1231    }
1232
1233    #[test]
1234    fn capture_gate_disables_when_config_switch_is_off() {
1235        let mut budget = standard_capture_budget();
1236        budget.capture_enabled = false;
1237        let observation = ReplayCaptureObservation {
1238            baseline_micros: 1_000,
1239            captured_micros: 1_010,
1240            trace_bytes: 128,
1241        };
1242
1243        let report = evaluate_replay_capture_gate(budget, observation);
1244        assert!(!report.capture_allowed);
1245        assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
1246    }
1247
1248    #[test]
1249    fn capture_gate_disables_when_overhead_exceeds_budget() {
1250        let budget = standard_capture_budget();
1251        let observation = ReplayCaptureObservation {
1252            baseline_micros: 1_000,
1253            captured_micros: 1_140,
1254            trace_bytes: 512,
1255        };
1256
1257        let report = evaluate_replay_capture_gate(budget, observation);
1258        assert!(!report.capture_allowed);
1259        assert_eq!(
1260            report.reason,
1261            ReplayCaptureGateReason::DisabledByOverheadBudget
1262        );
1263        assert_eq!(report.observed_overhead_per_mille, 140);
1264    }
1265
1266    #[test]
1267    fn capture_gate_disables_when_trace_budget_exceeded() {
1268        let budget = standard_capture_budget();
1269        let observation = ReplayCaptureObservation {
1270            baseline_micros: 1_000,
1271            captured_micros: 1_050,
1272            trace_bytes: 9_000,
1273        };
1274
1275        let report = evaluate_replay_capture_gate(budget, observation);
1276        assert!(!report.capture_allowed);
1277        assert_eq!(
1278            report.reason,
1279            ReplayCaptureGateReason::DisabledByTraceBudget
1280        );
1281        assert_eq!(report.observed_overhead_per_mille, 50);
1282    }
1283
1284    #[test]
1285    fn capture_gate_fails_closed_on_invalid_baseline() {
1286        let budget = standard_capture_budget();
1287        let observation = ReplayCaptureObservation {
1288            baseline_micros: 0,
1289            captured_micros: 1,
1290            trace_bytes: 64,
1291        };
1292
1293        let report = evaluate_replay_capture_gate(budget, observation);
1294        assert!(!report.capture_allowed);
1295        assert_eq!(
1296            report.reason,
1297            ReplayCaptureGateReason::DisabledByInvalidBaseline
1298        );
1299        assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1300    }
1301
1302    #[test]
1303    fn capture_gate_fails_closed_when_zero_baseline_reports_zero_capture_cost() {
1304        let budget = standard_capture_budget();
1305        let observation = ReplayCaptureObservation {
1306            baseline_micros: 0,
1307            captured_micros: 0,
1308            trace_bytes: 64,
1309        };
1310
1311        let report = evaluate_replay_capture_gate(budget, observation);
1312        assert!(!report.capture_allowed);
1313        assert_eq!(
1314            report.reason,
1315            ReplayCaptureGateReason::DisabledByInvalidBaseline
1316        );
1317        assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1318    }
1319
1320    #[test]
1321    fn capture_gate_reports_deterministic_within_budget_enablement() {
1322        let budget = standard_capture_budget();
1323        let observation = ReplayCaptureObservation {
1324            baseline_micros: 1_000,
1325            captured_micros: 1_080,
1326            trace_bytes: 4_096,
1327        };
1328
1329        let first = evaluate_replay_capture_gate(budget, observation);
1330        let second = evaluate_replay_capture_gate(budget, observation);
1331
1332        assert_eq!(first, second);
1333        assert!(first.capture_allowed);
1334        assert_eq!(first.reason, ReplayCaptureGateReason::Enabled);
1335        assert_eq!(first.observed_overhead_per_mille, 80);
1336    }
1337
1338    #[test]
1339    fn diagnostic_snapshot_emits_hint_codes_for_gate_and_payload_drift() {
1340        let expected = standard_bundle();
1341
1342        let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
1343        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1344        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1345        observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
1346        let observed = observed_builder.build().expect("observed bundle");
1347
1348        let divergence = first_divergence(&expected, &observed)
1349            .expect("comparison should succeed")
1350            .expect("divergence expected");
1351        let capture_gate = evaluate_replay_capture_gate(
1352            standard_capture_budget(),
1353            ReplayCaptureObservation {
1354                baseline_micros: 1_000,
1355                captured_micros: 1_150,
1356                trace_bytes: 64,
1357            },
1358        );
1359
1360        let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1361            .expect("snapshot should build");
1362        assert_eq!(snapshot.event_count, 3);
1363        assert_eq!(
1364            snapshot.root_cause_hints,
1365            vec![
1366                ReplayRootCauseHint::EventPayloadDrift,
1367                ReplayRootCauseHint::OverheadBudgetExceeded,
1368            ]
1369        );
1370    }
1371
1372    #[test]
1373    fn diagnostic_snapshot_maps_logical_clock_drift_hint() {
1374        let expected = standard_bundle();
1375        let mut observed = expected.clone();
1376        observed.events[1].logical_clock = 77;
1377
1378        let divergence = first_divergence(&expected, &observed)
1379            .expect("comparison should succeed")
1380            .expect("divergence expected");
1381        let capture_gate = evaluate_replay_capture_gate(
1382            standard_capture_budget(),
1383            ReplayCaptureObservation {
1384                baseline_micros: 1_000,
1385                captured_micros: 1_010,
1386                trace_bytes: 64,
1387            },
1388        );
1389
1390        let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1391            .expect("snapshot should build");
1392        assert_eq!(
1393            snapshot.root_cause_hints,
1394            vec![ReplayRootCauseHint::LogicalClockDrift]
1395        );
1396    }
1397
1398    #[test]
1399    fn diagnostic_snapshot_is_deterministic_for_same_inputs() {
1400        let bundle = standard_bundle();
1401        let capture_gate = evaluate_replay_capture_gate(
1402            standard_capture_budget(),
1403            ReplayCaptureObservation {
1404                baseline_micros: 1_000,
1405                captured_micros: 1_020,
1406                trace_bytes: 256,
1407            },
1408        );
1409
1410        let first =
1411            build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("first snapshot");
1412        let second =
1413            build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("second snapshot");
1414        assert_eq!(first, second);
1415    }
1416
1417    #[test]
1418    fn diagnostic_snapshot_rejects_invalid_bundle() {
1419        let invalid = ReplayTraceBundle {
1420            schema: "invalid.schema".to_string(),
1421            trace_id: "trace-bad".to_string(),
1422            metadata: BTreeMap::new(),
1423            events: Vec::new(),
1424        };
1425        let capture_gate = evaluate_replay_capture_gate(
1426            standard_capture_budget(),
1427            ReplayCaptureObservation {
1428                baseline_micros: 1_000,
1429                captured_micros: 1_000,
1430                trace_bytes: 0,
1431            },
1432        );
1433
1434        let error = build_replay_diagnostic_snapshot(&invalid, capture_gate, None)
1435            .expect_err("invalid schema should fail");
1436        assert!(matches!(
1437            error,
1438            ReplayTraceValidationError::UnknownSchema(_)
1439        ));
1440    }
1441
1442    // ── Builder edge cases ──
1443
1444    #[test]
1445    fn builder_empty_events_produces_valid_bundle() {
1446        let builder = ReplayTraceBuilder::new("trace-empty");
1447        let bundle = builder.build().expect("empty bundle should be valid");
1448        assert!(bundle.events.is_empty());
1449        assert_eq!(bundle.schema, REPLAY_TRACE_SCHEMA_V1);
1450        assert_eq!(bundle.trace_id, "trace-empty");
1451    }
1452
1453    #[test]
1454    fn builder_metadata_preserved_in_output() {
1455        let mut builder = ReplayTraceBuilder::new("trace-meta");
1456        builder.insert_metadata("env", "production");
1457        builder.insert_metadata("version", "1.2.3");
1458        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1459        let bundle = builder.build().expect("bundle with metadata");
1460        assert_eq!(
1461            bundle.metadata.get("env").map(String::as_str),
1462            Some("production")
1463        );
1464        assert_eq!(
1465            bundle.metadata.get("version").map(String::as_str),
1466            Some("1.2.3")
1467        );
1468    }
1469
1470    #[test]
1471    fn builder_metadata_overwrite_works() {
1472        let mut builder = ReplayTraceBuilder::new("trace-meta-ow");
1473        builder.insert_metadata("key", "old");
1474        builder.insert_metadata("key", "new");
1475        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1476        let bundle = builder.build().expect("metadata overwrite");
1477        assert_eq!(bundle.metadata.get("key").map(String::as_str), Some("new"));
1478    }
1479
1480    #[test]
1481    fn draft_attributes_carried_through_build() {
1482        let mut d = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1483        d.attributes
1484            .insert("policy".to_string(), "fast_lane".to_string());
1485        d.attributes
1486            .insert("latency_ms".to_string(), "12".to_string());
1487        let mut builder = ReplayTraceBuilder::new("trace-attrs");
1488        builder.push(d);
1489        let bundle = builder.build().expect("bundle with attrs");
1490        assert_eq!(bundle.events[0].attributes.len(), 2);
1491        assert_eq!(
1492            bundle.events[0]
1493                .attributes
1494                .get("policy")
1495                .map(String::as_str),
1496            Some("fast_lane")
1497        );
1498    }
1499
1500    // ── Validation error paths ──
1501
1502    #[test]
1503    fn validate_rejects_empty_trace_id() {
1504        let mut builder = ReplayTraceBuilder::new("");
1505        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1506        let err = builder.build().expect_err("empty trace_id should fail");
1507        assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1508    }
1509
1510    #[test]
1511    fn validate_rejects_whitespace_only_trace_id() {
1512        let mut builder = ReplayTraceBuilder::new("   ");
1513        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1514        let err = builder
1515            .build()
1516            .expect_err("whitespace trace_id should fail");
1517        assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1518    }
1519
1520    #[test]
1521    fn validate_rejects_empty_extension_id() {
1522        let mut builder = ReplayTraceBuilder::new("trace-val");
1523        builder.push(draft(1, "", "req-1", ReplayEventKind::Scheduled));
1524        let err = builder.build().expect_err("empty extension_id should fail");
1525        assert!(matches!(
1526            err,
1527            ReplayTraceValidationError::MissingExtensionId { .. }
1528        ));
1529    }
1530
1531    #[test]
1532    fn validate_rejects_empty_request_id() {
1533        let mut builder = ReplayTraceBuilder::new("trace-val");
1534        builder.push(draft(1, "ext.a", "", ReplayEventKind::Scheduled));
1535        let err = builder.build().expect_err("empty request_id should fail");
1536        assert!(matches!(
1537            err,
1538            ReplayTraceValidationError::MissingRequestId { .. }
1539        ));
1540    }
1541
1542    #[test]
1543    fn validate_rejects_duplicate_cancel_without_retry() {
1544        let mut builder = ReplayTraceBuilder::new("trace-dup-cancel");
1545        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1546        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Cancelled));
1547        let err = builder.build().expect_err("duplicate cancel should fail");
1548        assert!(matches!(
1549            err,
1550            ReplayTraceValidationError::DuplicateCancelWithoutRetry { .. }
1551        ));
1552    }
1553
1554    #[test]
1555    fn cancel_then_retry_then_cancel_is_valid() {
1556        let mut builder = ReplayTraceBuilder::new("trace-cancel-retry-cancel");
1557        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1558        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Retried));
1559        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1560        let bundle = builder
1561            .build()
1562            .expect("cancel-retry-cancel should be valid");
1563        assert_eq!(bundle.events.len(), 3);
1564    }
1565
1566    #[test]
1567    fn completed_clears_pending_cancel() {
1568        let mut builder = ReplayTraceBuilder::new("trace-complete-clear");
1569        builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1570        builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1571        builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1572        let bundle = builder
1573            .build()
1574            .expect("completed should clear cancel state");
1575        assert_eq!(bundle.events.len(), 3);
1576    }
1577
1578    // ── ReplayEventKind ordering ──
1579
1580    #[test]
1581    fn event_kind_canonical_rank_is_monotonic() {
1582        let kinds = [
1583            ReplayEventKind::Scheduled,
1584            ReplayEventKind::QueueAccepted,
1585            ReplayEventKind::PolicyDecision,
1586            ReplayEventKind::Cancelled,
1587            ReplayEventKind::Retried,
1588            ReplayEventKind::Completed,
1589            ReplayEventKind::Failed,
1590        ];
1591        for pair in kinds.windows(2) {
1592            assert!(
1593                pair[0].canonical_rank() < pair[1].canonical_rank(),
1594                "{:?} should have lower rank than {:?}",
1595                pair[0],
1596                pair[1]
1597            );
1598        }
1599    }
1600
1601    #[test]
1602    fn event_kind_serde_roundtrip() {
1603        let kinds = [
1604            ReplayEventKind::Scheduled,
1605            ReplayEventKind::QueueAccepted,
1606            ReplayEventKind::PolicyDecision,
1607            ReplayEventKind::Cancelled,
1608            ReplayEventKind::Retried,
1609            ReplayEventKind::Completed,
1610            ReplayEventKind::Failed,
1611        ];
1612        for kind in kinds {
1613            let json = serde_json::to_string(&kind).expect("serialize kind");
1614            let roundtrip: ReplayEventKind = serde_json::from_str(&json).expect("deserialize kind");
1615            assert_eq!(kind, roundtrip);
1616        }
1617    }
1618
1619    // ── Divergence edge cases ──
1620
1621    #[test]
1622    fn divergence_detects_schema_mismatch() {
1623        let mut observed = standard_bundle();
1624        observed.schema = "pi.ext.replay.trace.v2".to_string();
1625
1626        let divergence = first_divergence(&standard_bundle(), &observed)
1627            .expect("comparison should succeed")
1628            .expect("schema mismatch expected");
1629
1630        assert_eq!(divergence.seq, None);
1631        match divergence.reason {
1632            ReplayDivergenceReason::SchemaMismatch { expected, observed } => {
1633                assert_eq!(expected, REPLAY_TRACE_SCHEMA_V1);
1634                assert_eq!(observed, "pi.ext.replay.trace.v2");
1635            }
1636            other => unreachable!("expected SchemaMismatch, got: {other:?}"),
1637        }
1638    }
1639
1640    #[test]
1641    fn divergence_rejects_same_unknown_schema() {
1642        let mut expected = standard_bundle();
1643        expected.schema = "pi.ext.replay.trace.v2".to_string();
1644        let observed = expected.clone();
1645
1646        let error = first_divergence(&expected, &observed).expect_err("unsupported schema");
1647        assert!(matches!(
1648            error,
1649            ReplayTraceValidationError::UnknownSchema(schema) if schema == "pi.ext.replay.trace.v2"
1650        ));
1651    }
1652
1653    #[test]
1654    fn divergence_detects_attribute_mismatch() {
1655        let mut builder_a = ReplayTraceBuilder::new("trace-attrs-cmp");
1656        let mut d1 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1657        d1.attributes
1658            .insert("decision".to_string(), "fast".to_string());
1659        builder_a.push(d1);
1660        let expected = builder_a.build().expect("bundle a");
1661
1662        let mut builder_b = ReplayTraceBuilder::new("trace-attrs-cmp");
1663        let mut d2 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1664        d2.attributes
1665            .insert("decision".to_string(), "slow".to_string());
1666        builder_b.push(d2);
1667        let observed = builder_b.build().expect("bundle b");
1668
1669        let divergence = first_divergence(&expected, &observed)
1670            .expect("comparison should succeed")
1671            .expect("attribute mismatch expected");
1672        assert_eq!(divergence.seq, Some(1));
1673        match divergence.reason {
1674            ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1675                assert_eq!(field, "attributes");
1676            }
1677            other => unreachable!("expected EventFieldMismatch for attributes, got: {other:?}"),
1678        }
1679    }
1680
1681    // ── Capture gate boundary cases ──
1682
1683    #[test]
1684    fn capture_gate_zero_overhead_when_captured_equals_baseline() {
1685        let budget = standard_capture_budget();
1686        let observation = ReplayCaptureObservation {
1687            baseline_micros: 1_000,
1688            captured_micros: 1_000,
1689            trace_bytes: 100,
1690        };
1691        let report = evaluate_replay_capture_gate(budget, observation);
1692        assert!(report.capture_allowed);
1693        assert_eq!(report.observed_overhead_per_mille, 0);
1694    }
1695
1696    #[test]
1697    fn capture_gate_zero_overhead_when_captured_less_than_baseline() {
1698        let budget = standard_capture_budget();
1699        let observation = ReplayCaptureObservation {
1700            baseline_micros: 1_000,
1701            captured_micros: 900,
1702            trace_bytes: 100,
1703        };
1704        let report = evaluate_replay_capture_gate(budget, observation);
1705        assert!(report.capture_allowed);
1706        assert_eq!(report.observed_overhead_per_mille, 0);
1707    }
1708
1709    #[test]
1710    fn capture_gate_exact_boundary_at_max_overhead() {
1711        let budget = ReplayCaptureBudget {
1712            capture_enabled: true,
1713            max_overhead_per_mille: 100,
1714            max_trace_bytes: 10_000,
1715        };
1716        // 100/1000 = 100 per mille — exactly at budget
1717        let observation = ReplayCaptureObservation {
1718            baseline_micros: 1_000,
1719            captured_micros: 1_100,
1720            trace_bytes: 100,
1721        };
1722        let report = evaluate_replay_capture_gate(budget, observation);
1723        assert!(report.capture_allowed);
1724        assert_eq!(report.observed_overhead_per_mille, 100);
1725    }
1726
1727    #[test]
1728    fn capture_gate_exact_boundary_at_max_trace_bytes() {
1729        let budget = ReplayCaptureBudget {
1730            capture_enabled: true,
1731            max_overhead_per_mille: 1_000,
1732            max_trace_bytes: 500,
1733        };
1734        // Exactly at budget
1735        let at_limit = ReplayCaptureObservation {
1736            baseline_micros: 1_000,
1737            captured_micros: 1_010,
1738            trace_bytes: 500,
1739        };
1740        let report = evaluate_replay_capture_gate(budget, at_limit);
1741        assert!(report.capture_allowed);
1742
1743        // One over budget
1744        let over_limit = ReplayCaptureObservation {
1745            baseline_micros: 1_000,
1746            captured_micros: 1_010,
1747            trace_bytes: 501,
1748        };
1749        let report = evaluate_replay_capture_gate(budget, over_limit);
1750        assert!(!report.capture_allowed);
1751        assert_eq!(
1752            report.reason,
1753            ReplayCaptureGateReason::DisabledByTraceBudget
1754        );
1755    }
1756
1757    // ── Diagnostic snapshot root cause hints ──
1758
1759    #[test]
1760    fn diagnostic_snapshot_maps_config_disabled_hint() {
1761        let bundle = standard_bundle();
1762        let budget = ReplayCaptureBudget {
1763            capture_enabled: false,
1764            max_overhead_per_mille: 100,
1765            max_trace_bytes: 1_000,
1766        };
1767        let gate = evaluate_replay_capture_gate(
1768            budget,
1769            ReplayCaptureObservation {
1770                baseline_micros: 100,
1771                captured_micros: 100,
1772                trace_bytes: 0,
1773            },
1774        );
1775        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1776        assert_eq!(
1777            snapshot.root_cause_hints,
1778            vec![ReplayRootCauseHint::PolicyGateDisabled]
1779        );
1780    }
1781
1782    #[test]
1783    fn diagnostic_snapshot_maps_trace_budget_hint() {
1784        let bundle = standard_bundle();
1785        let budget = ReplayCaptureBudget {
1786            capture_enabled: true,
1787            max_overhead_per_mille: 1_000,
1788            max_trace_bytes: 100,
1789        };
1790        let gate = evaluate_replay_capture_gate(
1791            budget,
1792            ReplayCaptureObservation {
1793                baseline_micros: 1_000,
1794                captured_micros: 1_010,
1795                trace_bytes: 200,
1796            },
1797        );
1798        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1799        assert_eq!(
1800            snapshot.root_cause_hints,
1801            vec![ReplayRootCauseHint::TraceBudgetExceeded]
1802        );
1803    }
1804
1805    #[test]
1806    fn diagnostic_snapshot_serde_roundtrip() {
1807        let bundle = standard_bundle();
1808        let gate = evaluate_replay_capture_gate(
1809            standard_capture_budget(),
1810            ReplayCaptureObservation {
1811                baseline_micros: 1_000,
1812                captured_micros: 1_010,
1813                trace_bytes: 64,
1814            },
1815        );
1816        let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1817        let json = serde_json::to_string(&snapshot).expect("serialize");
1818        let roundtrip: super::ReplayDiagnosticSnapshot =
1819            serde_json::from_str(&json).expect("deserialize");
1820        assert_eq!(snapshot, roundtrip);
1821    }
1822
1823    // ── compute_overhead_per_mille edge cases ──
1824
1825    #[test]
1826    fn overhead_per_mille_exact_computation() {
1827        // 50 overhead on 1000 baseline = 50 per mille
1828        assert_eq!(super::compute_overhead_per_mille(1_000, 1_050), 50);
1829        // 200 overhead on 1000 baseline = 200 per mille
1830        assert_eq!(super::compute_overhead_per_mille(1_000, 1_200), 200);
1831        // 0 overhead
1832        assert_eq!(super::compute_overhead_per_mille(1_000, 1_000), 0);
1833        // Captured < baseline
1834        assert_eq!(super::compute_overhead_per_mille(1_000, 500), 0);
1835    }
1836
1837    #[test]
1838    fn overhead_per_mille_rounding_up() {
1839        // 1 overhead on 3 baseline = 333.3... per mille → rounds up to 334
1840        assert_eq!(super::compute_overhead_per_mille(3, 4), 334);
1841    }
1842
1843    #[test]
1844    fn overhead_per_mille_zero_baseline_returns_max() {
1845        assert_eq!(super::compute_overhead_per_mille(0, 1), u32::MAX);
1846        assert_eq!(super::compute_overhead_per_mille(0, 0), u32::MAX);
1847    }
1848
1849    // ── ReplayRecorder tests ──
1850
1851    fn within_budget_observation() -> ReplayCaptureObservation {
1852        ReplayCaptureObservation {
1853            baseline_micros: 1_000,
1854            captured_micros: 1_050,
1855            trace_bytes: 256,
1856        }
1857    }
1858
1859    fn standard_lane_config() -> super::ReplayLaneConfig {
1860        super::ReplayLaneConfig::new(standard_capture_budget())
1861    }
1862
1863    #[test]
1864    fn recorder_empty_produces_valid_bundle() {
1865        let recorder = super::ReplayRecorder::new("trace-empty-rec", standard_lane_config());
1866        assert_eq!(recorder.event_count(), 0);
1867        assert_eq!(recorder.logical_clock(), 0);
1868
1869        let result = recorder
1870            .finish(within_budget_observation())
1871            .expect("finish");
1872        assert!(result.bundle.events.is_empty());
1873        assert!(result.gate_report.capture_allowed);
1874        assert_eq!(result.diagnostic.event_count, 0);
1875    }
1876
1877    #[test]
1878    fn recorder_captures_events_in_sequence() {
1879        let mut recorder = super::ReplayRecorder::new("trace-seq-rec", standard_lane_config());
1880        recorder.tick();
1881        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1882        recorder.tick();
1883        recorder.record_queue_accepted("ext.a", "req-1", BTreeMap::new());
1884        recorder.tick();
1885        recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
1886        recorder.tick();
1887        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1888
1889        assert_eq!(recorder.event_count(), 4);
1890        assert_eq!(recorder.logical_clock(), 4);
1891
1892        let result = recorder
1893            .finish(within_budget_observation())
1894            .expect("finish");
1895        assert_eq!(result.bundle.events.len(), 4);
1896        assert_eq!(result.bundle.events[0].kind, ReplayEventKind::Scheduled);
1897        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::QueueAccepted);
1898        assert_eq!(
1899            result.bundle.events[2].kind,
1900            ReplayEventKind::PolicyDecision
1901        );
1902        assert_eq!(result.bundle.events[3].kind, ReplayEventKind::Completed);
1903
1904        // Sequences are 1-based contiguous
1905        for (i, event) in result.bundle.events.iter().enumerate() {
1906            assert_eq!(event.seq, (i + 1) as u64);
1907        }
1908    }
1909
1910    #[test]
1911    fn recorder_attributes_flow_through() {
1912        let mut recorder = super::ReplayRecorder::new("trace-attrs-rec", standard_lane_config());
1913        recorder.tick();
1914        let mut attrs = BTreeMap::new();
1915        attrs.insert("lane".to_string(), "fast".to_string());
1916        attrs.insert("capability".to_string(), "tool".to_string());
1917        recorder.record_policy_decision("ext.a", "req-1", attrs);
1918
1919        let result = recorder
1920            .finish(within_budget_observation())
1921            .expect("finish");
1922        let event = &result.bundle.events[0];
1923        assert_eq!(
1924            event.attributes.get("lane").map(String::as_str),
1925            Some("fast")
1926        );
1927        assert_eq!(
1928            event.attributes.get("capability").map(String::as_str),
1929            Some("tool")
1930        );
1931    }
1932
1933    #[test]
1934    fn recorder_lane_metadata_propagated() {
1935        let mut config = standard_lane_config();
1936        config.insert_metadata("env", "staging");
1937        config.insert_metadata("worker", "w-3");
1938        let mut recorder = super::ReplayRecorder::new("trace-meta-rec", config);
1939        recorder.tick();
1940        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1941
1942        let result = recorder
1943            .finish(within_budget_observation())
1944            .expect("finish");
1945        assert_eq!(
1946            result.bundle.metadata.get("env").map(String::as_str),
1947            Some("staging")
1948        );
1949        assert_eq!(
1950            result.bundle.metadata.get("worker").map(String::as_str),
1951            Some("w-3")
1952        );
1953    }
1954
1955    #[test]
1956    fn recorder_cancel_retry_lifecycle() {
1957        let mut recorder = super::ReplayRecorder::new("trace-cancel-retry", standard_lane_config());
1958        recorder.tick();
1959        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1960        recorder.tick();
1961        recorder.record_cancelled("ext.a", "req-1", BTreeMap::new());
1962        recorder.tick();
1963        recorder.record_retried("ext.a", "req-1", BTreeMap::new());
1964        recorder.tick();
1965        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1966
1967        let result = recorder
1968            .finish(within_budget_observation())
1969            .expect("finish");
1970        assert_eq!(result.bundle.events.len(), 4);
1971        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Cancelled);
1972        assert_eq!(result.bundle.events[2].kind, ReplayEventKind::Retried);
1973    }
1974
1975    #[test]
1976    fn recorder_failed_event() {
1977        let mut recorder = super::ReplayRecorder::new("trace-fail", standard_lane_config());
1978        recorder.tick();
1979        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1980        recorder.tick();
1981        let mut attrs = BTreeMap::new();
1982        attrs.insert("error".to_string(), "timeout".to_string());
1983        recorder.record_failed("ext.a", "req-1", attrs);
1984
1985        let result = recorder
1986            .finish(within_budget_observation())
1987            .expect("finish");
1988        assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Failed);
1989        assert_eq!(
1990            result.bundle.events[1]
1991                .attributes
1992                .get("error")
1993                .map(String::as_str),
1994            Some("timeout")
1995        );
1996    }
1997
1998    #[test]
1999    fn recorder_gate_report_reflects_budget() {
2000        let mut config = super::ReplayLaneConfig::new(ReplayCaptureBudget {
2001            capture_enabled: true,
2002            max_overhead_per_mille: 50,
2003            max_trace_bytes: 10_000,
2004        });
2005        config.insert_metadata("lane", "shadow");
2006        let mut recorder = super::ReplayRecorder::new("trace-gated", config);
2007        recorder.tick();
2008        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2009
2010        // Overhead 100 per mille > budget 50 per mille
2011        let result = recorder
2012            .finish(ReplayCaptureObservation {
2013                baseline_micros: 1_000,
2014                captured_micros: 1_100,
2015                trace_bytes: 64,
2016            })
2017            .expect("finish");
2018
2019        assert!(!result.gate_report.capture_allowed);
2020        assert_eq!(
2021            result.gate_report.reason,
2022            ReplayCaptureGateReason::DisabledByOverheadBudget
2023        );
2024        // Bundle is still present even when gated
2025        assert_eq!(result.bundle.events.len(), 1);
2026    }
2027
2028    #[test]
2029    fn recorder_diagnostic_snapshot_populated() {
2030        let mut recorder = super::ReplayRecorder::new("trace-diag", standard_lane_config());
2031        recorder.tick();
2032        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2033        recorder.tick();
2034        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2035
2036        let result = recorder
2037            .finish(within_budget_observation())
2038            .expect("finish");
2039        assert_eq!(result.diagnostic.trace_id, "trace-diag");
2040        assert_eq!(result.diagnostic.schema, REPLAY_TRACE_SCHEMA_V1);
2041        assert_eq!(result.diagnostic.event_count, 2);
2042        assert!(result.diagnostic.divergence.is_none());
2043        assert!(result.diagnostic.root_cause_hints.is_empty());
2044    }
2045
2046    #[test]
2047    fn recorder_finish_and_compare_identical() {
2048        let mut rec1 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2049        rec1.tick();
2050        rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2051        rec1.tick();
2052        rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2053        let reference = rec1
2054            .finish(within_budget_observation())
2055            .expect("ref")
2056            .bundle;
2057
2058        let mut rec2 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2059        rec2.tick();
2060        rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2061        rec2.tick();
2062        rec2.record_completed("ext.a", "req-1", BTreeMap::new());
2063
2064        let (result, comparison) = rec2
2065            .finish_and_compare(within_budget_observation(), &reference)
2066            .expect("compare");
2067        assert!(comparison.divergence.is_none());
2068        assert!(comparison.root_cause_hints.is_empty());
2069        assert_eq!(comparison.reference_trace_id, "trace-cmp");
2070        assert_eq!(comparison.observed_trace_id, "trace-cmp");
2071        assert!(result.diagnostic.divergence.is_none());
2072    }
2073
2074    #[test]
2075    fn recorder_finish_and_compare_detects_divergence() {
2076        let mut rec1 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2077        rec1.tick();
2078        rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2079        rec1.tick();
2080        rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2081        let reference = rec1
2082            .finish(within_budget_observation())
2083            .expect("ref")
2084            .bundle;
2085
2086        let mut rec2 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2087        rec2.tick();
2088        rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2089        rec2.tick();
2090        rec2.record_failed("ext.a", "req-1", BTreeMap::new());
2091
2092        let (result, comparison) = rec2
2093            .finish_and_compare(within_budget_observation(), &reference)
2094            .expect("compare");
2095        assert!(comparison.divergence.is_some());
2096        let div = comparison.divergence.as_ref().unwrap();
2097        assert_eq!(div.seq, Some(2));
2098        assert!(matches!(
2099            div.reason,
2100            ReplayDivergenceReason::EventFieldMismatch { ref field, .. } if field == "kind"
2101        ));
2102        assert!(result.diagnostic.divergence.is_some());
2103        assert!(!result.diagnostic.root_cause_hints.is_empty());
2104    }
2105
2106    #[test]
2107    fn recorder_multi_extension_interleaving() {
2108        let mut recorder = super::ReplayRecorder::new("trace-multi", standard_lane_config());
2109        recorder.tick();
2110        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2111        recorder.record_scheduled("ext.b", "req-2", BTreeMap::new());
2112        recorder.tick();
2113        recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
2114        recorder.record_policy_decision("ext.b", "req-2", BTreeMap::new());
2115        recorder.tick();
2116        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2117        recorder.record_completed("ext.b", "req-2", BTreeMap::new());
2118
2119        let result = recorder
2120            .finish(within_budget_observation())
2121            .expect("finish");
2122        assert_eq!(result.bundle.events.len(), 6);
2123
2124        // Canonical ordering: at same clock, ext.a < ext.b
2125        let clock_1_events: Vec<_> = result
2126            .bundle
2127            .events
2128            .iter()
2129            .filter(|e| e.logical_clock == 1)
2130            .collect();
2131        assert_eq!(clock_1_events.len(), 2);
2132        assert_eq!(clock_1_events[0].extension_id, "ext.a");
2133        assert_eq!(clock_1_events[1].extension_id, "ext.b");
2134    }
2135
2136    // ── compare_replay_bundles standalone tests ──
2137
2138    #[test]
2139    fn compare_replay_bundles_no_divergence() {
2140        let bundle = standard_bundle();
2141        let gate =
2142            evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2143
2144        let (diagnostic, comparison) =
2145            super::compare_replay_bundles(&bundle, &bundle, gate).expect("compare");
2146        assert!(comparison.divergence.is_none());
2147        assert!(comparison.root_cause_hints.is_empty());
2148        assert!(diagnostic.divergence.is_none());
2149    }
2150
2151    #[test]
2152    fn compare_replay_bundles_with_divergence() {
2153        let reference = standard_bundle();
2154        let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
2155        observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
2156        observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
2157        observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
2158        let observed = observed_builder.build().expect("observed bundle");
2159
2160        let gate =
2161            evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2162
2163        let (diagnostic, comparison) =
2164            super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2165        assert!(comparison.divergence.is_some());
2166        assert!(!comparison.root_cause_hints.is_empty());
2167        assert!(diagnostic.divergence.is_some());
2168    }
2169
2170    #[test]
2171    fn compare_replay_bundles_reports_schema_mismatch_with_hint() {
2172        let reference = standard_bundle();
2173        let mut observed = standard_bundle();
2174        observed.schema = "pi.ext.replay.trace.v2".to_string();
2175        let gate =
2176            evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2177
2178        let (diagnostic, comparison) =
2179            super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2180
2181        assert_eq!(
2182            comparison.root_cause_hints,
2183            vec![ReplayRootCauseHint::TraceSchemaMismatch]
2184        );
2185        assert_eq!(
2186            diagnostic.root_cause_hints,
2187            vec![ReplayRootCauseHint::TraceSchemaMismatch]
2188        );
2189        match comparison
2190            .divergence
2191            .expect("schema mismatch divergence")
2192            .reason
2193        {
2194            ReplayDivergenceReason::SchemaMismatch { expected, observed } => {
2195                assert_eq!(expected, REPLAY_TRACE_SCHEMA_V1);
2196                assert_eq!(observed, "pi.ext.replay.trace.v2");
2197            }
2198            other => unreachable!("expected SchemaMismatch, got: {other:?}"),
2199        }
2200    }
2201
2202    // ── ReplayLaneConfig tests ──
2203
2204    #[test]
2205    fn lane_config_serde_roundtrip() {
2206        let mut config = super::ReplayLaneConfig::new(standard_capture_budget());
2207        config.insert_metadata("env", "prod");
2208
2209        let json = serde_json::to_string(&config).expect("serialize");
2210        let roundtrip: super::ReplayLaneConfig = serde_json::from_str(&json).expect("deserialize");
2211        assert_eq!(config, roundtrip);
2212    }
2213
2214    #[test]
2215    fn lane_config_empty_metadata_omitted_in_json() {
2216        let config = super::ReplayLaneConfig::new(standard_capture_budget());
2217        let json = serde_json::to_string(&config).expect("serialize");
2218        assert!(!json.contains("laneMetadata"));
2219    }
2220
2221    #[test]
2222    fn lane_result_serde_roundtrip() {
2223        let mut recorder = super::ReplayRecorder::new("trace-serde", standard_lane_config());
2224        recorder.tick();
2225        recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2226        recorder.tick();
2227        recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2228
2229        let result = recorder
2230            .finish(within_budget_observation())
2231            .expect("finish");
2232        let json = serde_json::to_string(&result).expect("serialize");
2233        let roundtrip: super::ReplayLaneResult = serde_json::from_str(&json).expect("deserialize");
2234        assert_eq!(result, roundtrip);
2235    }
2236
2237    #[test]
2238    fn comparison_result_serde_roundtrip() {
2239        let comparison = super::ReplayComparisonResult {
2240            reference_trace_id: "ref-1".to_string(),
2241            observed_trace_id: "obs-1".to_string(),
2242            divergence: None,
2243            root_cause_hints: vec![],
2244        };
2245        let json = serde_json::to_string(&comparison).expect("serialize");
2246        let roundtrip: super::ReplayComparisonResult =
2247            serde_json::from_str(&json).expect("deserialize");
2248        assert_eq!(comparison, roundtrip);
2249    }
2250
2251    #[test]
2252    fn recorder_tick_is_monotonic() {
2253        let mut recorder = super::ReplayRecorder::new("trace-tick", standard_lane_config());
2254        let t1 = recorder.tick();
2255        let t2 = recorder.tick();
2256        let t3 = recorder.tick();
2257        assert_eq!(t1, 1);
2258        assert_eq!(t2, 2);
2259        assert_eq!(t3, 3);
2260    }
2261
2262    // ── Property tests ──────────────────────────────────────────────────
2263
2264    mod proptest_extension_replay {
2265        use super::*;
2266        use proptest::prelude::*;
2267
2268        fn arb_event_kind() -> impl Strategy<Value = ReplayEventKind> {
2269            prop::sample::select(vec![
2270                ReplayEventKind::Scheduled,
2271                ReplayEventKind::QueueAccepted,
2272                ReplayEventKind::PolicyDecision,
2273                ReplayEventKind::Completed,
2274                ReplayEventKind::Failed,
2275            ])
2276        }
2277
2278        fn arb_ext_id() -> impl Strategy<Value = String> {
2279            "ext\\.[a-z]{1,5}"
2280        }
2281
2282        fn arb_req_id() -> impl Strategy<Value = String> {
2283            "req-[0-9]{1,4}"
2284        }
2285
2286        fn arb_simple_draft() -> impl Strategy<Value = ReplayEventDraft> {
2287            (1..100u64, arb_ext_id(), arb_req_id(), arb_event_kind())
2288                .prop_map(|(clock, ext, req, kind)| ReplayEventDraft::new(clock, ext, req, kind))
2289        }
2290
2291        proptest! {
2292            #[test]
2293            fn compute_overhead_per_mille_zero_when_captured_leq_baseline(
2294                baseline in 1..10_000u64,
2295                captured in 0..10_000u64,
2296            ) {
2297                if captured <= baseline {
2298                    let result = super::super::compute_overhead_per_mille(baseline, captured);
2299                    assert_eq!(
2300                        result, 0,
2301                        "captured <= baseline should yield 0 overhead"
2302                    );
2303                }
2304            }
2305
2306            #[test]
2307            fn compute_overhead_per_mille_zero_baseline_returns_max(
2308                captured in 0..10_000u64,
2309            ) {
2310                let result = super::super::compute_overhead_per_mille(0, captured);
2311                assert_eq!(
2312                    result, u32::MAX,
2313                    "zero baseline should always be treated as invalid telemetry"
2314                );
2315            }
2316
2317            #[test]
2318            fn compute_overhead_per_mille_is_non_negative(
2319                baseline in 0..10_000u64,
2320                captured in 0..10_000u64,
2321            ) {
2322                let result = super::super::compute_overhead_per_mille(baseline, captured);
2323                // u32 is always non-negative, but verify we never panic
2324                let _ = result;
2325            }
2326
2327            #[test]
2328            fn builder_produces_contiguous_sequences(
2329                drafts in prop::collection::vec(arb_simple_draft(), 0..10),
2330            ) {
2331                let mut builder = ReplayTraceBuilder::new("trace-prop");
2332                for d in drafts {
2333                    builder.push(d);
2334                }
2335                let bundle = builder.build().expect("build should succeed");
2336                for (idx, event) in bundle.events.iter().enumerate() {
2337                    assert_eq!(
2338                        event.seq,
2339                        (idx + 1) as u64,
2340                        "sequence should be 1-based contiguous"
2341                    );
2342                }
2343            }
2344
2345            #[test]
2346            fn builder_is_deterministic_regardless_of_push_order(
2347                drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2348            ) {
2349                let mut builder1 = ReplayTraceBuilder::new("trace-det");
2350                for d in &drafts {
2351                    builder1.push(d.clone());
2352                }
2353                let bundle1 = builder1.build().expect("build1");
2354
2355                let mut reversed = drafts;
2356                reversed.reverse();
2357                let mut builder2 = ReplayTraceBuilder::new("trace-det");
2358                for d in &reversed {
2359                    builder2.push(d.clone());
2360                }
2361                let bundle2 = builder2.build().expect("build2");
2362
2363                assert_eq!(
2364                    bundle1, bundle2,
2365                    "canonical ordering should be same regardless of push order"
2366                );
2367            }
2368
2369            #[test]
2370            fn identical_bundles_have_no_divergence(
2371                drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2372            ) {
2373                let mut builder = ReplayTraceBuilder::new("trace-id");
2374                for d in &drafts {
2375                    builder.push(d.clone());
2376                }
2377                let bundle = builder.build().expect("build");
2378                let divergence = first_divergence(&bundle, &bundle)
2379                    .expect("comparison should succeed");
2380                assert!(
2381                    divergence.is_none(),
2382                    "identical bundles should have no divergence"
2383                );
2384            }
2385
2386            #[test]
2387            fn json_roundtrip_preserves_bundle(
2388                drafts in prop::collection::vec(arb_simple_draft(), 0..6),
2389            ) {
2390                let mut builder = ReplayTraceBuilder::new("trace-rt");
2391                for d in drafts {
2392                    builder.push(d);
2393                }
2394                let bundle = builder.build().expect("build");
2395                let json = bundle.encode_json().expect("encode");
2396                let decoded = ReplayTraceBundle::decode_json(&json).expect("decode");
2397                assert_eq!(bundle, decoded, "JSON roundtrip should preserve bundle");
2398            }
2399
2400            #[test]
2401            fn capture_gate_disabled_config_always_rejects(
2402                baseline in 1..10_000u64,
2403                captured in 1..10_000u64,
2404                trace_bytes in 0..10_000u64,
2405                max_overhead in 0..1_000u32,
2406                max_bytes in 0..10_000u64,
2407            ) {
2408                let budget = ReplayCaptureBudget {
2409                    capture_enabled: false,
2410                    max_overhead_per_mille: max_overhead,
2411                    max_trace_bytes: max_bytes,
2412                };
2413                let observation = ReplayCaptureObservation {
2414                    baseline_micros: baseline,
2415                    captured_micros: captured,
2416                    trace_bytes,
2417                };
2418                let report = evaluate_replay_capture_gate(budget, observation);
2419                assert!(
2420                    !report.capture_allowed,
2421                    "disabled config should always reject"
2422                );
2423                assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
2424            }
2425
2426            #[test]
2427            fn capture_gate_is_deterministic(
2428                baseline in 0..5_000u64,
2429                captured in 0..5_000u64,
2430                trace_bytes in 0..5_000u64,
2431                enabled in any::<bool>(),
2432                max_overhead in 0..500u32,
2433                max_bytes in 0..5_000u64,
2434            ) {
2435                let budget = ReplayCaptureBudget {
2436                    capture_enabled: enabled,
2437                    max_overhead_per_mille: max_overhead,
2438                    max_trace_bytes: max_bytes,
2439                };
2440                let observation = ReplayCaptureObservation {
2441                    baseline_micros: baseline,
2442                    captured_micros: captured,
2443                    trace_bytes,
2444                };
2445                let r1 = evaluate_replay_capture_gate(budget, observation);
2446                let r2 = evaluate_replay_capture_gate(budget, observation);
2447                assert_eq!(r1, r2, "capture gate must be deterministic");
2448            }
2449
2450            #[test]
2451            fn event_kind_canonical_rank_all_distinct(
2452                a_idx in 0..7usize,
2453                b_idx in 0..7usize,
2454            ) {
2455                let kinds = [
2456                    ReplayEventKind::Scheduled,
2457                    ReplayEventKind::QueueAccepted,
2458                    ReplayEventKind::PolicyDecision,
2459                    ReplayEventKind::Cancelled,
2460                    ReplayEventKind::Retried,
2461                    ReplayEventKind::Completed,
2462                    ReplayEventKind::Failed,
2463                ];
2464                if a_idx != b_idx {
2465                    assert_ne!(
2466                        kinds[a_idx].canonical_rank(),
2467                        kinds[b_idx].canonical_rank(),
2468                        "distinct kinds should have distinct ranks"
2469                    );
2470                }
2471            }
2472
2473            #[test]
2474            fn builder_events_sorted_by_logical_clock(
2475                clocks in prop::collection::vec(0..50u64, 1..10),
2476            ) {
2477                let mut builder = ReplayTraceBuilder::new("trace-clock");
2478                for (i, clock) in clocks.iter().enumerate() {
2479                    builder.push(ReplayEventDraft::new(
2480                        *clock,
2481                        format!("ext.{i}"),
2482                        format!("req-{i}"),
2483                        ReplayEventKind::Scheduled,
2484                    ));
2485                }
2486                let bundle = builder.build().expect("build");
2487                for pair in bundle.events.windows(2) {
2488                    assert!(
2489                        pair[0].logical_clock <= pair[1].logical_clock,
2490                        "events should be sorted by logical clock: {} > {}",
2491                        pair[0].logical_clock,
2492                        pair[1].logical_clock,
2493                    );
2494                }
2495            }
2496        }
2497    }
2498}