1use std::collections::BTreeMap;
24use std::time::Instant;
25
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum RefreshPhase {
34 Scan,
36 Persist,
38 LexicalRebuild,
40 Publish,
42 Analytics,
44 Semantic,
46 Recovery,
48}
49
50impl RefreshPhase {
51 pub const ALL: &'static [RefreshPhase] = &[
53 Self::Scan,
54 Self::Persist,
55 Self::LexicalRebuild,
56 Self::Publish,
57 Self::Analytics,
58 Self::Semantic,
59 Self::Recovery,
60 ];
61
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 Self::Scan => "scan",
65 Self::Persist => "persist",
66 Self::LexicalRebuild => "lexical_rebuild",
67 Self::Publish => "publish",
68 Self::Analytics => "analytics",
69 Self::Semantic => "semantic",
70 Self::Recovery => "recovery",
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PhaseRecord {
80 pub phase: RefreshPhase,
81 pub duration_ms: u64,
83 pub items_processed: u64,
85 pub items_skipped: u64,
87 pub errors: u64,
89 pub counters: BTreeMap<String, u64>,
91 pub success: bool,
93 pub error_message: Option<String>,
95}
96
97impl PhaseRecord {
98 fn new(phase: RefreshPhase) -> Self {
99 Self {
100 phase,
101 duration_ms: 0,
102 items_processed: 0,
103 items_skipped: 0,
104 errors: 0,
105 counters: BTreeMap::new(),
106 success: true,
107 error_message: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct EquivalenceArtifacts {
117 pub conversation_count: u64,
119 pub message_count: u64,
121 pub lexical_doc_count: u64,
123 pub lexical_fingerprint: Option<String>,
125 pub semantic_manifest_fingerprint: Option<String>,
127 pub search_hit_digest: Option<String>,
129 pub peak_rss_bytes: Option<u64>,
131 pub db_size_bytes: Option<u64>,
133 pub lexical_index_size_bytes: Option<u64>,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct RefreshLedger {
145 pub version: u32,
147 pub started_at_ms: i64,
149 pub completed_at_ms: i64,
151 pub total_duration_ms: u64,
153 pub full_rebuild: bool,
155 pub corpus_family: String,
157 pub phases: Vec<PhaseRecord>,
159 pub equivalence: EquivalenceArtifacts,
161 pub tags: BTreeMap<String, String>,
163}
164
165#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
171pub struct RefreshReadinessMilestones {
172 pub time_to_lexical_ready_ms: Option<u64>,
173 pub time_to_search_ready_ms: Option<u64>,
174 pub time_to_full_settled_ms: Option<u64>,
175 pub failed_phase: Option<String>,
176 pub search_readiness_state: RefreshSearchReadinessState,
177}
178
179#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
181#[serde(rename_all = "snake_case")]
182pub enum RefreshSearchReadinessState {
183 Published,
186 #[default]
188 WaitingForPublish,
189 BlockedBeforePublish,
191 PublishFailed,
193}
194
195impl Default for RefreshLedger {
196 fn default() -> Self {
197 Self {
198 version: 1,
199 started_at_ms: 0,
200 completed_at_ms: 0,
201 total_duration_ms: 0,
202 full_rebuild: false,
203 corpus_family: "default".to_owned(),
204 phases: Vec::new(),
205 equivalence: EquivalenceArtifacts::default(),
206 tags: BTreeMap::new(),
207 }
208 }
209}
210
211impl RefreshLedger {
212 pub fn start(corpus_family: &str, full_rebuild: bool) -> LedgerBuilder {
214 LedgerBuilder::new(corpus_family, full_rebuild)
215 }
216
217 pub fn phase(&self, phase: RefreshPhase) -> Option<&PhaseRecord> {
219 self.phases.iter().find(|p| p.phase == phase)
220 }
221
222 pub fn total_items_processed(&self) -> u64 {
224 self.phases
225 .iter()
226 .map(|p| p.items_processed)
227 .fold(0u64, u64::saturating_add)
228 }
229
230 pub fn total_errors(&self) -> u64 {
232 self.phases
233 .iter()
234 .map(|p| p.errors)
235 .fold(0u64, u64::saturating_add)
236 }
237
238 pub fn all_phases_succeeded(&self) -> bool {
240 self.phases.iter().all(|p| p.success)
241 }
242
243 pub fn failed_phases(&self) -> Vec<&PhaseRecord> {
245 self.phases.iter().filter(|p| !p.success).collect()
246 }
247
248 pub fn duration_breakdown(&self) -> BTreeMap<String, u64> {
250 self.phases
251 .iter()
252 .map(|p| (p.phase.as_str().to_owned(), p.duration_ms))
253 .collect()
254 }
255
256 pub fn readiness_milestones(&self) -> RefreshReadinessMilestones {
259 RefreshReadinessMilestones {
260 time_to_lexical_ready_ms: self
261 .successful_duration_through(RefreshPhase::LexicalRebuild),
262 time_to_search_ready_ms: self.successful_duration_through(RefreshPhase::Publish),
263 time_to_full_settled_ms: self.full_settlement_duration_ms(),
264 failed_phase: self
265 .failed_phases()
266 .first()
267 .map(|phase| phase.phase.as_str().to_owned()),
268 search_readiness_state: self.search_readiness_state(),
269 }
270 }
271
272 pub fn to_json(&self) -> String {
274 serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_owned())
275 }
276
277 fn successful_duration_through(&self, target: RefreshPhase) -> Option<u64> {
278 let mut elapsed_ms = 0u64;
279 for phase in &self.phases {
280 elapsed_ms = elapsed_ms.saturating_add(phase.duration_ms);
281 if !phase.success {
282 return None;
283 }
284 if phase.phase == target {
285 return Some(elapsed_ms);
286 }
287 }
288 None
289 }
290
291 fn sum_phase_durations(&self) -> u64 {
292 self.phases
293 .iter()
294 .map(|phase| phase.duration_ms)
295 .fold(0u64, u64::saturating_add)
296 }
297
298 fn full_settlement_duration_ms(&self) -> Option<u64> {
299 (self.all_phases_succeeded()
300 && self.search_readiness_state() == RefreshSearchReadinessState::Published)
301 .then(|| {
302 if self.total_duration_ms > 0 {
303 self.total_duration_ms
304 } else {
305 self.sum_phase_durations()
306 }
307 })
308 }
309
310 fn search_readiness_state(&self) -> RefreshSearchReadinessState {
311 let mut published = false;
312
313 for phase in &self.phases {
314 if !phase.success {
315 return if phase.phase == RefreshPhase::Publish {
316 RefreshSearchReadinessState::PublishFailed
317 } else if published {
318 RefreshSearchReadinessState::Published
319 } else {
320 RefreshSearchReadinessState::BlockedBeforePublish
321 };
322 }
323 if phase.phase == RefreshPhase::Publish {
324 published = true;
325 }
326 }
327
328 if published {
329 RefreshSearchReadinessState::Published
330 } else {
331 RefreshSearchReadinessState::WaitingForPublish
332 }
333 }
334}
335
336pub struct LedgerBuilder {
340 ledger: RefreshLedger,
341 start_time: Instant,
342 current_phase: Option<(RefreshPhase, Instant)>,
343 current_record: Option<PhaseRecord>,
344}
345
346impl LedgerBuilder {
347 fn new(corpus_family: &str, full_rebuild: bool) -> Self {
348 let now = std::time::SystemTime::now()
349 .duration_since(std::time::UNIX_EPOCH)
350 .map(|d| d.as_millis() as i64)
351 .unwrap_or(0);
352
353 Self {
354 ledger: RefreshLedger {
355 started_at_ms: now,
356 full_rebuild,
357 corpus_family: corpus_family.to_owned(),
358 ..Default::default()
359 },
360 start_time: Instant::now(),
361 current_phase: None,
362 current_record: None,
363 }
364 }
365
366 pub fn begin_phase(&mut self, phase: RefreshPhase) {
368 self.end_current_phase();
369 self.current_phase = Some((phase, Instant::now()));
370 self.current_record = Some(PhaseRecord::new(phase));
371 }
372
373 pub fn record_items(&mut self, processed: u64, skipped: u64) {
375 if let Some(ref mut record) = self.current_record {
376 record.items_processed = record.items_processed.saturating_add(processed);
377 record.items_skipped = record.items_skipped.saturating_add(skipped);
378 }
379 }
380
381 pub fn record_error(&mut self, message: &str) {
385 if let Some(ref mut record) = self.current_record {
386 record.errors = record.errors.saturating_add(1);
387 match &mut record.error_message {
388 Some(existing) => {
389 existing.push_str("; ");
390 existing.push_str(message);
391 }
392 None => record.error_message = Some(message.to_owned()),
393 }
394 }
395 }
396
397 pub fn record_failure(&mut self, message: &str) {
402 if let Some(ref mut record) = self.current_record {
403 record.success = false;
404 record.errors = record.errors.saturating_add(1);
405 record.error_message = Some(message.to_owned());
406 }
407 }
408
409 pub fn set_counter(&mut self, key: &str, value: u64) {
411 if let Some(ref mut record) = self.current_record {
412 record.counters.insert(key.to_owned(), value);
413 }
414 }
415
416 pub fn inc_counter(&mut self, key: &str, delta: u64) {
418 if let Some(ref mut record) = self.current_record {
419 let entry = record.counters.entry(key.to_owned()).or_insert(0);
420 *entry = entry.saturating_add(delta);
421 }
422 }
423
424 pub fn set_equivalence(&mut self, artifacts: EquivalenceArtifacts) {
426 self.ledger.equivalence = artifacts;
427 }
428
429 pub fn tag(&mut self, key: &str, value: &str) {
431 self.ledger.tags.insert(key.to_owned(), value.to_owned());
432 }
433
434 pub fn finish(mut self) -> RefreshLedger {
436 self.end_current_phase();
437 let now = std::time::SystemTime::now()
438 .duration_since(std::time::UNIX_EPOCH)
439 .map(|d| d.as_millis() as i64)
440 .unwrap_or(0);
441 self.ledger.completed_at_ms = now;
442 self.ledger.total_duration_ms = self.start_time.elapsed().as_millis() as u64;
443 self.ledger
444 }
445
446 fn end_current_phase(&mut self) {
447 let Some((_, phase_start)) = self.current_phase.take() else {
450 return;
451 };
452 let Some(mut record) = self.current_record.take() else {
453 return;
454 };
455 record.duration_ms = phase_start.elapsed().as_millis() as u64;
456 self.ledger.phases.push(record);
457 }
458}
459
460pub mod corpus_families {
464 pub const SMALL: &str = "small";
466 pub const MEDIUM: &str = "medium";
468 pub const LARGE: &str = "large";
470 pub const DUPLICATE_HEAVY: &str = "duplicate_heavy";
472 pub const PATHOLOGICAL: &str = "pathological";
474 pub const MIXED_AGENT: &str = "mixed_agent";
476 pub const INCREMENTAL: &str = "incremental";
478}
479
480#[derive(Debug, Clone)]
482pub struct BenchmarkCorpusConfig {
483 pub family: String,
484 pub num_conversations: usize,
485 pub messages_per_conversation: usize,
486 pub duplicate_fraction: f64,
488 pub max_message_length: usize,
490 pub agent_count: usize,
492}
493
494impl BenchmarkCorpusConfig {
495 pub fn small() -> Self {
496 Self {
497 family: corpus_families::SMALL.to_owned(),
498 num_conversations: 10,
499 messages_per_conversation: 4,
500 duplicate_fraction: 0.0,
501 max_message_length: 500,
502 agent_count: 3,
503 }
504 }
505
506 pub fn medium() -> Self {
507 Self {
508 family: corpus_families::MEDIUM.to_owned(),
509 num_conversations: 100,
510 messages_per_conversation: 5,
511 duplicate_fraction: 0.05,
512 max_message_length: 2000,
513 agent_count: 5,
514 }
515 }
516
517 pub fn large() -> Self {
518 Self {
519 family: corpus_families::LARGE.to_owned(),
520 num_conversations: 1000,
521 messages_per_conversation: 5,
522 duplicate_fraction: 0.05,
523 max_message_length: 2000,
524 agent_count: 8,
525 }
526 }
527
528 pub fn duplicate_heavy() -> Self {
529 Self {
530 family: corpus_families::DUPLICATE_HEAVY.to_owned(),
531 num_conversations: 50,
532 messages_per_conversation: 6,
533 duplicate_fraction: 0.5,
534 max_message_length: 1000,
535 agent_count: 3,
536 }
537 }
538
539 pub fn pathological() -> Self {
540 Self {
541 family: corpus_families::PATHOLOGICAL.to_owned(),
542 num_conversations: 20,
543 messages_per_conversation: 10,
544 duplicate_fraction: 0.0,
545 max_message_length: 50_000,
546 agent_count: 2,
547 }
548 }
549
550 pub fn mixed_agent() -> Self {
551 Self {
552 family: corpus_families::MIXED_AGENT.to_owned(),
553 num_conversations: 70,
554 messages_per_conversation: 4,
555 duplicate_fraction: 0.0,
556 max_message_length: 1000,
557 agent_count: 14,
558 }
559 }
560
561 pub fn incremental() -> Self {
562 Self {
563 family: corpus_families::INCREMENTAL.to_owned(),
564 num_conversations: 50,
565 messages_per_conversation: 4,
566 duplicate_fraction: 0.0,
567 max_message_length: 1000,
568 agent_count: 3,
569 }
570 }
571}
572
573#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
594pub struct RefreshThroughputProfile {
595 pub phase: RefreshPhase,
596 pub duration_ms: u64,
597 pub items_processed: u64,
598 pub items_per_second: Option<f64>,
602}
603
604#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
611pub struct RefreshPhaseShare {
612 pub phase: RefreshPhase,
613 pub duration_ms: u64,
614 pub share_pct: f64,
616}
617
618#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
627pub struct RefreshLedgerEvidence {
628 pub throughput: Vec<RefreshThroughputProfile>,
631 pub phase_share: Vec<RefreshPhaseShare>,
635 pub dominant_phase: Option<RefreshPhase>,
638 pub aggregate_items_processed: u64,
640 pub aggregate_duration_ms: u64,
644 pub aggregate_items_per_second: Option<f64>,
647}
648
649impl RefreshLedger {
650 pub fn evidence_summary(&self) -> RefreshLedgerEvidence {
654 let total_ms = self.total_duration_ms;
655 let throughput: Vec<RefreshThroughputProfile> = self
656 .phases
657 .iter()
658 .filter(|phase| phase.items_processed > 0)
659 .map(|phase| {
660 let items_per_second =
661 items_per_second_for(phase.duration_ms, phase.items_processed);
662 RefreshThroughputProfile {
663 phase: phase.phase,
664 duration_ms: phase.duration_ms,
665 items_processed: phase.items_processed,
666 items_per_second,
667 }
668 })
669 .collect();
670 let phase_share: Vec<RefreshPhaseShare> = self
671 .phases
672 .iter()
673 .map(|phase| RefreshPhaseShare {
674 phase: phase.phase,
675 duration_ms: phase.duration_ms,
676 share_pct: share_pct_for(phase.duration_ms, total_ms),
677 })
678 .collect();
679 let dominant_phase = self
680 .phases
681 .iter()
682 .max_by_key(|phase| phase.duration_ms)
683 .filter(|phase| phase.duration_ms > 0)
684 .map(|phase| phase.phase);
685 let aggregate_items_processed = self.total_items_processed();
686 let aggregate_items_per_second = items_per_second_for(total_ms, aggregate_items_processed);
687 RefreshLedgerEvidence {
688 throughput,
689 phase_share,
690 dominant_phase,
691 aggregate_items_processed,
692 aggregate_duration_ms: total_ms,
693 aggregate_items_per_second,
694 }
695 }
696}
697
698fn items_per_second_for(duration_ms: u64, items: u64) -> Option<f64> {
702 if duration_ms == 0 || items == 0 {
703 return None;
704 }
705 let seconds = duration_ms as f64 / 1000.0;
706 if seconds <= 0.0 {
707 return None;
708 }
709 let raw = items as f64 / seconds;
710 Some((raw * 1000.0).round() / 1000.0)
711}
712
713fn share_pct_for(phase_ms: u64, total_ms: u64) -> f64 {
717 if total_ms == 0 || phase_ms == 0 {
718 return 0.0;
719 }
720 let raw = (phase_ms as f64 / total_ms as f64) * 100.0;
721 (raw * 100.0).round() / 100.0
722}
723
724#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
744pub struct RefreshPhaseDelta {
745 pub phase: RefreshPhase,
746 pub baseline_duration_ms: u64,
747 pub current_duration_ms: u64,
748 pub duration_delta_pct: Option<f64>,
753 pub baseline_items_processed: u64,
754 pub current_items_processed: u64,
755 pub baseline_items_per_second: Option<f64>,
756 pub current_items_per_second: Option<f64>,
757 pub throughput_delta_pct: Option<f64>,
761}
762
763#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
767pub struct RefreshLedgerEvidenceComparison {
768 pub phase_deltas: Vec<RefreshPhaseDelta>,
772 pub aggregate_duration_delta_pct: Option<f64>,
774 pub aggregate_throughput_delta_pct: Option<f64>,
776 pub dominant_phase_shift: Option<(RefreshPhase, RefreshPhase)>,
781}
782
783impl RefreshLedgerEvidence {
784 pub fn compare_to(&self, baseline: &Self) -> RefreshLedgerEvidenceComparison {
794 use std::collections::{HashMap, HashSet};
805 let mut baseline_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
806 for entry in &baseline.phase_share {
807 baseline_share_by_phase.insert(entry.phase, entry);
808 }
809 let mut current_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
810 for entry in &self.phase_share {
811 current_share_by_phase.insert(entry.phase, entry);
812 }
813 let mut baseline_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> =
814 HashMap::new();
815 for entry in &baseline.throughput {
816 baseline_by_phase.insert(entry.phase, entry);
817 }
818 let mut current_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> = HashMap::new();
819 for entry in &self.throughput {
820 current_by_phase.insert(entry.phase, entry);
821 }
822 let mut all_phases: HashSet<RefreshPhase> = HashSet::new();
826 all_phases.extend(baseline_share_by_phase.keys().copied());
827 all_phases.extend(current_share_by_phase.keys().copied());
828 all_phases.extend(baseline_by_phase.keys().copied());
829 all_phases.extend(current_by_phase.keys().copied());
830
831 let phase_deltas: Vec<RefreshPhaseDelta> = RefreshPhase::ALL
832 .iter()
833 .copied()
834 .filter(|phase| all_phases.contains(phase))
835 .map(|phase| {
836 let baseline_entry = baseline_by_phase.get(&phase).copied();
837 let current_entry = current_by_phase.get(&phase).copied();
838 let baseline_duration_ms = baseline_share_by_phase
839 .get(&phase)
840 .map(|e| e.duration_ms)
841 .or_else(|| baseline_entry.map(|e| e.duration_ms))
842 .unwrap_or(0);
843 let current_duration_ms = current_share_by_phase
844 .get(&phase)
845 .map(|e| e.duration_ms)
846 .or_else(|| current_entry.map(|e| e.duration_ms))
847 .unwrap_or(0);
848 let baseline_items_processed =
849 baseline_entry.map(|e| e.items_processed).unwrap_or(0);
850 let current_items_processed = current_entry.map(|e| e.items_processed).unwrap_or(0);
851 let baseline_items_per_second = baseline_entry.and_then(|e| e.items_per_second);
852 let current_items_per_second = current_entry.and_then(|e| e.items_per_second);
853
854 RefreshPhaseDelta {
855 phase,
856 baseline_duration_ms,
857 current_duration_ms,
858 duration_delta_pct: pct_delta(
859 baseline_duration_ms as f64,
860 current_duration_ms as f64,
861 ),
862 baseline_items_processed,
863 current_items_processed,
864 baseline_items_per_second,
865 current_items_per_second,
866 throughput_delta_pct: match (
867 baseline_items_per_second,
868 current_items_per_second,
869 ) {
870 (Some(b), Some(c)) => pct_delta(b, c),
871 _ => None,
872 },
873 }
874 })
875 .collect();
876
877 let aggregate_duration_delta_pct = pct_delta(
878 baseline.aggregate_duration_ms as f64,
879 self.aggregate_duration_ms as f64,
880 );
881 let aggregate_throughput_delta_pct = match (
882 baseline.aggregate_items_per_second,
883 self.aggregate_items_per_second,
884 ) {
885 (Some(b), Some(c)) => pct_delta(b, c),
886 _ => None,
887 };
888
889 let dominant_phase_shift = match (baseline.dominant_phase, self.dominant_phase) {
890 (Some(from), Some(to)) if from != to => Some((from, to)),
891 _ => None,
892 };
893
894 RefreshLedgerEvidenceComparison {
895 phase_deltas,
896 aggregate_duration_delta_pct,
897 aggregate_throughput_delta_pct,
898 dominant_phase_shift,
899 }
900 }
901}
902
903fn pct_delta(baseline: f64, current: f64) -> Option<f64> {
911 if !baseline.is_finite() || !current.is_finite() {
912 return None;
913 }
914 if baseline == 0.0 {
915 return None;
916 }
917 let raw = ((current - baseline) / baseline) * 100.0;
918 if !raw.is_finite() {
919 return None;
920 }
921 Some((raw * 100.0).round() / 100.0)
922}
923
924#[derive(Debug, Clone, PartialEq, Serialize)]
933pub struct RegressionVerdictThresholds {
934 pub warning_duration_pct: f64,
938 pub failure_duration_pct: f64,
943}
944
945impl RegressionVerdictThresholds {
946 pub fn defaults() -> Self {
949 Self {
950 warning_duration_pct: 15.0,
951 failure_duration_pct: 30.0,
952 }
953 }
954
955 pub fn try_new(
959 warning_duration_pct: f64,
960 failure_duration_pct: f64,
961 ) -> Result<Self, &'static str> {
962 if !warning_duration_pct.is_finite() || !failure_duration_pct.is_finite() {
963 return Err("regression thresholds must be finite f64s");
964 }
965 if warning_duration_pct < 0.0 || failure_duration_pct < 0.0 {
966 return Err("regression thresholds must be non-negative percentages");
967 }
968 if warning_duration_pct >= failure_duration_pct {
969 return Err(
970 "warning_duration_pct must be strictly less than failure_duration_pct, \
971 otherwise the warning level is unreachable",
972 );
973 }
974 Ok(Self {
975 warning_duration_pct,
976 failure_duration_pct,
977 })
978 }
979
980 fn is_valid(&self) -> bool {
981 self.warning_duration_pct.is_finite()
982 && self.failure_duration_pct.is_finite()
983 && self.warning_duration_pct >= 0.0
984 && self.failure_duration_pct >= 0.0
985 && self.warning_duration_pct < self.failure_duration_pct
986 }
987}
988
989impl<'de> Deserialize<'de> for RegressionVerdictThresholds {
990 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
991 where
992 D: serde::Deserializer<'de>,
993 {
994 #[derive(Deserialize)]
995 struct RawThresholds {
996 warning_duration_pct: f64,
997 failure_duration_pct: f64,
998 }
999
1000 let raw = RawThresholds::deserialize(deserializer)?;
1001 Self::try_new(raw.warning_duration_pct, raw.failure_duration_pct)
1002 .map_err(serde::de::Error::custom)
1003 }
1004}
1005
1006#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1010#[serde(rename_all = "snake_case", tag = "verdict")]
1011pub enum RegressionVerdict {
1012 Clean,
1016 Warning {
1020 duration_delta_pct: f64,
1021 threshold_pct: f64,
1022 },
1023 Failure {
1026 duration_delta_pct: f64,
1027 threshold_pct: f64,
1028 },
1029}
1030
1031impl RegressionVerdict {
1032 pub fn should_fail_build(&self) -> bool {
1036 matches!(self, Self::Failure { .. })
1037 }
1038}
1039
1040impl RefreshLedgerEvidenceComparison {
1041 pub fn regression_verdict(
1059 &self,
1060 thresholds: &RegressionVerdictThresholds,
1061 ) -> RegressionVerdict {
1062 if !thresholds.is_valid() {
1063 return RegressionVerdict::Clean;
1064 }
1065 let Some(duration_pct) = self.aggregate_duration_delta_pct else {
1066 return RegressionVerdict::Clean;
1067 };
1068 if duration_pct < 0.0 {
1075 return RegressionVerdict::Clean;
1076 }
1077 if duration_pct >= thresholds.failure_duration_pct {
1078 return RegressionVerdict::Failure {
1079 duration_delta_pct: duration_pct,
1080 threshold_pct: thresholds.failure_duration_pct,
1081 };
1082 }
1083 if duration_pct >= thresholds.warning_duration_pct {
1084 return RegressionVerdict::Warning {
1085 duration_delta_pct: duration_pct,
1086 threshold_pct: thresholds.warning_duration_pct,
1087 };
1088 }
1089 RegressionVerdict::Clean
1090 }
1091}
1092
1093impl RefreshLedgerEvidenceComparison {
1094 pub fn emit_tracing_summary(&self) {
1124 let dominant_shift_str = self
1125 .dominant_phase_shift
1126 .map(|(from, to)| format!("{}->{}", from.as_str(), to.as_str()))
1127 .unwrap_or_else(|| "none".to_string());
1128 let aggregate_duration_str = self
1129 .aggregate_duration_delta_pct
1130 .map(|pct| format!("{pct:+.2}%"))
1131 .unwrap_or_else(|| "n/a".to_string());
1132 let aggregate_throughput_str = self
1133 .aggregate_throughput_delta_pct
1134 .map(|pct| format!("{pct:+.2}%"))
1135 .unwrap_or_else(|| "n/a".to_string());
1136
1137 const SLOWDOWN_WARN_THRESHOLD_PCT: f64 = 25.0;
1142 const IMPROVEMENT_INFO_THRESHOLD_PCT: f64 = -10.0;
1143 let duration_pct = self.aggregate_duration_delta_pct.unwrap_or(0.0);
1144 let phase_count = self.phase_deltas.len();
1145
1146 let aggregate_throughput_pct = self.aggregate_throughput_delta_pct.unwrap_or(0.0);
1157 macro_rules! emit_tier {
1158 ($macro:ident, $msg:literal) => {
1159 tracing::$macro!(
1160 target: "cass::indexer::lexical_refresh",
1161 aggregate_duration_delta_pct = duration_pct,
1162 aggregate_throughput_delta_pct = aggregate_throughput_pct,
1163 aggregate_duration = %aggregate_duration_str,
1164 aggregate_throughput = %aggregate_throughput_str,
1165 dominant_phase_shift = %dominant_shift_str,
1166 phase_count,
1167 $msg
1168 )
1169 };
1170 }
1171 if duration_pct >= SLOWDOWN_WARN_THRESHOLD_PCT {
1172 emit_tier!(
1173 warn,
1174 "lexical refresh evidence: significant slowdown vs previous publish"
1175 );
1176 } else if duration_pct <= IMPROVEMENT_INFO_THRESHOLD_PCT {
1177 emit_tier!(
1178 info,
1179 "lexical refresh evidence: notable improvement vs previous publish"
1180 );
1181 } else {
1182 emit_tier!(debug, "lexical refresh evidence: cross-run comparison");
1183 }
1184 }
1185}
1186
1187#[cfg(test)]
1190mod tests {
1191 use super::*;
1192
1193 #[test]
1194 fn phase_model_covers_all_phases() {
1195 assert_eq!(RefreshPhase::ALL.len(), 7);
1196 assert_eq!(RefreshPhase::ALL[0], RefreshPhase::Scan);
1197 assert_eq!(RefreshPhase::ALL[6], RefreshPhase::Recovery);
1198 }
1199
1200 #[test]
1201 fn phase_as_str_round_trips() {
1202 for phase in RefreshPhase::ALL {
1203 let s = phase.as_str();
1204 assert!(!s.is_empty(), "phase {phase:?} has empty string");
1205 }
1206 }
1207
1208 #[test]
1209 fn ledger_builder_records_phases() {
1210 let mut builder = RefreshLedger::start("small", false);
1211
1212 builder.begin_phase(RefreshPhase::Scan);
1213 builder.record_items(100, 5);
1214 builder.set_counter("connectors_scanned", 3);
1215
1216 builder.begin_phase(RefreshPhase::Persist);
1217 builder.record_items(95, 0);
1218 builder.set_counter("bytes_written", 50_000);
1219
1220 builder.begin_phase(RefreshPhase::LexicalRebuild);
1221 builder.record_items(450, 0);
1222
1223 builder.begin_phase(RefreshPhase::Publish);
1224 builder.record_items(1, 0);
1225
1226 let ledger = builder.finish();
1227
1228 assert_eq!(ledger.phases.len(), 4);
1229 assert_eq!(ledger.corpus_family, "small");
1230 assert!(!ledger.full_rebuild);
1231
1232 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1233 assert_eq!(scan.items_processed, 100);
1234 assert_eq!(scan.items_skipped, 5);
1235 assert_eq!(*scan.counters.get("connectors_scanned").unwrap(), 3);
1236
1237 let persist = ledger.phase(RefreshPhase::Persist).unwrap();
1238 assert_eq!(persist.items_processed, 95);
1239 assert_eq!(*persist.counters.get("bytes_written").unwrap(), 50_000);
1240
1241 assert!(ledger.all_phases_succeeded());
1242 assert_eq!(ledger.total_items_processed(), 100 + 95 + 450 + 1);
1243 assert!(ledger.completed_at_ms >= ledger.started_at_ms);
1244 let max_phase_duration = ledger
1245 .phases
1246 .iter()
1247 .map(|phase| phase.duration_ms)
1248 .max()
1249 .unwrap_or(0);
1250 assert!(ledger.total_duration_ms >= max_phase_duration);
1251 }
1252
1253 #[test]
1254 fn ledger_builder_saturates_counter_arithmetic() {
1255 let mut builder = RefreshLedger::start("pathological", true);
1256
1257 builder.begin_phase(RefreshPhase::Scan);
1258 builder.record_items(u64::MAX, u64::MAX);
1259 builder.record_items(1, 1);
1260 builder.inc_counter("bytes_scanned", u64::MAX);
1261 builder.inc_counter("bytes_scanned", 1);
1262
1263 let ledger = builder.finish();
1264 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1265 assert_eq!(scan.items_processed, u64::MAX);
1266 assert_eq!(scan.items_skipped, u64::MAX);
1267 assert_eq!(scan.counters.get("bytes_scanned"), Some(&u64::MAX));
1268 assert_eq!(ledger.total_items_processed(), u64::MAX);
1269 }
1270
1271 #[test]
1272 fn ledger_builder_records_failures() {
1273 let mut builder = RefreshLedger::start("small", false);
1274
1275 builder.begin_phase(RefreshPhase::Scan);
1276 builder.record_items(50, 0);
1277
1278 builder.begin_phase(RefreshPhase::Persist);
1279 builder.record_failure("database locked");
1280
1281 let ledger = builder.finish();
1282
1283 assert!(!ledger.all_phases_succeeded());
1284 assert_eq!(ledger.failed_phases().len(), 1);
1285 assert_eq!(ledger.failed_phases()[0].phase, RefreshPhase::Persist);
1286 assert_eq!(
1287 ledger.failed_phases()[0].error_message.as_deref(),
1288 Some("database locked")
1289 );
1290 assert_eq!(ledger.failed_phases()[0].errors, 1);
1291 assert_eq!(ledger.total_errors(), 1);
1292 }
1293
1294 #[test]
1295 fn ledger_builder_records_errors_without_failure() {
1296 let mut builder = RefreshLedger::start("medium", false);
1297
1298 builder.begin_phase(RefreshPhase::Scan);
1299 builder.record_items(90, 0);
1300 builder.record_error("connector timeout");
1301 builder.record_error("permission denied");
1302
1303 let ledger = builder.finish();
1304
1305 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1306 assert!(scan.success); assert_eq!(scan.errors, 2);
1308 let msg = scan.error_message.as_deref().unwrap();
1310 assert!(
1311 msg.contains("connector timeout"),
1312 "missing first error: {msg}"
1313 );
1314 assert!(
1315 msg.contains("permission denied"),
1316 "missing second error: {msg}"
1317 );
1318 }
1319
1320 #[test]
1321 fn ledger_equivalence_artifacts() {
1322 let mut builder = RefreshLedger::start("small", true);
1323
1324 builder.begin_phase(RefreshPhase::Scan);
1325 builder.record_items(10, 0);
1326
1327 builder.set_equivalence(EquivalenceArtifacts {
1328 conversation_count: 10,
1329 message_count: 40,
1330 lexical_doc_count: 40,
1331 lexical_fingerprint: Some("fp-abc".to_owned()),
1332 semantic_manifest_fingerprint: None,
1333 search_hit_digest: Some("sha256-xyz".to_owned()),
1334 peak_rss_bytes: Some(100_000_000),
1335 db_size_bytes: Some(5_000_000),
1336 lexical_index_size_bytes: Some(2_000_000),
1337 });
1338
1339 let ledger = builder.finish();
1340
1341 assert_eq!(ledger.equivalence.conversation_count, 10);
1342 assert_eq!(ledger.equivalence.message_count, 40);
1343 assert_eq!(
1344 ledger.equivalence.lexical_fingerprint.as_deref(),
1345 Some("fp-abc")
1346 );
1347 assert!(ledger.full_rebuild);
1348 }
1349
1350 #[test]
1351 fn ledger_duration_breakdown() {
1352 let mut builder = RefreshLedger::start("small", false);
1353
1354 builder.begin_phase(RefreshPhase::Scan);
1355 builder.begin_phase(RefreshPhase::LexicalRebuild);
1357
1358 let ledger = builder.finish();
1359
1360 let breakdown = ledger.duration_breakdown();
1361 assert!(breakdown.contains_key("scan"));
1362 assert!(breakdown.contains_key("lexical_rebuild"));
1363 }
1364
1365 #[test]
1366 fn readiness_milestones_measure_lexical_search_and_settled_times() {
1367 let ledger = RefreshLedger {
1368 total_duration_ms: 90,
1369 phases: vec![
1370 phase_record(RefreshPhase::Scan, 10, true),
1371 phase_record(RefreshPhase::Persist, 20, true),
1372 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1373 phase_record(RefreshPhase::Publish, 5, true),
1374 phase_record(RefreshPhase::Analytics, 7, true),
1375 phase_record(RefreshPhase::Semantic, 8, true),
1376 ],
1377 ..Default::default()
1378 };
1379
1380 let milestones = ledger.readiness_milestones();
1381
1382 assert_eq!(milestones.time_to_lexical_ready_ms, Some(60));
1383 assert_eq!(milestones.time_to_search_ready_ms, Some(65));
1384 assert_eq!(milestones.time_to_full_settled_ms, Some(90));
1385 assert_eq!(milestones.failed_phase, None);
1386 assert_eq!(
1387 milestones.search_readiness_state,
1388 RefreshSearchReadinessState::Published
1389 );
1390
1391 let json = serde_json::to_value(&milestones).unwrap();
1392 assert_eq!(json["time_to_lexical_ready_ms"], 60);
1393 assert_eq!(json["time_to_search_ready_ms"], 65);
1394 assert_eq!(json["time_to_full_settled_ms"], 90);
1395 assert_eq!(json["search_readiness_state"], "published");
1396 }
1397
1398 #[test]
1399 fn readiness_milestones_stop_at_first_failed_phase() {
1400 let ledger = RefreshLedger {
1401 total_duration_ms: 75,
1402 phases: vec![
1403 phase_record(RefreshPhase::Scan, 10, true),
1404 phase_record(RefreshPhase::Persist, 20, true),
1405 phase_record(RefreshPhase::LexicalRebuild, 30, false),
1406 phase_record(RefreshPhase::Publish, 5, true),
1407 ],
1408 ..Default::default()
1409 };
1410
1411 let milestones = ledger.readiness_milestones();
1412
1413 assert_eq!(milestones.time_to_lexical_ready_ms, None);
1414 assert_eq!(milestones.time_to_search_ready_ms, None);
1415 assert_eq!(milestones.time_to_full_settled_ms, None);
1416 assert_eq!(milestones.failed_phase.as_deref(), Some("lexical_rebuild"));
1417 assert_eq!(
1418 milestones.search_readiness_state,
1419 RefreshSearchReadinessState::BlockedBeforePublish
1420 );
1421 }
1422
1423 #[test]
1424 fn readiness_milestones_explain_unpublished_and_publish_failed_states() {
1425 let unpublished = RefreshLedger {
1426 phases: vec![
1427 phase_record(RefreshPhase::Scan, 10, true),
1428 phase_record(RefreshPhase::Persist, 20, true),
1429 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1430 ],
1431 ..Default::default()
1432 };
1433
1434 let unpublished_milestones = unpublished.readiness_milestones();
1435
1436 assert_eq!(unpublished_milestones.time_to_lexical_ready_ms, Some(60));
1437 assert_eq!(unpublished_milestones.time_to_search_ready_ms, None);
1438 assert_eq!(unpublished_milestones.time_to_full_settled_ms, None);
1439 assert_eq!(unpublished_milestones.failed_phase, None);
1440 assert_eq!(
1441 unpublished_milestones.search_readiness_state,
1442 RefreshSearchReadinessState::WaitingForPublish
1443 );
1444
1445 let publish_failed = RefreshLedger {
1446 phases: vec![
1447 phase_record(RefreshPhase::Scan, 10, true),
1448 phase_record(RefreshPhase::Persist, 20, true),
1449 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1450 phase_record(RefreshPhase::Publish, 5, false),
1451 ],
1452 ..Default::default()
1453 };
1454
1455 let publish_failed_milestones = publish_failed.readiness_milestones();
1456
1457 assert_eq!(publish_failed_milestones.time_to_lexical_ready_ms, Some(60));
1458 assert_eq!(publish_failed_milestones.time_to_search_ready_ms, None);
1459 assert_eq!(publish_failed_milestones.time_to_full_settled_ms, None);
1460 assert_eq!(
1461 publish_failed_milestones.failed_phase.as_deref(),
1462 Some("publish")
1463 );
1464 assert_eq!(
1465 publish_failed_milestones.search_readiness_state,
1466 RefreshSearchReadinessState::PublishFailed
1467 );
1468
1469 let post_publish_failure = RefreshLedger {
1470 phases: vec![
1471 phase_record(RefreshPhase::Scan, 10, true),
1472 phase_record(RefreshPhase::Persist, 20, true),
1473 phase_record(RefreshPhase::LexicalRebuild, 30, true),
1474 phase_record(RefreshPhase::Publish, 5, true),
1475 phase_record(RefreshPhase::Analytics, 7, false),
1476 ],
1477 ..Default::default()
1478 };
1479
1480 let post_publish_failure_milestones = post_publish_failure.readiness_milestones();
1481
1482 assert_eq!(
1483 post_publish_failure_milestones.time_to_lexical_ready_ms,
1484 Some(60)
1485 );
1486 assert_eq!(
1487 post_publish_failure_milestones.time_to_search_ready_ms,
1488 Some(65)
1489 );
1490 assert_eq!(
1491 post_publish_failure_milestones.time_to_full_settled_ms,
1492 None
1493 );
1494 assert_eq!(
1495 post_publish_failure_milestones.failed_phase.as_deref(),
1496 Some("analytics")
1497 );
1498 assert_eq!(
1499 post_publish_failure_milestones.search_readiness_state,
1500 RefreshSearchReadinessState::Published
1501 );
1502 }
1503
1504 #[test]
1505 fn readiness_milestones_do_not_report_full_settlement_before_publish() {
1506 let empty = RefreshLedger::default().readiness_milestones();
1507
1508 assert_eq!(empty.time_to_lexical_ready_ms, None);
1509 assert_eq!(empty.time_to_search_ready_ms, None);
1510 assert_eq!(empty.time_to_full_settled_ms, None);
1511 assert_eq!(
1512 empty.search_readiness_state,
1513 RefreshSearchReadinessState::WaitingForPublish
1514 );
1515
1516 let partial = RefreshLedger {
1517 total_duration_ms: 42,
1518 phases: vec![
1519 phase_record(RefreshPhase::Scan, 10, true),
1520 phase_record(RefreshPhase::Persist, 20, true),
1521 ],
1522 ..Default::default()
1523 }
1524 .readiness_milestones();
1525
1526 assert_eq!(partial.time_to_lexical_ready_ms, None);
1527 assert_eq!(partial.time_to_search_ready_ms, None);
1528 assert_eq!(partial.time_to_full_settled_ms, None);
1529 assert_eq!(
1530 partial.search_readiness_state,
1531 RefreshSearchReadinessState::WaitingForPublish
1532 );
1533 }
1534
1535 #[test]
1536 fn ledger_tags() {
1537 let mut builder = RefreshLedger::start("medium", false);
1538 builder.tag("run_id", "bench-2026-04-01");
1539 builder.tag("machine", "csd");
1540
1541 let ledger = builder.finish();
1542
1543 assert_eq!(ledger.tags.get("run_id").unwrap(), "bench-2026-04-01");
1544 assert_eq!(ledger.tags.get("machine").unwrap(), "csd");
1545 }
1546
1547 #[test]
1548 fn ledger_json_round_trip() {
1549 let mut builder = RefreshLedger::start("duplicate_heavy", true);
1550 builder.begin_phase(RefreshPhase::Scan);
1551 builder.record_items(50, 10);
1552 builder.set_counter("duplicate_conversations", 25);
1553 builder.begin_phase(RefreshPhase::Persist);
1554 builder.record_items(40, 0);
1555
1556 builder.set_equivalence(EquivalenceArtifacts {
1557 conversation_count: 40,
1558 message_count: 200,
1559 lexical_doc_count: 200,
1560 ..Default::default()
1561 });
1562
1563 let ledger = builder.finish();
1564 let json = ledger.to_json();
1565 let deser: RefreshLedger = serde_json::from_str(&json).unwrap();
1566
1567 assert_eq!(deser.corpus_family, "duplicate_heavy");
1568 assert!(deser.full_rebuild);
1569 assert_eq!(deser.phases.len(), 2);
1570 assert_eq!(deser.equivalence.conversation_count, 40);
1571 assert_eq!(
1572 *deser.phases[0]
1573 .counters
1574 .get("duplicate_conversations")
1575 .unwrap(),
1576 25
1577 );
1578 }
1579
1580 #[test]
1581 fn ledger_inc_counter() {
1582 let mut builder = RefreshLedger::start("small", false);
1583 builder.begin_phase(RefreshPhase::Scan);
1584 builder.inc_counter("files_scanned", 10);
1585 builder.inc_counter("files_scanned", 15);
1586 builder.inc_counter("files_scanned", 5);
1587
1588 let ledger = builder.finish();
1589 let scan = ledger.phase(RefreshPhase::Scan).unwrap();
1590 assert_eq!(*scan.counters.get("files_scanned").unwrap(), 30);
1591 }
1592
1593 #[test]
1594 fn benchmark_corpus_configs_have_correct_families() {
1595 assert_eq!(BenchmarkCorpusConfig::small().family, "small");
1596 assert_eq!(BenchmarkCorpusConfig::medium().family, "medium");
1597 assert_eq!(BenchmarkCorpusConfig::large().family, "large");
1598 assert_eq!(
1599 BenchmarkCorpusConfig::duplicate_heavy().family,
1600 "duplicate_heavy"
1601 );
1602 assert_eq!(BenchmarkCorpusConfig::pathological().family, "pathological");
1603 assert_eq!(BenchmarkCorpusConfig::mixed_agent().family, "mixed_agent");
1604 assert_eq!(BenchmarkCorpusConfig::incremental().family, "incremental");
1605 }
1606
1607 #[test]
1608 fn benchmark_corpus_configs_have_reasonable_sizes() {
1609 let configs = [
1610 BenchmarkCorpusConfig::small(),
1611 BenchmarkCorpusConfig::medium(),
1612 BenchmarkCorpusConfig::large(),
1613 BenchmarkCorpusConfig::duplicate_heavy(),
1614 BenchmarkCorpusConfig::pathological(),
1615 BenchmarkCorpusConfig::mixed_agent(),
1616 BenchmarkCorpusConfig::incremental(),
1617 ];
1618 for cfg in &configs {
1619 assert!(
1620 cfg.num_conversations > 0,
1621 "{} has 0 conversations",
1622 cfg.family
1623 );
1624 assert!(
1625 cfg.messages_per_conversation > 0,
1626 "{} has 0 messages",
1627 cfg.family
1628 );
1629 assert!(cfg.agent_count > 0, "{} has 0 agents", cfg.family);
1630 assert!(
1631 cfg.duplicate_fraction >= 0.0 && cfg.duplicate_fraction <= 1.0,
1632 "{} has invalid duplicate fraction",
1633 cfg.family
1634 );
1635 }
1636 }
1637
1638 fn phase_record(phase: RefreshPhase, duration_ms: u64, success: bool) -> PhaseRecord {
1639 PhaseRecord {
1640 phase,
1641 duration_ms,
1642 items_processed: 0,
1643 items_skipped: 0,
1644 errors: u64::from(!success),
1645 counters: BTreeMap::new(),
1646 success,
1647 error_message: (!success).then(|| format!("failed {}", phase.as_str())),
1648 }
1649 }
1650
1651 fn phase_record_with_items(phase: RefreshPhase, duration_ms: u64, items: u64) -> PhaseRecord {
1652 PhaseRecord {
1653 phase,
1654 duration_ms,
1655 items_processed: items,
1656 items_skipped: 0,
1657 errors: 0,
1658 counters: BTreeMap::new(),
1659 success: true,
1660 error_message: None,
1661 }
1662 }
1663
1664 fn ledger_with(phases: Vec<PhaseRecord>) -> RefreshLedger {
1665 let total_duration_ms = phases.iter().map(|p| p.duration_ms).sum();
1666 RefreshLedger {
1667 version: 1,
1668 started_at_ms: 1_700_000_000_000,
1669 completed_at_ms: 1_700_000_000_000 + i64::try_from(total_duration_ms).unwrap_or(0),
1670 total_duration_ms,
1671 full_rebuild: true,
1672 corpus_family: "evidence-test".to_owned(),
1673 phases,
1674 equivalence: EquivalenceArtifacts::default(),
1675 tags: BTreeMap::new(),
1676 }
1677 }
1678
1679 #[test]
1685 fn evidence_summary_reports_per_phase_throughput_with_safe_zero_handling() {
1686 let ledger = ledger_with(vec![
1690 phase_record_with_items(RefreshPhase::Scan, 500, 1000),
1691 phase_record_with_items(RefreshPhase::Persist, 1000, 2000),
1692 phase_record_with_items(RefreshPhase::LexicalRebuild, 100, 0),
1693 phase_record_with_items(RefreshPhase::Recovery, 0, 0),
1694 ]);
1695
1696 let evidence = ledger.evidence_summary();
1697
1698 assert_eq!(
1701 evidence.throughput.len(),
1702 2,
1703 "throughput must skip zero-item phases; got {:?}",
1704 evidence.throughput
1705 );
1706
1707 let scan = evidence
1709 .throughput
1710 .iter()
1711 .find(|t| t.phase == RefreshPhase::Scan)
1712 .expect("scan throughput present");
1713 assert_eq!(scan.items_per_second, Some(2000.0));
1714 assert_eq!(scan.duration_ms, 500);
1715 assert_eq!(scan.items_processed, 1000);
1716
1717 let persist = evidence
1719 .throughput
1720 .iter()
1721 .find(|t| t.phase == RefreshPhase::Persist)
1722 .expect("persist throughput present");
1723 assert_eq!(persist.items_per_second, Some(2000.0));
1724
1725 assert_eq!(evidence.aggregate_items_processed, 3000);
1727 assert_eq!(evidence.aggregate_duration_ms, 1600);
1728 assert_eq!(evidence.aggregate_items_per_second, Some(1875.0));
1729 }
1730
1731 #[test]
1735 fn evidence_summary_handles_empty_and_zero_duration_ledgers() {
1736 let empty = ledger_with(Vec::new());
1738 let empty_evidence = empty.evidence_summary();
1739 assert!(empty_evidence.throughput.is_empty());
1740 assert!(empty_evidence.phase_share.is_empty());
1741 assert_eq!(empty_evidence.dominant_phase, None);
1742 assert_eq!(empty_evidence.aggregate_items_per_second, None);
1743 assert_eq!(empty_evidence.aggregate_duration_ms, 0);
1744
1745 let instant = ledger_with(vec![
1747 phase_record_with_items(RefreshPhase::Scan, 0, 5),
1748 phase_record_with_items(RefreshPhase::Persist, 0, 5),
1749 ]);
1750 let instant_evidence = instant.evidence_summary();
1751 for t in &instant_evidence.throughput {
1753 assert_eq!(t.items_per_second, None, "zero duration must yield None");
1754 }
1755 assert_eq!(instant_evidence.dominant_phase, None);
1757 for share in &instant_evidence.phase_share {
1759 assert_eq!(share.share_pct, 0.0);
1760 assert!(!share.share_pct.is_nan(), "share_pct must never be NaN");
1761 }
1762 }
1763
1764 #[test]
1769 fn evidence_summary_phase_share_sums_to_one_hundred_and_dominant_phase_picks_max() {
1770 let ledger = ledger_with(vec![
1771 phase_record_with_items(RefreshPhase::Scan, 200, 100),
1772 phase_record_with_items(RefreshPhase::Persist, 600, 1500), phase_record_with_items(RefreshPhase::LexicalRebuild, 200, 1500),
1774 ]);
1775 let evidence = ledger.evidence_summary();
1776
1777 let total_share: f64 = evidence.phase_share.iter().map(|s| s.share_pct).sum();
1778 assert!(
1779 (total_share - 100.0).abs() <= 0.05,
1780 "phase shares must sum to ~100.0 (±0.05 for rounding); got {total_share}"
1781 );
1782
1783 let persist_share = evidence
1785 .phase_share
1786 .iter()
1787 .find(|s| s.phase == RefreshPhase::Persist)
1788 .expect("persist share present");
1789 assert_eq!(persist_share.share_pct, 60.0);
1790
1791 assert_eq!(evidence.dominant_phase, Some(RefreshPhase::Persist));
1793 }
1794
1795 #[test]
1800 fn evidence_summary_dominant_phase_tie_break_is_first_in_pipeline_order() {
1801 let ledger = ledger_with(vec![
1802 phase_record_with_items(RefreshPhase::Scan, 500, 1),
1803 phase_record_with_items(RefreshPhase::Persist, 500, 1),
1804 phase_record_with_items(RefreshPhase::LexicalRebuild, 500, 1),
1805 ]);
1806 let evidence = ledger.evidence_summary();
1807 assert_eq!(
1813 evidence.dominant_phase,
1814 Some(RefreshPhase::LexicalRebuild),
1815 "tie-break: max_by_key returns the LAST phase at max duration"
1816 );
1817 }
1818
1819 #[test]
1824 fn evidence_summary_serializes_to_stable_json_field_set() {
1825 let ledger = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)]);
1826 let evidence = ledger.evidence_summary();
1827 let json = serde_json::to_string(&evidence).expect("serialize");
1828 for required_field in [
1829 "\"throughput\"",
1830 "\"phase_share\"",
1831 "\"dominant_phase\"",
1832 "\"aggregate_items_processed\"",
1833 "\"aggregate_duration_ms\"",
1834 "\"aggregate_items_per_second\"",
1835 ] {
1836 assert!(
1837 json.contains(required_field),
1838 "evidence JSON missing field {required_field}; got: {json}"
1839 );
1840 }
1841 let parsed: serde_json::Value = serde_json::from_str(&json).expect("parse");
1845 assert_eq!(parsed["aggregate_items_processed"], 50);
1846 assert_eq!(parsed["aggregate_duration_ms"], 100);
1847 assert_eq!(parsed["aggregate_items_per_second"], 500.0);
1848 assert_eq!(parsed["dominant_phase"], "scan");
1849 }
1850
1851 #[test]
1860 fn evidence_compare_to_reports_per_phase_regressions_and_improvements() {
1861 let baseline = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
1863 .evidence_summary();
1864 let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 200, 100)])
1867 .evidence_summary();
1868
1869 let cmp = current.compare_to(&baseline);
1870
1871 assert_eq!(cmp.phase_deltas.len(), 1);
1872 let scan = &cmp.phase_deltas[0];
1873 assert_eq!(scan.phase, RefreshPhase::Scan);
1874 assert_eq!(scan.duration_delta_pct, Some(100.0));
1876 assert_eq!(scan.throughput_delta_pct, Some(-50.0));
1878 assert_eq!(cmp.aggregate_duration_delta_pct, Some(100.0));
1880 assert_eq!(cmp.aggregate_throughput_delta_pct, Some(-50.0));
1881 assert_eq!(cmp.dominant_phase_shift, None);
1883
1884 let cmp_improved = baseline.compare_to(¤t);
1886 let scan = &cmp_improved.phase_deltas[0];
1887 assert_eq!(scan.duration_delta_pct, Some(-50.0));
1889 assert_eq!(scan.throughput_delta_pct, Some(100.0));
1891 }
1892
1893 #[test]
1899 fn evidence_compare_to_surfaces_phases_unique_to_one_side() {
1900 let baseline = ledger_with(vec![
1901 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1902 phase_record_with_items(RefreshPhase::Persist, 50, 200),
1903 ])
1904 .evidence_summary();
1905 let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
1908 .evidence_summary();
1909
1910 let cmp = current.compare_to(&baseline);
1911
1912 let phases: Vec<RefreshPhase> = cmp.phase_deltas.iter().map(|d| d.phase).collect();
1913 assert!(
1914 phases.contains(&RefreshPhase::Scan),
1915 "Scan ran in both sides; must appear in comparison; got phases {phases:?}"
1916 );
1917 assert!(
1918 phases.contains(&RefreshPhase::Persist),
1919 "Persist is missing from current but ran in baseline — comparison MUST \
1920 surface it so caller can investigate; got phases {phases:?}"
1921 );
1922
1923 let persist = cmp
1927 .phase_deltas
1928 .iter()
1929 .find(|d| d.phase == RefreshPhase::Persist)
1930 .expect("Persist delta present");
1931 assert_eq!(persist.baseline_duration_ms, 50);
1932 assert_eq!(persist.current_duration_ms, 0);
1933 assert_eq!(
1934 persist.duration_delta_pct,
1935 Some(-100.0),
1936 "phase disappearing from current must surface as -100% duration delta; \
1937 got {persist:?}"
1938 );
1939 }
1940
1941 #[test]
1946 fn evidence_compare_to_retains_zero_item_phases_with_duration() {
1947 let baseline = ledger_with(vec![
1948 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1949 phase_record_with_items(RefreshPhase::Publish, 40, 0),
1950 ])
1951 .evidence_summary();
1952 let current = ledger_with(vec![
1953 phase_record_with_items(RefreshPhase::Scan, 100, 100),
1954 phase_record_with_items(RefreshPhase::Publish, 80, 0),
1955 ])
1956 .evidence_summary();
1957
1958 assert!(
1959 baseline
1960 .throughput
1961 .iter()
1962 .all(|entry| entry.phase != RefreshPhase::Publish),
1963 "zero-item Publish must stay out of throughput: {:?}",
1964 baseline.throughput
1965 );
1966
1967 let cmp = current.compare_to(&baseline);
1968 let publish = cmp
1969 .phase_deltas
1970 .iter()
1971 .find(|delta| delta.phase == RefreshPhase::Publish)
1972 .expect("zero-item Publish phase must remain in comparison");
1973
1974 assert_eq!(publish.baseline_duration_ms, 40);
1975 assert_eq!(publish.current_duration_ms, 80);
1976 assert_eq!(publish.duration_delta_pct, Some(100.0));
1977 assert_eq!(publish.baseline_items_processed, 0);
1978 assert_eq!(publish.current_items_processed, 0);
1979 assert_eq!(publish.baseline_items_per_second, None);
1980 assert_eq!(publish.current_items_per_second, None);
1981 assert_eq!(publish.throughput_delta_pct, None);
1982 }
1983
1984 #[test]
1990 fn evidence_compare_to_reports_dominant_phase_shift() {
1991 let baseline = ledger_with(vec![
1993 phase_record_with_items(RefreshPhase::Scan, 800, 100),
1994 phase_record_with_items(RefreshPhase::Persist, 200, 100),
1995 ])
1996 .evidence_summary();
1997 let current = ledger_with(vec![
1999 phase_record_with_items(RefreshPhase::Scan, 200, 100),
2000 phase_record_with_items(RefreshPhase::Persist, 800, 100),
2001 ])
2002 .evidence_summary();
2003 assert_eq!(baseline.dominant_phase, Some(RefreshPhase::Scan));
2007 assert_eq!(current.dominant_phase, Some(RefreshPhase::Persist));
2008
2009 let cmp = current.compare_to(&baseline);
2010
2011 assert_eq!(
2012 cmp.dominant_phase_shift,
2013 Some((RefreshPhase::Scan, RefreshPhase::Persist)),
2014 "dominant phase shifted Scan→Persist; comparison must surface this; got {cmp:?}"
2015 );
2016
2017 let same_dom = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
2019 .evidence_summary();
2020 let cmp_same = same_dom.compare_to(&same_dom);
2021 assert_eq!(cmp_same.dominant_phase_shift, None);
2022 }
2023
2024 #[test]
2030 fn evidence_compare_to_safely_handles_zero_baseline_and_empty_evidence() {
2031 let empty = ledger_with(Vec::new()).evidence_summary();
2032 let normal = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)])
2033 .evidence_summary();
2034
2035 let against_empty = normal.compare_to(&empty);
2038 assert!(
2039 against_empty
2040 .phase_deltas
2041 .iter()
2042 .all(|d| d.duration_delta_pct.is_none() || d.baseline_duration_ms == 0),
2043 "phases with zero-baseline duration must report None for duration_delta_pct"
2044 );
2045 assert_eq!(against_empty.aggregate_duration_delta_pct, None);
2046 assert_eq!(against_empty.aggregate_throughput_delta_pct, None);
2047
2048 let against_self = empty.compare_to(&empty);
2050 assert!(against_self.phase_deltas.is_empty());
2051 assert_eq!(against_self.aggregate_duration_delta_pct, None);
2052
2053 let json = serde_json::to_string(&against_empty).expect("serialize");
2056 assert!(
2057 !json.contains("NaN"),
2058 "comparison JSON must not contain NaN; got {json}"
2059 );
2060 assert!(
2061 !json.contains("Infinity"),
2062 "comparison JSON must not contain Infinity"
2063 );
2064 }
2065
2066 #[test]
2074 fn evidence_comparison_emit_tracing_summary_uses_correct_severity_tier() {
2075 use std::sync::{Arc, Mutex};
2076 use tracing::field::{Field, Visit};
2077 use tracing::{Event, Subscriber};
2078 use tracing_subscriber::Registry;
2079 use tracing_subscriber::layer::{Context, Layer, SubscriberExt};
2080
2081 #[derive(Debug, Clone, Default)]
2082 struct CapturedEvent {
2083 level: String,
2084 message: String,
2085 }
2086
2087 #[derive(Clone, Default)]
2088 struct LevelCollector {
2089 events: Arc<Mutex<Vec<CapturedEvent>>>,
2090 }
2091
2092 impl<S: Subscriber> Layer<S> for LevelCollector {
2093 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
2094 if event.metadata().target() != "cass::indexer::lexical_refresh" {
2095 return;
2096 }
2097 let mut visitor = MessageVisitor::default();
2098 event.record(&mut visitor);
2099 self.events
2100 .lock()
2101 .expect("collector lock")
2102 .push(CapturedEvent {
2103 level: event.metadata().level().to_string(),
2104 message: visitor.message,
2105 });
2106 }
2107 }
2108
2109 #[derive(Default)]
2110 struct MessageVisitor {
2111 message: String,
2112 }
2113 impl Visit for MessageVisitor {
2114 fn record_str(&mut self, _field: &Field, _value: &str) {}
2115 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
2116 if field.name() == "message" {
2117 self.message = format!("{:?}", value).trim_matches('"').to_string();
2118 }
2119 }
2120 }
2121
2122 fn comparison_with_duration_pct(pct: f64) -> RefreshLedgerEvidenceComparison {
2126 RefreshLedgerEvidenceComparison {
2127 phase_deltas: Vec::new(),
2128 aggregate_duration_delta_pct: Some(pct),
2129 aggregate_throughput_delta_pct: None,
2130 dominant_phase_shift: None,
2131 }
2132 }
2133
2134 let collector = LevelCollector::default();
2136 let subscriber = Registry::default().with(collector.clone());
2137 tracing::subscriber::with_default(subscriber, || {
2138 comparison_with_duration_pct(50.0).emit_tracing_summary();
2139 });
2140 let evs = collector.events.lock().expect("lock").clone();
2141 assert_eq!(
2142 evs.len(),
2143 1,
2144 "exactly one event per emit_tracing_summary call"
2145 );
2146 assert_eq!(
2147 evs[0].level, "WARN",
2148 "+50% slowdown must be warn; got {evs:?}"
2149 );
2150 assert!(
2151 evs[0].message.contains("significant slowdown"),
2152 "warn message must name the slowdown; got {:?}",
2153 evs[0].message
2154 );
2155
2156 let collector = LevelCollector::default();
2158 let subscriber = Registry::default().with(collector.clone());
2159 tracing::subscriber::with_default(subscriber, || {
2160 comparison_with_duration_pct(-25.0).emit_tracing_summary();
2161 });
2162 let evs = collector.events.lock().expect("lock").clone();
2163 assert_eq!(
2164 evs[0].level, "INFO",
2165 "-25% improvement must be info; got {evs:?}"
2166 );
2167 assert!(
2168 evs[0].message.contains("notable improvement"),
2169 "info message must name the improvement; got {:?}",
2170 evs[0].message
2171 );
2172
2173 let collector = LevelCollector::default();
2175 let subscriber = Registry::default().with(collector.clone());
2176 tracing::subscriber::with_default(subscriber, || {
2177 comparison_with_duration_pct(5.0).emit_tracing_summary();
2178 });
2179 let evs = collector.events.lock().expect("lock").clone();
2180 assert_eq!(
2181 evs[0].level, "DEBUG",
2182 "+5% within steady-state must be debug; got {evs:?}"
2183 );
2184 assert!(
2185 evs[0].message.contains("cross-run comparison"),
2186 "debug message must use the steady-state phrasing; got {:?}",
2187 evs[0].message
2188 );
2189
2190 let collector = LevelCollector::default();
2192 let subscriber = Registry::default().with(collector.clone());
2193 tracing::subscriber::with_default(subscriber, || {
2194 comparison_with_duration_pct(25.0).emit_tracing_summary();
2195 });
2196 let evs = collector.events.lock().expect("lock").clone();
2197 assert_eq!(
2198 evs[0].level, "WARN",
2199 "exactly +25% must be warn (inclusive threshold); got {evs:?}"
2200 );
2201
2202 let collector = LevelCollector::default();
2204 let subscriber = Registry::default().with(collector.clone());
2205 tracing::subscriber::with_default(subscriber, || {
2206 comparison_with_duration_pct(-10.0).emit_tracing_summary();
2207 });
2208 let evs = collector.events.lock().expect("lock").clone();
2209 assert_eq!(
2210 evs[0].level, "INFO",
2211 "exactly -10% must be info (inclusive threshold); got {evs:?}"
2212 );
2213
2214 let collector = LevelCollector::default();
2217 let subscriber = Registry::default().with(collector.clone());
2218 tracing::subscriber::with_default(subscriber, || {
2219 RefreshLedgerEvidenceComparison {
2220 phase_deltas: Vec::new(),
2221 aggregate_duration_delta_pct: None,
2222 aggregate_throughput_delta_pct: None,
2223 dominant_phase_shift: None,
2224 }
2225 .emit_tracing_summary();
2226 });
2227 let evs = collector.events.lock().expect("lock").clone();
2228 assert_eq!(
2229 evs[0].level, "DEBUG",
2230 "None duration delta defaults to steady-state (debug); got {evs:?}"
2231 );
2232 }
2233
2234 #[test]
2242 fn regression_verdict_categorizes_each_band_and_handles_degenerate_cases() {
2243 let thresholds = RegressionVerdictThresholds::defaults();
2244 assert_eq!(thresholds.warning_duration_pct, 15.0);
2245 assert_eq!(thresholds.failure_duration_pct, 30.0);
2246
2247 fn comparison_with_pct(pct: Option<f64>) -> RefreshLedgerEvidenceComparison {
2249 RefreshLedgerEvidenceComparison {
2250 phase_deltas: Vec::new(),
2251 aggregate_duration_delta_pct: pct,
2252 aggregate_throughput_delta_pct: None,
2253 dominant_phase_shift: None,
2254 }
2255 }
2256
2257 let clean = comparison_with_pct(Some(10.0)).regression_verdict(&thresholds);
2260 assert_eq!(clean, RegressionVerdict::Clean);
2261 assert!(!clean.should_fail_build());
2262
2263 let warn_at = comparison_with_pct(Some(15.0)).regression_verdict(&thresholds);
2266 assert!(
2267 matches!(
2268 warn_at,
2269 RegressionVerdict::Warning { duration_delta_pct, threshold_pct }
2270 if (duration_delta_pct - 15.0).abs() < 0.01 && threshold_pct == 15.0
2271 ),
2272 "+15% must trigger warn at the inclusive threshold; got {warn_at:?}"
2273 );
2274 assert!(!warn_at.should_fail_build());
2275
2276 let warn_mid = comparison_with_pct(Some(22.5)).regression_verdict(&thresholds);
2278 assert!(matches!(warn_mid, RegressionVerdict::Warning { .. }));
2279 assert!(!warn_mid.should_fail_build());
2280
2281 let fail_at = comparison_with_pct(Some(30.0)).regression_verdict(&thresholds);
2284 assert!(
2285 matches!(
2286 fail_at,
2287 RegressionVerdict::Failure { duration_delta_pct, threshold_pct }
2288 if (duration_delta_pct - 30.0).abs() < 0.01 && threshold_pct == 30.0
2289 ),
2290 "+30% must trigger failure at the inclusive threshold; got {fail_at:?}"
2291 );
2292 assert!(
2293 fail_at.should_fail_build(),
2294 "Failure verdict MUST cause CI to exit non-zero"
2295 );
2296
2297 let fail_far = comparison_with_pct(Some(150.0)).regression_verdict(&thresholds);
2299 assert!(matches!(fail_far, RegressionVerdict::Failure { .. }));
2300
2301 let improvement = comparison_with_pct(Some(-50.0)).regression_verdict(&thresholds);
2303 assert_eq!(
2304 improvement,
2305 RegressionVerdict::Clean,
2306 "improvements (negative duration delta) MUST NOT trigger regression verdicts; \
2307 got {improvement:?}"
2308 );
2309
2310 let no_data = comparison_with_pct(None).regression_verdict(&thresholds);
2312 assert_eq!(
2313 no_data,
2314 RegressionVerdict::Clean,
2315 "missing comparison data MUST NOT cause a CI failure (no signal to gate on)"
2316 );
2317
2318 let invalid_negative = RegressionVerdictThresholds {
2319 warning_duration_pct: -20.0,
2320 failure_duration_pct: -10.0,
2321 };
2322 let steady_state = comparison_with_pct(Some(0.0)).regression_verdict(&invalid_negative);
2323 assert_eq!(
2324 steady_state,
2325 RegressionVerdict::Clean,
2326 "invalid negative thresholds must fail open instead of turning a 0% \
2327 steady-state comparison into a CI failure"
2328 );
2329 }
2330
2331 #[test]
2338 fn regression_verdict_thresholds_try_new_rejects_inconsistent_configurations() {
2339 assert!(RegressionVerdictThresholds::try_new(10.0, 20.0).is_ok());
2341
2342 let err = RegressionVerdictThresholds::try_new(20.0, 10.0)
2344 .expect_err("warning > failure must be rejected");
2345 assert!(
2346 err.contains("strictly less"),
2347 "rejection message must explain the constraint; got {err:?}"
2348 );
2349
2350 let err_eq = RegressionVerdictThresholds::try_new(15.0, 15.0)
2352 .expect_err("warning == failure must be rejected");
2353 assert!(err_eq.contains("strictly less"));
2354
2355 let negative_warning = RegressionVerdictThresholds::try_new(-20.0, 10.0)
2358 .expect_err("negative warning threshold must be rejected");
2359 assert!(negative_warning.contains("non-negative"));
2360 let negative_failure = RegressionVerdictThresholds::try_new(10.0, -20.0)
2361 .expect_err("negative failure threshold must be rejected");
2362 assert!(negative_failure.contains("non-negative"));
2363 let invalid_json = r#"{"warning_duration_pct":-30.0,"failure_duration_pct":-10.0}"#;
2364 let deser = serde_json::from_str::<RegressionVerdictThresholds>(invalid_json)
2365 .expect_err("serde-loaded negative thresholds must be rejected too");
2366 assert!(
2367 deser.to_string().contains("non-negative"),
2368 "serde validation error must explain the threshold polarity; got {deser}"
2369 );
2370
2371 assert!(RegressionVerdictThresholds::try_new(f64::NAN, 30.0).is_err());
2374 assert!(RegressionVerdictThresholds::try_new(15.0, f64::INFINITY).is_err());
2375 }
2376
2377 #[test]
2387 fn regression_verdict_zero_change_under_valid_custom_thresholds_is_clean() {
2388 fn zero_delta_comparison() -> RefreshLedgerEvidenceComparison {
2389 RefreshLedgerEvidenceComparison {
2390 phase_deltas: Vec::new(),
2391 aggregate_duration_delta_pct: Some(0.0),
2392 aggregate_throughput_delta_pct: None,
2393 dominant_phase_shift: None,
2394 }
2395 }
2396
2397 let strict = RegressionVerdictThresholds::try_new(5.0, 20.0)
2400 .expect("valid strict thresholds must construct");
2401 let steady_state = zero_delta_comparison().regression_verdict(&strict);
2402 assert_eq!(
2403 steady_state,
2404 RegressionVerdict::Clean,
2405 "0% steady-state delta must be Clean under any valid \
2406 threshold pair — tight CI profiles must not flag no-op runs"
2407 );
2408
2409 let loose = RegressionVerdictThresholds::try_new(50.0, 200.0)
2413 .expect("valid loose thresholds must construct");
2414 let steady_state_loose = zero_delta_comparison().regression_verdict(&loose);
2415 assert_eq!(
2416 steady_state_loose,
2417 RegressionVerdict::Clean,
2418 "0% steady-state delta must be Clean under loose thresholds too"
2419 );
2420 }
2421
2422 #[test]
2428 fn regression_verdict_serializes_with_snake_case_verdict_tag() {
2429 let clean_json = serde_json::to_string(&RegressionVerdict::Clean).expect("serialize");
2430 assert!(
2431 clean_json.contains("\"verdict\":\"clean\""),
2432 "Clean must serialize with snake_case `verdict` tag; got {clean_json}"
2433 );
2434
2435 let warning_json = serde_json::to_string(&RegressionVerdict::Warning {
2436 duration_delta_pct: 18.5,
2437 threshold_pct: 15.0,
2438 })
2439 .expect("serialize");
2440 assert!(warning_json.contains("\"verdict\":\"warning\""));
2441 assert!(warning_json.contains("\"duration_delta_pct\":18.5"));
2442 assert!(warning_json.contains("\"threshold_pct\":15"));
2443
2444 let failure_json = serde_json::to_string(&RegressionVerdict::Failure {
2445 duration_delta_pct: 42.0,
2446 threshold_pct: 30.0,
2447 })
2448 .expect("serialize");
2449 assert!(failure_json.contains("\"verdict\":\"failure\""));
2450 }
2451}