1use std::collections::BTreeMap;
24use std::time::Instant;
25
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum RefreshPhase {
34 Scan,
36 Persist,
38 LexicalRebuild,
40 Publish,
42 Analytics,
44 Semantic,
46 Recovery,
48}
49
50impl RefreshPhase {
51 pub const ALL: &'static [RefreshPhase] = &[
53 Self::Scan,
54 Self::Persist,
55 Self::LexicalRebuild,
56 Self::Publish,
57 Self::Analytics,
58 Self::Semantic,
59 Self::Recovery,
60 ];
61
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 Self::Scan => "scan",
65 Self::Persist => "persist",
66 Self::LexicalRebuild => "lexical_rebuild",
67 Self::Publish => "publish",
68 Self::Analytics => "analytics",
69 Self::Semantic => "semantic",
70 Self::Recovery => "recovery",
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PhaseRecord {
80 pub phase: RefreshPhase,
81 pub duration_ms: u64,
83 pub items_processed: u64,
85 pub items_skipped: u64,
87 pub errors: u64,
89 pub counters: BTreeMap<String, u64>,
91 pub success: bool,
93 pub error_message: Option<String>,
95}
96
97impl PhaseRecord {
98 fn new(phase: RefreshPhase) -> Self {
99 Self {
100 phase,
101 duration_ms: 0,
102 items_processed: 0,
103 items_skipped: 0,
104 errors: 0,
105 counters: BTreeMap::new(),
106 success: true,
107 error_message: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct EquivalenceArtifacts {
117 pub conversation_count: u64,
119 pub message_count: u64,
121 pub lexical_doc_count: u64,
123 pub lexical_fingerprint: Option<String>,
125 pub semantic_manifest_fingerprint: Option<String>,
127 pub search_hit_digest: Option<String>,
129 pub peak_rss_bytes: Option<u64>,
131 pub db_size_bytes: Option<u64>,
133 pub lexical_index_size_bytes: Option<u64>,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct RefreshLedger {
145 pub version: u32,
147 pub started_at_ms: i64,
149 pub completed_at_ms: i64,
151 pub total_duration_ms: u64,
153 pub full_rebuild: bool,
155 pub corpus_family: String,
157 pub phases: Vec<PhaseRecord>,
159 pub equivalence: EquivalenceArtifacts,
161 pub tags: BTreeMap<String, String>,
163}
164
165#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
171pub struct RefreshReadinessMilestones {
172 pub time_to_lexical_ready_ms: Option<u64>,
173 pub time_to_search_ready_ms: Option<u64>,
174 pub time_to_full_settled_ms: Option<u64>,
175 pub failed_phase: Option<String>,
176 pub search_readiness_state: RefreshSearchReadinessState,
177}
178
179#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
181#[serde(rename_all = "snake_case")]
182pub enum RefreshSearchReadinessState {
183 Published,
186 #[default]
188 WaitingForPublish,
189 BlockedBeforePublish,
191 PublishFailed,
193}
194
195impl Default for RefreshLedger {
196 fn default() -> Self {
197 Self {
198 version: 1,
199 started_at_ms: 0,
200 completed_at_ms: 0,
201 total_duration_ms: 0,
202 full_rebuild: false,
203 corpus_family: "default".to_owned(),
204 phases: Vec::new(),
205 equivalence: EquivalenceArtifacts::default(),
206 tags: BTreeMap::new(),
207 }
208 }
209}
210
211impl RefreshLedger {
212 pub fn start(corpus_family: &str, full_rebuild: bool) -> LedgerBuilder {
214 LedgerBuilder::new(corpus_family, full_rebuild)
215 }
216
217 pub fn phase(&self, phase: RefreshPhase) -> Option<&PhaseRecord> {
219 self.phases.iter().find(|p| p.phase == phase)
220 }
221
222 pub fn total_items_processed(&self) -> u64 {
224 self.phases.iter().map(|p| p.items_processed).sum()
225 }
226
227 pub fn total_errors(&self) -> u64 {
229 self.phases.iter().map(|p| p.errors).sum()
230 }
231
232 pub fn all_phases_succeeded(&self) -> bool {
234 self.phases.iter().all(|p| p.success)
235 }
236
237 pub fn failed_phases(&self) -> Vec<&PhaseRecord> {
239 self.phases.iter().filter(|p| !p.success).collect()
240 }
241
242 pub fn duration_breakdown(&self) -> BTreeMap<String, u64> {
244 self.phases
245 .iter()
246 .map(|p| (p.phase.as_str().to_owned(), p.duration_ms))
247 .collect()
248 }
249
250 pub fn readiness_milestones(&self) -> RefreshReadinessMilestones {
253 RefreshReadinessMilestones {
254 time_to_lexical_ready_ms: self
255 .successful_duration_through(RefreshPhase::LexicalRebuild),
256 time_to_search_ready_ms: self.successful_duration_through(RefreshPhase::Publish),
257 time_to_full_settled_ms: self.full_settlement_duration_ms(),
258 failed_phase: self
259 .failed_phases()
260 .first()
261 .map(|phase| phase.phase.as_str().to_owned()),
262 search_readiness_state: self.search_readiness_state(),
263 }
264 }
265
266 pub fn to_json(&self) -> String {
268 serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_owned())
269 }
270
271 fn successful_duration_through(&self, target: RefreshPhase) -> Option<u64> {
272 let mut elapsed_ms = 0u64;
273 for phase in &self.phases {
274 elapsed_ms = elapsed_ms.saturating_add(phase.duration_ms);
275 if !phase.success {
276 return None;
277 }
278 if phase.phase == target {
279 return Some(elapsed_ms);
280 }
281 }
282 None
283 }
284
285 fn sum_phase_durations(&self) -> u64 {
286 self.phases
287 .iter()
288 .map(|phase| phase.duration_ms)
289 .fold(0u64, u64::saturating_add)
290 }
291
292 fn full_settlement_duration_ms(&self) -> Option<u64> {
293 (self.all_phases_succeeded()
294 && self.search_readiness_state() == RefreshSearchReadinessState::Published)
295 .then(|| {
296 if self.total_duration_ms > 0 {
297 self.total_duration_ms
298 } else {
299 self.sum_phase_durations()
300 }
301 })
302 }
303
304 fn search_readiness_state(&self) -> RefreshSearchReadinessState {
305 let mut published = false;
306
307 for phase in &self.phases {
308 if !phase.success {
309 return if phase.phase == RefreshPhase::Publish {
310 RefreshSearchReadinessState::PublishFailed
311 } else if published {
312 RefreshSearchReadinessState::Published
313 } else {
314 RefreshSearchReadinessState::BlockedBeforePublish
315 };
316 }
317 if phase.phase == RefreshPhase::Publish {
318 published = true;
319 }
320 }
321
322 if published {
323 RefreshSearchReadinessState::Published
324 } else {
325 RefreshSearchReadinessState::WaitingForPublish
326 }
327 }
328}
329
330pub struct LedgerBuilder {
334 ledger: RefreshLedger,
335 start_time: Instant,
336 current_phase: Option<(RefreshPhase, Instant)>,
337 current_record: Option<PhaseRecord>,
338}
339
340impl LedgerBuilder {
341 fn new(corpus_family: &str, full_rebuild: bool) -> Self {
342 let now = std::time::SystemTime::now()
343 .duration_since(std::time::UNIX_EPOCH)
344 .map(|d| d.as_millis() as i64)
345 .unwrap_or(0);
346
347 Self {
348 ledger: RefreshLedger {
349 started_at_ms: now,
350 full_rebuild,
351 corpus_family: corpus_family.to_owned(),
352 ..Default::default()
353 },
354 start_time: Instant::now(),
355 current_phase: None,
356 current_record: None,
357 }
358 }
359
360 pub fn begin_phase(&mut self, phase: RefreshPhase) {
362 self.end_current_phase();
363 self.current_phase = Some((phase, Instant::now()));
364 self.current_record = Some(PhaseRecord::new(phase));
365 }
366
367 pub fn record_items(&mut self, processed: u64, skipped: u64) {
369 if let Some(ref mut record) = self.current_record {
370 record.items_processed += processed;
371 record.items_skipped += skipped;
372 }
373 }
374
375 pub fn record_error(&mut self, message: &str) {
379 if let Some(ref mut record) = self.current_record {
380 record.errors += 1;
381 match &mut record.error_message {
382 Some(existing) => {
383 existing.push_str("; ");
384 existing.push_str(message);
385 }
386 None => record.error_message = Some(message.to_owned()),
387 }
388 }
389 }
390
391 pub fn record_failure(&mut self, message: &str) {
396 if let Some(ref mut record) = self.current_record {
397 record.success = false;
398 record.errors = record.errors.saturating_add(1);
399 record.error_message = Some(message.to_owned());
400 }
401 }
402
403 pub fn set_counter(&mut self, key: &str, value: u64) {
405 if let Some(ref mut record) = self.current_record {
406 record.counters.insert(key.to_owned(), value);
407 }
408 }
409
410 pub fn inc_counter(&mut self, key: &str, delta: u64) {
412 if let Some(ref mut record) = self.current_record {
413 *record.counters.entry(key.to_owned()).or_insert(0) += delta;
414 }
415 }
416
417 pub fn set_equivalence(&mut self, artifacts: EquivalenceArtifacts) {
419 self.ledger.equivalence = artifacts;
420 }
421
422 pub fn tag(&mut self, key: &str, value: &str) {
424 self.ledger.tags.insert(key.to_owned(), value.to_owned());
425 }
426
427 pub fn finish(mut self) -> RefreshLedger {
429 self.end_current_phase();
430 let now = std::time::SystemTime::now()
431 .duration_since(std::time::UNIX_EPOCH)
432 .map(|d| d.as_millis() as i64)
433 .unwrap_or(0);
434 self.ledger.completed_at_ms = now;
435 self.ledger.total_duration_ms = self.start_time.elapsed().as_millis() as u64;
436 self.ledger
437 }
438
439 fn end_current_phase(&mut self) {
440 let Some((_, phase_start)) = self.current_phase.take() else {
443 return;
444 };
445 let Some(mut record) = self.current_record.take() else {
446 return;
447 };
448 record.duration_ms = phase_start.elapsed().as_millis() as u64;
449 self.ledger.phases.push(record);
450 }
451}
452
453pub mod corpus_families {
457 pub const SMALL: &str = "small";
459 pub const MEDIUM: &str = "medium";
461 pub const LARGE: &str = "large";
463 pub const DUPLICATE_HEAVY: &str = "duplicate_heavy";
465 pub const PATHOLOGICAL: &str = "pathological";
467 pub const MIXED_AGENT: &str = "mixed_agent";
469 pub const INCREMENTAL: &str = "incremental";
471}
472
473#[derive(Debug, Clone)]
475pub struct BenchmarkCorpusConfig {
476 pub family: String,
477 pub num_conversations: usize,
478 pub messages_per_conversation: usize,
479 pub duplicate_fraction: f64,
481 pub max_message_length: usize,
483 pub agent_count: usize,
485}
486
487impl BenchmarkCorpusConfig {
488 pub fn small() -> Self {
489 Self {
490 family: corpus_families::SMALL.to_owned(),
491 num_conversations: 10,
492 messages_per_conversation: 4,
493 duplicate_fraction: 0.0,
494 max_message_length: 500,
495 agent_count: 3,
496 }
497 }
498
499 pub fn medium() -> Self {
500 Self {
501 family: corpus_families::MEDIUM.to_owned(),
502 num_conversations: 100,
503 messages_per_conversation: 5,
504 duplicate_fraction: 0.05,
505 max_message_length: 2000,
506 agent_count: 5,
507 }
508 }
509
510 pub fn large() -> Self {
511 Self {
512 family: corpus_families::LARGE.to_owned(),
513 num_conversations: 1000,
514 messages_per_conversation: 5,
515 duplicate_fraction: 0.05,
516 max_message_length: 2000,
517 agent_count: 8,
518 }
519 }
520
521 pub fn duplicate_heavy() -> Self {
522 Self {
523 family: corpus_families::DUPLICATE_HEAVY.to_owned(),
524 num_conversations: 50,
525 messages_per_conversation: 6,
526 duplicate_fraction: 0.5,
527 max_message_length: 1000,
528 agent_count: 3,
529 }
530 }
531
532 pub fn pathological() -> Self {
533 Self {
534 family: corpus_families::PATHOLOGICAL.to_owned(),
535 num_conversations: 20,
536 messages_per_conversation: 10,
537 duplicate_fraction: 0.0,
538 max_message_length: 50_000,
539 agent_count: 2,
540 }
541 }
542
543 pub fn mixed_agent() -> Self {
544 Self {
545 family: corpus_families::MIXED_AGENT.to_owned(),
546 num_conversations: 70,
547 messages_per_conversation: 4,
548 duplicate_fraction: 0.0,
549 max_message_length: 1000,
550 agent_count: 14,
551 }
552 }
553
554 pub fn incremental() -> Self {
555 Self {
556 family: corpus_families::INCREMENTAL.to_owned(),
557 num_conversations: 50,
558 messages_per_conversation: 4,
559 duplicate_fraction: 0.0,
560 max_message_length: 1000,
561 agent_count: 3,
562 }
563 }
564}
565
566#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
587pub struct RefreshThroughputProfile {
588 pub phase: RefreshPhase,
589 pub duration_ms: u64,
590 pub items_processed: u64,
591 pub items_per_second: Option<f64>,
595}
596
597#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
604pub struct RefreshPhaseShare {
605 pub phase: RefreshPhase,
606 pub duration_ms: u64,
607 pub share_pct: f64,
609}
610
611#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
620pub struct RefreshLedgerEvidence {
621 pub throughput: Vec<RefreshThroughputProfile>,
624 pub phase_share: Vec<RefreshPhaseShare>,
628 pub dominant_phase: Option<RefreshPhase>,
631 pub aggregate_items_processed: u64,
633 pub aggregate_duration_ms: u64,
637 pub aggregate_items_per_second: Option<f64>,
640}
641
642impl RefreshLedger {
643 pub fn evidence_summary(&self) -> RefreshLedgerEvidence {
647 let total_ms = self.total_duration_ms;
648 let throughput: Vec<RefreshThroughputProfile> = self
649 .phases
650 .iter()
651 .filter(|phase| phase.items_processed > 0)
652 .map(|phase| {
653 let items_per_second =
654 items_per_second_for(phase.duration_ms, phase.items_processed);
655 RefreshThroughputProfile {
656 phase: phase.phase,
657 duration_ms: phase.duration_ms,
658 items_processed: phase.items_processed,
659 items_per_second,
660 }
661 })
662 .collect();
663 let phase_share: Vec<RefreshPhaseShare> = self
664 .phases
665 .iter()
666 .map(|phase| RefreshPhaseShare {
667 phase: phase.phase,
668 duration_ms: phase.duration_ms,
669 share_pct: share_pct_for(phase.duration_ms, total_ms),
670 })
671 .collect();
672 let dominant_phase = self
673 .phases
674 .iter()
675 .max_by_key(|phase| phase.duration_ms)
676 .filter(|phase| phase.duration_ms > 0)
677 .map(|phase| phase.phase);
678 let aggregate_items_processed = self.total_items_processed();
679 let aggregate_items_per_second = items_per_second_for(total_ms, aggregate_items_processed);
680 RefreshLedgerEvidence {
681 throughput,
682 phase_share,
683 dominant_phase,
684 aggregate_items_processed,
685 aggregate_duration_ms: total_ms,
686 aggregate_items_per_second,
687 }
688 }
689}
690
691fn items_per_second_for(duration_ms: u64, items: u64) -> Option<f64> {
695 if duration_ms == 0 || items == 0 {
696 return None;
697 }
698 let seconds = duration_ms as f64 / 1000.0;
699 if seconds <= 0.0 {
700 return None;
701 }
702 let raw = items as f64 / seconds;
703 Some((raw * 1000.0).round() / 1000.0)
704}
705
706fn share_pct_for(phase_ms: u64, total_ms: u64) -> f64 {
710 if total_ms == 0 || phase_ms == 0 {
711 return 0.0;
712 }
713 let raw = (phase_ms as f64 / total_ms as f64) * 100.0;
714 (raw * 100.0).round() / 100.0
715}
716
717#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
737pub struct RefreshPhaseDelta {
738 pub phase: RefreshPhase,
739 pub baseline_duration_ms: u64,
740 pub current_duration_ms: u64,
741 pub duration_delta_pct: Option<f64>,
746 pub baseline_items_processed: u64,
747 pub current_items_processed: u64,
748 pub baseline_items_per_second: Option<f64>,
749 pub current_items_per_second: Option<f64>,
750 pub throughput_delta_pct: Option<f64>,
754}
755
756#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
760pub struct RefreshLedgerEvidenceComparison {
761 pub phase_deltas: Vec<RefreshPhaseDelta>,
765 pub aggregate_duration_delta_pct: Option<f64>,
767 pub aggregate_throughput_delta_pct: Option<f64>,
769 pub dominant_phase_shift: Option<(RefreshPhase, RefreshPhase)>,
774}
775
776impl RefreshLedgerEvidence {
777 pub fn compare_to(&self, baseline: &Self) -> RefreshLedgerEvidenceComparison {
787 use std::collections::{HashMap, HashSet};
798 let mut baseline_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
799 for entry in &baseline.phase_share {
800 baseline_share_by_phase.insert(entry.phase, entry);
801 }
802 let mut current_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
803 for entry in &self.phase_share {
804 current_share_by_phase.insert(entry.phase, entry);
805 }
806 let mut baseline_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> =
807 HashMap::new();
808 for entry in &baseline.throughput {
809 baseline_by_phase.insert(entry.phase, entry);
810 }
811 let mut current_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> = HashMap::new();
812 for entry in &self.throughput {
813 current_by_phase.insert(entry.phase, entry);
814 }
815 let mut all_phases: HashSet<RefreshPhase> = HashSet::new();
819 all_phases.extend(baseline_share_by_phase.keys().copied());
820 all_phases.extend(current_share_by_phase.keys().copied());
821 all_phases.extend(baseline_by_phase.keys().copied());
822 all_phases.extend(current_by_phase.keys().copied());
823
824 let phase_deltas: Vec<RefreshPhaseDelta> = RefreshPhase::ALL
825 .iter()
826 .copied()
827 .filter(|phase| all_phases.contains(phase))
828 .map(|phase| {
829 let baseline_entry = baseline_by_phase.get(&phase).copied();
830 let current_entry = current_by_phase.get(&phase).copied();
831 let baseline_duration_ms = baseline_share_by_phase
832 .get(&phase)
833 .map(|e| e.duration_ms)
834 .or_else(|| baseline_entry.map(|e| e.duration_ms))
835 .unwrap_or(0);
836 let current_duration_ms = current_share_by_phase
837 .get(&phase)
838 .map(|e| e.duration_ms)
839 .or_else(|| current_entry.map(|e| e.duration_ms))
840 .unwrap_or(0);
841 let baseline_items_processed =
842 baseline_entry.map(|e| e.items_processed).unwrap_or(0);
843 let current_items_processed = current_entry.map(|e| e.items_processed).unwrap_or(0);
844 let baseline_items_per_second = baseline_entry.and_then(|e| e.items_per_second);
845 let current_items_per_second = current_entry.and_then(|e| e.items_per_second);
846
847 RefreshPhaseDelta {
848 phase,
849 baseline_duration_ms,
850 current_duration_ms,
851 duration_delta_pct: pct_delta(
852 baseline_duration_ms as f64,
853 current_duration_ms as f64,
854 ),
855 baseline_items_processed,
856 current_items_processed,
857 baseline_items_per_second,
858 current_items_per_second,
859 throughput_delta_pct: match (
860 baseline_items_per_second,
861 current_items_per_second,
862 ) {
863 (Some(b), Some(c)) => pct_delta(b, c),
864 _ => None,
865 },
866 }
867 })
868 .collect();
869
870 let aggregate_duration_delta_pct = pct_delta(
871 baseline.aggregate_duration_ms as f64,
872 self.aggregate_duration_ms as f64,
873 );
874 let aggregate_throughput_delta_pct = match (
875 baseline.aggregate_items_per_second,
876 self.aggregate_items_per_second,
877 ) {
878 (Some(b), Some(c)) => pct_delta(b, c),
879 _ => None,
880 };
881
882 let dominant_phase_shift = match (baseline.dominant_phase, self.dominant_phase) {
883 (Some(from), Some(to)) if from != to => Some((from, to)),
884 _ => None,
885 };
886
887 RefreshLedgerEvidenceComparison {
888 phase_deltas,
889 aggregate_duration_delta_pct,
890 aggregate_throughput_delta_pct,
891 dominant_phase_shift,
892 }
893 }
894}
895
896fn pct_delta(baseline: f64, current: f64) -> Option<f64> {
904 if !baseline.is_finite() || !current.is_finite() {
905 return None;
906 }
907 if baseline == 0.0 {
908 return None;
909 }
910 let raw = ((current - baseline) / baseline) * 100.0;
911 if !raw.is_finite() {
912 return None;
913 }
914 Some((raw * 100.0).round() / 100.0)
915}
916
917#[derive(Debug, Clone, PartialEq, Serialize)]
926pub struct RegressionVerdictThresholds {
927 pub warning_duration_pct: f64,
931 pub failure_duration_pct: f64,
936}
937
938impl RegressionVerdictThresholds {
939 pub fn defaults() -> Self {
942 Self {
943 warning_duration_pct: 15.0,
944 failure_duration_pct: 30.0,
945 }
946 }
947
948 pub fn try_new(
952 warning_duration_pct: f64,
953 failure_duration_pct: f64,
954 ) -> Result<Self, &'static str> {
955 if !warning_duration_pct.is_finite() || !failure_duration_pct.is_finite() {
956 return Err("regression thresholds must be finite f64s");
957 }
958 if warning_duration_pct < 0.0 || failure_duration_pct < 0.0 {
959 return Err("regression thresholds must be non-negative percentages");
960 }
961 if warning_duration_pct >= failure_duration_pct {
962 return Err(
963 "warning_duration_pct must be strictly less than failure_duration_pct, \
964 otherwise the warning level is unreachable",
965 );
966 }
967 Ok(Self {
968 warning_duration_pct,
969 failure_duration_pct,
970 })
971 }
972
973 fn is_valid(&self) -> bool {
974 self.warning_duration_pct.is_finite()
975 && self.failure_duration_pct.is_finite()
976 && self.warning_duration_pct >= 0.0
977 && self.failure_duration_pct >= 0.0
978 && self.warning_duration_pct < self.failure_duration_pct
979 }
980}
981
982impl<'de> Deserialize<'de> for RegressionVerdictThresholds {
983 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
984 where
985 D: serde::Deserializer<'de>,
986 {
987 #[derive(Deserialize)]
988 struct RawThresholds {
989 warning_duration_pct: f64,
990 failure_duration_pct: f64,
991 }
992
993 let raw = RawThresholds::deserialize(deserializer)?;
994 Self::try_new(raw.warning_duration_pct, raw.failure_duration_pct)
995 .map_err(serde::de::Error::custom)
996 }
997}
998
999#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1003#[serde(rename_all = "snake_case", tag = "verdict")]
1004pub enum RegressionVerdict {
1005 Clean,
1009 Warning {
1013 duration_delta_pct: f64,
1014 threshold_pct: f64,
1015 },
1016 Failure {
1019 duration_delta_pct: f64,
1020 threshold_pct: f64,
1021 },
1022}
1023
1024impl RegressionVerdict {
1025 pub fn should_fail_build(&self) -> bool {
1029 matches!(self, Self::Failure { .. })
1030 }
1031}
1032
1033impl RefreshLedgerEvidenceComparison {
1034 pub fn regression_verdict(
1052 &self,
1053 thresholds: &RegressionVerdictThresholds,
1054 ) -> RegressionVerdict {
1055 if !thresholds.is_valid() {
1056 return RegressionVerdict::Clean;
1057 }
1058 let Some(duration_pct) = self.aggregate_duration_delta_pct else {
1059 return RegressionVerdict::Clean;
1060 };
1061 if duration_pct < 0.0 {
1068 return RegressionVerdict::Clean;
1069 }
1070 if duration_pct >= thresholds.failure_duration_pct {
1071 return RegressionVerdict::Failure {
1072 duration_delta_pct: duration_pct,
1073 threshold_pct: thresholds.failure_duration_pct,
1074 };
1075 }
1076 if duration_pct >= thresholds.warning_duration_pct {
1077 return RegressionVerdict::Warning {
1078 duration_delta_pct: duration_pct,
1079 threshold_pct: thresholds.warning_duration_pct,
1080 };
1081 }
1082 RegressionVerdict::Clean
1083 }
1084}
1085
1086impl RefreshLedgerEvidenceComparison {
1087 pub fn emit_tracing_summary(&self) {
1117 let dominant_shift_str = self
1118 .dominant_phase_shift
1119 .map(|(from, to)| format!("{}->{}", from.as_str(), to.as_str()))
1120 .unwrap_or_else(|| "none".to_string());
1121 let aggregate_duration_str = self
1122 .aggregate_duration_delta_pct
1123 .map(|pct| format!("{pct:+.2}%"))
1124 .unwrap_or_else(|| "n/a".to_string());
1125 let aggregate_throughput_str = self
1126 .aggregate_throughput_delta_pct
1127 .map(|pct| format!("{pct:+.2}%"))
1128 .unwrap_or_else(|| "n/a".to_string());
1129
1130 const SLOWDOWN_WARN_THRESHOLD_PCT: f64 = 25.0;
1135 const IMPROVEMENT_INFO_THRESHOLD_PCT: f64 = -10.0;
1136 let duration_pct = self.aggregate_duration_delta_pct.unwrap_or(0.0);
1137 let phase_count = self.phase_deltas.len();
1138
1139 let aggregate_throughput_pct = self.aggregate_throughput_delta_pct.unwrap_or(0.0);
1150 macro_rules! emit_tier {
1151 ($macro:ident, $msg:literal) => {
1152 tracing::$macro!(
1153 target: "cass::indexer::lexical_refresh",
1154 aggregate_duration_delta_pct = duration_pct,
1155 aggregate_throughput_delta_pct = aggregate_throughput_pct,
1156 aggregate_duration = %aggregate_duration_str,
1157 aggregate_throughput = %aggregate_throughput_str,
1158 dominant_phase_shift = %dominant_shift_str,
1159 phase_count,
1160 $msg
1161 )
1162 };
1163 }
1164 if duration_pct >= SLOWDOWN_WARN_THRESHOLD_PCT {
1165 emit_tier!(
1166 warn,
1167 "lexical refresh evidence: significant slowdown vs previous publish"
1168 );
1169 } else if duration_pct <= IMPROVEMENT_INFO_THRESHOLD_PCT {
1170 emit_tier!(
1171 info,
1172 "lexical refresh evidence: notable improvement vs previous publish"
1173 );
1174 } else {
1175 emit_tier!(debug, "lexical refresh evidence: cross-run comparison");
1176 }
1177 }
1178}
1179
1180#[cfg(test)]
1183mod tests {
1184 use super::*;
1185
1186 #[test]
1187 fn phase_model_covers_all_phases() {
1188 assert_eq!(RefreshPhase::ALL.len(), 7);
1189 assert_eq!(RefreshPhase::ALL[0], RefreshPhase::Scan);
1190 assert_eq!(RefreshPhase::ALL[6], RefreshPhase::Recovery);
1191 }
1192
1193 #[test]
1194 fn phase_as_str_round_trips() {
1195 for phase in RefreshPhase::ALL {
1196 let s = phase.as_str();
1197 assert!(!s.is_empty(), "phase {phase:?} has empty string");
1198 }
1199 }
1200
1201 #[test]
1202 fn ledger_builder_records_phases() {
1203 let mut builder = RefreshLedger::start("small", false);
1204
1205 builder.begin_phase(RefreshPhase::Scan);
1206 builder.record_items(100, 5);
1207 builder.set_counter("connectors_scanned", 3);
1208
1209 builder.begin_phase(RefreshPhase::Persist);
1210 builder.record_items(95, 0);
1211 builder.set_counter("bytes_written", 50_000);
1212
1213 builder.begin_phase(RefreshPhase::LexicalRebuild);
1214 builder.record_items(450, 0);
1215
1216 builder.begin_phase(RefreshPhase::Publish);
1217 builder.record_items(1, 0);
1218
1219 let ledger = builder.finish();
1220
1221 assert_eq!(ledger.phases.len(), 4);
1222 assert_eq!(ledger.corpus_family, "small");
1223 assert!(!ledger.full_rebuild);
1224
1225 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1226 assert_eq!(scan.items_processed, 100);
1227 assert_eq!(scan.items_skipped, 5);
1228 assert_eq!(*scan.counters.get("connectors_scanned").unwrap(), 3);
1229
1230 let persist = ledger.phase(RefreshPhase::Persist).unwrap();
1231 assert_eq!(persist.items_processed, 95);
1232 assert_eq!(*persist.counters.get("bytes_written").unwrap(), 50_000);
1233
1234 assert!(ledger.all_phases_succeeded());
1235 assert_eq!(ledger.total_items_processed(), 100 + 95 + 450 + 1);
1236 assert!(ledger.completed_at_ms >= ledger.started_at_ms);
1237 let max_phase_duration = ledger
1238 .phases
1239 .iter()
1240 .map(|phase| phase.duration_ms)
1241 .max()
1242 .unwrap_or(0);
1243 assert!(ledger.total_duration_ms >= max_phase_duration);
1244 }
1245
1246 #[test]
1247 fn ledger_builder_records_failures() {
1248 let mut builder = RefreshLedger::start("small", false);
1249
1250 builder.begin_phase(RefreshPhase::Scan);
1251 builder.record_items(50, 0);
1252
1253 builder.begin_phase(RefreshPhase::Persist);
1254 builder.record_failure("database locked");
1255
1256 let ledger = builder.finish();
1257
1258 assert!(!ledger.all_phases_succeeded());
1259 assert_eq!(ledger.failed_phases().len(), 1);
1260 assert_eq!(ledger.failed_phases()[0].phase, RefreshPhase::Persist);
1261 assert_eq!(
1262 ledger.failed_phases()[0].error_message.as_deref(),
1263 Some("database locked")
1264 );
1265 assert_eq!(ledger.failed_phases()[0].errors, 1);
1266 assert_eq!(ledger.total_errors(), 1);
1267 }
1268
1269 #[test]
1270 fn ledger_builder_records_errors_without_failure() {
1271 let mut builder = RefreshLedger::start("medium", false);
1272
1273 builder.begin_phase(RefreshPhase::Scan);
1274 builder.record_items(90, 0);
1275 builder.record_error("connector timeout");
1276 builder.record_error("permission denied");
1277
1278 let ledger = builder.finish();
1279
1280 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1281 assert!(scan.success); assert_eq!(scan.errors, 2);
1283 let msg = scan.error_message.as_deref().unwrap();
1285 assert!(
1286 msg.contains("connector timeout"),
1287 "missing first error: {msg}"
1288 );
1289 assert!(
1290 msg.contains("permission denied"),
1291 "missing second error: {msg}"
1292 );
1293 }
1294
1295 #[test]
1296 fn ledger_equivalence_artifacts() {
1297 let mut builder = RefreshLedger::start("small", true);
1298
1299 builder.begin_phase(RefreshPhase::Scan);
1300 builder.record_items(10, 0);
1301
1302 builder.set_equivalence(EquivalenceArtifacts {
1303 conversation_count: 10,
1304 message_count: 40,
1305 lexical_doc_count: 40,
1306 lexical_fingerprint: Some("fp-abc".to_owned()),
1307 semantic_manifest_fingerprint: None,
1308 search_hit_digest: Some("sha256-xyz".to_owned()),
1309 peak_rss_bytes: Some(100_000_000),
1310 db_size_bytes: Some(5_000_000),
1311 lexical_index_size_bytes: Some(2_000_000),
1312 });
1313
1314 let ledger = builder.finish();
1315
1316 assert_eq!(ledger.equivalence.conversation_count, 10);
1317 assert_eq!(ledger.equivalence.message_count, 40);
1318 assert_eq!(
1319 ledger.equivalence.lexical_fingerprint.as_deref(),
1320 Some("fp-abc")
1321 );
1322 assert!(ledger.full_rebuild);
1323 }
1324
1325 #[test]
1326 fn ledger_duration_breakdown() {
1327 let mut builder = RefreshLedger::start("small", false);
1328
1329 builder.begin_phase(RefreshPhase::Scan);
1330 builder.begin_phase(RefreshPhase::LexicalRebuild);
1332
1333 let ledger = builder.finish();
1334
1335 let breakdown = ledger.duration_breakdown();
1336 assert!(breakdown.contains_key("scan"));
1337 assert!(breakdown.contains_key("lexical_rebuild"));
1338 }
1339
1340 #[test]
1341 fn readiness_milestones_measure_lexical_search_and_settled_times() {
1342 let ledger = RefreshLedger {
1343 total_duration_ms: 90,
1344 phases: vec![
1345 phase_record(RefreshPhase::Scan, 10, true),
1346 phase_record(RefreshPhase::Persist, 20, true),
1347 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1348 phase_record(RefreshPhase::Publish, 5, true),
1349 phase_record(RefreshPhase::Analytics, 7, true),
1350 phase_record(RefreshPhase::Semantic, 8, true),
1351 ],
1352 ..Default::default()
1353 };
1354
1355 let milestones = ledger.readiness_milestones();
1356
1357 assert_eq!(milestones.time_to_lexical_ready_ms, Some(60));
1358 assert_eq!(milestones.time_to_search_ready_ms, Some(65));
1359 assert_eq!(milestones.time_to_full_settled_ms, Some(90));
1360 assert_eq!(milestones.failed_phase, None);
1361 assert_eq!(
1362 milestones.search_readiness_state,
1363 RefreshSearchReadinessState::Published
1364 );
1365
1366 let json = serde_json::to_value(&milestones).unwrap();
1367 assert_eq!(json["time_to_lexical_ready_ms"], 60);
1368 assert_eq!(json["time_to_search_ready_ms"], 65);
1369 assert_eq!(json["time_to_full_settled_ms"], 90);
1370 assert_eq!(json["search_readiness_state"], "published");
1371 }
1372
1373 #[test]
1374 fn readiness_milestones_stop_at_first_failed_phase() {
1375 let ledger = RefreshLedger {
1376 total_duration_ms: 75,
1377 phases: vec![
1378 phase_record(RefreshPhase::Scan, 10, true),
1379 phase_record(RefreshPhase::Persist, 20, true),
1380 phase_record(RefreshPhase::LexicalRebuild, 30, false),
1381 phase_record(RefreshPhase::Publish, 5, true),
1382 ],
1383 ..Default::default()
1384 };
1385
1386 let milestones = ledger.readiness_milestones();
1387
1388 assert_eq!(milestones.time_to_lexical_ready_ms, None);
1389 assert_eq!(milestones.time_to_search_ready_ms, None);
1390 assert_eq!(milestones.time_to_full_settled_ms, None);
1391 assert_eq!(milestones.failed_phase.as_deref(), Some("lexical_rebuild"));
1392 assert_eq!(
1393 milestones.search_readiness_state,
1394 RefreshSearchReadinessState::BlockedBeforePublish
1395 );
1396 }
1397
1398 #[test]
1399 fn readiness_milestones_explain_unpublished_and_publish_failed_states() {
1400 let unpublished = RefreshLedger {
1401 phases: vec![
1402 phase_record(RefreshPhase::Scan, 10, true),
1403 phase_record(RefreshPhase::Persist, 20, true),
1404 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1405 ],
1406 ..Default::default()
1407 };
1408
1409 let unpublished_milestones = unpublished.readiness_milestones();
1410
1411 assert_eq!(unpublished_milestones.time_to_lexical_ready_ms, Some(60));
1412 assert_eq!(unpublished_milestones.time_to_search_ready_ms, None);
1413 assert_eq!(unpublished_milestones.time_to_full_settled_ms, None);
1414 assert_eq!(unpublished_milestones.failed_phase, None);
1415 assert_eq!(
1416 unpublished_milestones.search_readiness_state,
1417 RefreshSearchReadinessState::WaitingForPublish
1418 );
1419
1420 let publish_failed = RefreshLedger {
1421 phases: vec![
1422 phase_record(RefreshPhase::Scan, 10, true),
1423 phase_record(RefreshPhase::Persist, 20, true),
1424 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1425 phase_record(RefreshPhase::Publish, 5, false),
1426 ],
1427 ..Default::default()
1428 };
1429
1430 let publish_failed_milestones = publish_failed.readiness_milestones();
1431
1432 assert_eq!(publish_failed_milestones.time_to_lexical_ready_ms, Some(60));
1433 assert_eq!(publish_failed_milestones.time_to_search_ready_ms, None);
1434 assert_eq!(publish_failed_milestones.time_to_full_settled_ms, None);
1435 assert_eq!(
1436 publish_failed_milestones.failed_phase.as_deref(),
1437 Some("publish")
1438 );
1439 assert_eq!(
1440 publish_failed_milestones.search_readiness_state,
1441 RefreshSearchReadinessState::PublishFailed
1442 );
1443
1444 let post_publish_failure = RefreshLedger {
1445 phases: vec![
1446 phase_record(RefreshPhase::Scan, 10, true),
1447 phase_record(RefreshPhase::Persist, 20, true),
1448 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1449 phase_record(RefreshPhase::Publish, 5, true),
1450 phase_record(RefreshPhase::Analytics, 7, false),
1451 ],
1452 ..Default::default()
1453 };
1454
1455 let post_publish_failure_milestones = post_publish_failure.readiness_milestones();
1456
1457 assert_eq!(
1458 post_publish_failure_milestones.time_to_lexical_ready_ms,
1459 Some(60)
1460 );
1461 assert_eq!(
1462 post_publish_failure_milestones.time_to_search_ready_ms,
1463 Some(65)
1464 );
1465 assert_eq!(
1466 post_publish_failure_milestones.time_to_full_settled_ms,
1467 None
1468 );
1469 assert_eq!(
1470 post_publish_failure_milestones.failed_phase.as_deref(),
1471 Some("analytics")
1472 );
1473 assert_eq!(
1474 post_publish_failure_milestones.search_readiness_state,
1475 RefreshSearchReadinessState::Published
1476 );
1477 }
1478
1479 #[test]
1480 fn readiness_milestones_do_not_report_full_settlement_before_publish() {
1481 let empty = RefreshLedger::default().readiness_milestones();
1482
1483 assert_eq!(empty.time_to_lexical_ready_ms, None);
1484 assert_eq!(empty.time_to_search_ready_ms, None);
1485 assert_eq!(empty.time_to_full_settled_ms, None);
1486 assert_eq!(
1487 empty.search_readiness_state,
1488 RefreshSearchReadinessState::WaitingForPublish
1489 );
1490
1491 let partial = RefreshLedger {
1492 total_duration_ms: 42,
1493 phases: vec![
1494 phase_record(RefreshPhase::Scan, 10, true),
1495 phase_record(RefreshPhase::Persist, 20, true),
1496 ],
1497 ..Default::default()
1498 }
1499 .readiness_milestones();
1500
1501 assert_eq!(partial.time_to_lexical_ready_ms, None);
1502 assert_eq!(partial.time_to_search_ready_ms, None);
1503 assert_eq!(partial.time_to_full_settled_ms, None);
1504 assert_eq!(
1505 partial.search_readiness_state,
1506 RefreshSearchReadinessState::WaitingForPublish
1507 );
1508 }
1509
1510 #[test]
1511 fn ledger_tags() {
1512 let mut builder = RefreshLedger::start("medium", false);
1513 builder.tag("run_id", "bench-2026-04-01");
1514 builder.tag("machine", "csd");
1515
1516 let ledger = builder.finish();
1517
1518 assert_eq!(ledger.tags.get("run_id").unwrap(), "bench-2026-04-01");
1519 assert_eq!(ledger.tags.get("machine").unwrap(), "csd");
1520 }
1521
1522 #[test]
1523 fn ledger_json_round_trip() {
1524 let mut builder = RefreshLedger::start("duplicate_heavy", true);
1525 builder.begin_phase(RefreshPhase::Scan);
1526 builder.record_items(50, 10);
1527 builder.set_counter("duplicate_conversations", 25);
1528 builder.begin_phase(RefreshPhase::Persist);
1529 builder.record_items(40, 0);
1530
1531 builder.set_equivalence(EquivalenceArtifacts {
1532 conversation_count: 40,
1533 message_count: 200,
1534 lexical_doc_count: 200,
1535 ..Default::default()
1536 });
1537
1538 let ledger = builder.finish();
1539 let json = ledger.to_json();
1540 let deser: RefreshLedger = serde_json::from_str(&json).unwrap();
1541
1542 assert_eq!(deser.corpus_family, "duplicate_heavy");
1543 assert!(deser.full_rebuild);
1544 assert_eq!(deser.phases.len(), 2);
1545 assert_eq!(deser.equivalence.conversation_count, 40);
1546 assert_eq!(
1547 *deser.phases[0]
1548 .counters
1549 .get("duplicate_conversations")
1550 .unwrap(),
1551 25
1552 );
1553 }
1554
1555 #[test]
1556 fn ledger_inc_counter() {
1557 let mut builder = RefreshLedger::start("small", false);
1558 builder.begin_phase(RefreshPhase::Scan);
1559 builder.inc_counter("files_scanned", 10);
1560 builder.inc_counter("files_scanned", 15);
1561 builder.inc_counter("files_scanned", 5);
1562
1563 let ledger = builder.finish();
1564 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1565 assert_eq!(*scan.counters.get("files_scanned").unwrap(), 30);
1566 }
1567
1568 #[test]
1569 fn benchmark_corpus_configs_have_correct_families() {
1570 assert_eq!(BenchmarkCorpusConfig::small().family, "small");
1571 assert_eq!(BenchmarkCorpusConfig::medium().family, "medium");
1572 assert_eq!(BenchmarkCorpusConfig::large().family, "large");
1573 assert_eq!(
1574 BenchmarkCorpusConfig::duplicate_heavy().family,
1575 "duplicate_heavy"
1576 );
1577 assert_eq!(BenchmarkCorpusConfig::pathological().family, "pathological");
1578 assert_eq!(BenchmarkCorpusConfig::mixed_agent().family, "mixed_agent");
1579 assert_eq!(BenchmarkCorpusConfig::incremental().family, "incremental");
1580 }
1581
1582 #[test]
1583 fn benchmark_corpus_configs_have_reasonable_sizes() {
1584 let configs = [
1585 BenchmarkCorpusConfig::small(),
1586 BenchmarkCorpusConfig::medium(),
1587 BenchmarkCorpusConfig::large(),
1588 BenchmarkCorpusConfig::duplicate_heavy(),
1589 BenchmarkCorpusConfig::pathological(),
1590 BenchmarkCorpusConfig::mixed_agent(),
1591 BenchmarkCorpusConfig::incremental(),
1592 ];
1593 for cfg in &configs {
1594 assert!(
1595 cfg.num_conversations > 0,
1596 "{} has 0 conversations",
1597 cfg.family
1598 );
1599 assert!(
1600 cfg.messages_per_conversation > 0,
1601 "{} has 0 messages",
1602 cfg.family
1603 );
1604 assert!(cfg.agent_count > 0, "{} has 0 agents", cfg.family);
1605 assert!(
1606 cfg.duplicate_fraction >= 0.0 && cfg.duplicate_fraction <= 1.0,
1607 "{} has invalid duplicate fraction",
1608 cfg.family
1609 );
1610 }
1611 }
1612
1613 fn phase_record(phase: RefreshPhase, duration_ms: u64, success: bool) -> PhaseRecord {
1614 PhaseRecord {
1615 phase,
1616 duration_ms,
1617 items_processed: 0,
1618 items_skipped: 0,
1619 errors: u64::from(!success),
1620 counters: BTreeMap::new(),
1621 success,
1622 error_message: (!success).then(|| format!("failed {}", phase.as_str())),
1623 }
1624 }
1625
1626 fn phase_record_with_items(phase: RefreshPhase, duration_ms: u64, items: u64) -> PhaseRecord {
1627 PhaseRecord {
1628 phase,
1629 duration_ms,
1630 items_processed: items,
1631 items_skipped: 0,
1632 errors: 0,
1633 counters: BTreeMap::new(),
1634 success: true,
1635 error_message: None,
1636 }
1637 }
1638
1639 fn ledger_with(phases: Vec<PhaseRecord>) -> RefreshLedger {
1640 let total_duration_ms = phases.iter().map(|p| p.duration_ms).sum();
1641 RefreshLedger {
1642 version: 1,
1643 started_at_ms: 1_700_000_000_000,
1644 completed_at_ms: 1_700_000_000_000 + i64::try_from(total_duration_ms).unwrap_or(0),
1645 total_duration_ms,
1646 full_rebuild: true,
1647 corpus_family: "evidence-test".to_owned(),
1648 phases,
1649 equivalence: EquivalenceArtifacts::default(),
1650 tags: BTreeMap::new(),
1651 }
1652 }
1653
1654 #[test]
1660 fn evidence_summary_reports_per_phase_throughput_with_safe_zero_handling() {
1661 let ledger = ledger_with(vec![
1665 phase_record_with_items(RefreshPhase::Scan, 500, 1000),
1666 phase_record_with_items(RefreshPhase::Persist, 1000, 2000),
1667 phase_record_with_items(RefreshPhase::LexicalRebuild, 100, 0),
1668 phase_record_with_items(RefreshPhase::Recovery, 0, 0),
1669 ]);
1670
1671 let evidence = ledger.evidence_summary();
1672
1673 assert_eq!(
1676 evidence.throughput.len(),
1677 2,
1678 "throughput must skip zero-item phases; got {:?}",
1679 evidence.throughput
1680 );
1681
1682 let scan = evidence
1684 .throughput
1685 .iter()
1686 .find(|t| t.phase == RefreshPhase::Scan)
1687 .expect("scan throughput present");
1688 assert_eq!(scan.items_per_second, Some(2000.0));
1689 assert_eq!(scan.duration_ms, 500);
1690 assert_eq!(scan.items_processed, 1000);
1691
1692 let persist = evidence
1694 .throughput
1695 .iter()
1696 .find(|t| t.phase == RefreshPhase::Persist)
1697 .expect("persist throughput present");
1698 assert_eq!(persist.items_per_second, Some(2000.0));
1699
1700 assert_eq!(evidence.aggregate_items_processed, 3000);
1702 assert_eq!(evidence.aggregate_duration_ms, 1600);
1703 assert_eq!(evidence.aggregate_items_per_second, Some(1875.0));
1704 }
1705
1706 #[test]
1710 fn evidence_summary_handles_empty_and_zero_duration_ledgers() {
1711 let empty = ledger_with(Vec::new());
1713 let empty_evidence = empty.evidence_summary();
1714 assert!(empty_evidence.throughput.is_empty());
1715 assert!(empty_evidence.phase_share.is_empty());
1716 assert_eq!(empty_evidence.dominant_phase, None);
1717 assert_eq!(empty_evidence.aggregate_items_per_second, None);
1718 assert_eq!(empty_evidence.aggregate_duration_ms, 0);
1719
1720 let instant = ledger_with(vec![
1722 phase_record_with_items(RefreshPhase::Scan, 0, 5),
1723 phase_record_with_items(RefreshPhase::Persist, 0, 5),
1724 ]);
1725 let instant_evidence = instant.evidence_summary();
1726 for t in &instant_evidence.throughput {
1728 assert_eq!(t.items_per_second, None, "zero duration must yield None");
1729 }
1730 assert_eq!(instant_evidence.dominant_phase, None);
1732 for share in &instant_evidence.phase_share {
1734 assert_eq!(share.share_pct, 0.0);
1735 assert!(!share.share_pct.is_nan(), "share_pct must never be NaN");
1736 }
1737 }
1738
1739 #[test]
1744 fn evidence_summary_phase_share_sums_to_one_hundred_and_dominant_phase_picks_max() {
1745 let ledger = ledger_with(vec![
1746 phase_record_with_items(RefreshPhase::Scan, 200, 100),
1747 phase_record_with_items(RefreshPhase::Persist, 600, 1500), phase_record_with_items(RefreshPhase::LexicalRebuild, 200, 1500),
1749 ]);
1750 let evidence = ledger.evidence_summary();
1751
1752 let total_share: f64 = evidence.phase_share.iter().map(|s| s.share_pct).sum();
1753 assert!(
1754 (total_share - 100.0).abs() <= 0.05,
1755 "phase shares must sum to ~100.0 (±0.05 for rounding); got {total_share}"
1756 );
1757
1758 let persist_share = evidence
1760 .phase_share
1761 .iter()
1762 .find(|s| s.phase == RefreshPhase::Persist)
1763 .expect("persist share present");
1764 assert_eq!(persist_share.share_pct, 60.0);
1765
1766 assert_eq!(evidence.dominant_phase, Some(RefreshPhase::Persist));
1768 }
1769
1770 #[test]
1775 fn evidence_summary_dominant_phase_tie_break_is_first_in_pipeline_order() {
1776 let ledger = ledger_with(vec![
1777 phase_record_with_items(RefreshPhase::Scan, 500, 1),
1778 phase_record_with_items(RefreshPhase::Persist, 500, 1),
1779 phase_record_with_items(RefreshPhase::LexicalRebuild, 500, 1),
1780 ]);
1781 let evidence = ledger.evidence_summary();
1782 assert_eq!(
1788 evidence.dominant_phase,
1789 Some(RefreshPhase::LexicalRebuild),
1790 "tie-break: max_by_key returns the LAST phase at max duration"
1791 );
1792 }
1793
1794 #[test]
1799 fn evidence_summary_serializes_to_stable_json_field_set() {
1800 let ledger = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)]);
1801 let evidence = ledger.evidence_summary();
1802 let json = serde_json::to_string(&evidence).expect("serialize");
1803 for required_field in [
1804 "\"throughput\"",
1805 "\"phase_share\"",
1806 "\"dominant_phase\"",
1807 "\"aggregate_items_processed\"",
1808 "\"aggregate_duration_ms\"",
1809 "\"aggregate_items_per_second\"",
1810 ] {
1811 assert!(
1812 json.contains(required_field),
1813 "evidence JSON missing field {required_field}; got: {json}"
1814 );
1815 }
1816 let parsed: serde_json::Value = serde_json::from_str(&json).expect("parse");
1820 assert_eq!(parsed["aggregate_items_processed"], 50);
1821 assert_eq!(parsed["aggregate_duration_ms"], 100);
1822 assert_eq!(parsed["aggregate_items_per_second"], 500.0);
1823 assert_eq!(parsed["dominant_phase"], "scan");
1824 }
1825
1826 #[test]
1835 fn evidence_compare_to_reports_per_phase_regressions_and_improvements() {
1836 let baseline = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
1838 .evidence_summary();
1839 let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 200, 100)])
1842 .evidence_summary();
1843
1844 let cmp = current.compare_to(&baseline);
1845
1846 assert_eq!(cmp.phase_deltas.len(), 1);
1847 let scan = &cmp.phase_deltas[0];
1848 assert_eq!(scan.phase, RefreshPhase::Scan);
1849 assert_eq!(scan.duration_delta_pct, Some(100.0));
1851 assert_eq!(scan.throughput_delta_pct, Some(-50.0));
1853 assert_eq!(cmp.aggregate_duration_delta_pct, Some(100.0));
1855 assert_eq!(cmp.aggregate_throughput_delta_pct, Some(-50.0));
1856 assert_eq!(cmp.dominant_phase_shift, None);
1858
1859 let cmp_improved = baseline.compare_to(¤t);
1861 let scan = &cmp_improved.phase_deltas[0];
1862 assert_eq!(scan.duration_delta_pct, Some(-50.0));
1864 assert_eq!(scan.throughput_delta_pct, Some(100.0));
1866 }
1867
1868 #[test]
1874 fn evidence_compare_to_surfaces_phases_unique_to_one_side() {
1875 let baseline = ledger_with(vec![
1876 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1877 phase_record_with_items(RefreshPhase::Persist, 50, 200),
1878 ])
1879 .evidence_summary();
1880 let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
1883 .evidence_summary();
1884
1885 let cmp = current.compare_to(&baseline);
1886
1887 let phases: Vec<RefreshPhase> = cmp.phase_deltas.iter().map(|d| d.phase).collect();
1888 assert!(
1889 phases.contains(&RefreshPhase::Scan),
1890 "Scan ran in both sides; must appear in comparison; got phases {phases:?}"
1891 );
1892 assert!(
1893 phases.contains(&RefreshPhase::Persist),
1894 "Persist is missing from current but ran in baseline — comparison MUST \
1895 surface it so caller can investigate; got phases {phases:?}"
1896 );
1897
1898 let persist = cmp
1902 .phase_deltas
1903 .iter()
1904 .find(|d| d.phase == RefreshPhase::Persist)
1905 .expect("Persist delta present");
1906 assert_eq!(persist.baseline_duration_ms, 50);
1907 assert_eq!(persist.current_duration_ms, 0);
1908 assert_eq!(
1909 persist.duration_delta_pct,
1910 Some(-100.0),
1911 "phase disappearing from current must surface as -100% duration delta; \
1912 got {persist:?}"
1913 );
1914 }
1915
1916 #[test]
1921 fn evidence_compare_to_retains_zero_item_phases_with_duration() {
1922 let baseline = ledger_with(vec![
1923 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1924 phase_record_with_items(RefreshPhase::Publish, 40, 0),
1925 ])
1926 .evidence_summary();
1927 let current = ledger_with(vec![
1928 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1929 phase_record_with_items(RefreshPhase::Publish, 80, 0),
1930 ])
1931 .evidence_summary();
1932
1933 assert!(
1934 baseline
1935 .throughput
1936 .iter()
1937 .all(|entry| entry.phase != RefreshPhase::Publish),
1938 "zero-item Publish must stay out of throughput: {:?}",
1939 baseline.throughput
1940 );
1941
1942 let cmp = current.compare_to(&baseline);
1943 let publish = cmp
1944 .phase_deltas
1945 .iter()
1946 .find(|delta| delta.phase == RefreshPhase::Publish)
1947 .expect("zero-item Publish phase must remain in comparison");
1948
1949 assert_eq!(publish.baseline_duration_ms, 40);
1950 assert_eq!(publish.current_duration_ms, 80);
1951 assert_eq!(publish.duration_delta_pct, Some(100.0));
1952 assert_eq!(publish.baseline_items_processed, 0);
1953 assert_eq!(publish.current_items_processed, 0);
1954 assert_eq!(publish.baseline_items_per_second, None);
1955 assert_eq!(publish.current_items_per_second, None);
1956 assert_eq!(publish.throughput_delta_pct, None);
1957 }
1958
1959 #[test]
1965 fn evidence_compare_to_reports_dominant_phase_shift() {
1966 let baseline = ledger_with(vec![
1968 phase_record_with_items(RefreshPhase::Scan, 800, 100),
1969 phase_record_with_items(RefreshPhase::Persist, 200, 100),
1970 ])
1971 .evidence_summary();
1972 let current = ledger_with(vec![
1974 phase_record_with_items(RefreshPhase::Scan, 200, 100),
1975 phase_record_with_items(RefreshPhase::Persist, 800, 100),
1976 ])
1977 .evidence_summary();
1978 assert_eq!(baseline.dominant_phase, Some(RefreshPhase::Scan));
1982 assert_eq!(current.dominant_phase, Some(RefreshPhase::Persist));
1983
1984 let cmp = current.compare_to(&baseline);
1985
1986 assert_eq!(
1987 cmp.dominant_phase_shift,
1988 Some((RefreshPhase::Scan, RefreshPhase::Persist)),
1989 "dominant phase shifted Scan→Persist; comparison must surface this; got {cmp:?}"
1990 );
1991
1992 let same_dom = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
1994 .evidence_summary();
1995 let cmp_same = same_dom.compare_to(&same_dom);
1996 assert_eq!(cmp_same.dominant_phase_shift, None);
1997 }
1998
1999 #[test]
2005 fn evidence_compare_to_safely_handles_zero_baseline_and_empty_evidence() {
2006 let empty = ledger_with(Vec::new()).evidence_summary();
2007 let normal = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)])
2008 .evidence_summary();
2009
2010 let against_empty = normal.compare_to(&empty);
2013 assert!(
2014 against_empty
2015 .phase_deltas
2016 .iter()
2017 .all(|d| d.duration_delta_pct.is_none() || d.baseline_duration_ms == 0),
2018 "phases with zero-baseline duration must report None for duration_delta_pct"
2019 );
2020 assert_eq!(against_empty.aggregate_duration_delta_pct, None);
2021 assert_eq!(against_empty.aggregate_throughput_delta_pct, None);
2022
2023 let against_self = empty.compare_to(&empty);
2025 assert!(against_self.phase_deltas.is_empty());
2026 assert_eq!(against_self.aggregate_duration_delta_pct, None);
2027
2028 let json = serde_json::to_string(&against_empty).expect("serialize");
2031 assert!(
2032 !json.contains("NaN"),
2033 "comparison JSON must not contain NaN; got {json}"
2034 );
2035 assert!(
2036 !json.contains("Infinity"),
2037 "comparison JSON must not contain Infinity"
2038 );
2039 }
2040
2041 #[test]
2049 fn evidence_comparison_emit_tracing_summary_uses_correct_severity_tier() {
2050 use std::sync::{Arc, Mutex};
2051 use tracing::field::{Field, Visit};
2052 use tracing::{Event, Subscriber};
2053 use tracing_subscriber::Registry;
2054 use tracing_subscriber::layer::{Context, Layer, SubscriberExt};
2055
2056 #[derive(Debug, Clone, Default)]
2057 struct CapturedEvent {
2058 level: String,
2059 message: String,
2060 }
2061
2062 #[derive(Clone, Default)]
2063 struct LevelCollector {
2064 events: Arc<Mutex<Vec<CapturedEvent>>>,
2065 }
2066
2067 impl<S: Subscriber> Layer<S> for LevelCollector {
2068 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
2069 if event.metadata().target() != "cass::indexer::lexical_refresh" {
2070 return;
2071 }
2072 let mut visitor = MessageVisitor::default();
2073 event.record(&mut visitor);
2074 self.events
2075 .lock()
2076 .expect("collector lock")
2077 .push(CapturedEvent {
2078 level: event.metadata().level().to_string(),
2079 message: visitor.message,
2080 });
2081 }
2082 }
2083
2084 #[derive(Default)]
2085 struct MessageVisitor {
2086 message: String,
2087 }
2088 impl Visit for MessageVisitor {
2089 fn record_str(&mut self, _field: &Field, _value: &str) {}
2090 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
2091 if field.name() == "message" {
2092 self.message = format!("{:?}", value).trim_matches('"').to_string();
2093 }
2094 }
2095 }
2096
2097 fn comparison_with_duration_pct(pct: f64) -> RefreshLedgerEvidenceComparison {
2101 RefreshLedgerEvidenceComparison {
2102 phase_deltas: Vec::new(),
2103 aggregate_duration_delta_pct: Some(pct),
2104 aggregate_throughput_delta_pct: None,
2105 dominant_phase_shift: None,
2106 }
2107 }
2108
2109 let collector = LevelCollector::default();
2111 let subscriber = Registry::default().with(collector.clone());
2112 tracing::subscriber::with_default(subscriber, || {
2113 comparison_with_duration_pct(50.0).emit_tracing_summary();
2114 });
2115 let evs = collector.events.lock().expect("lock").clone();
2116 assert_eq!(
2117 evs.len(),
2118 1,
2119 "exactly one event per emit_tracing_summary call"
2120 );
2121 assert_eq!(
2122 evs[0].level, "WARN",
2123 "+50% slowdown must be warn; got {evs:?}"
2124 );
2125 assert!(
2126 evs[0].message.contains("significant slowdown"),
2127 "warn message must name the slowdown; got {:?}",
2128 evs[0].message
2129 );
2130
2131 let collector = LevelCollector::default();
2133 let subscriber = Registry::default().with(collector.clone());
2134 tracing::subscriber::with_default(subscriber, || {
2135 comparison_with_duration_pct(-25.0).emit_tracing_summary();
2136 });
2137 let evs = collector.events.lock().expect("lock").clone();
2138 assert_eq!(
2139 evs[0].level, "INFO",
2140 "-25% improvement must be info; got {evs:?}"
2141 );
2142 assert!(
2143 evs[0].message.contains("notable improvement"),
2144 "info message must name the improvement; got {:?}",
2145 evs[0].message
2146 );
2147
2148 let collector = LevelCollector::default();
2150 let subscriber = Registry::default().with(collector.clone());
2151 tracing::subscriber::with_default(subscriber, || {
2152 comparison_with_duration_pct(5.0).emit_tracing_summary();
2153 });
2154 let evs = collector.events.lock().expect("lock").clone();
2155 assert_eq!(
2156 evs[0].level, "DEBUG",
2157 "+5% within steady-state must be debug; got {evs:?}"
2158 );
2159 assert!(
2160 evs[0].message.contains("cross-run comparison"),
2161 "debug message must use the steady-state phrasing; got {:?}",
2162 evs[0].message
2163 );
2164
2165 let collector = LevelCollector::default();
2167 let subscriber = Registry::default().with(collector.clone());
2168 tracing::subscriber::with_default(subscriber, || {
2169 comparison_with_duration_pct(25.0).emit_tracing_summary();
2170 });
2171 let evs = collector.events.lock().expect("lock").clone();
2172 assert_eq!(
2173 evs[0].level, "WARN",
2174 "exactly +25% must be warn (inclusive threshold); got {evs:?}"
2175 );
2176
2177 let collector = LevelCollector::default();
2179 let subscriber = Registry::default().with(collector.clone());
2180 tracing::subscriber::with_default(subscriber, || {
2181 comparison_with_duration_pct(-10.0).emit_tracing_summary();
2182 });
2183 let evs = collector.events.lock().expect("lock").clone();
2184 assert_eq!(
2185 evs[0].level, "INFO",
2186 "exactly -10% must be info (inclusive threshold); got {evs:?}"
2187 );
2188
2189 let collector = LevelCollector::default();
2192 let subscriber = Registry::default().with(collector.clone());
2193 tracing::subscriber::with_default(subscriber, || {
2194 RefreshLedgerEvidenceComparison {
2195 phase_deltas: Vec::new(),
2196 aggregate_duration_delta_pct: None,
2197 aggregate_throughput_delta_pct: None,
2198 dominant_phase_shift: None,
2199 }
2200 .emit_tracing_summary();
2201 });
2202 let evs = collector.events.lock().expect("lock").clone();
2203 assert_eq!(
2204 evs[0].level, "DEBUG",
2205 "None duration delta defaults to steady-state (debug); got {evs:?}"
2206 );
2207 }
2208
2209 #[test]
2217 fn regression_verdict_categorizes_each_band_and_handles_degenerate_cases() {
2218 let thresholds = RegressionVerdictThresholds::defaults();
2219 assert_eq!(thresholds.warning_duration_pct, 15.0);
2220 assert_eq!(thresholds.failure_duration_pct, 30.0);
2221
2222 fn comparison_with_pct(pct: Option<f64>) -> RefreshLedgerEvidenceComparison {
2224 RefreshLedgerEvidenceComparison {
2225 phase_deltas: Vec::new(),
2226 aggregate_duration_delta_pct: pct,
2227 aggregate_throughput_delta_pct: None,
2228 dominant_phase_shift: None,
2229 }
2230 }
2231
2232 let clean = comparison_with_pct(Some(10.0)).regression_verdict(&thresholds);
2235 assert_eq!(clean, RegressionVerdict::Clean);
2236 assert!(!clean.should_fail_build());
2237
2238 let warn_at = comparison_with_pct(Some(15.0)).regression_verdict(&thresholds);
2241 assert!(
2242 matches!(
2243 warn_at,
2244 RegressionVerdict::Warning { duration_delta_pct, threshold_pct }
2245 if (duration_delta_pct - 15.0).abs() < 0.01 && threshold_pct == 15.0
2246 ),
2247 "+15% must trigger warn at the inclusive threshold; got {warn_at:?}"
2248 );
2249 assert!(!warn_at.should_fail_build());
2250
2251 let warn_mid = comparison_with_pct(Some(22.5)).regression_verdict(&thresholds);
2253 assert!(matches!(warn_mid, RegressionVerdict::Warning { .. }));
2254 assert!(!warn_mid.should_fail_build());
2255
2256 let fail_at = comparison_with_pct(Some(30.0)).regression_verdict(&thresholds);
2259 assert!(
2260 matches!(
2261 fail_at,
2262 RegressionVerdict::Failure { duration_delta_pct, threshold_pct }
2263 if (duration_delta_pct - 30.0).abs() < 0.01 && threshold_pct == 30.0
2264 ),
2265 "+30% must trigger failure at the inclusive threshold; got {fail_at:?}"
2266 );
2267 assert!(
2268 fail_at.should_fail_build(),
2269 "Failure verdict MUST cause CI to exit non-zero"
2270 );
2271
2272 let fail_far = comparison_with_pct(Some(150.0)).regression_verdict(&thresholds);
2274 assert!(matches!(fail_far, RegressionVerdict::Failure { .. }));
2275
2276 let improvement = comparison_with_pct(Some(-50.0)).regression_verdict(&thresholds);
2278 assert_eq!(
2279 improvement,
2280 RegressionVerdict::Clean,
2281 "improvements (negative duration delta) MUST NOT trigger regression verdicts; \
2282 got {improvement:?}"
2283 );
2284
2285 let no_data = comparison_with_pct(None).regression_verdict(&thresholds);
2287 assert_eq!(
2288 no_data,
2289 RegressionVerdict::Clean,
2290 "missing comparison data MUST NOT cause a CI failure (no signal to gate on)"
2291 );
2292
2293 let invalid_negative = RegressionVerdictThresholds {
2294 warning_duration_pct: -20.0,
2295 failure_duration_pct: -10.0,
2296 };
2297 let steady_state = comparison_with_pct(Some(0.0)).regression_verdict(&invalid_negative);
2298 assert_eq!(
2299 steady_state,
2300 RegressionVerdict::Clean,
2301 "invalid negative thresholds must fail open instead of turning a 0% \
2302 steady-state comparison into a CI failure"
2303 );
2304 }
2305
2306 #[test]
2313 fn regression_verdict_thresholds_try_new_rejects_inconsistent_configurations() {
2314 assert!(RegressionVerdictThresholds::try_new(10.0, 20.0).is_ok());
2316
2317 let err = RegressionVerdictThresholds::try_new(20.0, 10.0)
2319 .expect_err("warning > failure must be rejected");
2320 assert!(
2321 err.contains("strictly less"),
2322 "rejection message must explain the constraint; got {err:?}"
2323 );
2324
2325 let err_eq = RegressionVerdictThresholds::try_new(15.0, 15.0)
2327 .expect_err("warning == failure must be rejected");
2328 assert!(err_eq.contains("strictly less"));
2329
2330 let negative_warning = RegressionVerdictThresholds::try_new(-20.0, 10.0)
2333 .expect_err("negative warning threshold must be rejected");
2334 assert!(negative_warning.contains("non-negative"));
2335 let negative_failure = RegressionVerdictThresholds::try_new(10.0, -20.0)
2336 .expect_err("negative failure threshold must be rejected");
2337 assert!(negative_failure.contains("non-negative"));
2338 let invalid_json = r#"{"warning_duration_pct":-30.0,"failure_duration_pct":-10.0}"#;
2339 let deser = serde_json::from_str::<RegressionVerdictThresholds>(invalid_json)
2340 .expect_err("serde-loaded negative thresholds must be rejected too");
2341 assert!(
2342 deser.to_string().contains("non-negative"),
2343 "serde validation error must explain the threshold polarity; got {deser}"
2344 );
2345
2346 assert!(RegressionVerdictThresholds::try_new(f64::NAN, 30.0).is_err());
2349 assert!(RegressionVerdictThresholds::try_new(15.0, f64::INFINITY).is_err());
2350 }
2351
2352 #[test]
2362 fn regression_verdict_zero_change_under_valid_custom_thresholds_is_clean() {
2363 fn zero_delta_comparison() -> RefreshLedgerEvidenceComparison {
2364 RefreshLedgerEvidenceComparison {
2365 phase_deltas: Vec::new(),
2366 aggregate_duration_delta_pct: Some(0.0),
2367 aggregate_throughput_delta_pct: None,
2368 dominant_phase_shift: None,
2369 }
2370 }
2371
2372 let strict = RegressionVerdictThresholds::try_new(5.0, 20.0)
2375 .expect("valid strict thresholds must construct");
2376 let steady_state = zero_delta_comparison().regression_verdict(&strict);
2377 assert_eq!(
2378 steady_state,
2379 RegressionVerdict::Clean,
2380 "0% steady-state delta must be Clean under any valid \
2381 threshold pair — tight CI profiles must not flag no-op runs"
2382 );
2383
2384 let loose = RegressionVerdictThresholds::try_new(50.0, 200.0)
2388 .expect("valid loose thresholds must construct");
2389 let steady_state_loose = zero_delta_comparison().regression_verdict(&loose);
2390 assert_eq!(
2391 steady_state_loose,
2392 RegressionVerdict::Clean,
2393 "0% steady-state delta must be Clean under loose thresholds too"
2394 );
2395 }
2396
2397 #[test]
2403 fn regression_verdict_serializes_with_snake_case_verdict_tag() {
2404 let clean_json = serde_json::to_string(&RegressionVerdict::Clean).expect("serialize");
2405 assert!(
2406 clean_json.contains("\"verdict\":\"clean\""),
2407 "Clean must serialize with snake_case `verdict` tag; got {clean_json}"
2408 );
2409
2410 let warning_json = serde_json::to_string(&RegressionVerdict::Warning {
2411 duration_delta_pct: 18.5,
2412 threshold_pct: 15.0,
2413 })
2414 .expect("serialize");
2415 assert!(warning_json.contains("\"verdict\":\"warning\""));
2416 assert!(warning_json.contains("\"duration_delta_pct\":18.5"));
2417 assert!(warning_json.contains("\"threshold_pct\":15"));
2418
2419 let failure_json = serde_json::to_string(&RegressionVerdict::Failure {
2420 duration_delta_pct: 42.0,
2421 threshold_pct: 30.0,
2422 })
2423 .expect("serialize");
2424 assert!(failure_json.contains("\"verdict\":\"failure\""));
2425 }
2426}