1use serde::{Deserialize, Serialize};
8use std::cmp::Ordering;
9use std::collections::{BTreeMap, BTreeSet};
10use thiserror::Error;
11
12pub const REPLAY_TRACE_SCHEMA_V1: &str = "pi.ext.replay.trace.v1";
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
17#[serde(rename_all = "snake_case")]
18pub enum ReplayEventKind {
19 Scheduled,
20 QueueAccepted,
21 PolicyDecision,
22 Cancelled,
23 Retried,
24 Completed,
25 Failed,
26}
27
28impl ReplayEventKind {
29 const fn canonical_rank(self) -> u8 {
30 match self {
31 Self::Scheduled => 0,
32 Self::QueueAccepted => 1,
33 Self::PolicyDecision => 2,
34 Self::Cancelled => 3,
35 Self::Retried => 4,
36 Self::Completed => 5,
37 Self::Failed => 6,
38 }
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ReplayTraceEvent {
46 pub seq: u64,
47 pub logical_clock: u64,
48 pub extension_id: String,
49 pub request_id: String,
50 pub kind: ReplayEventKind,
51 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
52 pub attributes: BTreeMap<String, String>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ReplayEventDraft {
58 pub logical_clock: u64,
59 pub extension_id: String,
60 pub request_id: String,
61 pub kind: ReplayEventKind,
62 pub attributes: BTreeMap<String, String>,
63}
64
65impl ReplayEventDraft {
66 #[must_use]
67 pub fn new(
68 logical_clock: u64,
69 extension_id: impl Into<String>,
70 request_id: impl Into<String>,
71 kind: ReplayEventKind,
72 ) -> Self {
73 Self {
74 logical_clock,
75 extension_id: extension_id.into(),
76 request_id: request_id.into(),
77 kind,
78 attributes: BTreeMap::new(),
79 }
80 }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct ReplayTraceBundle {
87 pub schema: String,
88 pub trace_id: String,
89 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
90 pub metadata: BTreeMap<String, String>,
91 pub events: Vec<ReplayTraceEvent>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
96#[serde(rename_all = "camelCase")]
97pub struct ReplayDivergence {
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub seq: Option<u64>,
100 pub reason: ReplayDivergenceReason,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ReplayDivergenceReason {
107 SchemaMismatch {
108 expected: String,
109 observed: String,
110 },
111 TraceIdMismatch {
112 expected: String,
113 observed: String,
114 },
115 EventCountMismatch {
116 expected: u64,
117 observed: u64,
118 },
119 EventFieldMismatch {
120 field: String,
121 expected: String,
122 observed: String,
123 },
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ReplayCaptureBudget {
130 pub capture_enabled: bool,
132 pub max_overhead_per_mille: u32,
134 pub max_trace_bytes: u64,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
140#[serde(rename_all = "camelCase")]
141pub struct ReplayCaptureObservation {
142 pub baseline_micros: u64,
144 pub captured_micros: u64,
146 pub trace_bytes: u64,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
152#[serde(rename_all = "snake_case")]
153pub enum ReplayCaptureGateReason {
154 Enabled,
155 DisabledByConfig,
156 DisabledByOverheadBudget,
157 DisabledByTraceBudget,
158 DisabledByInvalidBaseline,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ReplayCaptureGateReport {
165 pub capture_allowed: bool,
166 pub reason: ReplayCaptureGateReason,
167 pub observed_overhead_per_mille: u32,
168 pub max_overhead_per_mille: u32,
169 pub observed_trace_bytes: u64,
170 pub max_trace_bytes: u64,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
175#[serde(rename_all = "snake_case")]
176pub enum ReplayRootCauseHint {
177 TraceSchemaMismatch,
178 TraceIdMismatch,
179 EventCountDrift,
180 EventPayloadDrift,
181 LogicalClockDrift,
182 PolicyGateDisabled,
183 OverheadBudgetExceeded,
184 TraceBudgetExceeded,
185 InvalidBaselineTelemetry,
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190#[serde(rename_all = "camelCase")]
191pub struct ReplayDiagnosticSnapshot {
192 pub trace_id: String,
193 pub schema: String,
194 pub event_count: u64,
195 pub capture_gate: ReplayCaptureGateReport,
196 #[serde(skip_serializing_if = "Option::is_none")]
197 pub divergence: Option<ReplayDivergence>,
198 pub root_cause_hints: Vec<ReplayRootCauseHint>,
199}
200
201#[must_use]
203pub fn evaluate_replay_capture_gate(
204 budget: ReplayCaptureBudget,
205 observation: ReplayCaptureObservation,
206) -> ReplayCaptureGateReport {
207 if !budget.capture_enabled {
208 return ReplayCaptureGateReport {
209 capture_allowed: false,
210 reason: ReplayCaptureGateReason::DisabledByConfig,
211 observed_overhead_per_mille: 0,
212 max_overhead_per_mille: budget.max_overhead_per_mille,
213 observed_trace_bytes: observation.trace_bytes,
214 max_trace_bytes: budget.max_trace_bytes,
215 };
216 }
217
218 let observed_overhead_per_mille =
219 compute_overhead_per_mille(observation.baseline_micros, observation.captured_micros);
220
221 if observed_overhead_per_mille == u32::MAX {
222 return ReplayCaptureGateReport {
223 capture_allowed: false,
224 reason: ReplayCaptureGateReason::DisabledByInvalidBaseline,
225 observed_overhead_per_mille,
226 max_overhead_per_mille: budget.max_overhead_per_mille,
227 observed_trace_bytes: observation.trace_bytes,
228 max_trace_bytes: budget.max_trace_bytes,
229 };
230 }
231
232 if observed_overhead_per_mille > budget.max_overhead_per_mille {
233 return ReplayCaptureGateReport {
234 capture_allowed: false,
235 reason: ReplayCaptureGateReason::DisabledByOverheadBudget,
236 observed_overhead_per_mille,
237 max_overhead_per_mille: budget.max_overhead_per_mille,
238 observed_trace_bytes: observation.trace_bytes,
239 max_trace_bytes: budget.max_trace_bytes,
240 };
241 }
242
243 if observation.trace_bytes > budget.max_trace_bytes {
244 return ReplayCaptureGateReport {
245 capture_allowed: false,
246 reason: ReplayCaptureGateReason::DisabledByTraceBudget,
247 observed_overhead_per_mille,
248 max_overhead_per_mille: budget.max_overhead_per_mille,
249 observed_trace_bytes: observation.trace_bytes,
250 max_trace_bytes: budget.max_trace_bytes,
251 };
252 }
253
254 ReplayCaptureGateReport {
255 capture_allowed: true,
256 reason: ReplayCaptureGateReason::Enabled,
257 observed_overhead_per_mille,
258 max_overhead_per_mille: budget.max_overhead_per_mille,
259 observed_trace_bytes: observation.trace_bytes,
260 max_trace_bytes: budget.max_trace_bytes,
261 }
262}
263
264pub fn build_replay_diagnostic_snapshot(
270 bundle: &ReplayTraceBundle,
271 capture_gate: ReplayCaptureGateReport,
272 divergence: Option<&ReplayDivergence>,
273) -> Result<ReplayDiagnosticSnapshot, ReplayTraceValidationError> {
274 bundle.validate()?;
275
276 let event_count = u64::try_from(bundle.events.len())
277 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
278 let root_cause_hints = derive_root_cause_hints(capture_gate.reason, divergence);
279
280 Ok(ReplayDiagnosticSnapshot {
281 trace_id: bundle.trace_id.clone(),
282 schema: bundle.schema.clone(),
283 event_count,
284 capture_gate,
285 divergence: divergence.cloned(),
286 root_cause_hints,
287 })
288}
289
290fn compute_overhead_per_mille(baseline_micros: u64, captured_micros: u64) -> u32 {
291 if captured_micros <= baseline_micros {
292 return 0;
293 }
294 if baseline_micros == 0 {
295 return u32::MAX;
296 }
297
298 let overhead = u128::from(captured_micros - baseline_micros);
299 let baseline = u128::from(baseline_micros);
300 let scaled = overhead.saturating_mul(1_000);
301 let rounded_up = scaled
302 .saturating_add(baseline - 1)
303 .checked_div(baseline)
304 .unwrap_or(u128::MAX);
305 u32::try_from(rounded_up).unwrap_or(u32::MAX)
306}
307
308fn derive_root_cause_hints(
309 gate_reason: ReplayCaptureGateReason,
310 divergence: Option<&ReplayDivergence>,
311) -> Vec<ReplayRootCauseHint> {
312 let mut hints = BTreeSet::new();
313
314 match gate_reason {
315 ReplayCaptureGateReason::Enabled => {}
316 ReplayCaptureGateReason::DisabledByConfig => {
317 hints.insert(ReplayRootCauseHint::PolicyGateDisabled);
318 }
319 ReplayCaptureGateReason::DisabledByOverheadBudget => {
320 hints.insert(ReplayRootCauseHint::OverheadBudgetExceeded);
321 }
322 ReplayCaptureGateReason::DisabledByTraceBudget => {
323 hints.insert(ReplayRootCauseHint::TraceBudgetExceeded);
324 }
325 ReplayCaptureGateReason::DisabledByInvalidBaseline => {
326 hints.insert(ReplayRootCauseHint::InvalidBaselineTelemetry);
327 }
328 }
329
330 if let Some(divergence) = divergence {
331 match &divergence.reason {
332 ReplayDivergenceReason::SchemaMismatch { .. } => {
333 hints.insert(ReplayRootCauseHint::TraceSchemaMismatch);
334 }
335 ReplayDivergenceReason::TraceIdMismatch { .. } => {
336 hints.insert(ReplayRootCauseHint::TraceIdMismatch);
337 }
338 ReplayDivergenceReason::EventCountMismatch { .. } => {
339 hints.insert(ReplayRootCauseHint::EventCountDrift);
340 }
341 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
342 if field == "logical_clock" {
343 hints.insert(ReplayRootCauseHint::LogicalClockDrift);
344 } else {
345 hints.insert(ReplayRootCauseHint::EventPayloadDrift);
346 }
347 }
348 }
349 }
350
351 hints.into_iter().collect()
352}
353
354impl ReplayTraceBundle {
355 pub fn encode_json(&self) -> Result<String, serde_json::Error> {
361 serde_json::to_string(self)
362 }
363
364 pub fn decode_json(input: &str) -> Result<Self, ReplayTraceCodecError> {
370 let bundle: Self = serde_json::from_str(input)?;
371 bundle.validate()?;
372 Ok(bundle)
373 }
374
375 pub fn validate(&self) -> Result<(), ReplayTraceValidationError> {
382 if self.schema != REPLAY_TRACE_SCHEMA_V1 {
383 return Err(ReplayTraceValidationError::UnknownSchema(
384 self.schema.clone(),
385 ));
386 }
387
388 if self.trace_id.trim().is_empty() {
389 return Err(ReplayTraceValidationError::EmptyTraceId);
390 }
391
392 for (idx, event) in self.events.iter().enumerate() {
393 let seq_index = idx
394 .checked_add(1)
395 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
396 let expected_seq =
397 u64::try_from(seq_index).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
398 if event.seq != expected_seq {
399 return Err(ReplayTraceValidationError::NonContiguousSequence {
400 expected: expected_seq,
401 observed: event.seq,
402 });
403 }
404
405 if event.extension_id.trim().is_empty() {
406 return Err(ReplayTraceValidationError::MissingExtensionId { seq: event.seq });
407 }
408 if event.request_id.trim().is_empty() {
409 return Err(ReplayTraceValidationError::MissingRequestId { seq: event.seq });
410 }
411 }
412
413 self.validate_retry_ordering()
414 }
415
416 fn validate_retry_ordering(&self) -> Result<(), ReplayTraceValidationError> {
417 let mut pending_cancel: BTreeSet<(String, String)> = BTreeSet::new();
418 for event in &self.events {
419 let key = (event.extension_id.clone(), event.request_id.clone());
420 match event.kind {
421 ReplayEventKind::Cancelled => {
422 if !pending_cancel.insert(key) {
423 return Err(ReplayTraceValidationError::DuplicateCancelWithoutRetry {
424 seq: event.seq,
425 extension_id: event.extension_id.clone(),
426 request_id: event.request_id.clone(),
427 });
428 }
429 }
430 ReplayEventKind::Retried => {
431 if !pending_cancel.remove(&key) {
432 return Err(ReplayTraceValidationError::RetryWithoutCancel {
433 seq: event.seq,
434 extension_id: event.extension_id.clone(),
435 request_id: event.request_id.clone(),
436 });
437 }
438 }
439 ReplayEventKind::Completed | ReplayEventKind::Failed => {
440 pending_cancel.remove(&key);
441 }
442 ReplayEventKind::Scheduled
443 | ReplayEventKind::QueueAccepted
444 | ReplayEventKind::PolicyDecision => {}
445 }
446 }
447 Ok(())
448 }
449}
450
451pub fn first_divergence(
459 expected: &ReplayTraceBundle,
460 observed: &ReplayTraceBundle,
461) -> Result<Option<ReplayDivergence>, ReplayTraceValidationError> {
462 expected.validate()?;
463 observed.validate()?;
464
465 if expected.schema != observed.schema {
466 return Ok(Some(ReplayDivergence {
467 seq: None,
468 reason: ReplayDivergenceReason::SchemaMismatch {
469 expected: expected.schema.clone(),
470 observed: observed.schema.clone(),
471 },
472 }));
473 }
474
475 if expected.trace_id != observed.trace_id {
476 return Ok(Some(ReplayDivergence {
477 seq: None,
478 reason: ReplayDivergenceReason::TraceIdMismatch {
479 expected: expected.trace_id.clone(),
480 observed: observed.trace_id.clone(),
481 },
482 }));
483 }
484
485 let max_shared = expected.events.len().min(observed.events.len());
486 for idx in 0..max_shared {
487 let left = &expected.events[idx];
488 let right = &observed.events[idx];
489 if left.logical_clock != right.logical_clock {
490 return Ok(Some(field_mismatch(
491 left.seq,
492 "logical_clock",
493 left.logical_clock.to_string(),
494 right.logical_clock.to_string(),
495 )));
496 }
497 if left.extension_id != right.extension_id {
498 return Ok(Some(field_mismatch(
499 left.seq,
500 "extension_id",
501 left.extension_id.clone(),
502 right.extension_id.clone(),
503 )));
504 }
505 if left.request_id != right.request_id {
506 return Ok(Some(field_mismatch(
507 left.seq,
508 "request_id",
509 left.request_id.clone(),
510 right.request_id.clone(),
511 )));
512 }
513 if left.kind != right.kind {
514 return Ok(Some(field_mismatch(
515 left.seq,
516 "kind",
517 format!("{:?}", left.kind),
518 format!("{:?}", right.kind),
519 )));
520 }
521 if left.attributes != right.attributes {
522 return Ok(Some(field_mismatch(
523 left.seq,
524 "attributes",
525 format!("{:?}", left.attributes),
526 format!("{:?}", right.attributes),
527 )));
528 }
529 }
530
531 if expected.events.len() != observed.events.len() {
532 let next_seq = max_shared
533 .checked_add(1)
534 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
535 let seq = u64::try_from(next_seq).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
536 return Ok(Some(ReplayDivergence {
537 seq: Some(seq),
538 reason: ReplayDivergenceReason::EventCountMismatch {
539 expected: u64::try_from(expected.events.len())
540 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
541 observed: u64::try_from(observed.events.len())
542 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
543 },
544 }));
545 }
546
547 Ok(None)
548}
549
550fn field_mismatch(seq: u64, field: &str, expected: String, observed: String) -> ReplayDivergence {
551 ReplayDivergence {
552 seq: Some(seq),
553 reason: ReplayDivergenceReason::EventFieldMismatch {
554 field: field.to_string(),
555 expected,
556 observed,
557 },
558 }
559}
560
561#[derive(Debug, Clone, Default)]
563pub struct ReplayTraceBuilder {
564 trace_id: String,
565 metadata: BTreeMap<String, String>,
566 drafts: Vec<ReplayEventDraft>,
567}
568
569impl ReplayTraceBuilder {
570 #[must_use]
571 pub fn new(trace_id: impl Into<String>) -> Self {
572 Self {
573 trace_id: trace_id.into(),
574 metadata: BTreeMap::new(),
575 drafts: Vec::new(),
576 }
577 }
578
579 pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
580 self.metadata.insert(key.into(), value.into());
581 }
582
583 pub fn push(&mut self, draft: ReplayEventDraft) {
584 self.drafts.push(draft);
585 }
586
587 pub fn build(self) -> Result<ReplayTraceBundle, ReplayTraceValidationError> {
593 let mut indexed = self
594 .drafts
595 .into_iter()
596 .enumerate()
597 .map(|(insertion_index, draft)| IndexedDraft {
598 insertion_index,
599 draft,
600 })
601 .collect::<Vec<_>>();
602 indexed.sort_by(compare_indexed_drafts);
603
604 let events = indexed
605 .into_iter()
606 .enumerate()
607 .map(|(idx, entry)| {
608 let seq_index = idx
609 .checked_add(1)
610 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
611 let seq = u64::try_from(seq_index)
612 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
613 Ok(ReplayTraceEvent {
614 seq,
615 logical_clock: entry.draft.logical_clock,
616 extension_id: entry.draft.extension_id,
617 request_id: entry.draft.request_id,
618 kind: entry.draft.kind,
619 attributes: entry.draft.attributes,
620 })
621 })
622 .collect::<Result<Vec<_>, ReplayTraceValidationError>>()?;
623
624 let bundle = ReplayTraceBundle {
625 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
626 trace_id: self.trace_id,
627 metadata: self.metadata,
628 events,
629 };
630 bundle.validate()?;
631 Ok(bundle)
632 }
633}
634
635#[derive(Debug, Clone)]
636struct IndexedDraft {
637 insertion_index: usize,
638 draft: ReplayEventDraft,
639}
640
641fn compare_indexed_drafts(left: &IndexedDraft, right: &IndexedDraft) -> Ordering {
642 left.draft
643 .logical_clock
644 .cmp(&right.draft.logical_clock)
645 .then_with(|| left.draft.extension_id.cmp(&right.draft.extension_id))
646 .then_with(|| left.draft.request_id.cmp(&right.draft.request_id))
647 .then_with(|| {
648 left.draft
649 .kind
650 .canonical_rank()
651 .cmp(&right.draft.kind.canonical_rank())
652 })
653 .then_with(|| left.insertion_index.cmp(&right.insertion_index))
654}
655
656#[derive(Debug, Clone, PartialEq, Eq, Error)]
658pub enum ReplayTraceValidationError {
659 #[error("unknown replay trace schema: {0}")]
660 UnknownSchema(String),
661 #[error("trace id must not be empty")]
662 EmptyTraceId,
663 #[error("replay bundle contains too many events to index")]
664 TooManyEvents,
665 #[error("non-contiguous sequence: expected {expected}, observed {observed}")]
666 NonContiguousSequence { expected: u64, observed: u64 },
667 #[error("event seq {seq} missing extension id")]
668 MissingExtensionId { seq: u64 },
669 #[error("event seq {seq} missing request id")]
670 MissingRequestId { seq: u64 },
671 #[error("retry without prior cancel at seq {seq} for {extension_id}/{request_id}")]
672 RetryWithoutCancel {
673 seq: u64,
674 extension_id: String,
675 request_id: String,
676 },
677 #[error("duplicate cancel without retry at seq {seq} for {extension_id}/{request_id}")]
678 DuplicateCancelWithoutRetry {
679 seq: u64,
680 extension_id: String,
681 request_id: String,
682 },
683}
684
685#[derive(Debug, Error)]
687pub enum ReplayTraceCodecError {
688 #[error("failed to parse replay trace JSON: {0}")]
689 Deserialize(#[from] serde_json::Error),
690 #[error("invalid replay trace bundle: {0}")]
691 Validation(#[from] ReplayTraceValidationError),
692}
693
694#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
696#[serde(rename_all = "camelCase")]
697pub struct ReplayLaneConfig {
698 pub budget: ReplayCaptureBudget,
700 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
702 pub lane_metadata: BTreeMap<String, String>,
703}
704
705impl ReplayLaneConfig {
706 #[must_use]
707 pub const fn new(budget: ReplayCaptureBudget) -> Self {
708 Self {
709 budget,
710 lane_metadata: BTreeMap::new(),
711 }
712 }
713
714 pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
716 self.lane_metadata.insert(key.into(), value.into());
717 }
718}
719
720#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
722#[serde(rename_all = "camelCase")]
723pub struct ReplayLaneResult {
724 pub bundle: ReplayTraceBundle,
726 pub gate_report: ReplayCaptureGateReport,
728 pub diagnostic: ReplayDiagnosticSnapshot,
730}
731
732#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
734#[serde(rename_all = "camelCase")]
735pub struct ReplayComparisonResult {
736 pub reference_trace_id: String,
738 pub observed_trace_id: String,
740 #[serde(skip_serializing_if = "Option::is_none")]
742 pub divergence: Option<ReplayDivergence>,
743 pub root_cause_hints: Vec<ReplayRootCauseHint>,
745}
746
747#[derive(Debug)]
754pub struct ReplayRecorder {
755 config: ReplayLaneConfig,
756 builder: ReplayTraceBuilder,
757 logical_clock: u64,
758 event_count: u64,
759}
760
761impl ReplayRecorder {
762 #[must_use]
764 pub fn new(trace_id: impl Into<String>, config: ReplayLaneConfig) -> Self {
765 let mut builder = ReplayTraceBuilder::new(trace_id);
766 for (key, value) in &config.lane_metadata {
767 builder.insert_metadata(key.clone(), value.clone());
768 }
769 Self {
770 config,
771 builder,
772 logical_clock: 0,
773 event_count: 0,
774 }
775 }
776
777 #[must_use]
779 pub const fn logical_clock(&self) -> u64 {
780 self.logical_clock
781 }
782
783 #[must_use]
785 pub const fn event_count(&self) -> u64 {
786 self.event_count
787 }
788
789 pub const fn tick(&mut self) -> u64 {
791 self.logical_clock = self.logical_clock.saturating_add(1);
792 self.logical_clock
793 }
794
795 pub fn record(
797 &mut self,
798 extension_id: impl Into<String>,
799 request_id: impl Into<String>,
800 kind: ReplayEventKind,
801 attributes: BTreeMap<String, String>,
802 ) {
803 let mut draft = ReplayEventDraft::new(self.logical_clock, extension_id, request_id, kind);
804 draft.attributes = attributes;
805 self.builder.push(draft);
806 self.event_count = self.event_count.saturating_add(1);
807 }
808
809 pub fn record_scheduled(
811 &mut self,
812 extension_id: impl Into<String>,
813 request_id: impl Into<String>,
814 attributes: BTreeMap<String, String>,
815 ) {
816 self.record(
817 extension_id,
818 request_id,
819 ReplayEventKind::Scheduled,
820 attributes,
821 );
822 }
823
824 pub fn record_queue_accepted(
826 &mut self,
827 extension_id: impl Into<String>,
828 request_id: impl Into<String>,
829 attributes: BTreeMap<String, String>,
830 ) {
831 self.record(
832 extension_id,
833 request_id,
834 ReplayEventKind::QueueAccepted,
835 attributes,
836 );
837 }
838
839 pub fn record_policy_decision(
841 &mut self,
842 extension_id: impl Into<String>,
843 request_id: impl Into<String>,
844 attributes: BTreeMap<String, String>,
845 ) {
846 self.record(
847 extension_id,
848 request_id,
849 ReplayEventKind::PolicyDecision,
850 attributes,
851 );
852 }
853
854 pub fn record_cancelled(
856 &mut self,
857 extension_id: impl Into<String>,
858 request_id: impl Into<String>,
859 attributes: BTreeMap<String, String>,
860 ) {
861 self.record(
862 extension_id,
863 request_id,
864 ReplayEventKind::Cancelled,
865 attributes,
866 );
867 }
868
869 pub fn record_retried(
871 &mut self,
872 extension_id: impl Into<String>,
873 request_id: impl Into<String>,
874 attributes: BTreeMap<String, String>,
875 ) {
876 self.record(
877 extension_id,
878 request_id,
879 ReplayEventKind::Retried,
880 attributes,
881 );
882 }
883
884 pub fn record_completed(
886 &mut self,
887 extension_id: impl Into<String>,
888 request_id: impl Into<String>,
889 attributes: BTreeMap<String, String>,
890 ) {
891 self.record(
892 extension_id,
893 request_id,
894 ReplayEventKind::Completed,
895 attributes,
896 );
897 }
898
899 pub fn record_failed(
901 &mut self,
902 extension_id: impl Into<String>,
903 request_id: impl Into<String>,
904 attributes: BTreeMap<String, String>,
905 ) {
906 self.record(
907 extension_id,
908 request_id,
909 ReplayEventKind::Failed,
910 attributes,
911 );
912 }
913
914 pub fn finish(
923 self,
924 observation: ReplayCaptureObservation,
925 ) -> Result<ReplayLaneResult, ReplayTraceValidationError> {
926 let bundle = self.builder.build()?;
927 let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
928 let diagnostic = build_replay_diagnostic_snapshot(&bundle, gate_report, None)?;
929
930 Ok(ReplayLaneResult {
931 bundle,
932 gate_report,
933 diagnostic,
934 })
935 }
936
937 pub fn finish_and_compare(
946 self,
947 observation: ReplayCaptureObservation,
948 reference: &ReplayTraceBundle,
949 ) -> Result<(ReplayLaneResult, ReplayComparisonResult), ReplayTraceValidationError> {
950 let bundle = self.builder.build()?;
951 let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
952 let divergence_opt = first_divergence(reference, &bundle)?;
953 let diagnostic =
954 build_replay_diagnostic_snapshot(&bundle, gate_report, divergence_opt.as_ref())?;
955
956 let comparison = ReplayComparisonResult {
957 reference_trace_id: reference.trace_id.clone(),
958 observed_trace_id: bundle.trace_id.clone(),
959 divergence: divergence_opt,
960 root_cause_hints: diagnostic.root_cause_hints.clone(),
961 };
962
963 let result = ReplayLaneResult {
964 bundle,
965 gate_report,
966 diagnostic,
967 };
968
969 Ok((result, comparison))
970 }
971}
972
973pub fn compare_replay_bundles(
979 reference: &ReplayTraceBundle,
980 observed: &ReplayTraceBundle,
981 gate_report: ReplayCaptureGateReport,
982) -> Result<(ReplayDiagnosticSnapshot, ReplayComparisonResult), ReplayTraceValidationError> {
983 let divergence_opt = first_divergence(reference, observed)?;
984 let diagnostic =
985 build_replay_diagnostic_snapshot(observed, gate_report, divergence_opt.as_ref())?;
986
987 let comparison = ReplayComparisonResult {
988 reference_trace_id: reference.trace_id.clone(),
989 observed_trace_id: observed.trace_id.clone(),
990 divergence: divergence_opt,
991 root_cause_hints: diagnostic.root_cause_hints.clone(),
992 };
993
994 Ok((diagnostic, comparison))
995}
996
997#[cfg(test)]
998mod tests {
999 use super::{
1000 REPLAY_TRACE_SCHEMA_V1, ReplayCaptureBudget, ReplayCaptureGateReason,
1001 ReplayCaptureObservation, ReplayDivergenceReason, ReplayEventDraft, ReplayEventKind,
1002 ReplayRootCauseHint, ReplayTraceBuilder, ReplayTraceBundle, ReplayTraceCodecError,
1003 ReplayTraceValidationError, build_replay_diagnostic_snapshot, evaluate_replay_capture_gate,
1004 first_divergence,
1005 };
1006 use std::collections::BTreeMap;
1007
1008 fn draft(
1009 logical_clock: u64,
1010 extension_id: &str,
1011 request_id: &str,
1012 kind: ReplayEventKind,
1013 ) -> ReplayEventDraft {
1014 ReplayEventDraft::new(
1015 logical_clock,
1016 extension_id.to_string(),
1017 request_id.to_string(),
1018 kind,
1019 )
1020 }
1021
1022 const fn standard_capture_budget() -> ReplayCaptureBudget {
1023 ReplayCaptureBudget {
1024 capture_enabled: true,
1025 max_overhead_per_mille: 120,
1026 max_trace_bytes: 8_192,
1027 }
1028 }
1029
1030 fn standard_bundle() -> ReplayTraceBundle {
1031 let mut builder = ReplayTraceBuilder::new("trace-diagnostic");
1032 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1033 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1034 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Completed));
1035 builder.build().expect("bundle should build")
1036 }
1037
1038 #[test]
1039 fn deterministic_build_is_order_stable_across_input_permutations() {
1040 let mut left = ReplayTraceBuilder::new("trace-a");
1041 left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1042 left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1043 left.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1044 left.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1045
1046 let mut right = ReplayTraceBuilder::new("trace-a");
1047 right.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1048 right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1049 right.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1050 right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1051
1052 let left_bundle = left.build().expect("left bundle should build");
1053 let right_bundle = right.build().expect("right bundle should build");
1054
1055 assert_eq!(left_bundle, right_bundle);
1056 assert_eq!(left_bundle.events[0].kind, ReplayEventKind::Cancelled);
1057 assert_eq!(left_bundle.events[1].kind, ReplayEventKind::Retried);
1058 }
1059
1060 #[test]
1061 fn json_roundtrip_preserves_replay_bundle() {
1062 let mut builder = ReplayTraceBuilder::new("trace-roundtrip");
1063 builder.insert_metadata("lane", "shadow");
1064 let mut event = draft(20, "ext.gamma", "req-7", ReplayEventKind::PolicyDecision);
1065 event
1066 .attributes
1067 .insert("decision".to_string(), "fast_lane".to_string());
1068 builder.push(draft(19, "ext.gamma", "req-7", ReplayEventKind::Scheduled));
1069 builder.push(event);
1070 builder.push(draft(21, "ext.gamma", "req-7", ReplayEventKind::Completed));
1071
1072 let bundle = builder.build().expect("bundle should build");
1073 let encoded = bundle.encode_json().expect("bundle should encode");
1074 let decoded = ReplayTraceBundle::decode_json(&encoded).expect("bundle should decode");
1075 assert_eq!(decoded, bundle);
1076 }
1077
1078 #[test]
1079 fn decode_rejects_retry_without_prior_cancel() {
1080 let bundle = ReplayTraceBundle {
1081 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1082 trace_id: "trace-invalid".to_string(),
1083 metadata: BTreeMap::new(),
1084 events: vec![super::ReplayTraceEvent {
1085 seq: 1,
1086 logical_clock: 1,
1087 extension_id: "ext.a".to_string(),
1088 request_id: "req".to_string(),
1089 kind: ReplayEventKind::Retried,
1090 attributes: BTreeMap::new(),
1091 }],
1092 };
1093 let encoded = bundle
1094 .encode_json()
1095 .expect("invalid bundle should serialize");
1096
1097 let error = ReplayTraceBundle::decode_json(&encoded).expect_err("retry without cancel");
1098 match error {
1099 ReplayTraceCodecError::Validation(ReplayTraceValidationError::RetryWithoutCancel {
1100 ..
1101 }) => {}
1102 other => panic!("unexpected error: {other:?}"),
1103 }
1104 }
1105
1106 #[test]
1107 fn decode_rejects_non_contiguous_sequence() {
1108 let bundle = ReplayTraceBundle {
1109 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1110 trace_id: "trace-seq".to_string(),
1111 metadata: BTreeMap::new(),
1112 events: vec![
1113 super::ReplayTraceEvent {
1114 seq: 1,
1115 logical_clock: 1,
1116 extension_id: "ext.a".to_string(),
1117 request_id: "req-1".to_string(),
1118 kind: ReplayEventKind::Scheduled,
1119 attributes: BTreeMap::new(),
1120 },
1121 super::ReplayTraceEvent {
1122 seq: 3,
1123 logical_clock: 2,
1124 extension_id: "ext.a".to_string(),
1125 request_id: "req-1".to_string(),
1126 kind: ReplayEventKind::Completed,
1127 attributes: BTreeMap::new(),
1128 },
1129 ],
1130 };
1131 let encoded = bundle
1132 .encode_json()
1133 .expect("invalid bundle should serialize");
1134
1135 let error = ReplayTraceBundle::decode_json(&encoded).expect_err("non-contiguous seq");
1136 match error {
1137 ReplayTraceCodecError::Validation(
1138 ReplayTraceValidationError::NonContiguousSequence { expected, observed },
1139 ) => {
1140 assert_eq!(expected, 2);
1141 assert_eq!(observed, 3);
1142 }
1143 other => panic!("unexpected error: {other:?}"),
1144 }
1145 }
1146
1147 #[test]
1148 fn divergence_reports_kind_mismatch_with_seq() {
1149 let mut expected_builder = ReplayTraceBuilder::new("trace-divergence");
1150 expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1151 expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1152 let expected = expected_builder.build().expect("expected bundle");
1153
1154 let mut observed_builder = ReplayTraceBuilder::new("trace-divergence");
1155 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1156 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Failed));
1157 let observed = observed_builder.build().expect("observed bundle");
1158
1159 let divergence = first_divergence(&expected, &observed)
1160 .expect("comparison should succeed")
1161 .expect("divergence expected");
1162 assert_eq!(divergence.seq, Some(2));
1163 match divergence.reason {
1164 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1165 assert_eq!(field, "kind");
1166 }
1167 other => panic!("unexpected divergence reason: {other:?}"),
1168 }
1169 }
1170
1171 #[test]
1172 fn divergence_reports_event_count_mismatch() {
1173 let mut expected_builder = ReplayTraceBuilder::new("trace-length");
1174 expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1175 expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1176 let expected = expected_builder.build().expect("expected bundle");
1177
1178 let mut observed_builder = ReplayTraceBuilder::new("trace-length");
1179 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1180 let observed = observed_builder.build().expect("observed bundle");
1181
1182 let divergence = first_divergence(&expected, &observed)
1183 .expect("comparison should succeed")
1184 .expect("divergence expected");
1185 assert_eq!(divergence.seq, Some(2));
1186 match divergence.reason {
1187 ReplayDivergenceReason::EventCountMismatch { expected, observed } => {
1188 assert_eq!(expected, 2);
1189 assert_eq!(observed, 1);
1190 }
1191 other => panic!("unexpected divergence reason: {other:?}"),
1192 }
1193 }
1194
1195 #[test]
1196 fn divergence_returns_none_for_identical_bundles() {
1197 let mut builder = ReplayTraceBuilder::new("trace-identical");
1198 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1199 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1200 let bundle = builder.build().expect("bundle");
1201
1202 let divergence =
1203 first_divergence(&bundle, &bundle).expect("comparison should validate identical");
1204 assert!(divergence.is_none());
1205 }
1206
1207 #[test]
1208 fn capture_gate_disables_when_config_switch_is_off() {
1209 let mut budget = standard_capture_budget();
1210 budget.capture_enabled = false;
1211 let observation = ReplayCaptureObservation {
1212 baseline_micros: 1_000,
1213 captured_micros: 1_010,
1214 trace_bytes: 128,
1215 };
1216
1217 let report = evaluate_replay_capture_gate(budget, observation);
1218 assert!(!report.capture_allowed);
1219 assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
1220 }
1221
1222 #[test]
1223 fn capture_gate_disables_when_overhead_exceeds_budget() {
1224 let budget = standard_capture_budget();
1225 let observation = ReplayCaptureObservation {
1226 baseline_micros: 1_000,
1227 captured_micros: 1_140,
1228 trace_bytes: 512,
1229 };
1230
1231 let report = evaluate_replay_capture_gate(budget, observation);
1232 assert!(!report.capture_allowed);
1233 assert_eq!(
1234 report.reason,
1235 ReplayCaptureGateReason::DisabledByOverheadBudget
1236 );
1237 assert_eq!(report.observed_overhead_per_mille, 140);
1238 }
1239
1240 #[test]
1241 fn capture_gate_disables_when_trace_budget_exceeded() {
1242 let budget = standard_capture_budget();
1243 let observation = ReplayCaptureObservation {
1244 baseline_micros: 1_000,
1245 captured_micros: 1_050,
1246 trace_bytes: 9_000,
1247 };
1248
1249 let report = evaluate_replay_capture_gate(budget, observation);
1250 assert!(!report.capture_allowed);
1251 assert_eq!(
1252 report.reason,
1253 ReplayCaptureGateReason::DisabledByTraceBudget
1254 );
1255 assert_eq!(report.observed_overhead_per_mille, 50);
1256 }
1257
1258 #[test]
1259 fn capture_gate_fails_closed_on_invalid_baseline() {
1260 let budget = standard_capture_budget();
1261 let observation = ReplayCaptureObservation {
1262 baseline_micros: 0,
1263 captured_micros: 1,
1264 trace_bytes: 64,
1265 };
1266
1267 let report = evaluate_replay_capture_gate(budget, observation);
1268 assert!(!report.capture_allowed);
1269 assert_eq!(
1270 report.reason,
1271 ReplayCaptureGateReason::DisabledByInvalidBaseline
1272 );
1273 assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1274 }
1275
1276 #[test]
1277 fn capture_gate_reports_deterministic_within_budget_enablement() {
1278 let budget = standard_capture_budget();
1279 let observation = ReplayCaptureObservation {
1280 baseline_micros: 1_000,
1281 captured_micros: 1_080,
1282 trace_bytes: 4_096,
1283 };
1284
1285 let first = evaluate_replay_capture_gate(budget, observation);
1286 let second = evaluate_replay_capture_gate(budget, observation);
1287
1288 assert_eq!(first, second);
1289 assert!(first.capture_allowed);
1290 assert_eq!(first.reason, ReplayCaptureGateReason::Enabled);
1291 assert_eq!(first.observed_overhead_per_mille, 80);
1292 }
1293
1294 #[test]
1295 fn diagnostic_snapshot_emits_hint_codes_for_gate_and_payload_drift() {
1296 let expected = standard_bundle();
1297
1298 let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
1299 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1300 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1301 observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
1302 let observed = observed_builder.build().expect("observed bundle");
1303
1304 let divergence = first_divergence(&expected, &observed)
1305 .expect("comparison should succeed")
1306 .expect("divergence expected");
1307 let capture_gate = evaluate_replay_capture_gate(
1308 standard_capture_budget(),
1309 ReplayCaptureObservation {
1310 baseline_micros: 1_000,
1311 captured_micros: 1_150,
1312 trace_bytes: 64,
1313 },
1314 );
1315
1316 let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1317 .expect("snapshot should build");
1318 assert_eq!(snapshot.event_count, 3);
1319 assert_eq!(
1320 snapshot.root_cause_hints,
1321 vec![
1322 ReplayRootCauseHint::EventPayloadDrift,
1323 ReplayRootCauseHint::OverheadBudgetExceeded,
1324 ]
1325 );
1326 }
1327
1328 #[test]
1329 fn diagnostic_snapshot_maps_logical_clock_drift_hint() {
1330 let expected = standard_bundle();
1331 let mut observed = expected.clone();
1332 observed.events[1].logical_clock = 77;
1333
1334 let divergence = first_divergence(&expected, &observed)
1335 .expect("comparison should succeed")
1336 .expect("divergence expected");
1337 let capture_gate = evaluate_replay_capture_gate(
1338 standard_capture_budget(),
1339 ReplayCaptureObservation {
1340 baseline_micros: 1_000,
1341 captured_micros: 1_010,
1342 trace_bytes: 64,
1343 },
1344 );
1345
1346 let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1347 .expect("snapshot should build");
1348 assert_eq!(
1349 snapshot.root_cause_hints,
1350 vec![ReplayRootCauseHint::LogicalClockDrift]
1351 );
1352 }
1353
1354 #[test]
1355 fn diagnostic_snapshot_is_deterministic_for_same_inputs() {
1356 let bundle = standard_bundle();
1357 let capture_gate = evaluate_replay_capture_gate(
1358 standard_capture_budget(),
1359 ReplayCaptureObservation {
1360 baseline_micros: 1_000,
1361 captured_micros: 1_020,
1362 trace_bytes: 256,
1363 },
1364 );
1365
1366 let first =
1367 build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("first snapshot");
1368 let second =
1369 build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("second snapshot");
1370 assert_eq!(first, second);
1371 }
1372
1373 #[test]
1374 fn diagnostic_snapshot_rejects_invalid_bundle() {
1375 let invalid = ReplayTraceBundle {
1376 schema: "invalid.schema".to_string(),
1377 trace_id: "trace-bad".to_string(),
1378 metadata: BTreeMap::new(),
1379 events: Vec::new(),
1380 };
1381 let capture_gate = evaluate_replay_capture_gate(
1382 standard_capture_budget(),
1383 ReplayCaptureObservation {
1384 baseline_micros: 1_000,
1385 captured_micros: 1_000,
1386 trace_bytes: 0,
1387 },
1388 );
1389
1390 let error = build_replay_diagnostic_snapshot(&invalid, capture_gate, None)
1391 .expect_err("invalid schema should fail");
1392 assert!(matches!(
1393 error,
1394 ReplayTraceValidationError::UnknownSchema(_)
1395 ));
1396 }
1397
1398 #[test]
1401 fn builder_empty_events_produces_valid_bundle() {
1402 let builder = ReplayTraceBuilder::new("trace-empty");
1403 let bundle = builder.build().expect("empty bundle should be valid");
1404 assert!(bundle.events.is_empty());
1405 assert_eq!(bundle.schema, REPLAY_TRACE_SCHEMA_V1);
1406 assert_eq!(bundle.trace_id, "trace-empty");
1407 }
1408
1409 #[test]
1410 fn builder_metadata_preserved_in_output() {
1411 let mut builder = ReplayTraceBuilder::new("trace-meta");
1412 builder.insert_metadata("env", "production");
1413 builder.insert_metadata("version", "1.2.3");
1414 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1415 let bundle = builder.build().expect("bundle with metadata");
1416 assert_eq!(
1417 bundle.metadata.get("env").map(String::as_str),
1418 Some("production")
1419 );
1420 assert_eq!(
1421 bundle.metadata.get("version").map(String::as_str),
1422 Some("1.2.3")
1423 );
1424 }
1425
1426 #[test]
1427 fn builder_metadata_overwrite_works() {
1428 let mut builder = ReplayTraceBuilder::new("trace-meta-ow");
1429 builder.insert_metadata("key", "old");
1430 builder.insert_metadata("key", "new");
1431 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1432 let bundle = builder.build().expect("metadata overwrite");
1433 assert_eq!(bundle.metadata.get("key").map(String::as_str), Some("new"));
1434 }
1435
1436 #[test]
1437 fn draft_attributes_carried_through_build() {
1438 let mut d = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1439 d.attributes
1440 .insert("policy".to_string(), "fast_lane".to_string());
1441 d.attributes
1442 .insert("latency_ms".to_string(), "12".to_string());
1443 let mut builder = ReplayTraceBuilder::new("trace-attrs");
1444 builder.push(d);
1445 let bundle = builder.build().expect("bundle with attrs");
1446 assert_eq!(bundle.events[0].attributes.len(), 2);
1447 assert_eq!(
1448 bundle.events[0]
1449 .attributes
1450 .get("policy")
1451 .map(String::as_str),
1452 Some("fast_lane")
1453 );
1454 }
1455
1456 #[test]
1459 fn validate_rejects_empty_trace_id() {
1460 let mut builder = ReplayTraceBuilder::new("");
1461 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1462 let err = builder.build().expect_err("empty trace_id should fail");
1463 assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1464 }
1465
1466 #[test]
1467 fn validate_rejects_whitespace_only_trace_id() {
1468 let mut builder = ReplayTraceBuilder::new(" ");
1469 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1470 let err = builder
1471 .build()
1472 .expect_err("whitespace trace_id should fail");
1473 assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1474 }
1475
1476 #[test]
1477 fn validate_rejects_empty_extension_id() {
1478 let mut builder = ReplayTraceBuilder::new("trace-val");
1479 builder.push(draft(1, "", "req-1", ReplayEventKind::Scheduled));
1480 let err = builder.build().expect_err("empty extension_id should fail");
1481 assert!(matches!(
1482 err,
1483 ReplayTraceValidationError::MissingExtensionId { .. }
1484 ));
1485 }
1486
1487 #[test]
1488 fn validate_rejects_empty_request_id() {
1489 let mut builder = ReplayTraceBuilder::new("trace-val");
1490 builder.push(draft(1, "ext.a", "", ReplayEventKind::Scheduled));
1491 let err = builder.build().expect_err("empty request_id should fail");
1492 assert!(matches!(
1493 err,
1494 ReplayTraceValidationError::MissingRequestId { .. }
1495 ));
1496 }
1497
1498 #[test]
1499 fn validate_rejects_duplicate_cancel_without_retry() {
1500 let mut builder = ReplayTraceBuilder::new("trace-dup-cancel");
1501 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1502 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Cancelled));
1503 let err = builder.build().expect_err("duplicate cancel should fail");
1504 assert!(matches!(
1505 err,
1506 ReplayTraceValidationError::DuplicateCancelWithoutRetry { .. }
1507 ));
1508 }
1509
1510 #[test]
1511 fn cancel_then_retry_then_cancel_is_valid() {
1512 let mut builder = ReplayTraceBuilder::new("trace-cancel-retry-cancel");
1513 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1514 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Retried));
1515 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1516 let bundle = builder
1517 .build()
1518 .expect("cancel-retry-cancel should be valid");
1519 assert_eq!(bundle.events.len(), 3);
1520 }
1521
1522 #[test]
1523 fn completed_clears_pending_cancel() {
1524 let mut builder = ReplayTraceBuilder::new("trace-complete-clear");
1525 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1526 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1527 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1528 let bundle = builder
1529 .build()
1530 .expect("completed should clear cancel state");
1531 assert_eq!(bundle.events.len(), 3);
1532 }
1533
1534 #[test]
1537 fn event_kind_canonical_rank_is_monotonic() {
1538 let kinds = [
1539 ReplayEventKind::Scheduled,
1540 ReplayEventKind::QueueAccepted,
1541 ReplayEventKind::PolicyDecision,
1542 ReplayEventKind::Cancelled,
1543 ReplayEventKind::Retried,
1544 ReplayEventKind::Completed,
1545 ReplayEventKind::Failed,
1546 ];
1547 for pair in kinds.windows(2) {
1548 assert!(
1549 pair[0].canonical_rank() < pair[1].canonical_rank(),
1550 "{:?} should have lower rank than {:?}",
1551 pair[0],
1552 pair[1]
1553 );
1554 }
1555 }
1556
1557 #[test]
1558 fn event_kind_serde_roundtrip() {
1559 let kinds = [
1560 ReplayEventKind::Scheduled,
1561 ReplayEventKind::QueueAccepted,
1562 ReplayEventKind::PolicyDecision,
1563 ReplayEventKind::Cancelled,
1564 ReplayEventKind::Retried,
1565 ReplayEventKind::Completed,
1566 ReplayEventKind::Failed,
1567 ];
1568 for kind in kinds {
1569 let json = serde_json::to_string(&kind).expect("serialize kind");
1570 let roundtrip: ReplayEventKind = serde_json::from_str(&json).expect("deserialize kind");
1571 assert_eq!(kind, roundtrip);
1572 }
1573 }
1574
1575 #[test]
1578 fn divergence_detects_schema_mismatch() {
1579 let mut observed = standard_bundle();
1580 observed.schema = "pi.ext.replay.trace.v2".to_string();
1581
1582 let d = super::ReplayDivergence {
1585 seq: None,
1586 reason: ReplayDivergenceReason::SchemaMismatch {
1587 expected: REPLAY_TRACE_SCHEMA_V1.to_string(),
1588 observed: "pi.ext.replay.trace.v2".to_string(),
1589 },
1590 };
1591 let json = serde_json::to_string(&d).expect("serialize divergence");
1592 let roundtrip: super::ReplayDivergence =
1593 serde_json::from_str(&json).expect("deserialize divergence");
1594 assert_eq!(d, roundtrip);
1595 }
1596
1597 #[test]
1598 fn divergence_detects_attribute_mismatch() {
1599 let mut builder_a = ReplayTraceBuilder::new("trace-attrs-cmp");
1600 let mut d1 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1601 d1.attributes
1602 .insert("decision".to_string(), "fast".to_string());
1603 builder_a.push(d1);
1604 let expected = builder_a.build().expect("bundle a");
1605
1606 let mut builder_b = ReplayTraceBuilder::new("trace-attrs-cmp");
1607 let mut d2 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1608 d2.attributes
1609 .insert("decision".to_string(), "slow".to_string());
1610 builder_b.push(d2);
1611 let observed = builder_b.build().expect("bundle b");
1612
1613 let divergence = first_divergence(&expected, &observed)
1614 .expect("comparison should succeed")
1615 .expect("attribute mismatch expected");
1616 assert_eq!(divergence.seq, Some(1));
1617 match divergence.reason {
1618 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1619 assert_eq!(field, "attributes");
1620 }
1621 other => panic!("unexpected: {other:?}"),
1622 }
1623 }
1624
1625 #[test]
1628 fn capture_gate_zero_overhead_when_captured_equals_baseline() {
1629 let budget = standard_capture_budget();
1630 let observation = ReplayCaptureObservation {
1631 baseline_micros: 1_000,
1632 captured_micros: 1_000,
1633 trace_bytes: 100,
1634 };
1635 let report = evaluate_replay_capture_gate(budget, observation);
1636 assert!(report.capture_allowed);
1637 assert_eq!(report.observed_overhead_per_mille, 0);
1638 }
1639
1640 #[test]
1641 fn capture_gate_zero_overhead_when_captured_less_than_baseline() {
1642 let budget = standard_capture_budget();
1643 let observation = ReplayCaptureObservation {
1644 baseline_micros: 1_000,
1645 captured_micros: 900,
1646 trace_bytes: 100,
1647 };
1648 let report = evaluate_replay_capture_gate(budget, observation);
1649 assert!(report.capture_allowed);
1650 assert_eq!(report.observed_overhead_per_mille, 0);
1651 }
1652
1653 #[test]
1654 fn capture_gate_exact_boundary_at_max_overhead() {
1655 let budget = ReplayCaptureBudget {
1656 capture_enabled: true,
1657 max_overhead_per_mille: 100,
1658 max_trace_bytes: 10_000,
1659 };
1660 let observation = ReplayCaptureObservation {
1662 baseline_micros: 1_000,
1663 captured_micros: 1_100,
1664 trace_bytes: 100,
1665 };
1666 let report = evaluate_replay_capture_gate(budget, observation);
1667 assert!(report.capture_allowed);
1668 assert_eq!(report.observed_overhead_per_mille, 100);
1669 }
1670
1671 #[test]
1672 fn capture_gate_exact_boundary_at_max_trace_bytes() {
1673 let budget = ReplayCaptureBudget {
1674 capture_enabled: true,
1675 max_overhead_per_mille: 1_000,
1676 max_trace_bytes: 500,
1677 };
1678 let at_limit = ReplayCaptureObservation {
1680 baseline_micros: 1_000,
1681 captured_micros: 1_010,
1682 trace_bytes: 500,
1683 };
1684 let report = evaluate_replay_capture_gate(budget, at_limit);
1685 assert!(report.capture_allowed);
1686
1687 let over_limit = ReplayCaptureObservation {
1689 baseline_micros: 1_000,
1690 captured_micros: 1_010,
1691 trace_bytes: 501,
1692 };
1693 let report = evaluate_replay_capture_gate(budget, over_limit);
1694 assert!(!report.capture_allowed);
1695 assert_eq!(
1696 report.reason,
1697 ReplayCaptureGateReason::DisabledByTraceBudget
1698 );
1699 }
1700
1701 #[test]
1704 fn diagnostic_snapshot_maps_config_disabled_hint() {
1705 let bundle = standard_bundle();
1706 let budget = ReplayCaptureBudget {
1707 capture_enabled: false,
1708 max_overhead_per_mille: 100,
1709 max_trace_bytes: 1_000,
1710 };
1711 let gate = evaluate_replay_capture_gate(
1712 budget,
1713 ReplayCaptureObservation {
1714 baseline_micros: 100,
1715 captured_micros: 100,
1716 trace_bytes: 0,
1717 },
1718 );
1719 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1720 assert_eq!(
1721 snapshot.root_cause_hints,
1722 vec![ReplayRootCauseHint::PolicyGateDisabled]
1723 );
1724 }
1725
1726 #[test]
1727 fn diagnostic_snapshot_maps_trace_budget_hint() {
1728 let bundle = standard_bundle();
1729 let budget = ReplayCaptureBudget {
1730 capture_enabled: true,
1731 max_overhead_per_mille: 1_000,
1732 max_trace_bytes: 100,
1733 };
1734 let gate = evaluate_replay_capture_gate(
1735 budget,
1736 ReplayCaptureObservation {
1737 baseline_micros: 1_000,
1738 captured_micros: 1_010,
1739 trace_bytes: 200,
1740 },
1741 );
1742 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1743 assert_eq!(
1744 snapshot.root_cause_hints,
1745 vec![ReplayRootCauseHint::TraceBudgetExceeded]
1746 );
1747 }
1748
1749 #[test]
1750 fn diagnostic_snapshot_serde_roundtrip() {
1751 let bundle = standard_bundle();
1752 let gate = evaluate_replay_capture_gate(
1753 standard_capture_budget(),
1754 ReplayCaptureObservation {
1755 baseline_micros: 1_000,
1756 captured_micros: 1_010,
1757 trace_bytes: 64,
1758 },
1759 );
1760 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1761 let json = serde_json::to_string(&snapshot).expect("serialize");
1762 let roundtrip: super::ReplayDiagnosticSnapshot =
1763 serde_json::from_str(&json).expect("deserialize");
1764 assert_eq!(snapshot, roundtrip);
1765 }
1766
1767 #[test]
1770 fn overhead_per_mille_exact_computation() {
1771 assert_eq!(super::compute_overhead_per_mille(1_000, 1_050), 50);
1773 assert_eq!(super::compute_overhead_per_mille(1_000, 1_200), 200);
1775 assert_eq!(super::compute_overhead_per_mille(1_000, 1_000), 0);
1777 assert_eq!(super::compute_overhead_per_mille(1_000, 500), 0);
1779 }
1780
1781 #[test]
1782 fn overhead_per_mille_rounding_up() {
1783 assert_eq!(super::compute_overhead_per_mille(3, 4), 334);
1785 }
1786
1787 #[test]
1788 fn overhead_per_mille_zero_baseline_returns_max() {
1789 assert_eq!(super::compute_overhead_per_mille(0, 1), u32::MAX);
1790 assert_eq!(super::compute_overhead_per_mille(0, 0), 0);
1791 }
1792
1793 fn within_budget_observation() -> ReplayCaptureObservation {
1796 ReplayCaptureObservation {
1797 baseline_micros: 1_000,
1798 captured_micros: 1_050,
1799 trace_bytes: 256,
1800 }
1801 }
1802
1803 fn standard_lane_config() -> super::ReplayLaneConfig {
1804 super::ReplayLaneConfig::new(standard_capture_budget())
1805 }
1806
1807 #[test]
1808 fn recorder_empty_produces_valid_bundle() {
1809 let recorder = super::ReplayRecorder::new("trace-empty-rec", standard_lane_config());
1810 assert_eq!(recorder.event_count(), 0);
1811 assert_eq!(recorder.logical_clock(), 0);
1812
1813 let result = recorder
1814 .finish(within_budget_observation())
1815 .expect("finish");
1816 assert!(result.bundle.events.is_empty());
1817 assert!(result.gate_report.capture_allowed);
1818 assert_eq!(result.diagnostic.event_count, 0);
1819 }
1820
1821 #[test]
1822 fn recorder_captures_events_in_sequence() {
1823 let mut recorder = super::ReplayRecorder::new("trace-seq-rec", standard_lane_config());
1824 recorder.tick();
1825 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1826 recorder.tick();
1827 recorder.record_queue_accepted("ext.a", "req-1", BTreeMap::new());
1828 recorder.tick();
1829 recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
1830 recorder.tick();
1831 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1832
1833 assert_eq!(recorder.event_count(), 4);
1834 assert_eq!(recorder.logical_clock(), 4);
1835
1836 let result = recorder
1837 .finish(within_budget_observation())
1838 .expect("finish");
1839 assert_eq!(result.bundle.events.len(), 4);
1840 assert_eq!(result.bundle.events[0].kind, ReplayEventKind::Scheduled);
1841 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::QueueAccepted);
1842 assert_eq!(
1843 result.bundle.events[2].kind,
1844 ReplayEventKind::PolicyDecision
1845 );
1846 assert_eq!(result.bundle.events[3].kind, ReplayEventKind::Completed);
1847
1848 for (i, event) in result.bundle.events.iter().enumerate() {
1850 assert_eq!(event.seq, (i + 1) as u64);
1851 }
1852 }
1853
1854 #[test]
1855 fn recorder_attributes_flow_through() {
1856 let mut recorder = super::ReplayRecorder::new("trace-attrs-rec", standard_lane_config());
1857 recorder.tick();
1858 let mut attrs = BTreeMap::new();
1859 attrs.insert("lane".to_string(), "fast".to_string());
1860 attrs.insert("capability".to_string(), "tool".to_string());
1861 recorder.record_policy_decision("ext.a", "req-1", attrs);
1862
1863 let result = recorder
1864 .finish(within_budget_observation())
1865 .expect("finish");
1866 let event = &result.bundle.events[0];
1867 assert_eq!(
1868 event.attributes.get("lane").map(String::as_str),
1869 Some("fast")
1870 );
1871 assert_eq!(
1872 event.attributes.get("capability").map(String::as_str),
1873 Some("tool")
1874 );
1875 }
1876
1877 #[test]
1878 fn recorder_lane_metadata_propagated() {
1879 let mut config = standard_lane_config();
1880 config.insert_metadata("env", "staging");
1881 config.insert_metadata("worker", "w-3");
1882 let mut recorder = super::ReplayRecorder::new("trace-meta-rec", config);
1883 recorder.tick();
1884 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1885
1886 let result = recorder
1887 .finish(within_budget_observation())
1888 .expect("finish");
1889 assert_eq!(
1890 result.bundle.metadata.get("env").map(String::as_str),
1891 Some("staging")
1892 );
1893 assert_eq!(
1894 result.bundle.metadata.get("worker").map(String::as_str),
1895 Some("w-3")
1896 );
1897 }
1898
1899 #[test]
1900 fn recorder_cancel_retry_lifecycle() {
1901 let mut recorder = super::ReplayRecorder::new("trace-cancel-retry", standard_lane_config());
1902 recorder.tick();
1903 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1904 recorder.tick();
1905 recorder.record_cancelled("ext.a", "req-1", BTreeMap::new());
1906 recorder.tick();
1907 recorder.record_retried("ext.a", "req-1", BTreeMap::new());
1908 recorder.tick();
1909 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1910
1911 let result = recorder
1912 .finish(within_budget_observation())
1913 .expect("finish");
1914 assert_eq!(result.bundle.events.len(), 4);
1915 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Cancelled);
1916 assert_eq!(result.bundle.events[2].kind, ReplayEventKind::Retried);
1917 }
1918
1919 #[test]
1920 fn recorder_failed_event() {
1921 let mut recorder = super::ReplayRecorder::new("trace-fail", standard_lane_config());
1922 recorder.tick();
1923 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1924 recorder.tick();
1925 let mut attrs = BTreeMap::new();
1926 attrs.insert("error".to_string(), "timeout".to_string());
1927 recorder.record_failed("ext.a", "req-1", attrs);
1928
1929 let result = recorder
1930 .finish(within_budget_observation())
1931 .expect("finish");
1932 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Failed);
1933 assert_eq!(
1934 result.bundle.events[1]
1935 .attributes
1936 .get("error")
1937 .map(String::as_str),
1938 Some("timeout")
1939 );
1940 }
1941
1942 #[test]
1943 fn recorder_gate_report_reflects_budget() {
1944 let mut config = super::ReplayLaneConfig::new(ReplayCaptureBudget {
1945 capture_enabled: true,
1946 max_overhead_per_mille: 50,
1947 max_trace_bytes: 10_000,
1948 });
1949 config.insert_metadata("lane", "shadow");
1950 let mut recorder = super::ReplayRecorder::new("trace-gated", config);
1951 recorder.tick();
1952 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1953
1954 let result = recorder
1956 .finish(ReplayCaptureObservation {
1957 baseline_micros: 1_000,
1958 captured_micros: 1_100,
1959 trace_bytes: 64,
1960 })
1961 .expect("finish");
1962
1963 assert!(!result.gate_report.capture_allowed);
1964 assert_eq!(
1965 result.gate_report.reason,
1966 ReplayCaptureGateReason::DisabledByOverheadBudget
1967 );
1968 assert_eq!(result.bundle.events.len(), 1);
1970 }
1971
1972 #[test]
1973 fn recorder_diagnostic_snapshot_populated() {
1974 let mut recorder = super::ReplayRecorder::new("trace-diag", standard_lane_config());
1975 recorder.tick();
1976 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1977 recorder.tick();
1978 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1979
1980 let result = recorder
1981 .finish(within_budget_observation())
1982 .expect("finish");
1983 assert_eq!(result.diagnostic.trace_id, "trace-diag");
1984 assert_eq!(result.diagnostic.schema, REPLAY_TRACE_SCHEMA_V1);
1985 assert_eq!(result.diagnostic.event_count, 2);
1986 assert!(result.diagnostic.divergence.is_none());
1987 assert!(result.diagnostic.root_cause_hints.is_empty());
1988 }
1989
1990 #[test]
1991 fn recorder_finish_and_compare_identical() {
1992 let mut rec1 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
1993 rec1.tick();
1994 rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
1995 rec1.tick();
1996 rec1.record_completed("ext.a", "req-1", BTreeMap::new());
1997 let reference = rec1
1998 .finish(within_budget_observation())
1999 .expect("ref")
2000 .bundle;
2001
2002 let mut rec2 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2003 rec2.tick();
2004 rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2005 rec2.tick();
2006 rec2.record_completed("ext.a", "req-1", BTreeMap::new());
2007
2008 let (result, comparison) = rec2
2009 .finish_and_compare(within_budget_observation(), &reference)
2010 .expect("compare");
2011 assert!(comparison.divergence.is_none());
2012 assert!(comparison.root_cause_hints.is_empty());
2013 assert_eq!(comparison.reference_trace_id, "trace-cmp");
2014 assert_eq!(comparison.observed_trace_id, "trace-cmp");
2015 assert!(result.diagnostic.divergence.is_none());
2016 }
2017
2018 #[test]
2019 fn recorder_finish_and_compare_detects_divergence() {
2020 let mut rec1 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2021 rec1.tick();
2022 rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2023 rec1.tick();
2024 rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2025 let reference = rec1
2026 .finish(within_budget_observation())
2027 .expect("ref")
2028 .bundle;
2029
2030 let mut rec2 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2031 rec2.tick();
2032 rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2033 rec2.tick();
2034 rec2.record_failed("ext.a", "req-1", BTreeMap::new());
2035
2036 let (result, comparison) = rec2
2037 .finish_and_compare(within_budget_observation(), &reference)
2038 .expect("compare");
2039 assert!(comparison.divergence.is_some());
2040 let div = comparison.divergence.as_ref().unwrap();
2041 assert_eq!(div.seq, Some(2));
2042 assert!(matches!(
2043 div.reason,
2044 ReplayDivergenceReason::EventFieldMismatch { ref field, .. } if field == "kind"
2045 ));
2046 assert!(result.diagnostic.divergence.is_some());
2047 assert!(!result.diagnostic.root_cause_hints.is_empty());
2048 }
2049
2050 #[test]
2051 fn recorder_multi_extension_interleaving() {
2052 let mut recorder = super::ReplayRecorder::new("trace-multi", standard_lane_config());
2053 recorder.tick();
2054 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2055 recorder.record_scheduled("ext.b", "req-2", BTreeMap::new());
2056 recorder.tick();
2057 recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
2058 recorder.record_policy_decision("ext.b", "req-2", BTreeMap::new());
2059 recorder.tick();
2060 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2061 recorder.record_completed("ext.b", "req-2", BTreeMap::new());
2062
2063 let result = recorder
2064 .finish(within_budget_observation())
2065 .expect("finish");
2066 assert_eq!(result.bundle.events.len(), 6);
2067
2068 let clock_1_events: Vec<_> = result
2070 .bundle
2071 .events
2072 .iter()
2073 .filter(|e| e.logical_clock == 1)
2074 .collect();
2075 assert_eq!(clock_1_events.len(), 2);
2076 assert_eq!(clock_1_events[0].extension_id, "ext.a");
2077 assert_eq!(clock_1_events[1].extension_id, "ext.b");
2078 }
2079
2080 #[test]
2083 fn compare_replay_bundles_no_divergence() {
2084 let bundle = standard_bundle();
2085 let gate =
2086 evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2087
2088 let (diagnostic, comparison) =
2089 super::compare_replay_bundles(&bundle, &bundle, gate).expect("compare");
2090 assert!(comparison.divergence.is_none());
2091 assert!(comparison.root_cause_hints.is_empty());
2092 assert!(diagnostic.divergence.is_none());
2093 }
2094
2095 #[test]
2096 fn compare_replay_bundles_with_divergence() {
2097 let reference = standard_bundle();
2098 let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
2099 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
2100 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
2101 observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
2102 let observed = observed_builder.build().expect("observed bundle");
2103
2104 let gate =
2105 evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2106
2107 let (diagnostic, comparison) =
2108 super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2109 assert!(comparison.divergence.is_some());
2110 assert!(!comparison.root_cause_hints.is_empty());
2111 assert!(diagnostic.divergence.is_some());
2112 }
2113
2114 #[test]
2117 fn lane_config_serde_roundtrip() {
2118 let mut config = super::ReplayLaneConfig::new(standard_capture_budget());
2119 config.insert_metadata("env", "prod");
2120
2121 let json = serde_json::to_string(&config).expect("serialize");
2122 let roundtrip: super::ReplayLaneConfig = serde_json::from_str(&json).expect("deserialize");
2123 assert_eq!(config, roundtrip);
2124 }
2125
2126 #[test]
2127 fn lane_config_empty_metadata_omitted_in_json() {
2128 let config = super::ReplayLaneConfig::new(standard_capture_budget());
2129 let json = serde_json::to_string(&config).expect("serialize");
2130 assert!(!json.contains("laneMetadata"));
2131 }
2132
2133 #[test]
2134 fn lane_result_serde_roundtrip() {
2135 let mut recorder = super::ReplayRecorder::new("trace-serde", standard_lane_config());
2136 recorder.tick();
2137 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2138 recorder.tick();
2139 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2140
2141 let result = recorder
2142 .finish(within_budget_observation())
2143 .expect("finish");
2144 let json = serde_json::to_string(&result).expect("serialize");
2145 let roundtrip: super::ReplayLaneResult = serde_json::from_str(&json).expect("deserialize");
2146 assert_eq!(result, roundtrip);
2147 }
2148
2149 #[test]
2150 fn comparison_result_serde_roundtrip() {
2151 let comparison = super::ReplayComparisonResult {
2152 reference_trace_id: "ref-1".to_string(),
2153 observed_trace_id: "obs-1".to_string(),
2154 divergence: None,
2155 root_cause_hints: vec![],
2156 };
2157 let json = serde_json::to_string(&comparison).expect("serialize");
2158 let roundtrip: super::ReplayComparisonResult =
2159 serde_json::from_str(&json).expect("deserialize");
2160 assert_eq!(comparison, roundtrip);
2161 }
2162
2163 #[test]
2164 fn recorder_tick_is_monotonic() {
2165 let mut recorder = super::ReplayRecorder::new("trace-tick", standard_lane_config());
2166 let t1 = recorder.tick();
2167 let t2 = recorder.tick();
2168 let t3 = recorder.tick();
2169 assert_eq!(t1, 1);
2170 assert_eq!(t2, 2);
2171 assert_eq!(t3, 3);
2172 }
2173
2174 mod proptest_extension_replay {
2177 use super::*;
2178 use proptest::prelude::*;
2179
2180 fn arb_event_kind() -> impl Strategy<Value = ReplayEventKind> {
2181 prop::sample::select(vec![
2182 ReplayEventKind::Scheduled,
2183 ReplayEventKind::QueueAccepted,
2184 ReplayEventKind::PolicyDecision,
2185 ReplayEventKind::Completed,
2186 ReplayEventKind::Failed,
2187 ])
2188 }
2189
2190 fn arb_ext_id() -> impl Strategy<Value = String> {
2191 "ext\\.[a-z]{1,5}"
2192 }
2193
2194 fn arb_req_id() -> impl Strategy<Value = String> {
2195 "req-[0-9]{1,4}"
2196 }
2197
2198 fn arb_simple_draft() -> impl Strategy<Value = ReplayEventDraft> {
2199 (1..100u64, arb_ext_id(), arb_req_id(), arb_event_kind())
2200 .prop_map(|(clock, ext, req, kind)| ReplayEventDraft::new(clock, ext, req, kind))
2201 }
2202
2203 proptest! {
2204 #[test]
2205 fn compute_overhead_per_mille_zero_when_captured_leq_baseline(
2206 baseline in 1..10_000u64,
2207 captured in 0..10_000u64,
2208 ) {
2209 if captured <= baseline {
2210 let result = super::super::compute_overhead_per_mille(baseline, captured);
2211 assert_eq!(
2212 result, 0,
2213 "captured <= baseline should yield 0 overhead"
2214 );
2215 }
2216 }
2217
2218 #[test]
2219 fn compute_overhead_per_mille_zero_baseline_returns_max(
2220 captured in 1..10_000u64,
2221 ) {
2222 let result = super::super::compute_overhead_per_mille(0, captured);
2223 assert_eq!(
2224 result, u32::MAX,
2225 "zero baseline with positive captured should be MAX"
2226 );
2227 }
2228
2229 #[test]
2230 fn compute_overhead_per_mille_is_non_negative(
2231 baseline in 0..10_000u64,
2232 captured in 0..10_000u64,
2233 ) {
2234 let result = super::super::compute_overhead_per_mille(baseline, captured);
2235 let _ = result;
2237 }
2238
2239 #[test]
2240 fn builder_produces_contiguous_sequences(
2241 drafts in prop::collection::vec(arb_simple_draft(), 0..10),
2242 ) {
2243 let mut builder = ReplayTraceBuilder::new("trace-prop");
2244 for d in drafts {
2245 builder.push(d);
2246 }
2247 let bundle = builder.build().expect("build should succeed");
2248 for (idx, event) in bundle.events.iter().enumerate() {
2249 assert_eq!(
2250 event.seq,
2251 (idx + 1) as u64,
2252 "sequence should be 1-based contiguous"
2253 );
2254 }
2255 }
2256
2257 #[test]
2258 fn builder_is_deterministic_regardless_of_push_order(
2259 drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2260 ) {
2261 let mut builder1 = ReplayTraceBuilder::new("trace-det");
2262 for d in &drafts {
2263 builder1.push(d.clone());
2264 }
2265 let bundle1 = builder1.build().expect("build1");
2266
2267 let mut reversed = drafts;
2268 reversed.reverse();
2269 let mut builder2 = ReplayTraceBuilder::new("trace-det");
2270 for d in &reversed {
2271 builder2.push(d.clone());
2272 }
2273 let bundle2 = builder2.build().expect("build2");
2274
2275 assert_eq!(
2276 bundle1, bundle2,
2277 "canonical ordering should be same regardless of push order"
2278 );
2279 }
2280
2281 #[test]
2282 fn identical_bundles_have_no_divergence(
2283 drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2284 ) {
2285 let mut builder = ReplayTraceBuilder::new("trace-id");
2286 for d in &drafts {
2287 builder.push(d.clone());
2288 }
2289 let bundle = builder.build().expect("build");
2290 let divergence = first_divergence(&bundle, &bundle)
2291 .expect("comparison should succeed");
2292 assert!(
2293 divergence.is_none(),
2294 "identical bundles should have no divergence"
2295 );
2296 }
2297
2298 #[test]
2299 fn json_roundtrip_preserves_bundle(
2300 drafts in prop::collection::vec(arb_simple_draft(), 0..6),
2301 ) {
2302 let mut builder = ReplayTraceBuilder::new("trace-rt");
2303 for d in drafts {
2304 builder.push(d);
2305 }
2306 let bundle = builder.build().expect("build");
2307 let json = bundle.encode_json().expect("encode");
2308 let decoded = ReplayTraceBundle::decode_json(&json).expect("decode");
2309 assert_eq!(bundle, decoded, "JSON roundtrip should preserve bundle");
2310 }
2311
2312 #[test]
2313 fn capture_gate_disabled_config_always_rejects(
2314 baseline in 1..10_000u64,
2315 captured in 1..10_000u64,
2316 trace_bytes in 0..10_000u64,
2317 max_overhead in 0..1_000u32,
2318 max_bytes in 0..10_000u64,
2319 ) {
2320 let budget = ReplayCaptureBudget {
2321 capture_enabled: false,
2322 max_overhead_per_mille: max_overhead,
2323 max_trace_bytes: max_bytes,
2324 };
2325 let observation = ReplayCaptureObservation {
2326 baseline_micros: baseline,
2327 captured_micros: captured,
2328 trace_bytes,
2329 };
2330 let report = evaluate_replay_capture_gate(budget, observation);
2331 assert!(
2332 !report.capture_allowed,
2333 "disabled config should always reject"
2334 );
2335 assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
2336 }
2337
2338 #[test]
2339 fn capture_gate_is_deterministic(
2340 baseline in 0..5_000u64,
2341 captured in 0..5_000u64,
2342 trace_bytes in 0..5_000u64,
2343 enabled in any::<bool>(),
2344 max_overhead in 0..500u32,
2345 max_bytes in 0..5_000u64,
2346 ) {
2347 let budget = ReplayCaptureBudget {
2348 capture_enabled: enabled,
2349 max_overhead_per_mille: max_overhead,
2350 max_trace_bytes: max_bytes,
2351 };
2352 let observation = ReplayCaptureObservation {
2353 baseline_micros: baseline,
2354 captured_micros: captured,
2355 trace_bytes,
2356 };
2357 let r1 = evaluate_replay_capture_gate(budget, observation);
2358 let r2 = evaluate_replay_capture_gate(budget, observation);
2359 assert_eq!(r1, r2, "capture gate must be deterministic");
2360 }
2361
2362 #[test]
2363 fn event_kind_canonical_rank_all_distinct(
2364 a_idx in 0..7usize,
2365 b_idx in 0..7usize,
2366 ) {
2367 let kinds = [
2368 ReplayEventKind::Scheduled,
2369 ReplayEventKind::QueueAccepted,
2370 ReplayEventKind::PolicyDecision,
2371 ReplayEventKind::Cancelled,
2372 ReplayEventKind::Retried,
2373 ReplayEventKind::Completed,
2374 ReplayEventKind::Failed,
2375 ];
2376 if a_idx != b_idx {
2377 assert_ne!(
2378 kinds[a_idx].canonical_rank(),
2379 kinds[b_idx].canonical_rank(),
2380 "distinct kinds should have distinct ranks"
2381 );
2382 }
2383 }
2384
2385 #[test]
2386 fn builder_events_sorted_by_logical_clock(
2387 clocks in prop::collection::vec(0..50u64, 1..10),
2388 ) {
2389 let mut builder = ReplayTraceBuilder::new("trace-clock");
2390 for (i, clock) in clocks.iter().enumerate() {
2391 builder.push(ReplayEventDraft::new(
2392 *clock,
2393 format!("ext.{i}"),
2394 format!("req-{i}"),
2395 ReplayEventKind::Scheduled,
2396 ));
2397 }
2398 let bundle = builder.build().expect("build");
2399 for pair in bundle.events.windows(2) {
2400 assert!(
2401 pair[0].logical_clock <= pair[1].logical_clock,
2402 "events should be sorted by logical clock: {} > {}",
2403 pair[0].logical_clock,
2404 pair[1].logical_clock,
2405 );
2406 }
2407 }
2408 }
2409 }
2410}