1use serde::{Deserialize, Serialize};
8use std::cmp::Ordering;
9use std::collections::{BTreeMap, BTreeSet};
10use thiserror::Error;
11
12pub const REPLAY_TRACE_SCHEMA_V1: &str = "pi.ext.replay.trace.v1";
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
17#[serde(rename_all = "snake_case")]
18pub enum ReplayEventKind {
19 Scheduled,
20 QueueAccepted,
21 PolicyDecision,
22 Cancelled,
23 Retried,
24 Completed,
25 Failed,
26}
27
28impl ReplayEventKind {
29 const fn canonical_rank(self) -> u8 {
30 match self {
31 Self::Scheduled => 0,
32 Self::QueueAccepted => 1,
33 Self::PolicyDecision => 2,
34 Self::Cancelled => 3,
35 Self::Retried => 4,
36 Self::Completed => 5,
37 Self::Failed => 6,
38 }
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ReplayTraceEvent {
46 pub seq: u64,
47 pub logical_clock: u64,
48 pub extension_id: String,
49 pub request_id: String,
50 pub kind: ReplayEventKind,
51 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
52 pub attributes: BTreeMap<String, String>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ReplayEventDraft {
58 pub logical_clock: u64,
59 pub extension_id: String,
60 pub request_id: String,
61 pub kind: ReplayEventKind,
62 pub attributes: BTreeMap<String, String>,
63}
64
65impl ReplayEventDraft {
66 #[must_use]
67 pub fn new(
68 logical_clock: u64,
69 extension_id: impl Into<String>,
70 request_id: impl Into<String>,
71 kind: ReplayEventKind,
72 ) -> Self {
73 Self {
74 logical_clock,
75 extension_id: extension_id.into(),
76 request_id: request_id.into(),
77 kind,
78 attributes: BTreeMap::new(),
79 }
80 }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct ReplayTraceBundle {
87 pub schema: String,
88 pub trace_id: String,
89 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
90 pub metadata: BTreeMap<String, String>,
91 pub events: Vec<ReplayTraceEvent>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
96#[serde(rename_all = "camelCase")]
97pub struct ReplayDivergence {
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub seq: Option<u64>,
100 pub reason: ReplayDivergenceReason,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ReplayDivergenceReason {
107 SchemaMismatch {
108 expected: String,
109 observed: String,
110 },
111 TraceIdMismatch {
112 expected: String,
113 observed: String,
114 },
115 EventCountMismatch {
116 expected: u64,
117 observed: u64,
118 },
119 EventFieldMismatch {
120 field: String,
121 expected: String,
122 observed: String,
123 },
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ReplayCaptureBudget {
130 pub capture_enabled: bool,
132 pub max_overhead_per_mille: u32,
134 pub max_trace_bytes: u64,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
140#[serde(rename_all = "camelCase")]
141pub struct ReplayCaptureObservation {
142 pub baseline_micros: u64,
144 pub captured_micros: u64,
146 pub trace_bytes: u64,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
152#[serde(rename_all = "snake_case")]
153pub enum ReplayCaptureGateReason {
154 Enabled,
155 DisabledByConfig,
156 DisabledByOverheadBudget,
157 DisabledByTraceBudget,
158 DisabledByInvalidBaseline,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ReplayCaptureGateReport {
165 pub capture_allowed: bool,
166 pub reason: ReplayCaptureGateReason,
167 pub observed_overhead_per_mille: u32,
168 pub max_overhead_per_mille: u32,
169 pub observed_trace_bytes: u64,
170 pub max_trace_bytes: u64,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
175#[serde(rename_all = "snake_case")]
176pub enum ReplayRootCauseHint {
177 TraceSchemaMismatch,
178 TraceIdMismatch,
179 EventCountDrift,
180 EventPayloadDrift,
181 LogicalClockDrift,
182 PolicyGateDisabled,
183 OverheadBudgetExceeded,
184 TraceBudgetExceeded,
185 InvalidBaselineTelemetry,
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190#[serde(rename_all = "camelCase")]
191pub struct ReplayDiagnosticSnapshot {
192 pub trace_id: String,
193 pub schema: String,
194 pub event_count: u64,
195 pub capture_gate: ReplayCaptureGateReport,
196 #[serde(skip_serializing_if = "Option::is_none")]
197 pub divergence: Option<ReplayDivergence>,
198 pub root_cause_hints: Vec<ReplayRootCauseHint>,
199}
200
201#[must_use]
203pub fn evaluate_replay_capture_gate(
204 budget: ReplayCaptureBudget,
205 observation: ReplayCaptureObservation,
206) -> ReplayCaptureGateReport {
207 if !budget.capture_enabled {
208 return ReplayCaptureGateReport {
209 capture_allowed: false,
210 reason: ReplayCaptureGateReason::DisabledByConfig,
211 observed_overhead_per_mille: 0,
212 max_overhead_per_mille: budget.max_overhead_per_mille,
213 observed_trace_bytes: observation.trace_bytes,
214 max_trace_bytes: budget.max_trace_bytes,
215 };
216 }
217
218 let observed_overhead_per_mille =
219 compute_overhead_per_mille(observation.baseline_micros, observation.captured_micros);
220
221 if observed_overhead_per_mille == u32::MAX {
222 return ReplayCaptureGateReport {
223 capture_allowed: false,
224 reason: ReplayCaptureGateReason::DisabledByInvalidBaseline,
225 observed_overhead_per_mille,
226 max_overhead_per_mille: budget.max_overhead_per_mille,
227 observed_trace_bytes: observation.trace_bytes,
228 max_trace_bytes: budget.max_trace_bytes,
229 };
230 }
231
232 if observed_overhead_per_mille > budget.max_overhead_per_mille {
233 return ReplayCaptureGateReport {
234 capture_allowed: false,
235 reason: ReplayCaptureGateReason::DisabledByOverheadBudget,
236 observed_overhead_per_mille,
237 max_overhead_per_mille: budget.max_overhead_per_mille,
238 observed_trace_bytes: observation.trace_bytes,
239 max_trace_bytes: budget.max_trace_bytes,
240 };
241 }
242
243 if observation.trace_bytes > budget.max_trace_bytes {
244 return ReplayCaptureGateReport {
245 capture_allowed: false,
246 reason: ReplayCaptureGateReason::DisabledByTraceBudget,
247 observed_overhead_per_mille,
248 max_overhead_per_mille: budget.max_overhead_per_mille,
249 observed_trace_bytes: observation.trace_bytes,
250 max_trace_bytes: budget.max_trace_bytes,
251 };
252 }
253
254 ReplayCaptureGateReport {
255 capture_allowed: true,
256 reason: ReplayCaptureGateReason::Enabled,
257 observed_overhead_per_mille,
258 max_overhead_per_mille: budget.max_overhead_per_mille,
259 observed_trace_bytes: observation.trace_bytes,
260 max_trace_bytes: budget.max_trace_bytes,
261 }
262}
263
264pub fn build_replay_diagnostic_snapshot(
272 bundle: &ReplayTraceBundle,
273 capture_gate: ReplayCaptureGateReport,
274 divergence: Option<&ReplayDivergence>,
275) -> Result<ReplayDiagnosticSnapshot, ReplayTraceValidationError> {
276 if matches!(
277 divergence,
278 Some(ReplayDivergence {
279 reason: ReplayDivergenceReason::SchemaMismatch { .. },
280 ..
281 })
282 ) {
283 bundle.validate_structure()?;
284 } else {
285 bundle.validate()?;
286 }
287
288 let event_count = u64::try_from(bundle.events.len())
289 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
290 let root_cause_hints = derive_root_cause_hints(capture_gate.reason, divergence);
291
292 Ok(ReplayDiagnosticSnapshot {
293 trace_id: bundle.trace_id.clone(),
294 schema: bundle.schema.clone(),
295 event_count,
296 capture_gate,
297 divergence: divergence.cloned(),
298 root_cause_hints,
299 })
300}
301
302fn compute_overhead_per_mille(baseline_micros: u64, captured_micros: u64) -> u32 {
303 if baseline_micros == 0 {
304 return u32::MAX;
305 }
306 if captured_micros <= baseline_micros {
307 return 0;
308 }
309
310 let overhead = u128::from(captured_micros - baseline_micros);
311 let baseline = u128::from(baseline_micros);
312 let scaled = overhead.saturating_mul(1_000);
313 let rounded_up = scaled
314 .saturating_add(baseline - 1)
315 .checked_div(baseline)
316 .unwrap_or(u128::MAX);
317 u32::try_from(rounded_up).unwrap_or(u32::MAX)
318}
319
320fn derive_root_cause_hints(
321 gate_reason: ReplayCaptureGateReason,
322 divergence: Option<&ReplayDivergence>,
323) -> Vec<ReplayRootCauseHint> {
324 let mut hints = BTreeSet::new();
325
326 match gate_reason {
327 ReplayCaptureGateReason::Enabled => {}
328 ReplayCaptureGateReason::DisabledByConfig => {
329 hints.insert(ReplayRootCauseHint::PolicyGateDisabled);
330 }
331 ReplayCaptureGateReason::DisabledByOverheadBudget => {
332 hints.insert(ReplayRootCauseHint::OverheadBudgetExceeded);
333 }
334 ReplayCaptureGateReason::DisabledByTraceBudget => {
335 hints.insert(ReplayRootCauseHint::TraceBudgetExceeded);
336 }
337 ReplayCaptureGateReason::DisabledByInvalidBaseline => {
338 hints.insert(ReplayRootCauseHint::InvalidBaselineTelemetry);
339 }
340 }
341
342 if let Some(divergence) = divergence {
343 match &divergence.reason {
344 ReplayDivergenceReason::SchemaMismatch { .. } => {
345 hints.insert(ReplayRootCauseHint::TraceSchemaMismatch);
346 }
347 ReplayDivergenceReason::TraceIdMismatch { .. } => {
348 hints.insert(ReplayRootCauseHint::TraceIdMismatch);
349 }
350 ReplayDivergenceReason::EventCountMismatch { .. } => {
351 hints.insert(ReplayRootCauseHint::EventCountDrift);
352 }
353 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
354 if field == "logical_clock" {
355 hints.insert(ReplayRootCauseHint::LogicalClockDrift);
356 } else {
357 hints.insert(ReplayRootCauseHint::EventPayloadDrift);
358 }
359 }
360 }
361 }
362
363 hints.into_iter().collect()
364}
365
366impl ReplayTraceBundle {
367 pub fn encode_json(&self) -> Result<String, serde_json::Error> {
373 serde_json::to_string(self)
374 }
375
376 pub fn decode_json(input: &str) -> Result<Self, ReplayTraceCodecError> {
382 let bundle: Self = serde_json::from_str(input)?;
383 bundle.validate()?;
384 Ok(bundle)
385 }
386
387 pub fn validate(&self) -> Result<(), ReplayTraceValidationError> {
394 self.validate_schema()?;
395 self.validate_structure()
396 }
397
398 fn validate_schema(&self) -> Result<(), ReplayTraceValidationError> {
399 if self.schema != REPLAY_TRACE_SCHEMA_V1 {
400 return Err(ReplayTraceValidationError::UnknownSchema(
401 self.schema.clone(),
402 ));
403 }
404 Ok(())
405 }
406
407 fn validate_structure(&self) -> Result<(), ReplayTraceValidationError> {
408 if self.trace_id.trim().is_empty() {
409 return Err(ReplayTraceValidationError::EmptyTraceId);
410 }
411
412 for (idx, event) in self.events.iter().enumerate() {
413 let seq_index = idx
414 .checked_add(1)
415 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
416 let expected_seq =
417 u64::try_from(seq_index).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
418 if event.seq != expected_seq {
419 return Err(ReplayTraceValidationError::NonContiguousSequence {
420 expected: expected_seq,
421 observed: event.seq,
422 });
423 }
424
425 if event.extension_id.trim().is_empty() {
426 return Err(ReplayTraceValidationError::MissingExtensionId { seq: event.seq });
427 }
428 if event.request_id.trim().is_empty() {
429 return Err(ReplayTraceValidationError::MissingRequestId { seq: event.seq });
430 }
431 }
432
433 self.validate_retry_ordering()
434 }
435
436 fn validate_retry_ordering(&self) -> Result<(), ReplayTraceValidationError> {
437 let mut pending_cancel: BTreeSet<(String, String)> = BTreeSet::new();
438 for event in &self.events {
439 let key = (event.extension_id.clone(), event.request_id.clone());
440 match event.kind {
441 ReplayEventKind::Cancelled => {
442 if !pending_cancel.insert(key) {
443 return Err(ReplayTraceValidationError::DuplicateCancelWithoutRetry {
444 seq: event.seq,
445 extension_id: event.extension_id.clone(),
446 request_id: event.request_id.clone(),
447 });
448 }
449 }
450 ReplayEventKind::Retried => {
451 if !pending_cancel.remove(&key) {
452 return Err(ReplayTraceValidationError::RetryWithoutCancel {
453 seq: event.seq,
454 extension_id: event.extension_id.clone(),
455 request_id: event.request_id.clone(),
456 });
457 }
458 }
459 ReplayEventKind::Completed | ReplayEventKind::Failed => {
460 pending_cancel.remove(&key);
461 }
462 ReplayEventKind::Scheduled
463 | ReplayEventKind::QueueAccepted
464 | ReplayEventKind::PolicyDecision => {}
465 }
466 }
467 Ok(())
468 }
469}
470
471pub fn first_divergence(
481 expected: &ReplayTraceBundle,
482 observed: &ReplayTraceBundle,
483) -> Result<Option<ReplayDivergence>, ReplayTraceValidationError> {
484 expected.validate_structure()?;
485 observed.validate_structure()?;
486
487 if expected.schema != observed.schema {
488 return Ok(Some(ReplayDivergence {
489 seq: None,
490 reason: ReplayDivergenceReason::SchemaMismatch {
491 expected: expected.schema.clone(),
492 observed: observed.schema.clone(),
493 },
494 }));
495 }
496
497 expected.validate_schema()?;
498 observed.validate_schema()?;
499
500 if expected.trace_id != observed.trace_id {
501 return Ok(Some(ReplayDivergence {
502 seq: None,
503 reason: ReplayDivergenceReason::TraceIdMismatch {
504 expected: expected.trace_id.clone(),
505 observed: observed.trace_id.clone(),
506 },
507 }));
508 }
509
510 let max_shared = expected.events.len().min(observed.events.len());
511 for idx in 0..max_shared {
512 let left = &expected.events[idx];
513 let right = &observed.events[idx];
514 if left.logical_clock != right.logical_clock {
515 return Ok(Some(field_mismatch(
516 left.seq,
517 "logical_clock",
518 left.logical_clock.to_string(),
519 right.logical_clock.to_string(),
520 )));
521 }
522 if left.extension_id != right.extension_id {
523 return Ok(Some(field_mismatch(
524 left.seq,
525 "extension_id",
526 left.extension_id.clone(),
527 right.extension_id.clone(),
528 )));
529 }
530 if left.request_id != right.request_id {
531 return Ok(Some(field_mismatch(
532 left.seq,
533 "request_id",
534 left.request_id.clone(),
535 right.request_id.clone(),
536 )));
537 }
538 if left.kind != right.kind {
539 return Ok(Some(field_mismatch(
540 left.seq,
541 "kind",
542 format!("{:?}", left.kind),
543 format!("{:?}", right.kind),
544 )));
545 }
546 if left.attributes != right.attributes {
547 return Ok(Some(field_mismatch(
548 left.seq,
549 "attributes",
550 format!("{:?}", left.attributes),
551 format!("{:?}", right.attributes),
552 )));
553 }
554 }
555
556 if expected.events.len() != observed.events.len() {
557 let next_seq = max_shared
558 .checked_add(1)
559 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
560 let seq = u64::try_from(next_seq).map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
561 return Ok(Some(ReplayDivergence {
562 seq: Some(seq),
563 reason: ReplayDivergenceReason::EventCountMismatch {
564 expected: u64::try_from(expected.events.len())
565 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
566 observed: u64::try_from(observed.events.len())
567 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?,
568 },
569 }));
570 }
571
572 Ok(None)
573}
574
575fn field_mismatch(seq: u64, field: &str, expected: String, observed: String) -> ReplayDivergence {
576 ReplayDivergence {
577 seq: Some(seq),
578 reason: ReplayDivergenceReason::EventFieldMismatch {
579 field: field.to_string(),
580 expected,
581 observed,
582 },
583 }
584}
585
586#[derive(Debug, Clone, Default)]
588pub struct ReplayTraceBuilder {
589 trace_id: String,
590 metadata: BTreeMap<String, String>,
591 drafts: Vec<ReplayEventDraft>,
592}
593
594impl ReplayTraceBuilder {
595 #[must_use]
596 pub fn new(trace_id: impl Into<String>) -> Self {
597 Self {
598 trace_id: trace_id.into(),
599 metadata: BTreeMap::new(),
600 drafts: Vec::new(),
601 }
602 }
603
604 pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
605 self.metadata.insert(key.into(), value.into());
606 }
607
608 pub fn push(&mut self, draft: ReplayEventDraft) {
609 self.drafts.push(draft);
610 }
611
612 pub fn build(self) -> Result<ReplayTraceBundle, ReplayTraceValidationError> {
618 let mut indexed = self
619 .drafts
620 .into_iter()
621 .enumerate()
622 .map(|(insertion_index, draft)| IndexedDraft {
623 insertion_index,
624 draft,
625 })
626 .collect::<Vec<_>>();
627 indexed.sort_by(compare_indexed_drafts);
628
629 let events = indexed
630 .into_iter()
631 .enumerate()
632 .map(|(idx, entry)| {
633 let seq_index = idx
634 .checked_add(1)
635 .ok_or(ReplayTraceValidationError::TooManyEvents)?;
636 let seq = u64::try_from(seq_index)
637 .map_err(|_| ReplayTraceValidationError::TooManyEvents)?;
638 Ok(ReplayTraceEvent {
639 seq,
640 logical_clock: entry.draft.logical_clock,
641 extension_id: entry.draft.extension_id,
642 request_id: entry.draft.request_id,
643 kind: entry.draft.kind,
644 attributes: entry.draft.attributes,
645 })
646 })
647 .collect::<Result<Vec<_>, ReplayTraceValidationError>>()?;
648
649 let bundle = ReplayTraceBundle {
650 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
651 trace_id: self.trace_id,
652 metadata: self.metadata,
653 events,
654 };
655 bundle.validate()?;
656 Ok(bundle)
657 }
658}
659
660#[derive(Debug, Clone)]
661struct IndexedDraft {
662 insertion_index: usize,
663 draft: ReplayEventDraft,
664}
665
666fn compare_indexed_drafts(left: &IndexedDraft, right: &IndexedDraft) -> Ordering {
667 left.draft
668 .logical_clock
669 .cmp(&right.draft.logical_clock)
670 .then_with(|| left.draft.extension_id.cmp(&right.draft.extension_id))
671 .then_with(|| left.draft.request_id.cmp(&right.draft.request_id))
672 .then_with(|| {
673 left.draft
674 .kind
675 .canonical_rank()
676 .cmp(&right.draft.kind.canonical_rank())
677 })
678 .then_with(|| left.insertion_index.cmp(&right.insertion_index))
679}
680
681#[derive(Debug, Clone, PartialEq, Eq, Error)]
683pub enum ReplayTraceValidationError {
684 #[error("unknown replay trace schema: {0}")]
685 UnknownSchema(String),
686 #[error("trace id must not be empty")]
687 EmptyTraceId,
688 #[error("replay bundle contains too many events to index")]
689 TooManyEvents,
690 #[error("non-contiguous sequence: expected {expected}, observed {observed}")]
691 NonContiguousSequence { expected: u64, observed: u64 },
692 #[error("event seq {seq} missing extension id")]
693 MissingExtensionId { seq: u64 },
694 #[error("event seq {seq} missing request id")]
695 MissingRequestId { seq: u64 },
696 #[error("retry without prior cancel at seq {seq} for {extension_id}/{request_id}")]
697 RetryWithoutCancel {
698 seq: u64,
699 extension_id: String,
700 request_id: String,
701 },
702 #[error("duplicate cancel without retry at seq {seq} for {extension_id}/{request_id}")]
703 DuplicateCancelWithoutRetry {
704 seq: u64,
705 extension_id: String,
706 request_id: String,
707 },
708}
709
710#[derive(Debug, Error)]
712pub enum ReplayTraceCodecError {
713 #[error("failed to parse replay trace JSON: {0}")]
714 Deserialize(#[from] serde_json::Error),
715 #[error("invalid replay trace bundle: {0}")]
716 Validation(#[from] ReplayTraceValidationError),
717}
718
719#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
721#[serde(rename_all = "camelCase")]
722pub struct ReplayLaneConfig {
723 pub budget: ReplayCaptureBudget,
725 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
727 pub lane_metadata: BTreeMap<String, String>,
728}
729
730impl ReplayLaneConfig {
731 #[must_use]
732 pub const fn new(budget: ReplayCaptureBudget) -> Self {
733 Self {
734 budget,
735 lane_metadata: BTreeMap::new(),
736 }
737 }
738
739 pub fn insert_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
741 self.lane_metadata.insert(key.into(), value.into());
742 }
743}
744
745#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
747#[serde(rename_all = "camelCase")]
748pub struct ReplayLaneResult {
749 pub bundle: ReplayTraceBundle,
751 pub gate_report: ReplayCaptureGateReport,
753 pub diagnostic: ReplayDiagnosticSnapshot,
755}
756
757#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
759#[serde(rename_all = "camelCase")]
760pub struct ReplayComparisonResult {
761 pub reference_trace_id: String,
763 pub observed_trace_id: String,
765 #[serde(skip_serializing_if = "Option::is_none")]
767 pub divergence: Option<ReplayDivergence>,
768 pub root_cause_hints: Vec<ReplayRootCauseHint>,
770}
771
772#[derive(Debug)]
779pub struct ReplayRecorder {
780 config: ReplayLaneConfig,
781 builder: ReplayTraceBuilder,
782 logical_clock: u64,
783 event_count: u64,
784}
785
786impl ReplayRecorder {
787 #[must_use]
789 pub fn new(trace_id: impl Into<String>, config: ReplayLaneConfig) -> Self {
790 let mut builder = ReplayTraceBuilder::new(trace_id);
791 for (key, value) in &config.lane_metadata {
792 builder.insert_metadata(key.clone(), value.clone());
793 }
794 Self {
795 config,
796 builder,
797 logical_clock: 0,
798 event_count: 0,
799 }
800 }
801
802 #[must_use]
804 pub const fn logical_clock(&self) -> u64 {
805 self.logical_clock
806 }
807
808 #[must_use]
810 pub const fn event_count(&self) -> u64 {
811 self.event_count
812 }
813
814 pub const fn tick(&mut self) -> u64 {
816 self.logical_clock = self.logical_clock.saturating_add(1);
817 self.logical_clock
818 }
819
820 pub fn record(
822 &mut self,
823 extension_id: impl Into<String>,
824 request_id: impl Into<String>,
825 kind: ReplayEventKind,
826 attributes: BTreeMap<String, String>,
827 ) {
828 let mut draft = ReplayEventDraft::new(self.logical_clock, extension_id, request_id, kind);
829 draft.attributes = attributes;
830 self.builder.push(draft);
831 self.event_count = self.event_count.saturating_add(1);
832 }
833
834 pub fn record_scheduled(
836 &mut self,
837 extension_id: impl Into<String>,
838 request_id: impl Into<String>,
839 attributes: BTreeMap<String, String>,
840 ) {
841 self.record(
842 extension_id,
843 request_id,
844 ReplayEventKind::Scheduled,
845 attributes,
846 );
847 }
848
849 pub fn record_queue_accepted(
851 &mut self,
852 extension_id: impl Into<String>,
853 request_id: impl Into<String>,
854 attributes: BTreeMap<String, String>,
855 ) {
856 self.record(
857 extension_id,
858 request_id,
859 ReplayEventKind::QueueAccepted,
860 attributes,
861 );
862 }
863
864 pub fn record_policy_decision(
866 &mut self,
867 extension_id: impl Into<String>,
868 request_id: impl Into<String>,
869 attributes: BTreeMap<String, String>,
870 ) {
871 self.record(
872 extension_id,
873 request_id,
874 ReplayEventKind::PolicyDecision,
875 attributes,
876 );
877 }
878
879 pub fn record_cancelled(
881 &mut self,
882 extension_id: impl Into<String>,
883 request_id: impl Into<String>,
884 attributes: BTreeMap<String, String>,
885 ) {
886 self.record(
887 extension_id,
888 request_id,
889 ReplayEventKind::Cancelled,
890 attributes,
891 );
892 }
893
894 pub fn record_retried(
896 &mut self,
897 extension_id: impl Into<String>,
898 request_id: impl Into<String>,
899 attributes: BTreeMap<String, String>,
900 ) {
901 self.record(
902 extension_id,
903 request_id,
904 ReplayEventKind::Retried,
905 attributes,
906 );
907 }
908
909 pub fn record_completed(
911 &mut self,
912 extension_id: impl Into<String>,
913 request_id: impl Into<String>,
914 attributes: BTreeMap<String, String>,
915 ) {
916 self.record(
917 extension_id,
918 request_id,
919 ReplayEventKind::Completed,
920 attributes,
921 );
922 }
923
924 pub fn record_failed(
926 &mut self,
927 extension_id: impl Into<String>,
928 request_id: impl Into<String>,
929 attributes: BTreeMap<String, String>,
930 ) {
931 self.record(
932 extension_id,
933 request_id,
934 ReplayEventKind::Failed,
935 attributes,
936 );
937 }
938
939 pub fn finish(
948 self,
949 observation: ReplayCaptureObservation,
950 ) -> Result<ReplayLaneResult, ReplayTraceValidationError> {
951 let bundle = self.builder.build()?;
952 let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
953 let diagnostic = build_replay_diagnostic_snapshot(&bundle, gate_report, None)?;
954
955 Ok(ReplayLaneResult {
956 bundle,
957 gate_report,
958 diagnostic,
959 })
960 }
961
962 pub fn finish_and_compare(
971 self,
972 observation: ReplayCaptureObservation,
973 reference: &ReplayTraceBundle,
974 ) -> Result<(ReplayLaneResult, ReplayComparisonResult), ReplayTraceValidationError> {
975 let bundle = self.builder.build()?;
976 let gate_report = evaluate_replay_capture_gate(self.config.budget, observation);
977 let divergence_opt = first_divergence(reference, &bundle)?;
978 let diagnostic =
979 build_replay_diagnostic_snapshot(&bundle, gate_report, divergence_opt.as_ref())?;
980
981 let comparison = ReplayComparisonResult {
982 reference_trace_id: reference.trace_id.clone(),
983 observed_trace_id: bundle.trace_id.clone(),
984 divergence: divergence_opt,
985 root_cause_hints: diagnostic.root_cause_hints.clone(),
986 };
987
988 let result = ReplayLaneResult {
989 bundle,
990 gate_report,
991 diagnostic,
992 };
993
994 Ok((result, comparison))
995 }
996}
997
998pub fn compare_replay_bundles(
1005 reference: &ReplayTraceBundle,
1006 observed: &ReplayTraceBundle,
1007 gate_report: ReplayCaptureGateReport,
1008) -> Result<(ReplayDiagnosticSnapshot, ReplayComparisonResult), ReplayTraceValidationError> {
1009 let divergence_opt = first_divergence(reference, observed)?;
1010 let diagnostic =
1011 build_replay_diagnostic_snapshot(observed, gate_report, divergence_opt.as_ref())?;
1012
1013 let comparison = ReplayComparisonResult {
1014 reference_trace_id: reference.trace_id.clone(),
1015 observed_trace_id: observed.trace_id.clone(),
1016 divergence: divergence_opt,
1017 root_cause_hints: diagnostic.root_cause_hints.clone(),
1018 };
1019
1020 Ok((diagnostic, comparison))
1021}
1022
1023#[cfg(test)]
1024mod tests {
1025 use super::{
1026 REPLAY_TRACE_SCHEMA_V1, ReplayCaptureBudget, ReplayCaptureGateReason,
1027 ReplayCaptureObservation, ReplayDivergenceReason, ReplayEventDraft, ReplayEventKind,
1028 ReplayRootCauseHint, ReplayTraceBuilder, ReplayTraceBundle, ReplayTraceCodecError,
1029 ReplayTraceValidationError, build_replay_diagnostic_snapshot, evaluate_replay_capture_gate,
1030 first_divergence,
1031 };
1032 use std::collections::BTreeMap;
1033
1034 fn draft(
1035 logical_clock: u64,
1036 extension_id: &str,
1037 request_id: &str,
1038 kind: ReplayEventKind,
1039 ) -> ReplayEventDraft {
1040 ReplayEventDraft::new(
1041 logical_clock,
1042 extension_id.to_string(),
1043 request_id.to_string(),
1044 kind,
1045 )
1046 }
1047
1048 const fn standard_capture_budget() -> ReplayCaptureBudget {
1049 ReplayCaptureBudget {
1050 capture_enabled: true,
1051 max_overhead_per_mille: 120,
1052 max_trace_bytes: 8_192,
1053 }
1054 }
1055
1056 fn standard_bundle() -> ReplayTraceBundle {
1057 let mut builder = ReplayTraceBuilder::new("trace-diagnostic");
1058 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1059 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1060 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Completed));
1061 builder.build().expect("bundle should build")
1062 }
1063
1064 #[test]
1065 fn deterministic_build_is_order_stable_across_input_permutations() {
1066 let mut left = ReplayTraceBuilder::new("trace-a");
1067 left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1068 left.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1069 left.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1070 left.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1071
1072 let mut right = ReplayTraceBuilder::new("trace-a");
1073 right.push(draft(11, "ext.beta", "req-2", ReplayEventKind::Scheduled));
1074 right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Cancelled));
1075 right.push(draft(11, "ext.alpha", "req-1", ReplayEventKind::Scheduled));
1076 right.push(draft(10, "ext.alpha", "req-1", ReplayEventKind::Retried));
1077
1078 let left_bundle = left.build().expect("left bundle should build");
1079 let right_bundle = right.build().expect("right bundle should build");
1080
1081 assert_eq!(left_bundle, right_bundle);
1082 assert_eq!(left_bundle.events[0].kind, ReplayEventKind::Cancelled);
1083 assert_eq!(left_bundle.events[1].kind, ReplayEventKind::Retried);
1084 }
1085
1086 #[test]
1087 fn json_roundtrip_preserves_replay_bundle() {
1088 let mut builder = ReplayTraceBuilder::new("trace-roundtrip");
1089 builder.insert_metadata("lane", "shadow");
1090 let mut event = draft(20, "ext.gamma", "req-7", ReplayEventKind::PolicyDecision);
1091 event
1092 .attributes
1093 .insert("decision".to_string(), "fast_lane".to_string());
1094 builder.push(draft(19, "ext.gamma", "req-7", ReplayEventKind::Scheduled));
1095 builder.push(event);
1096 builder.push(draft(21, "ext.gamma", "req-7", ReplayEventKind::Completed));
1097
1098 let bundle = builder.build().expect("bundle should build");
1099 let encoded = bundle.encode_json().expect("bundle should encode");
1100 let decoded = ReplayTraceBundle::decode_json(&encoded).expect("bundle should decode");
1101 assert_eq!(decoded, bundle);
1102 }
1103
1104 #[test]
1105 fn decode_rejects_retry_without_prior_cancel() {
1106 let bundle = ReplayTraceBundle {
1107 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1108 trace_id: "trace-invalid".to_string(),
1109 metadata: BTreeMap::new(),
1110 events: vec![super::ReplayTraceEvent {
1111 seq: 1,
1112 logical_clock: 1,
1113 extension_id: "ext.a".to_string(),
1114 request_id: "req".to_string(),
1115 kind: ReplayEventKind::Retried,
1116 attributes: BTreeMap::new(),
1117 }],
1118 };
1119 let encoded = bundle
1120 .encode_json()
1121 .expect("invalid bundle should serialize");
1122
1123 let error = ReplayTraceBundle::decode_json(&encoded).expect_err("retry without cancel");
1124 match error {
1125 ReplayTraceCodecError::Validation(ReplayTraceValidationError::RetryWithoutCancel {
1126 ..
1127 }) => {}
1128 other => unreachable!("expected RetryWithoutCancel, got: {other:?}"),
1129 }
1130 }
1131
1132 #[test]
1133 fn decode_rejects_non_contiguous_sequence() {
1134 let bundle = ReplayTraceBundle {
1135 schema: REPLAY_TRACE_SCHEMA_V1.to_string(),
1136 trace_id: "trace-seq".to_string(),
1137 metadata: BTreeMap::new(),
1138 events: vec![
1139 super::ReplayTraceEvent {
1140 seq: 1,
1141 logical_clock: 1,
1142 extension_id: "ext.a".to_string(),
1143 request_id: "req-1".to_string(),
1144 kind: ReplayEventKind::Scheduled,
1145 attributes: BTreeMap::new(),
1146 },
1147 super::ReplayTraceEvent {
1148 seq: 3,
1149 logical_clock: 2,
1150 extension_id: "ext.a".to_string(),
1151 request_id: "req-1".to_string(),
1152 kind: ReplayEventKind::Completed,
1153 attributes: BTreeMap::new(),
1154 },
1155 ],
1156 };
1157 let encoded = bundle
1158 .encode_json()
1159 .expect("invalid bundle should serialize");
1160
1161 let error = ReplayTraceBundle::decode_json(&encoded).expect_err("non-contiguous seq");
1162 match error {
1163 ReplayTraceCodecError::Validation(
1164 ReplayTraceValidationError::NonContiguousSequence { expected, observed },
1165 ) => {
1166 assert_eq!(expected, 2);
1167 assert_eq!(observed, 3);
1168 }
1169 other => unreachable!("expected NonContiguousSequence, got: {other:?}"),
1170 }
1171 }
1172
1173 #[test]
1174 fn divergence_reports_kind_mismatch_with_seq() {
1175 let mut expected_builder = ReplayTraceBuilder::new("trace-divergence");
1176 expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1177 expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1178 let expected = expected_builder.build().expect("expected bundle");
1179
1180 let mut observed_builder = ReplayTraceBuilder::new("trace-divergence");
1181 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1182 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Failed));
1183 let observed = observed_builder.build().expect("observed bundle");
1184
1185 let divergence = first_divergence(&expected, &observed)
1186 .expect("comparison should succeed")
1187 .expect("divergence expected");
1188 assert_eq!(divergence.seq, Some(2));
1189 match divergence.reason {
1190 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1191 assert_eq!(field, "kind");
1192 }
1193 other => unreachable!("expected EventFieldMismatch, got: {other:?}"),
1194 }
1195 }
1196
1197 #[test]
1198 fn divergence_reports_event_count_mismatch() {
1199 let mut expected_builder = ReplayTraceBuilder::new("trace-length");
1200 expected_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1201 expected_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1202 let expected = expected_builder.build().expect("expected bundle");
1203
1204 let mut observed_builder = ReplayTraceBuilder::new("trace-length");
1205 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1206 let observed = observed_builder.build().expect("observed bundle");
1207
1208 let divergence = first_divergence(&expected, &observed)
1209 .expect("comparison should succeed")
1210 .expect("divergence expected");
1211 assert_eq!(divergence.seq, Some(2));
1212 match divergence.reason {
1213 ReplayDivergenceReason::EventCountMismatch { expected, observed } => {
1214 assert_eq!(expected, 2);
1215 assert_eq!(observed, 1);
1216 }
1217 other => unreachable!("expected EventCountMismatch, got: {other:?}"),
1218 }
1219 }
1220
1221 #[test]
1222 fn divergence_returns_none_for_identical_bundles() {
1223 let mut builder = ReplayTraceBuilder::new("trace-identical");
1224 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1225 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1226 let bundle = builder.build().expect("bundle");
1227
1228 let divergence =
1229 first_divergence(&bundle, &bundle).expect("comparison should validate identical");
1230 assert!(divergence.is_none());
1231 }
1232
1233 #[test]
1234 fn capture_gate_disables_when_config_switch_is_off() {
1235 let mut budget = standard_capture_budget();
1236 budget.capture_enabled = false;
1237 let observation = ReplayCaptureObservation {
1238 baseline_micros: 1_000,
1239 captured_micros: 1_010,
1240 trace_bytes: 128,
1241 };
1242
1243 let report = evaluate_replay_capture_gate(budget, observation);
1244 assert!(!report.capture_allowed);
1245 assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
1246 }
1247
1248 #[test]
1249 fn capture_gate_disables_when_overhead_exceeds_budget() {
1250 let budget = standard_capture_budget();
1251 let observation = ReplayCaptureObservation {
1252 baseline_micros: 1_000,
1253 captured_micros: 1_140,
1254 trace_bytes: 512,
1255 };
1256
1257 let report = evaluate_replay_capture_gate(budget, observation);
1258 assert!(!report.capture_allowed);
1259 assert_eq!(
1260 report.reason,
1261 ReplayCaptureGateReason::DisabledByOverheadBudget
1262 );
1263 assert_eq!(report.observed_overhead_per_mille, 140);
1264 }
1265
1266 #[test]
1267 fn capture_gate_disables_when_trace_budget_exceeded() {
1268 let budget = standard_capture_budget();
1269 let observation = ReplayCaptureObservation {
1270 baseline_micros: 1_000,
1271 captured_micros: 1_050,
1272 trace_bytes: 9_000,
1273 };
1274
1275 let report = evaluate_replay_capture_gate(budget, observation);
1276 assert!(!report.capture_allowed);
1277 assert_eq!(
1278 report.reason,
1279 ReplayCaptureGateReason::DisabledByTraceBudget
1280 );
1281 assert_eq!(report.observed_overhead_per_mille, 50);
1282 }
1283
1284 #[test]
1285 fn capture_gate_fails_closed_on_invalid_baseline() {
1286 let budget = standard_capture_budget();
1287 let observation = ReplayCaptureObservation {
1288 baseline_micros: 0,
1289 captured_micros: 1,
1290 trace_bytes: 64,
1291 };
1292
1293 let report = evaluate_replay_capture_gate(budget, observation);
1294 assert!(!report.capture_allowed);
1295 assert_eq!(
1296 report.reason,
1297 ReplayCaptureGateReason::DisabledByInvalidBaseline
1298 );
1299 assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1300 }
1301
1302 #[test]
1303 fn capture_gate_fails_closed_when_zero_baseline_reports_zero_capture_cost() {
1304 let budget = standard_capture_budget();
1305 let observation = ReplayCaptureObservation {
1306 baseline_micros: 0,
1307 captured_micros: 0,
1308 trace_bytes: 64,
1309 };
1310
1311 let report = evaluate_replay_capture_gate(budget, observation);
1312 assert!(!report.capture_allowed);
1313 assert_eq!(
1314 report.reason,
1315 ReplayCaptureGateReason::DisabledByInvalidBaseline
1316 );
1317 assert_eq!(report.observed_overhead_per_mille, u32::MAX);
1318 }
1319
1320 #[test]
1321 fn capture_gate_reports_deterministic_within_budget_enablement() {
1322 let budget = standard_capture_budget();
1323 let observation = ReplayCaptureObservation {
1324 baseline_micros: 1_000,
1325 captured_micros: 1_080,
1326 trace_bytes: 4_096,
1327 };
1328
1329 let first = evaluate_replay_capture_gate(budget, observation);
1330 let second = evaluate_replay_capture_gate(budget, observation);
1331
1332 assert_eq!(first, second);
1333 assert!(first.capture_allowed);
1334 assert_eq!(first.reason, ReplayCaptureGateReason::Enabled);
1335 assert_eq!(first.observed_overhead_per_mille, 80);
1336 }
1337
1338 #[test]
1339 fn diagnostic_snapshot_emits_hint_codes_for_gate_and_payload_drift() {
1340 let expected = standard_bundle();
1341
1342 let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
1343 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1344 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
1345 observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
1346 let observed = observed_builder.build().expect("observed bundle");
1347
1348 let divergence = first_divergence(&expected, &observed)
1349 .expect("comparison should succeed")
1350 .expect("divergence expected");
1351 let capture_gate = evaluate_replay_capture_gate(
1352 standard_capture_budget(),
1353 ReplayCaptureObservation {
1354 baseline_micros: 1_000,
1355 captured_micros: 1_150,
1356 trace_bytes: 64,
1357 },
1358 );
1359
1360 let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1361 .expect("snapshot should build");
1362 assert_eq!(snapshot.event_count, 3);
1363 assert_eq!(
1364 snapshot.root_cause_hints,
1365 vec![
1366 ReplayRootCauseHint::EventPayloadDrift,
1367 ReplayRootCauseHint::OverheadBudgetExceeded,
1368 ]
1369 );
1370 }
1371
1372 #[test]
1373 fn diagnostic_snapshot_maps_logical_clock_drift_hint() {
1374 let expected = standard_bundle();
1375 let mut observed = expected.clone();
1376 observed.events[1].logical_clock = 77;
1377
1378 let divergence = first_divergence(&expected, &observed)
1379 .expect("comparison should succeed")
1380 .expect("divergence expected");
1381 let capture_gate = evaluate_replay_capture_gate(
1382 standard_capture_budget(),
1383 ReplayCaptureObservation {
1384 baseline_micros: 1_000,
1385 captured_micros: 1_010,
1386 trace_bytes: 64,
1387 },
1388 );
1389
1390 let snapshot = build_replay_diagnostic_snapshot(&expected, capture_gate, Some(&divergence))
1391 .expect("snapshot should build");
1392 assert_eq!(
1393 snapshot.root_cause_hints,
1394 vec![ReplayRootCauseHint::LogicalClockDrift]
1395 );
1396 }
1397
1398 #[test]
1399 fn diagnostic_snapshot_is_deterministic_for_same_inputs() {
1400 let bundle = standard_bundle();
1401 let capture_gate = evaluate_replay_capture_gate(
1402 standard_capture_budget(),
1403 ReplayCaptureObservation {
1404 baseline_micros: 1_000,
1405 captured_micros: 1_020,
1406 trace_bytes: 256,
1407 },
1408 );
1409
1410 let first =
1411 build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("first snapshot");
1412 let second =
1413 build_replay_diagnostic_snapshot(&bundle, capture_gate, None).expect("second snapshot");
1414 assert_eq!(first, second);
1415 }
1416
1417 #[test]
1418 fn diagnostic_snapshot_rejects_invalid_bundle() {
1419 let invalid = ReplayTraceBundle {
1420 schema: "invalid.schema".to_string(),
1421 trace_id: "trace-bad".to_string(),
1422 metadata: BTreeMap::new(),
1423 events: Vec::new(),
1424 };
1425 let capture_gate = evaluate_replay_capture_gate(
1426 standard_capture_budget(),
1427 ReplayCaptureObservation {
1428 baseline_micros: 1_000,
1429 captured_micros: 1_000,
1430 trace_bytes: 0,
1431 },
1432 );
1433
1434 let error = build_replay_diagnostic_snapshot(&invalid, capture_gate, None)
1435 .expect_err("invalid schema should fail");
1436 assert!(matches!(
1437 error,
1438 ReplayTraceValidationError::UnknownSchema(_)
1439 ));
1440 }
1441
1442 #[test]
1445 fn builder_empty_events_produces_valid_bundle() {
1446 let builder = ReplayTraceBuilder::new("trace-empty");
1447 let bundle = builder.build().expect("empty bundle should be valid");
1448 assert!(bundle.events.is_empty());
1449 assert_eq!(bundle.schema, REPLAY_TRACE_SCHEMA_V1);
1450 assert_eq!(bundle.trace_id, "trace-empty");
1451 }
1452
1453 #[test]
1454 fn builder_metadata_preserved_in_output() {
1455 let mut builder = ReplayTraceBuilder::new("trace-meta");
1456 builder.insert_metadata("env", "production");
1457 builder.insert_metadata("version", "1.2.3");
1458 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1459 let bundle = builder.build().expect("bundle with metadata");
1460 assert_eq!(
1461 bundle.metadata.get("env").map(String::as_str),
1462 Some("production")
1463 );
1464 assert_eq!(
1465 bundle.metadata.get("version").map(String::as_str),
1466 Some("1.2.3")
1467 );
1468 }
1469
1470 #[test]
1471 fn builder_metadata_overwrite_works() {
1472 let mut builder = ReplayTraceBuilder::new("trace-meta-ow");
1473 builder.insert_metadata("key", "old");
1474 builder.insert_metadata("key", "new");
1475 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1476 let bundle = builder.build().expect("metadata overwrite");
1477 assert_eq!(bundle.metadata.get("key").map(String::as_str), Some("new"));
1478 }
1479
1480 #[test]
1481 fn draft_attributes_carried_through_build() {
1482 let mut d = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1483 d.attributes
1484 .insert("policy".to_string(), "fast_lane".to_string());
1485 d.attributes
1486 .insert("latency_ms".to_string(), "12".to_string());
1487 let mut builder = ReplayTraceBuilder::new("trace-attrs");
1488 builder.push(d);
1489 let bundle = builder.build().expect("bundle with attrs");
1490 assert_eq!(bundle.events[0].attributes.len(), 2);
1491 assert_eq!(
1492 bundle.events[0]
1493 .attributes
1494 .get("policy")
1495 .map(String::as_str),
1496 Some("fast_lane")
1497 );
1498 }
1499
1500 #[test]
1503 fn validate_rejects_empty_trace_id() {
1504 let mut builder = ReplayTraceBuilder::new("");
1505 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1506 let err = builder.build().expect_err("empty trace_id should fail");
1507 assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1508 }
1509
1510 #[test]
1511 fn validate_rejects_whitespace_only_trace_id() {
1512 let mut builder = ReplayTraceBuilder::new(" ");
1513 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
1514 let err = builder
1515 .build()
1516 .expect_err("whitespace trace_id should fail");
1517 assert!(matches!(err, ReplayTraceValidationError::EmptyTraceId));
1518 }
1519
1520 #[test]
1521 fn validate_rejects_empty_extension_id() {
1522 let mut builder = ReplayTraceBuilder::new("trace-val");
1523 builder.push(draft(1, "", "req-1", ReplayEventKind::Scheduled));
1524 let err = builder.build().expect_err("empty extension_id should fail");
1525 assert!(matches!(
1526 err,
1527 ReplayTraceValidationError::MissingExtensionId { .. }
1528 ));
1529 }
1530
1531 #[test]
1532 fn validate_rejects_empty_request_id() {
1533 let mut builder = ReplayTraceBuilder::new("trace-val");
1534 builder.push(draft(1, "ext.a", "", ReplayEventKind::Scheduled));
1535 let err = builder.build().expect_err("empty request_id should fail");
1536 assert!(matches!(
1537 err,
1538 ReplayTraceValidationError::MissingRequestId { .. }
1539 ));
1540 }
1541
1542 #[test]
1543 fn validate_rejects_duplicate_cancel_without_retry() {
1544 let mut builder = ReplayTraceBuilder::new("trace-dup-cancel");
1545 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1546 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Cancelled));
1547 let err = builder.build().expect_err("duplicate cancel should fail");
1548 assert!(matches!(
1549 err,
1550 ReplayTraceValidationError::DuplicateCancelWithoutRetry { .. }
1551 ));
1552 }
1553
1554 #[test]
1555 fn cancel_then_retry_then_cancel_is_valid() {
1556 let mut builder = ReplayTraceBuilder::new("trace-cancel-retry-cancel");
1557 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1558 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Retried));
1559 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1560 let bundle = builder
1561 .build()
1562 .expect("cancel-retry-cancel should be valid");
1563 assert_eq!(bundle.events.len(), 3);
1564 }
1565
1566 #[test]
1567 fn completed_clears_pending_cancel() {
1568 let mut builder = ReplayTraceBuilder::new("trace-complete-clear");
1569 builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Cancelled));
1570 builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::Completed));
1571 builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Cancelled));
1572 let bundle = builder
1573 .build()
1574 .expect("completed should clear cancel state");
1575 assert_eq!(bundle.events.len(), 3);
1576 }
1577
1578 #[test]
1581 fn event_kind_canonical_rank_is_monotonic() {
1582 let kinds = [
1583 ReplayEventKind::Scheduled,
1584 ReplayEventKind::QueueAccepted,
1585 ReplayEventKind::PolicyDecision,
1586 ReplayEventKind::Cancelled,
1587 ReplayEventKind::Retried,
1588 ReplayEventKind::Completed,
1589 ReplayEventKind::Failed,
1590 ];
1591 for pair in kinds.windows(2) {
1592 assert!(
1593 pair[0].canonical_rank() < pair[1].canonical_rank(),
1594 "{:?} should have lower rank than {:?}",
1595 pair[0],
1596 pair[1]
1597 );
1598 }
1599 }
1600
1601 #[test]
1602 fn event_kind_serde_roundtrip() {
1603 let kinds = [
1604 ReplayEventKind::Scheduled,
1605 ReplayEventKind::QueueAccepted,
1606 ReplayEventKind::PolicyDecision,
1607 ReplayEventKind::Cancelled,
1608 ReplayEventKind::Retried,
1609 ReplayEventKind::Completed,
1610 ReplayEventKind::Failed,
1611 ];
1612 for kind in kinds {
1613 let json = serde_json::to_string(&kind).expect("serialize kind");
1614 let roundtrip: ReplayEventKind = serde_json::from_str(&json).expect("deserialize kind");
1615 assert_eq!(kind, roundtrip);
1616 }
1617 }
1618
1619 #[test]
1622 fn divergence_detects_schema_mismatch() {
1623 let mut observed = standard_bundle();
1624 observed.schema = "pi.ext.replay.trace.v2".to_string();
1625
1626 let divergence = first_divergence(&standard_bundle(), &observed)
1627 .expect("comparison should succeed")
1628 .expect("schema mismatch expected");
1629
1630 assert_eq!(divergence.seq, None);
1631 match divergence.reason {
1632 ReplayDivergenceReason::SchemaMismatch { expected, observed } => {
1633 assert_eq!(expected, REPLAY_TRACE_SCHEMA_V1);
1634 assert_eq!(observed, "pi.ext.replay.trace.v2");
1635 }
1636 other => unreachable!("expected SchemaMismatch, got: {other:?}"),
1637 }
1638 }
1639
1640 #[test]
1641 fn divergence_rejects_same_unknown_schema() {
1642 let mut expected = standard_bundle();
1643 expected.schema = "pi.ext.replay.trace.v2".to_string();
1644 let observed = expected.clone();
1645
1646 let error = first_divergence(&expected, &observed).expect_err("unsupported schema");
1647 assert!(matches!(
1648 error,
1649 ReplayTraceValidationError::UnknownSchema(schema) if schema == "pi.ext.replay.trace.v2"
1650 ));
1651 }
1652
1653 #[test]
1654 fn divergence_detects_attribute_mismatch() {
1655 let mut builder_a = ReplayTraceBuilder::new("trace-attrs-cmp");
1656 let mut d1 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1657 d1.attributes
1658 .insert("decision".to_string(), "fast".to_string());
1659 builder_a.push(d1);
1660 let expected = builder_a.build().expect("bundle a");
1661
1662 let mut builder_b = ReplayTraceBuilder::new("trace-attrs-cmp");
1663 let mut d2 = draft(1, "ext.a", "req-1", ReplayEventKind::PolicyDecision);
1664 d2.attributes
1665 .insert("decision".to_string(), "slow".to_string());
1666 builder_b.push(d2);
1667 let observed = builder_b.build().expect("bundle b");
1668
1669 let divergence = first_divergence(&expected, &observed)
1670 .expect("comparison should succeed")
1671 .expect("attribute mismatch expected");
1672 assert_eq!(divergence.seq, Some(1));
1673 match divergence.reason {
1674 ReplayDivergenceReason::EventFieldMismatch { field, .. } => {
1675 assert_eq!(field, "attributes");
1676 }
1677 other => unreachable!("expected EventFieldMismatch for attributes, got: {other:?}"),
1678 }
1679 }
1680
1681 #[test]
1684 fn capture_gate_zero_overhead_when_captured_equals_baseline() {
1685 let budget = standard_capture_budget();
1686 let observation = ReplayCaptureObservation {
1687 baseline_micros: 1_000,
1688 captured_micros: 1_000,
1689 trace_bytes: 100,
1690 };
1691 let report = evaluate_replay_capture_gate(budget, observation);
1692 assert!(report.capture_allowed);
1693 assert_eq!(report.observed_overhead_per_mille, 0);
1694 }
1695
1696 #[test]
1697 fn capture_gate_zero_overhead_when_captured_less_than_baseline() {
1698 let budget = standard_capture_budget();
1699 let observation = ReplayCaptureObservation {
1700 baseline_micros: 1_000,
1701 captured_micros: 900,
1702 trace_bytes: 100,
1703 };
1704 let report = evaluate_replay_capture_gate(budget, observation);
1705 assert!(report.capture_allowed);
1706 assert_eq!(report.observed_overhead_per_mille, 0);
1707 }
1708
1709 #[test]
1710 fn capture_gate_exact_boundary_at_max_overhead() {
1711 let budget = ReplayCaptureBudget {
1712 capture_enabled: true,
1713 max_overhead_per_mille: 100,
1714 max_trace_bytes: 10_000,
1715 };
1716 let observation = ReplayCaptureObservation {
1718 baseline_micros: 1_000,
1719 captured_micros: 1_100,
1720 trace_bytes: 100,
1721 };
1722 let report = evaluate_replay_capture_gate(budget, observation);
1723 assert!(report.capture_allowed);
1724 assert_eq!(report.observed_overhead_per_mille, 100);
1725 }
1726
1727 #[test]
1728 fn capture_gate_exact_boundary_at_max_trace_bytes() {
1729 let budget = ReplayCaptureBudget {
1730 capture_enabled: true,
1731 max_overhead_per_mille: 1_000,
1732 max_trace_bytes: 500,
1733 };
1734 let at_limit = ReplayCaptureObservation {
1736 baseline_micros: 1_000,
1737 captured_micros: 1_010,
1738 trace_bytes: 500,
1739 };
1740 let report = evaluate_replay_capture_gate(budget, at_limit);
1741 assert!(report.capture_allowed);
1742
1743 let over_limit = ReplayCaptureObservation {
1745 baseline_micros: 1_000,
1746 captured_micros: 1_010,
1747 trace_bytes: 501,
1748 };
1749 let report = evaluate_replay_capture_gate(budget, over_limit);
1750 assert!(!report.capture_allowed);
1751 assert_eq!(
1752 report.reason,
1753 ReplayCaptureGateReason::DisabledByTraceBudget
1754 );
1755 }
1756
1757 #[test]
1760 fn diagnostic_snapshot_maps_config_disabled_hint() {
1761 let bundle = standard_bundle();
1762 let budget = ReplayCaptureBudget {
1763 capture_enabled: false,
1764 max_overhead_per_mille: 100,
1765 max_trace_bytes: 1_000,
1766 };
1767 let gate = evaluate_replay_capture_gate(
1768 budget,
1769 ReplayCaptureObservation {
1770 baseline_micros: 100,
1771 captured_micros: 100,
1772 trace_bytes: 0,
1773 },
1774 );
1775 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1776 assert_eq!(
1777 snapshot.root_cause_hints,
1778 vec![ReplayRootCauseHint::PolicyGateDisabled]
1779 );
1780 }
1781
1782 #[test]
1783 fn diagnostic_snapshot_maps_trace_budget_hint() {
1784 let bundle = standard_bundle();
1785 let budget = ReplayCaptureBudget {
1786 capture_enabled: true,
1787 max_overhead_per_mille: 1_000,
1788 max_trace_bytes: 100,
1789 };
1790 let gate = evaluate_replay_capture_gate(
1791 budget,
1792 ReplayCaptureObservation {
1793 baseline_micros: 1_000,
1794 captured_micros: 1_010,
1795 trace_bytes: 200,
1796 },
1797 );
1798 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1799 assert_eq!(
1800 snapshot.root_cause_hints,
1801 vec![ReplayRootCauseHint::TraceBudgetExceeded]
1802 );
1803 }
1804
1805 #[test]
1806 fn diagnostic_snapshot_serde_roundtrip() {
1807 let bundle = standard_bundle();
1808 let gate = evaluate_replay_capture_gate(
1809 standard_capture_budget(),
1810 ReplayCaptureObservation {
1811 baseline_micros: 1_000,
1812 captured_micros: 1_010,
1813 trace_bytes: 64,
1814 },
1815 );
1816 let snapshot = build_replay_diagnostic_snapshot(&bundle, gate, None).expect("snapshot");
1817 let json = serde_json::to_string(&snapshot).expect("serialize");
1818 let roundtrip: super::ReplayDiagnosticSnapshot =
1819 serde_json::from_str(&json).expect("deserialize");
1820 assert_eq!(snapshot, roundtrip);
1821 }
1822
1823 #[test]
1826 fn overhead_per_mille_exact_computation() {
1827 assert_eq!(super::compute_overhead_per_mille(1_000, 1_050), 50);
1829 assert_eq!(super::compute_overhead_per_mille(1_000, 1_200), 200);
1831 assert_eq!(super::compute_overhead_per_mille(1_000, 1_000), 0);
1833 assert_eq!(super::compute_overhead_per_mille(1_000, 500), 0);
1835 }
1836
1837 #[test]
1838 fn overhead_per_mille_rounding_up() {
1839 assert_eq!(super::compute_overhead_per_mille(3, 4), 334);
1841 }
1842
1843 #[test]
1844 fn overhead_per_mille_zero_baseline_returns_max() {
1845 assert_eq!(super::compute_overhead_per_mille(0, 1), u32::MAX);
1846 assert_eq!(super::compute_overhead_per_mille(0, 0), u32::MAX);
1847 }
1848
1849 fn within_budget_observation() -> ReplayCaptureObservation {
1852 ReplayCaptureObservation {
1853 baseline_micros: 1_000,
1854 captured_micros: 1_050,
1855 trace_bytes: 256,
1856 }
1857 }
1858
1859 fn standard_lane_config() -> super::ReplayLaneConfig {
1860 super::ReplayLaneConfig::new(standard_capture_budget())
1861 }
1862
1863 #[test]
1864 fn recorder_empty_produces_valid_bundle() {
1865 let recorder = super::ReplayRecorder::new("trace-empty-rec", standard_lane_config());
1866 assert_eq!(recorder.event_count(), 0);
1867 assert_eq!(recorder.logical_clock(), 0);
1868
1869 let result = recorder
1870 .finish(within_budget_observation())
1871 .expect("finish");
1872 assert!(result.bundle.events.is_empty());
1873 assert!(result.gate_report.capture_allowed);
1874 assert_eq!(result.diagnostic.event_count, 0);
1875 }
1876
1877 #[test]
1878 fn recorder_captures_events_in_sequence() {
1879 let mut recorder = super::ReplayRecorder::new("trace-seq-rec", standard_lane_config());
1880 recorder.tick();
1881 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1882 recorder.tick();
1883 recorder.record_queue_accepted("ext.a", "req-1", BTreeMap::new());
1884 recorder.tick();
1885 recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
1886 recorder.tick();
1887 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1888
1889 assert_eq!(recorder.event_count(), 4);
1890 assert_eq!(recorder.logical_clock(), 4);
1891
1892 let result = recorder
1893 .finish(within_budget_observation())
1894 .expect("finish");
1895 assert_eq!(result.bundle.events.len(), 4);
1896 assert_eq!(result.bundle.events[0].kind, ReplayEventKind::Scheduled);
1897 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::QueueAccepted);
1898 assert_eq!(
1899 result.bundle.events[2].kind,
1900 ReplayEventKind::PolicyDecision
1901 );
1902 assert_eq!(result.bundle.events[3].kind, ReplayEventKind::Completed);
1903
1904 for (i, event) in result.bundle.events.iter().enumerate() {
1906 assert_eq!(event.seq, (i + 1) as u64);
1907 }
1908 }
1909
1910 #[test]
1911 fn recorder_attributes_flow_through() {
1912 let mut recorder = super::ReplayRecorder::new("trace-attrs-rec", standard_lane_config());
1913 recorder.tick();
1914 let mut attrs = BTreeMap::new();
1915 attrs.insert("lane".to_string(), "fast".to_string());
1916 attrs.insert("capability".to_string(), "tool".to_string());
1917 recorder.record_policy_decision("ext.a", "req-1", attrs);
1918
1919 let result = recorder
1920 .finish(within_budget_observation())
1921 .expect("finish");
1922 let event = &result.bundle.events[0];
1923 assert_eq!(
1924 event.attributes.get("lane").map(String::as_str),
1925 Some("fast")
1926 );
1927 assert_eq!(
1928 event.attributes.get("capability").map(String::as_str),
1929 Some("tool")
1930 );
1931 }
1932
1933 #[test]
1934 fn recorder_lane_metadata_propagated() {
1935 let mut config = standard_lane_config();
1936 config.insert_metadata("env", "staging");
1937 config.insert_metadata("worker", "w-3");
1938 let mut recorder = super::ReplayRecorder::new("trace-meta-rec", config);
1939 recorder.tick();
1940 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1941
1942 let result = recorder
1943 .finish(within_budget_observation())
1944 .expect("finish");
1945 assert_eq!(
1946 result.bundle.metadata.get("env").map(String::as_str),
1947 Some("staging")
1948 );
1949 assert_eq!(
1950 result.bundle.metadata.get("worker").map(String::as_str),
1951 Some("w-3")
1952 );
1953 }
1954
1955 #[test]
1956 fn recorder_cancel_retry_lifecycle() {
1957 let mut recorder = super::ReplayRecorder::new("trace-cancel-retry", standard_lane_config());
1958 recorder.tick();
1959 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1960 recorder.tick();
1961 recorder.record_cancelled("ext.a", "req-1", BTreeMap::new());
1962 recorder.tick();
1963 recorder.record_retried("ext.a", "req-1", BTreeMap::new());
1964 recorder.tick();
1965 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
1966
1967 let result = recorder
1968 .finish(within_budget_observation())
1969 .expect("finish");
1970 assert_eq!(result.bundle.events.len(), 4);
1971 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Cancelled);
1972 assert_eq!(result.bundle.events[2].kind, ReplayEventKind::Retried);
1973 }
1974
1975 #[test]
1976 fn recorder_failed_event() {
1977 let mut recorder = super::ReplayRecorder::new("trace-fail", standard_lane_config());
1978 recorder.tick();
1979 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
1980 recorder.tick();
1981 let mut attrs = BTreeMap::new();
1982 attrs.insert("error".to_string(), "timeout".to_string());
1983 recorder.record_failed("ext.a", "req-1", attrs);
1984
1985 let result = recorder
1986 .finish(within_budget_observation())
1987 .expect("finish");
1988 assert_eq!(result.bundle.events[1].kind, ReplayEventKind::Failed);
1989 assert_eq!(
1990 result.bundle.events[1]
1991 .attributes
1992 .get("error")
1993 .map(String::as_str),
1994 Some("timeout")
1995 );
1996 }
1997
1998 #[test]
1999 fn recorder_gate_report_reflects_budget() {
2000 let mut config = super::ReplayLaneConfig::new(ReplayCaptureBudget {
2001 capture_enabled: true,
2002 max_overhead_per_mille: 50,
2003 max_trace_bytes: 10_000,
2004 });
2005 config.insert_metadata("lane", "shadow");
2006 let mut recorder = super::ReplayRecorder::new("trace-gated", config);
2007 recorder.tick();
2008 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2009
2010 let result = recorder
2012 .finish(ReplayCaptureObservation {
2013 baseline_micros: 1_000,
2014 captured_micros: 1_100,
2015 trace_bytes: 64,
2016 })
2017 .expect("finish");
2018
2019 assert!(!result.gate_report.capture_allowed);
2020 assert_eq!(
2021 result.gate_report.reason,
2022 ReplayCaptureGateReason::DisabledByOverheadBudget
2023 );
2024 assert_eq!(result.bundle.events.len(), 1);
2026 }
2027
2028 #[test]
2029 fn recorder_diagnostic_snapshot_populated() {
2030 let mut recorder = super::ReplayRecorder::new("trace-diag", standard_lane_config());
2031 recorder.tick();
2032 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2033 recorder.tick();
2034 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2035
2036 let result = recorder
2037 .finish(within_budget_observation())
2038 .expect("finish");
2039 assert_eq!(result.diagnostic.trace_id, "trace-diag");
2040 assert_eq!(result.diagnostic.schema, REPLAY_TRACE_SCHEMA_V1);
2041 assert_eq!(result.diagnostic.event_count, 2);
2042 assert!(result.diagnostic.divergence.is_none());
2043 assert!(result.diagnostic.root_cause_hints.is_empty());
2044 }
2045
2046 #[test]
2047 fn recorder_finish_and_compare_identical() {
2048 let mut rec1 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2049 rec1.tick();
2050 rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2051 rec1.tick();
2052 rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2053 let reference = rec1
2054 .finish(within_budget_observation())
2055 .expect("ref")
2056 .bundle;
2057
2058 let mut rec2 = super::ReplayRecorder::new("trace-cmp", standard_lane_config());
2059 rec2.tick();
2060 rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2061 rec2.tick();
2062 rec2.record_completed("ext.a", "req-1", BTreeMap::new());
2063
2064 let (result, comparison) = rec2
2065 .finish_and_compare(within_budget_observation(), &reference)
2066 .expect("compare");
2067 assert!(comparison.divergence.is_none());
2068 assert!(comparison.root_cause_hints.is_empty());
2069 assert_eq!(comparison.reference_trace_id, "trace-cmp");
2070 assert_eq!(comparison.observed_trace_id, "trace-cmp");
2071 assert!(result.diagnostic.divergence.is_none());
2072 }
2073
2074 #[test]
2075 fn recorder_finish_and_compare_detects_divergence() {
2076 let mut rec1 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2077 rec1.tick();
2078 rec1.record_scheduled("ext.a", "req-1", BTreeMap::new());
2079 rec1.tick();
2080 rec1.record_completed("ext.a", "req-1", BTreeMap::new());
2081 let reference = rec1
2082 .finish(within_budget_observation())
2083 .expect("ref")
2084 .bundle;
2085
2086 let mut rec2 = super::ReplayRecorder::new("trace-div", standard_lane_config());
2087 rec2.tick();
2088 rec2.record_scheduled("ext.a", "req-1", BTreeMap::new());
2089 rec2.tick();
2090 rec2.record_failed("ext.a", "req-1", BTreeMap::new());
2091
2092 let (result, comparison) = rec2
2093 .finish_and_compare(within_budget_observation(), &reference)
2094 .expect("compare");
2095 assert!(comparison.divergence.is_some());
2096 let div = comparison.divergence.as_ref().unwrap();
2097 assert_eq!(div.seq, Some(2));
2098 assert!(matches!(
2099 div.reason,
2100 ReplayDivergenceReason::EventFieldMismatch { ref field, .. } if field == "kind"
2101 ));
2102 assert!(result.diagnostic.divergence.is_some());
2103 assert!(!result.diagnostic.root_cause_hints.is_empty());
2104 }
2105
2106 #[test]
2107 fn recorder_multi_extension_interleaving() {
2108 let mut recorder = super::ReplayRecorder::new("trace-multi", standard_lane_config());
2109 recorder.tick();
2110 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2111 recorder.record_scheduled("ext.b", "req-2", BTreeMap::new());
2112 recorder.tick();
2113 recorder.record_policy_decision("ext.a", "req-1", BTreeMap::new());
2114 recorder.record_policy_decision("ext.b", "req-2", BTreeMap::new());
2115 recorder.tick();
2116 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2117 recorder.record_completed("ext.b", "req-2", BTreeMap::new());
2118
2119 let result = recorder
2120 .finish(within_budget_observation())
2121 .expect("finish");
2122 assert_eq!(result.bundle.events.len(), 6);
2123
2124 let clock_1_events: Vec<_> = result
2126 .bundle
2127 .events
2128 .iter()
2129 .filter(|e| e.logical_clock == 1)
2130 .collect();
2131 assert_eq!(clock_1_events.len(), 2);
2132 assert_eq!(clock_1_events[0].extension_id, "ext.a");
2133 assert_eq!(clock_1_events[1].extension_id, "ext.b");
2134 }
2135
2136 #[test]
2139 fn compare_replay_bundles_no_divergence() {
2140 let bundle = standard_bundle();
2141 let gate =
2142 evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2143
2144 let (diagnostic, comparison) =
2145 super::compare_replay_bundles(&bundle, &bundle, gate).expect("compare");
2146 assert!(comparison.divergence.is_none());
2147 assert!(comparison.root_cause_hints.is_empty());
2148 assert!(diagnostic.divergence.is_none());
2149 }
2150
2151 #[test]
2152 fn compare_replay_bundles_with_divergence() {
2153 let reference = standard_bundle();
2154 let mut observed_builder = ReplayTraceBuilder::new("trace-diagnostic");
2155 observed_builder.push(draft(1, "ext.a", "req-1", ReplayEventKind::Scheduled));
2156 observed_builder.push(draft(2, "ext.a", "req-1", ReplayEventKind::PolicyDecision));
2157 observed_builder.push(draft(3, "ext.a", "req-1", ReplayEventKind::Failed));
2158 let observed = observed_builder.build().expect("observed bundle");
2159
2160 let gate =
2161 evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2162
2163 let (diagnostic, comparison) =
2164 super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2165 assert!(comparison.divergence.is_some());
2166 assert!(!comparison.root_cause_hints.is_empty());
2167 assert!(diagnostic.divergence.is_some());
2168 }
2169
2170 #[test]
2171 fn compare_replay_bundles_reports_schema_mismatch_with_hint() {
2172 let reference = standard_bundle();
2173 let mut observed = standard_bundle();
2174 observed.schema = "pi.ext.replay.trace.v2".to_string();
2175 let gate =
2176 evaluate_replay_capture_gate(standard_capture_budget(), within_budget_observation());
2177
2178 let (diagnostic, comparison) =
2179 super::compare_replay_bundles(&reference, &observed, gate).expect("compare");
2180
2181 assert_eq!(
2182 comparison.root_cause_hints,
2183 vec![ReplayRootCauseHint::TraceSchemaMismatch]
2184 );
2185 assert_eq!(
2186 diagnostic.root_cause_hints,
2187 vec![ReplayRootCauseHint::TraceSchemaMismatch]
2188 );
2189 match comparison
2190 .divergence
2191 .expect("schema mismatch divergence")
2192 .reason
2193 {
2194 ReplayDivergenceReason::SchemaMismatch { expected, observed } => {
2195 assert_eq!(expected, REPLAY_TRACE_SCHEMA_V1);
2196 assert_eq!(observed, "pi.ext.replay.trace.v2");
2197 }
2198 other => unreachable!("expected SchemaMismatch, got: {other:?}"),
2199 }
2200 }
2201
2202 #[test]
2205 fn lane_config_serde_roundtrip() {
2206 let mut config = super::ReplayLaneConfig::new(standard_capture_budget());
2207 config.insert_metadata("env", "prod");
2208
2209 let json = serde_json::to_string(&config).expect("serialize");
2210 let roundtrip: super::ReplayLaneConfig = serde_json::from_str(&json).expect("deserialize");
2211 assert_eq!(config, roundtrip);
2212 }
2213
2214 #[test]
2215 fn lane_config_empty_metadata_omitted_in_json() {
2216 let config = super::ReplayLaneConfig::new(standard_capture_budget());
2217 let json = serde_json::to_string(&config).expect("serialize");
2218 assert!(!json.contains("laneMetadata"));
2219 }
2220
2221 #[test]
2222 fn lane_result_serde_roundtrip() {
2223 let mut recorder = super::ReplayRecorder::new("trace-serde", standard_lane_config());
2224 recorder.tick();
2225 recorder.record_scheduled("ext.a", "req-1", BTreeMap::new());
2226 recorder.tick();
2227 recorder.record_completed("ext.a", "req-1", BTreeMap::new());
2228
2229 let result = recorder
2230 .finish(within_budget_observation())
2231 .expect("finish");
2232 let json = serde_json::to_string(&result).expect("serialize");
2233 let roundtrip: super::ReplayLaneResult = serde_json::from_str(&json).expect("deserialize");
2234 assert_eq!(result, roundtrip);
2235 }
2236
2237 #[test]
2238 fn comparison_result_serde_roundtrip() {
2239 let comparison = super::ReplayComparisonResult {
2240 reference_trace_id: "ref-1".to_string(),
2241 observed_trace_id: "obs-1".to_string(),
2242 divergence: None,
2243 root_cause_hints: vec![],
2244 };
2245 let json = serde_json::to_string(&comparison).expect("serialize");
2246 let roundtrip: super::ReplayComparisonResult =
2247 serde_json::from_str(&json).expect("deserialize");
2248 assert_eq!(comparison, roundtrip);
2249 }
2250
2251 #[test]
2252 fn recorder_tick_is_monotonic() {
2253 let mut recorder = super::ReplayRecorder::new("trace-tick", standard_lane_config());
2254 let t1 = recorder.tick();
2255 let t2 = recorder.tick();
2256 let t3 = recorder.tick();
2257 assert_eq!(t1, 1);
2258 assert_eq!(t2, 2);
2259 assert_eq!(t3, 3);
2260 }
2261
2262 mod proptest_extension_replay {
2265 use super::*;
2266 use proptest::prelude::*;
2267
2268 fn arb_event_kind() -> impl Strategy<Value = ReplayEventKind> {
2269 prop::sample::select(vec![
2270 ReplayEventKind::Scheduled,
2271 ReplayEventKind::QueueAccepted,
2272 ReplayEventKind::PolicyDecision,
2273 ReplayEventKind::Completed,
2274 ReplayEventKind::Failed,
2275 ])
2276 }
2277
2278 fn arb_ext_id() -> impl Strategy<Value = String> {
2279 "ext\\.[a-z]{1,5}"
2280 }
2281
2282 fn arb_req_id() -> impl Strategy<Value = String> {
2283 "req-[0-9]{1,4}"
2284 }
2285
2286 fn arb_simple_draft() -> impl Strategy<Value = ReplayEventDraft> {
2287 (1..100u64, arb_ext_id(), arb_req_id(), arb_event_kind())
2288 .prop_map(|(clock, ext, req, kind)| ReplayEventDraft::new(clock, ext, req, kind))
2289 }
2290
2291 proptest! {
2292 #[test]
2293 fn compute_overhead_per_mille_zero_when_captured_leq_baseline(
2294 baseline in 1..10_000u64,
2295 captured in 0..10_000u64,
2296 ) {
2297 if captured <= baseline {
2298 let result = super::super::compute_overhead_per_mille(baseline, captured);
2299 assert_eq!(
2300 result, 0,
2301 "captured <= baseline should yield 0 overhead"
2302 );
2303 }
2304 }
2305
2306 #[test]
2307 fn compute_overhead_per_mille_zero_baseline_returns_max(
2308 captured in 0..10_000u64,
2309 ) {
2310 let result = super::super::compute_overhead_per_mille(0, captured);
2311 assert_eq!(
2312 result, u32::MAX,
2313 "zero baseline should always be treated as invalid telemetry"
2314 );
2315 }
2316
2317 #[test]
2318 fn compute_overhead_per_mille_is_non_negative(
2319 baseline in 0..10_000u64,
2320 captured in 0..10_000u64,
2321 ) {
2322 let result = super::super::compute_overhead_per_mille(baseline, captured);
2323 let _ = result;
2325 }
2326
2327 #[test]
2328 fn builder_produces_contiguous_sequences(
2329 drafts in prop::collection::vec(arb_simple_draft(), 0..10),
2330 ) {
2331 let mut builder = ReplayTraceBuilder::new("trace-prop");
2332 for d in drafts {
2333 builder.push(d);
2334 }
2335 let bundle = builder.build().expect("build should succeed");
2336 for (idx, event) in bundle.events.iter().enumerate() {
2337 assert_eq!(
2338 event.seq,
2339 (idx + 1) as u64,
2340 "sequence should be 1-based contiguous"
2341 );
2342 }
2343 }
2344
2345 #[test]
2346 fn builder_is_deterministic_regardless_of_push_order(
2347 drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2348 ) {
2349 let mut builder1 = ReplayTraceBuilder::new("trace-det");
2350 for d in &drafts {
2351 builder1.push(d.clone());
2352 }
2353 let bundle1 = builder1.build().expect("build1");
2354
2355 let mut reversed = drafts;
2356 reversed.reverse();
2357 let mut builder2 = ReplayTraceBuilder::new("trace-det");
2358 for d in &reversed {
2359 builder2.push(d.clone());
2360 }
2361 let bundle2 = builder2.build().expect("build2");
2362
2363 assert_eq!(
2364 bundle1, bundle2,
2365 "canonical ordering should be same regardless of push order"
2366 );
2367 }
2368
2369 #[test]
2370 fn identical_bundles_have_no_divergence(
2371 drafts in prop::collection::vec(arb_simple_draft(), 0..8),
2372 ) {
2373 let mut builder = ReplayTraceBuilder::new("trace-id");
2374 for d in &drafts {
2375 builder.push(d.clone());
2376 }
2377 let bundle = builder.build().expect("build");
2378 let divergence = first_divergence(&bundle, &bundle)
2379 .expect("comparison should succeed");
2380 assert!(
2381 divergence.is_none(),
2382 "identical bundles should have no divergence"
2383 );
2384 }
2385
2386 #[test]
2387 fn json_roundtrip_preserves_bundle(
2388 drafts in prop::collection::vec(arb_simple_draft(), 0..6),
2389 ) {
2390 let mut builder = ReplayTraceBuilder::new("trace-rt");
2391 for d in drafts {
2392 builder.push(d);
2393 }
2394 let bundle = builder.build().expect("build");
2395 let json = bundle.encode_json().expect("encode");
2396 let decoded = ReplayTraceBundle::decode_json(&json).expect("decode");
2397 assert_eq!(bundle, decoded, "JSON roundtrip should preserve bundle");
2398 }
2399
2400 #[test]
2401 fn capture_gate_disabled_config_always_rejects(
2402 baseline in 1..10_000u64,
2403 captured in 1..10_000u64,
2404 trace_bytes in 0..10_000u64,
2405 max_overhead in 0..1_000u32,
2406 max_bytes in 0..10_000u64,
2407 ) {
2408 let budget = ReplayCaptureBudget {
2409 capture_enabled: false,
2410 max_overhead_per_mille: max_overhead,
2411 max_trace_bytes: max_bytes,
2412 };
2413 let observation = ReplayCaptureObservation {
2414 baseline_micros: baseline,
2415 captured_micros: captured,
2416 trace_bytes,
2417 };
2418 let report = evaluate_replay_capture_gate(budget, observation);
2419 assert!(
2420 !report.capture_allowed,
2421 "disabled config should always reject"
2422 );
2423 assert_eq!(report.reason, ReplayCaptureGateReason::DisabledByConfig);
2424 }
2425
2426 #[test]
2427 fn capture_gate_is_deterministic(
2428 baseline in 0..5_000u64,
2429 captured in 0..5_000u64,
2430 trace_bytes in 0..5_000u64,
2431 enabled in any::<bool>(),
2432 max_overhead in 0..500u32,
2433 max_bytes in 0..5_000u64,
2434 ) {
2435 let budget = ReplayCaptureBudget {
2436 capture_enabled: enabled,
2437 max_overhead_per_mille: max_overhead,
2438 max_trace_bytes: max_bytes,
2439 };
2440 let observation = ReplayCaptureObservation {
2441 baseline_micros: baseline,
2442 captured_micros: captured,
2443 trace_bytes,
2444 };
2445 let r1 = evaluate_replay_capture_gate(budget, observation);
2446 let r2 = evaluate_replay_capture_gate(budget, observation);
2447 assert_eq!(r1, r2, "capture gate must be deterministic");
2448 }
2449
2450 #[test]
2451 fn event_kind_canonical_rank_all_distinct(
2452 a_idx in 0..7usize,
2453 b_idx in 0..7usize,
2454 ) {
2455 let kinds = [
2456 ReplayEventKind::Scheduled,
2457 ReplayEventKind::QueueAccepted,
2458 ReplayEventKind::PolicyDecision,
2459 ReplayEventKind::Cancelled,
2460 ReplayEventKind::Retried,
2461 ReplayEventKind::Completed,
2462 ReplayEventKind::Failed,
2463 ];
2464 if a_idx != b_idx {
2465 assert_ne!(
2466 kinds[a_idx].canonical_rank(),
2467 kinds[b_idx].canonical_rank(),
2468 "distinct kinds should have distinct ranks"
2469 );
2470 }
2471 }
2472
2473 #[test]
2474 fn builder_events_sorted_by_logical_clock(
2475 clocks in prop::collection::vec(0..50u64, 1..10),
2476 ) {
2477 let mut builder = ReplayTraceBuilder::new("trace-clock");
2478 for (i, clock) in clocks.iter().enumerate() {
2479 builder.push(ReplayEventDraft::new(
2480 *clock,
2481 format!("ext.{i}"),
2482 format!("req-{i}"),
2483 ReplayEventKind::Scheduled,
2484 ));
2485 }
2486 let bundle = builder.build().expect("build");
2487 for pair in bundle.events.windows(2) {
2488 assert!(
2489 pair[0].logical_clock <= pair[1].logical_clock,
2490 "events should be sorted by logical clock: {} > {}",
2491 pair[0].logical_clock,
2492 pair[1].logical_clock,
2493 );
2494 }
2495 }
2496 }
2497 }
2498}