use std::collections::BTreeMap;
use std::time::Instant;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RefreshPhase {
Scan,
Persist,
LexicalRebuild,
Publish,
Analytics,
Semantic,
Recovery,
}
impl RefreshPhase {
pub const ALL: &'static [RefreshPhase] = &[
Self::Scan,
Self::Persist,
Self::LexicalRebuild,
Self::Publish,
Self::Analytics,
Self::Semantic,
Self::Recovery,
];
pub fn as_str(&self) -> &'static str {
match self {
Self::Scan => "scan",
Self::Persist => "persist",
Self::LexicalRebuild => "lexical_rebuild",
Self::Publish => "publish",
Self::Analytics => "analytics",
Self::Semantic => "semantic",
Self::Recovery => "recovery",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseRecord {
pub phase: RefreshPhase,
pub duration_ms: u64,
pub items_processed: u64,
pub items_skipped: u64,
pub errors: u64,
pub counters: BTreeMap<String, u64>,
pub success: bool,
pub error_message: Option<String>,
}
impl PhaseRecord {
fn new(phase: RefreshPhase) -> Self {
Self {
phase,
duration_ms: 0,
items_processed: 0,
items_skipped: 0,
errors: 0,
counters: BTreeMap::new(),
success: true,
error_message: None,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EquivalenceArtifacts {
pub conversation_count: u64,
pub message_count: u64,
pub lexical_doc_count: u64,
pub lexical_fingerprint: Option<String>,
pub semantic_manifest_fingerprint: Option<String>,
pub search_hit_digest: Option<String>,
pub peak_rss_bytes: Option<u64>,
pub db_size_bytes: Option<u64>,
pub lexical_index_size_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefreshLedger {
pub version: u32,
pub started_at_ms: i64,
pub completed_at_ms: i64,
pub total_duration_ms: u64,
pub full_rebuild: bool,
pub corpus_family: String,
pub phases: Vec<PhaseRecord>,
pub equivalence: EquivalenceArtifacts,
pub tags: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct RefreshReadinessMilestones {
pub time_to_lexical_ready_ms: Option<u64>,
pub time_to_search_ready_ms: Option<u64>,
pub time_to_full_settled_ms: Option<u64>,
pub failed_phase: Option<String>,
pub search_readiness_state: RefreshSearchReadinessState,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RefreshSearchReadinessState {
Published,
#[default]
WaitingForPublish,
BlockedBeforePublish,
PublishFailed,
}
impl Default for RefreshLedger {
fn default() -> Self {
Self {
version: 1,
started_at_ms: 0,
completed_at_ms: 0,
total_duration_ms: 0,
full_rebuild: false,
corpus_family: "default".to_owned(),
phases: Vec::new(),
equivalence: EquivalenceArtifacts::default(),
tags: BTreeMap::new(),
}
}
}
impl RefreshLedger {
pub fn start(corpus_family: &str, full_rebuild: bool) -> LedgerBuilder {
LedgerBuilder::new(corpus_family, full_rebuild)
}
pub fn phase(&self, phase: RefreshPhase) -> Option<&PhaseRecord> {
self.phases.iter().find(|p| p.phase == phase)
}
pub fn total_items_processed(&self) -> u64 {
self.phases.iter().map(|p| p.items_processed).sum()
}
pub fn total_errors(&self) -> u64 {
self.phases.iter().map(|p| p.errors).sum()
}
pub fn all_phases_succeeded(&self) -> bool {
self.phases.iter().all(|p| p.success)
}
pub fn failed_phases(&self) -> Vec<&PhaseRecord> {
self.phases.iter().filter(|p| !p.success).collect()
}
pub fn duration_breakdown(&self) -> BTreeMap<String, u64> {
self.phases
.iter()
.map(|p| (p.phase.as_str().to_owned(), p.duration_ms))
.collect()
}
pub fn readiness_milestones(&self) -> RefreshReadinessMilestones {
RefreshReadinessMilestones {
time_to_lexical_ready_ms: self
.successful_duration_through(RefreshPhase::LexicalRebuild),
time_to_search_ready_ms: self.successful_duration_through(RefreshPhase::Publish),
time_to_full_settled_ms: self.full_settlement_duration_ms(),
failed_phase: self
.failed_phases()
.first()
.map(|phase| phase.phase.as_str().to_owned()),
search_readiness_state: self.search_readiness_state(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_owned())
}
fn successful_duration_through(&self, target: RefreshPhase) -> Option<u64> {
let mut elapsed_ms = 0u64;
for phase in &self.phases {
elapsed_ms = elapsed_ms.saturating_add(phase.duration_ms);
if !phase.success {
return None;
}
if phase.phase == target {
return Some(elapsed_ms);
}
}
None
}
fn sum_phase_durations(&self) -> u64 {
self.phases
.iter()
.map(|phase| phase.duration_ms)
.fold(0u64, u64::saturating_add)
}
fn full_settlement_duration_ms(&self) -> Option<u64> {
(self.all_phases_succeeded()
&& self.search_readiness_state() == RefreshSearchReadinessState::Published)
.then(|| {
if self.total_duration_ms > 0 {
self.total_duration_ms
} else {
self.sum_phase_durations()
}
})
}
fn search_readiness_state(&self) -> RefreshSearchReadinessState {
let mut published = false;
for phase in &self.phases {
if !phase.success {
return if phase.phase == RefreshPhase::Publish {
RefreshSearchReadinessState::PublishFailed
} else if published {
RefreshSearchReadinessState::Published
} else {
RefreshSearchReadinessState::BlockedBeforePublish
};
}
if phase.phase == RefreshPhase::Publish {
published = true;
}
}
if published {
RefreshSearchReadinessState::Published
} else {
RefreshSearchReadinessState::WaitingForPublish
}
}
}
pub struct LedgerBuilder {
ledger: RefreshLedger,
start_time: Instant,
current_phase: Option<(RefreshPhase, Instant)>,
current_record: Option<PhaseRecord>,
}
impl LedgerBuilder {
fn new(corpus_family: &str, full_rebuild: bool) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
Self {
ledger: RefreshLedger {
started_at_ms: now,
full_rebuild,
corpus_family: corpus_family.to_owned(),
..Default::default()
},
start_time: Instant::now(),
current_phase: None,
current_record: None,
}
}
pub fn begin_phase(&mut self, phase: RefreshPhase) {
self.end_current_phase();
self.current_phase = Some((phase, Instant::now()));
self.current_record = Some(PhaseRecord::new(phase));
}
pub fn record_items(&mut self, processed: u64, skipped: u64) {
if let Some(ref mut record) = self.current_record {
record.items_processed += processed;
record.items_skipped += skipped;
}
}
pub fn record_error(&mut self, message: &str) {
if let Some(ref mut record) = self.current_record {
record.errors += 1;
match &mut record.error_message {
Some(existing) => {
existing.push_str("; ");
existing.push_str(message);
}
None => record.error_message = Some(message.to_owned()),
}
}
}
pub fn record_failure(&mut self, message: &str) {
if let Some(ref mut record) = self.current_record {
record.success = false;
record.errors = record.errors.saturating_add(1);
record.error_message = Some(message.to_owned());
}
}
pub fn set_counter(&mut self, key: &str, value: u64) {
if let Some(ref mut record) = self.current_record {
record.counters.insert(key.to_owned(), value);
}
}
pub fn inc_counter(&mut self, key: &str, delta: u64) {
if let Some(ref mut record) = self.current_record {
*record.counters.entry(key.to_owned()).or_insert(0) += delta;
}
}
pub fn set_equivalence(&mut self, artifacts: EquivalenceArtifacts) {
self.ledger.equivalence = artifacts;
}
pub fn tag(&mut self, key: &str, value: &str) {
self.ledger.tags.insert(key.to_owned(), value.to_owned());
}
pub fn finish(mut self) -> RefreshLedger {
self.end_current_phase();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
self.ledger.completed_at_ms = now;
self.ledger.total_duration_ms = self.start_time.elapsed().as_millis() as u64;
self.ledger
}
fn end_current_phase(&mut self) {
let Some((_, phase_start)) = self.current_phase.take() else {
return;
};
let Some(mut record) = self.current_record.take() else {
return;
};
record.duration_ms = phase_start.elapsed().as_millis() as u64;
self.ledger.phases.push(record);
}
}
pub mod corpus_families {
pub const SMALL: &str = "small";
pub const MEDIUM: &str = "medium";
pub const LARGE: &str = "large";
pub const DUPLICATE_HEAVY: &str = "duplicate_heavy";
pub const PATHOLOGICAL: &str = "pathological";
pub const MIXED_AGENT: &str = "mixed_agent";
pub const INCREMENTAL: &str = "incremental";
}
#[derive(Debug, Clone)]
pub struct BenchmarkCorpusConfig {
pub family: String,
pub num_conversations: usize,
pub messages_per_conversation: usize,
pub duplicate_fraction: f64,
pub max_message_length: usize,
pub agent_count: usize,
}
impl BenchmarkCorpusConfig {
pub fn small() -> Self {
Self {
family: corpus_families::SMALL.to_owned(),
num_conversations: 10,
messages_per_conversation: 4,
duplicate_fraction: 0.0,
max_message_length: 500,
agent_count: 3,
}
}
pub fn medium() -> Self {
Self {
family: corpus_families::MEDIUM.to_owned(),
num_conversations: 100,
messages_per_conversation: 5,
duplicate_fraction: 0.05,
max_message_length: 2000,
agent_count: 5,
}
}
pub fn large() -> Self {
Self {
family: corpus_families::LARGE.to_owned(),
num_conversations: 1000,
messages_per_conversation: 5,
duplicate_fraction: 0.05,
max_message_length: 2000,
agent_count: 8,
}
}
pub fn duplicate_heavy() -> Self {
Self {
family: corpus_families::DUPLICATE_HEAVY.to_owned(),
num_conversations: 50,
messages_per_conversation: 6,
duplicate_fraction: 0.5,
max_message_length: 1000,
agent_count: 3,
}
}
pub fn pathological() -> Self {
Self {
family: corpus_families::PATHOLOGICAL.to_owned(),
num_conversations: 20,
messages_per_conversation: 10,
duplicate_fraction: 0.0,
max_message_length: 50_000,
agent_count: 2,
}
}
pub fn mixed_agent() -> Self {
Self {
family: corpus_families::MIXED_AGENT.to_owned(),
num_conversations: 70,
messages_per_conversation: 4,
duplicate_fraction: 0.0,
max_message_length: 1000,
agent_count: 14,
}
}
pub fn incremental() -> Self {
Self {
family: corpus_families::INCREMENTAL.to_owned(),
num_conversations: 50,
messages_per_conversation: 4,
duplicate_fraction: 0.0,
max_message_length: 1000,
agent_count: 3,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RefreshThroughputProfile {
pub phase: RefreshPhase,
pub duration_ms: u64,
pub items_processed: u64,
pub items_per_second: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RefreshPhaseShare {
pub phase: RefreshPhase,
pub duration_ms: u64,
pub share_pct: f64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RefreshLedgerEvidence {
pub throughput: Vec<RefreshThroughputProfile>,
pub phase_share: Vec<RefreshPhaseShare>,
pub dominant_phase: Option<RefreshPhase>,
pub aggregate_items_processed: u64,
pub aggregate_duration_ms: u64,
pub aggregate_items_per_second: Option<f64>,
}
impl RefreshLedger {
pub fn evidence_summary(&self) -> RefreshLedgerEvidence {
let total_ms = self.total_duration_ms;
let throughput: Vec<RefreshThroughputProfile> = self
.phases
.iter()
.filter(|phase| phase.items_processed > 0)
.map(|phase| {
let items_per_second =
items_per_second_for(phase.duration_ms, phase.items_processed);
RefreshThroughputProfile {
phase: phase.phase,
duration_ms: phase.duration_ms,
items_processed: phase.items_processed,
items_per_second,
}
})
.collect();
let phase_share: Vec<RefreshPhaseShare> = self
.phases
.iter()
.map(|phase| RefreshPhaseShare {
phase: phase.phase,
duration_ms: phase.duration_ms,
share_pct: share_pct_for(phase.duration_ms, total_ms),
})
.collect();
let dominant_phase = self
.phases
.iter()
.max_by_key(|phase| phase.duration_ms)
.filter(|phase| phase.duration_ms > 0)
.map(|phase| phase.phase);
let aggregate_items_processed = self.total_items_processed();
let aggregate_items_per_second = items_per_second_for(total_ms, aggregate_items_processed);
RefreshLedgerEvidence {
throughput,
phase_share,
dominant_phase,
aggregate_items_processed,
aggregate_duration_ms: total_ms,
aggregate_items_per_second,
}
}
}
fn items_per_second_for(duration_ms: u64, items: u64) -> Option<f64> {
if duration_ms == 0 || items == 0 {
return None;
}
let seconds = duration_ms as f64 / 1000.0;
if seconds <= 0.0 {
return None;
}
let raw = items as f64 / seconds;
Some((raw * 1000.0).round() / 1000.0)
}
fn share_pct_for(phase_ms: u64, total_ms: u64) -> f64 {
if total_ms == 0 || phase_ms == 0 {
return 0.0;
}
let raw = (phase_ms as f64 / total_ms as f64) * 100.0;
(raw * 100.0).round() / 100.0
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RefreshPhaseDelta {
pub phase: RefreshPhase,
pub baseline_duration_ms: u64,
pub current_duration_ms: u64,
pub duration_delta_pct: Option<f64>,
pub baseline_items_processed: u64,
pub current_items_processed: u64,
pub baseline_items_per_second: Option<f64>,
pub current_items_per_second: Option<f64>,
pub throughput_delta_pct: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RefreshLedgerEvidenceComparison {
pub phase_deltas: Vec<RefreshPhaseDelta>,
pub aggregate_duration_delta_pct: Option<f64>,
pub aggregate_throughput_delta_pct: Option<f64>,
pub dominant_phase_shift: Option<(RefreshPhase, RefreshPhase)>,
}
impl RefreshLedgerEvidence {
pub fn compare_to(&self, baseline: &Self) -> RefreshLedgerEvidenceComparison {
use std::collections::{HashMap, HashSet};
let mut baseline_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
for entry in &baseline.phase_share {
baseline_share_by_phase.insert(entry.phase, entry);
}
let mut current_share_by_phase: HashMap<RefreshPhase, &RefreshPhaseShare> = HashMap::new();
for entry in &self.phase_share {
current_share_by_phase.insert(entry.phase, entry);
}
let mut baseline_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> =
HashMap::new();
for entry in &baseline.throughput {
baseline_by_phase.insert(entry.phase, entry);
}
let mut current_by_phase: HashMap<RefreshPhase, &RefreshThroughputProfile> = HashMap::new();
for entry in &self.throughput {
current_by_phase.insert(entry.phase, entry);
}
let mut all_phases: HashSet<RefreshPhase> = HashSet::new();
all_phases.extend(baseline_share_by_phase.keys().copied());
all_phases.extend(current_share_by_phase.keys().copied());
all_phases.extend(baseline_by_phase.keys().copied());
all_phases.extend(current_by_phase.keys().copied());
let phase_deltas: Vec<RefreshPhaseDelta> = RefreshPhase::ALL
.iter()
.copied()
.filter(|phase| all_phases.contains(phase))
.map(|phase| {
let baseline_entry = baseline_by_phase.get(&phase).copied();
let current_entry = current_by_phase.get(&phase).copied();
let baseline_duration_ms = baseline_share_by_phase
.get(&phase)
.map(|e| e.duration_ms)
.or_else(|| baseline_entry.map(|e| e.duration_ms))
.unwrap_or(0);
let current_duration_ms = current_share_by_phase
.get(&phase)
.map(|e| e.duration_ms)
.or_else(|| current_entry.map(|e| e.duration_ms))
.unwrap_or(0);
let baseline_items_processed =
baseline_entry.map(|e| e.items_processed).unwrap_or(0);
let current_items_processed = current_entry.map(|e| e.items_processed).unwrap_or(0);
let baseline_items_per_second = baseline_entry.and_then(|e| e.items_per_second);
let current_items_per_second = current_entry.and_then(|e| e.items_per_second);
RefreshPhaseDelta {
phase,
baseline_duration_ms,
current_duration_ms,
duration_delta_pct: pct_delta(
baseline_duration_ms as f64,
current_duration_ms as f64,
),
baseline_items_processed,
current_items_processed,
baseline_items_per_second,
current_items_per_second,
throughput_delta_pct: match (
baseline_items_per_second,
current_items_per_second,
) {
(Some(b), Some(c)) => pct_delta(b, c),
_ => None,
},
}
})
.collect();
let aggregate_duration_delta_pct = pct_delta(
baseline.aggregate_duration_ms as f64,
self.aggregate_duration_ms as f64,
);
let aggregate_throughput_delta_pct = match (
baseline.aggregate_items_per_second,
self.aggregate_items_per_second,
) {
(Some(b), Some(c)) => pct_delta(b, c),
_ => None,
};
let dominant_phase_shift = match (baseline.dominant_phase, self.dominant_phase) {
(Some(from), Some(to)) if from != to => Some((from, to)),
_ => None,
};
RefreshLedgerEvidenceComparison {
phase_deltas,
aggregate_duration_delta_pct,
aggregate_throughput_delta_pct,
dominant_phase_shift,
}
}
}
fn pct_delta(baseline: f64, current: f64) -> Option<f64> {
if !baseline.is_finite() || !current.is_finite() {
return None;
}
if baseline == 0.0 {
return None;
}
let raw = ((current - baseline) / baseline) * 100.0;
if !raw.is_finite() {
return None;
}
Some((raw * 100.0).round() / 100.0)
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct RegressionVerdictThresholds {
pub warning_duration_pct: f64,
pub failure_duration_pct: f64,
}
impl RegressionVerdictThresholds {
pub fn defaults() -> Self {
Self {
warning_duration_pct: 15.0,
failure_duration_pct: 30.0,
}
}
pub fn try_new(
warning_duration_pct: f64,
failure_duration_pct: f64,
) -> Result<Self, &'static str> {
if !warning_duration_pct.is_finite() || !failure_duration_pct.is_finite() {
return Err("regression thresholds must be finite f64s");
}
if warning_duration_pct < 0.0 || failure_duration_pct < 0.0 {
return Err("regression thresholds must be non-negative percentages");
}
if warning_duration_pct >= failure_duration_pct {
return Err(
"warning_duration_pct must be strictly less than failure_duration_pct, \
otherwise the warning level is unreachable",
);
}
Ok(Self {
warning_duration_pct,
failure_duration_pct,
})
}
fn is_valid(&self) -> bool {
self.warning_duration_pct.is_finite()
&& self.failure_duration_pct.is_finite()
&& self.warning_duration_pct >= 0.0
&& self.failure_duration_pct >= 0.0
&& self.warning_duration_pct < self.failure_duration_pct
}
}
impl<'de> Deserialize<'de> for RegressionVerdictThresholds {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct RawThresholds {
warning_duration_pct: f64,
failure_duration_pct: f64,
}
let raw = RawThresholds::deserialize(deserializer)?;
Self::try_new(raw.warning_duration_pct, raw.failure_duration_pct)
.map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "verdict")]
pub enum RegressionVerdict {
Clean,
Warning {
duration_delta_pct: f64,
threshold_pct: f64,
},
Failure {
duration_delta_pct: f64,
threshold_pct: f64,
},
}
impl RegressionVerdict {
pub fn should_fail_build(&self) -> bool {
matches!(self, Self::Failure { .. })
}
}
impl RefreshLedgerEvidenceComparison {
pub fn regression_verdict(
&self,
thresholds: &RegressionVerdictThresholds,
) -> RegressionVerdict {
if !thresholds.is_valid() {
return RegressionVerdict::Clean;
}
let Some(duration_pct) = self.aggregate_duration_delta_pct else {
return RegressionVerdict::Clean;
};
if duration_pct < 0.0 {
return RegressionVerdict::Clean;
}
if duration_pct >= thresholds.failure_duration_pct {
return RegressionVerdict::Failure {
duration_delta_pct: duration_pct,
threshold_pct: thresholds.failure_duration_pct,
};
}
if duration_pct >= thresholds.warning_duration_pct {
return RegressionVerdict::Warning {
duration_delta_pct: duration_pct,
threshold_pct: thresholds.warning_duration_pct,
};
}
RegressionVerdict::Clean
}
}
impl RefreshLedgerEvidenceComparison {
pub fn emit_tracing_summary(&self) {
let dominant_shift_str = self
.dominant_phase_shift
.map(|(from, to)| format!("{}->{}", from.as_str(), to.as_str()))
.unwrap_or_else(|| "none".to_string());
let aggregate_duration_str = self
.aggregate_duration_delta_pct
.map(|pct| format!("{pct:+.2}%"))
.unwrap_or_else(|| "n/a".to_string());
let aggregate_throughput_str = self
.aggregate_throughput_delta_pct
.map(|pct| format!("{pct:+.2}%"))
.unwrap_or_else(|| "n/a".to_string());
const SLOWDOWN_WARN_THRESHOLD_PCT: f64 = 25.0;
const IMPROVEMENT_INFO_THRESHOLD_PCT: f64 = -10.0;
let duration_pct = self.aggregate_duration_delta_pct.unwrap_or(0.0);
let phase_count = self.phase_deltas.len();
let aggregate_throughput_pct = self.aggregate_throughput_delta_pct.unwrap_or(0.0);
macro_rules! emit_tier {
($macro:ident, $msg:literal) => {
tracing::$macro!(
target: "cass::indexer::lexical_refresh",
aggregate_duration_delta_pct = duration_pct,
aggregate_throughput_delta_pct = aggregate_throughput_pct,
aggregate_duration = %aggregate_duration_str,
aggregate_throughput = %aggregate_throughput_str,
dominant_phase_shift = %dominant_shift_str,
phase_count,
$msg
)
};
}
if duration_pct >= SLOWDOWN_WARN_THRESHOLD_PCT {
emit_tier!(
warn,
"lexical refresh evidence: significant slowdown vs previous publish"
);
} else if duration_pct <= IMPROVEMENT_INFO_THRESHOLD_PCT {
emit_tier!(
info,
"lexical refresh evidence: notable improvement vs previous publish"
);
} else {
emit_tier!(debug, "lexical refresh evidence: cross-run comparison");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn phase_model_covers_all_phases() {
assert_eq!(RefreshPhase::ALL.len(), 7);
assert_eq!(RefreshPhase::ALL[0], RefreshPhase::Scan);
assert_eq!(RefreshPhase::ALL[6], RefreshPhase::Recovery);
}
#[test]
fn phase_as_str_round_trips() {
for phase in RefreshPhase::ALL {
let s = phase.as_str();
assert!(!s.is_empty(), "phase {phase:?} has empty string");
}
}
#[test]
fn ledger_builder_records_phases() {
let mut builder = RefreshLedger::start("small", false);
builder.begin_phase(RefreshPhase::Scan);
builder.record_items(100, 5);
builder.set_counter("connectors_scanned", 3);
builder.begin_phase(RefreshPhase::Persist);
builder.record_items(95, 0);
builder.set_counter("bytes_written", 50_000);
builder.begin_phase(RefreshPhase::LexicalRebuild);
builder.record_items(450, 0);
builder.begin_phase(RefreshPhase::Publish);
builder.record_items(1, 0);
let ledger = builder.finish();
assert_eq!(ledger.phases.len(), 4);
assert_eq!(ledger.corpus_family, "small");
assert!(!ledger.full_rebuild);
let scan = ledger.phase(RefreshPhase::Scan).unwrap();
assert_eq!(scan.items_processed, 100);
assert_eq!(scan.items_skipped, 5);
assert_eq!(*scan.counters.get("connectors_scanned").unwrap(), 3);
let persist = ledger.phase(RefreshPhase::Persist).unwrap();
assert_eq!(persist.items_processed, 95);
assert_eq!(*persist.counters.get("bytes_written").unwrap(), 50_000);
assert!(ledger.all_phases_succeeded());
assert_eq!(ledger.total_items_processed(), 100 + 95 + 450 + 1);
assert!(ledger.completed_at_ms >= ledger.started_at_ms);
let max_phase_duration = ledger
.phases
.iter()
.map(|phase| phase.duration_ms)
.max()
.unwrap_or(0);
assert!(ledger.total_duration_ms >= max_phase_duration);
}
#[test]
fn ledger_builder_records_failures() {
let mut builder = RefreshLedger::start("small", false);
builder.begin_phase(RefreshPhase::Scan);
builder.record_items(50, 0);
builder.begin_phase(RefreshPhase::Persist);
builder.record_failure("database locked");
let ledger = builder.finish();
assert!(!ledger.all_phases_succeeded());
assert_eq!(ledger.failed_phases().len(), 1);
assert_eq!(ledger.failed_phases()[0].phase, RefreshPhase::Persist);
assert_eq!(
ledger.failed_phases()[0].error_message.as_deref(),
Some("database locked")
);
assert_eq!(ledger.failed_phases()[0].errors, 1);
assert_eq!(ledger.total_errors(), 1);
}
#[test]
fn ledger_builder_records_errors_without_failure() {
let mut builder = RefreshLedger::start("medium", false);
builder.begin_phase(RefreshPhase::Scan);
builder.record_items(90, 0);
builder.record_error("connector timeout");
builder.record_error("permission denied");
let ledger = builder.finish();
let scan = ledger.phase(RefreshPhase::Scan).unwrap();
assert!(scan.success); assert_eq!(scan.errors, 2);
let msg = scan.error_message.as_deref().unwrap();
assert!(
msg.contains("connector timeout"),
"missing first error: {msg}"
);
assert!(
msg.contains("permission denied"),
"missing second error: {msg}"
);
}
#[test]
fn ledger_equivalence_artifacts() {
let mut builder = RefreshLedger::start("small", true);
builder.begin_phase(RefreshPhase::Scan);
builder.record_items(10, 0);
builder.set_equivalence(EquivalenceArtifacts {
conversation_count: 10,
message_count: 40,
lexical_doc_count: 40,
lexical_fingerprint: Some("fp-abc".to_owned()),
semantic_manifest_fingerprint: None,
search_hit_digest: Some("sha256-xyz".to_owned()),
peak_rss_bytes: Some(100_000_000),
db_size_bytes: Some(5_000_000),
lexical_index_size_bytes: Some(2_000_000),
});
let ledger = builder.finish();
assert_eq!(ledger.equivalence.conversation_count, 10);
assert_eq!(ledger.equivalence.message_count, 40);
assert_eq!(
ledger.equivalence.lexical_fingerprint.as_deref(),
Some("fp-abc")
);
assert!(ledger.full_rebuild);
}
#[test]
fn ledger_duration_breakdown() {
let mut builder = RefreshLedger::start("small", false);
builder.begin_phase(RefreshPhase::Scan);
builder.begin_phase(RefreshPhase::LexicalRebuild);
let ledger = builder.finish();
let breakdown = ledger.duration_breakdown();
assert!(breakdown.contains_key("scan"));
assert!(breakdown.contains_key("lexical_rebuild"));
}
#[test]
fn readiness_milestones_measure_lexical_search_and_settled_times() {
let ledger = RefreshLedger {
total_duration_ms: 90,
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
phase_record(RefreshPhase::LexicalRebuild, 30, true),
phase_record(RefreshPhase::Publish, 5, true),
phase_record(RefreshPhase::Analytics, 7, true),
phase_record(RefreshPhase::Semantic, 8, true),
],
..Default::default()
};
let milestones = ledger.readiness_milestones();
assert_eq!(milestones.time_to_lexical_ready_ms, Some(60));
assert_eq!(milestones.time_to_search_ready_ms, Some(65));
assert_eq!(milestones.time_to_full_settled_ms, Some(90));
assert_eq!(milestones.failed_phase, None);
assert_eq!(
milestones.search_readiness_state,
RefreshSearchReadinessState::Published
);
let json = serde_json::to_value(&milestones).unwrap();
assert_eq!(json["time_to_lexical_ready_ms"], 60);
assert_eq!(json["time_to_search_ready_ms"], 65);
assert_eq!(json["time_to_full_settled_ms"], 90);
assert_eq!(json["search_readiness_state"], "published");
}
#[test]
fn readiness_milestones_stop_at_first_failed_phase() {
let ledger = RefreshLedger {
total_duration_ms: 75,
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
phase_record(RefreshPhase::LexicalRebuild, 30, false),
phase_record(RefreshPhase::Publish, 5, true),
],
..Default::default()
};
let milestones = ledger.readiness_milestones();
assert_eq!(milestones.time_to_lexical_ready_ms, None);
assert_eq!(milestones.time_to_search_ready_ms, None);
assert_eq!(milestones.time_to_full_settled_ms, None);
assert_eq!(milestones.failed_phase.as_deref(), Some("lexical_rebuild"));
assert_eq!(
milestones.search_readiness_state,
RefreshSearchReadinessState::BlockedBeforePublish
);
}
#[test]
fn readiness_milestones_explain_unpublished_and_publish_failed_states() {
let unpublished = RefreshLedger {
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
phase_record(RefreshPhase::LexicalRebuild, 30, true),
],
..Default::default()
};
let unpublished_milestones = unpublished.readiness_milestones();
assert_eq!(unpublished_milestones.time_to_lexical_ready_ms, Some(60));
assert_eq!(unpublished_milestones.time_to_search_ready_ms, None);
assert_eq!(unpublished_milestones.time_to_full_settled_ms, None);
assert_eq!(unpublished_milestones.failed_phase, None);
assert_eq!(
unpublished_milestones.search_readiness_state,
RefreshSearchReadinessState::WaitingForPublish
);
let publish_failed = RefreshLedger {
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
phase_record(RefreshPhase::LexicalRebuild, 30, true),
phase_record(RefreshPhase::Publish, 5, false),
],
..Default::default()
};
let publish_failed_milestones = publish_failed.readiness_milestones();
assert_eq!(publish_failed_milestones.time_to_lexical_ready_ms, Some(60));
assert_eq!(publish_failed_milestones.time_to_search_ready_ms, None);
assert_eq!(publish_failed_milestones.time_to_full_settled_ms, None);
assert_eq!(
publish_failed_milestones.failed_phase.as_deref(),
Some("publish")
);
assert_eq!(
publish_failed_milestones.search_readiness_state,
RefreshSearchReadinessState::PublishFailed
);
let post_publish_failure = RefreshLedger {
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
phase_record(RefreshPhase::LexicalRebuild, 30, true),
phase_record(RefreshPhase::Publish, 5, true),
phase_record(RefreshPhase::Analytics, 7, false),
],
..Default::default()
};
let post_publish_failure_milestones = post_publish_failure.readiness_milestones();
assert_eq!(
post_publish_failure_milestones.time_to_lexical_ready_ms,
Some(60)
);
assert_eq!(
post_publish_failure_milestones.time_to_search_ready_ms,
Some(65)
);
assert_eq!(
post_publish_failure_milestones.time_to_full_settled_ms,
None
);
assert_eq!(
post_publish_failure_milestones.failed_phase.as_deref(),
Some("analytics")
);
assert_eq!(
post_publish_failure_milestones.search_readiness_state,
RefreshSearchReadinessState::Published
);
}
#[test]
fn readiness_milestones_do_not_report_full_settlement_before_publish() {
let empty = RefreshLedger::default().readiness_milestones();
assert_eq!(empty.time_to_lexical_ready_ms, None);
assert_eq!(empty.time_to_search_ready_ms, None);
assert_eq!(empty.time_to_full_settled_ms, None);
assert_eq!(
empty.search_readiness_state,
RefreshSearchReadinessState::WaitingForPublish
);
let partial = RefreshLedger {
total_duration_ms: 42,
phases: vec![
phase_record(RefreshPhase::Scan, 10, true),
phase_record(RefreshPhase::Persist, 20, true),
],
..Default::default()
}
.readiness_milestones();
assert_eq!(partial.time_to_lexical_ready_ms, None);
assert_eq!(partial.time_to_search_ready_ms, None);
assert_eq!(partial.time_to_full_settled_ms, None);
assert_eq!(
partial.search_readiness_state,
RefreshSearchReadinessState::WaitingForPublish
);
}
#[test]
fn ledger_tags() {
let mut builder = RefreshLedger::start("medium", false);
builder.tag("run_id", "bench-2026-04-01");
builder.tag("machine", "csd");
let ledger = builder.finish();
assert_eq!(ledger.tags.get("run_id").unwrap(), "bench-2026-04-01");
assert_eq!(ledger.tags.get("machine").unwrap(), "csd");
}
#[test]
fn ledger_json_round_trip() {
let mut builder = RefreshLedger::start("duplicate_heavy", true);
builder.begin_phase(RefreshPhase::Scan);
builder.record_items(50, 10);
builder.set_counter("duplicate_conversations", 25);
builder.begin_phase(RefreshPhase::Persist);
builder.record_items(40, 0);
builder.set_equivalence(EquivalenceArtifacts {
conversation_count: 40,
message_count: 200,
lexical_doc_count: 200,
..Default::default()
});
let ledger = builder.finish();
let json = ledger.to_json();
let deser: RefreshLedger = serde_json::from_str(&json).unwrap();
assert_eq!(deser.corpus_family, "duplicate_heavy");
assert!(deser.full_rebuild);
assert_eq!(deser.phases.len(), 2);
assert_eq!(deser.equivalence.conversation_count, 40);
assert_eq!(
*deser.phases[0]
.counters
.get("duplicate_conversations")
.unwrap(),
25
);
}
#[test]
fn ledger_inc_counter() {
let mut builder = RefreshLedger::start("small", false);
builder.begin_phase(RefreshPhase::Scan);
builder.inc_counter("files_scanned", 10);
builder.inc_counter("files_scanned", 15);
builder.inc_counter("files_scanned", 5);
let ledger = builder.finish();
let scan = ledger.phase(RefreshPhase::Scan).unwrap();
assert_eq!(*scan.counters.get("files_scanned").unwrap(), 30);
}
#[test]
fn benchmark_corpus_configs_have_correct_families() {
assert_eq!(BenchmarkCorpusConfig::small().family, "small");
assert_eq!(BenchmarkCorpusConfig::medium().family, "medium");
assert_eq!(BenchmarkCorpusConfig::large().family, "large");
assert_eq!(
BenchmarkCorpusConfig::duplicate_heavy().family,
"duplicate_heavy"
);
assert_eq!(BenchmarkCorpusConfig::pathological().family, "pathological");
assert_eq!(BenchmarkCorpusConfig::mixed_agent().family, "mixed_agent");
assert_eq!(BenchmarkCorpusConfig::incremental().family, "incremental");
}
#[test]
fn benchmark_corpus_configs_have_reasonable_sizes() {
let configs = [
BenchmarkCorpusConfig::small(),
BenchmarkCorpusConfig::medium(),
BenchmarkCorpusConfig::large(),
BenchmarkCorpusConfig::duplicate_heavy(),
BenchmarkCorpusConfig::pathological(),
BenchmarkCorpusConfig::mixed_agent(),
BenchmarkCorpusConfig::incremental(),
];
for cfg in &configs {
assert!(
cfg.num_conversations > 0,
"{} has 0 conversations",
cfg.family
);
assert!(
cfg.messages_per_conversation > 0,
"{} has 0 messages",
cfg.family
);
assert!(cfg.agent_count > 0, "{} has 0 agents", cfg.family);
assert!(
cfg.duplicate_fraction >= 0.0 && cfg.duplicate_fraction <= 1.0,
"{} has invalid duplicate fraction",
cfg.family
);
}
}
fn phase_record(phase: RefreshPhase, duration_ms: u64, success: bool) -> PhaseRecord {
PhaseRecord {
phase,
duration_ms,
items_processed: 0,
items_skipped: 0,
errors: u64::from(!success),
counters: BTreeMap::new(),
success,
error_message: (!success).then(|| format!("failed {}", phase.as_str())),
}
}
fn phase_record_with_items(phase: RefreshPhase, duration_ms: u64, items: u64) -> PhaseRecord {
PhaseRecord {
phase,
duration_ms,
items_processed: items,
items_skipped: 0,
errors: 0,
counters: BTreeMap::new(),
success: true,
error_message: None,
}
}
fn ledger_with(phases: Vec<PhaseRecord>) -> RefreshLedger {
let total_duration_ms = phases.iter().map(|p| p.duration_ms).sum();
RefreshLedger {
version: 1,
started_at_ms: 1_700_000_000_000,
completed_at_ms: 1_700_000_000_000 + i64::try_from(total_duration_ms).unwrap_or(0),
total_duration_ms,
full_rebuild: true,
corpus_family: "evidence-test".to_owned(),
phases,
equivalence: EquivalenceArtifacts::default(),
tags: BTreeMap::new(),
}
}
#[test]
fn evidence_summary_reports_per_phase_throughput_with_safe_zero_handling() {
let ledger = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 500, 1000),
phase_record_with_items(RefreshPhase::Persist, 1000, 2000),
phase_record_with_items(RefreshPhase::LexicalRebuild, 100, 0),
phase_record_with_items(RefreshPhase::Recovery, 0, 0),
]);
let evidence = ledger.evidence_summary();
assert_eq!(
evidence.throughput.len(),
2,
"throughput must skip zero-item phases; got {:?}",
evidence.throughput
);
let scan = evidence
.throughput
.iter()
.find(|t| t.phase == RefreshPhase::Scan)
.expect("scan throughput present");
assert_eq!(scan.items_per_second, Some(2000.0));
assert_eq!(scan.duration_ms, 500);
assert_eq!(scan.items_processed, 1000);
let persist = evidence
.throughput
.iter()
.find(|t| t.phase == RefreshPhase::Persist)
.expect("persist throughput present");
assert_eq!(persist.items_per_second, Some(2000.0));
assert_eq!(evidence.aggregate_items_processed, 3000);
assert_eq!(evidence.aggregate_duration_ms, 1600);
assert_eq!(evidence.aggregate_items_per_second, Some(1875.0));
}
#[test]
fn evidence_summary_handles_empty_and_zero_duration_ledgers() {
let empty = ledger_with(Vec::new());
let empty_evidence = empty.evidence_summary();
assert!(empty_evidence.throughput.is_empty());
assert!(empty_evidence.phase_share.is_empty());
assert_eq!(empty_evidence.dominant_phase, None);
assert_eq!(empty_evidence.aggregate_items_per_second, None);
assert_eq!(empty_evidence.aggregate_duration_ms, 0);
let instant = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 0, 5),
phase_record_with_items(RefreshPhase::Persist, 0, 5),
]);
let instant_evidence = instant.evidence_summary();
for t in &instant_evidence.throughput {
assert_eq!(t.items_per_second, None, "zero duration must yield None");
}
assert_eq!(instant_evidence.dominant_phase, None);
for share in &instant_evidence.phase_share {
assert_eq!(share.share_pct, 0.0);
assert!(!share.share_pct.is_nan(), "share_pct must never be NaN");
}
}
#[test]
fn evidence_summary_phase_share_sums_to_one_hundred_and_dominant_phase_picks_max() {
let ledger = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 200, 100),
phase_record_with_items(RefreshPhase::Persist, 600, 1500), phase_record_with_items(RefreshPhase::LexicalRebuild, 200, 1500),
]);
let evidence = ledger.evidence_summary();
let total_share: f64 = evidence.phase_share.iter().map(|s| s.share_pct).sum();
assert!(
(total_share - 100.0).abs() <= 0.05,
"phase shares must sum to ~100.0 (±0.05 for rounding); got {total_share}"
);
let persist_share = evidence
.phase_share
.iter()
.find(|s| s.phase == RefreshPhase::Persist)
.expect("persist share present");
assert_eq!(persist_share.share_pct, 60.0);
assert_eq!(evidence.dominant_phase, Some(RefreshPhase::Persist));
}
#[test]
fn evidence_summary_dominant_phase_tie_break_is_first_in_pipeline_order() {
let ledger = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 500, 1),
phase_record_with_items(RefreshPhase::Persist, 500, 1),
phase_record_with_items(RefreshPhase::LexicalRebuild, 500, 1),
]);
let evidence = ledger.evidence_summary();
assert_eq!(
evidence.dominant_phase,
Some(RefreshPhase::LexicalRebuild),
"tie-break: max_by_key returns the LAST phase at max duration"
);
}
#[test]
fn evidence_summary_serializes_to_stable_json_field_set() {
let ledger = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)]);
let evidence = ledger.evidence_summary();
let json = serde_json::to_string(&evidence).expect("serialize");
for required_field in [
"\"throughput\"",
"\"phase_share\"",
"\"dominant_phase\"",
"\"aggregate_items_processed\"",
"\"aggregate_duration_ms\"",
"\"aggregate_items_per_second\"",
] {
assert!(
json.contains(required_field),
"evidence JSON missing field {required_field}; got: {json}"
);
}
let parsed: serde_json::Value = serde_json::from_str(&json).expect("parse");
assert_eq!(parsed["aggregate_items_processed"], 50);
assert_eq!(parsed["aggregate_duration_ms"], 100);
assert_eq!(parsed["aggregate_items_per_second"], 500.0);
assert_eq!(parsed["dominant_phase"], "scan");
}
#[test]
fn evidence_compare_to_reports_per_phase_regressions_and_improvements() {
let baseline = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
.evidence_summary();
let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 200, 100)])
.evidence_summary();
let cmp = current.compare_to(&baseline);
assert_eq!(cmp.phase_deltas.len(), 1);
let scan = &cmp.phase_deltas[0];
assert_eq!(scan.phase, RefreshPhase::Scan);
assert_eq!(scan.duration_delta_pct, Some(100.0));
assert_eq!(scan.throughput_delta_pct, Some(-50.0));
assert_eq!(cmp.aggregate_duration_delta_pct, Some(100.0));
assert_eq!(cmp.aggregate_throughput_delta_pct, Some(-50.0));
assert_eq!(cmp.dominant_phase_shift, None);
let cmp_improved = baseline.compare_to(¤t);
let scan = &cmp_improved.phase_deltas[0];
assert_eq!(scan.duration_delta_pct, Some(-50.0));
assert_eq!(scan.throughput_delta_pct, Some(100.0));
}
#[test]
fn evidence_compare_to_surfaces_phases_unique_to_one_side() {
let baseline = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 100, 100),
phase_record_with_items(RefreshPhase::Persist, 50, 200),
])
.evidence_summary();
let current = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
.evidence_summary();
let cmp = current.compare_to(&baseline);
let phases: Vec<RefreshPhase> = cmp.phase_deltas.iter().map(|d| d.phase).collect();
assert!(
phases.contains(&RefreshPhase::Scan),
"Scan ran in both sides; must appear in comparison; got phases {phases:?}"
);
assert!(
phases.contains(&RefreshPhase::Persist),
"Persist is missing from current but ran in baseline — comparison MUST \
surface it so caller can investigate; got phases {phases:?}"
);
let persist = cmp
.phase_deltas
.iter()
.find(|d| d.phase == RefreshPhase::Persist)
.expect("Persist delta present");
assert_eq!(persist.baseline_duration_ms, 50);
assert_eq!(persist.current_duration_ms, 0);
assert_eq!(
persist.duration_delta_pct,
Some(-100.0),
"phase disappearing from current must surface as -100% duration delta; \
got {persist:?}"
);
}
#[test]
fn evidence_compare_to_retains_zero_item_phases_with_duration() {
let baseline = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 100, 100),
phase_record_with_items(RefreshPhase::Publish, 40, 0),
])
.evidence_summary();
let current = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 100, 100),
phase_record_with_items(RefreshPhase::Publish, 80, 0),
])
.evidence_summary();
assert!(
baseline
.throughput
.iter()
.all(|entry| entry.phase != RefreshPhase::Publish),
"zero-item Publish must stay out of throughput: {:?}",
baseline.throughput
);
let cmp = current.compare_to(&baseline);
let publish = cmp
.phase_deltas
.iter()
.find(|delta| delta.phase == RefreshPhase::Publish)
.expect("zero-item Publish phase must remain in comparison");
assert_eq!(publish.baseline_duration_ms, 40);
assert_eq!(publish.current_duration_ms, 80);
assert_eq!(publish.duration_delta_pct, Some(100.0));
assert_eq!(publish.baseline_items_processed, 0);
assert_eq!(publish.current_items_processed, 0);
assert_eq!(publish.baseline_items_per_second, None);
assert_eq!(publish.current_items_per_second, None);
assert_eq!(publish.throughput_delta_pct, None);
}
#[test]
fn evidence_compare_to_reports_dominant_phase_shift() {
let baseline = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 800, 100),
phase_record_with_items(RefreshPhase::Persist, 200, 100),
])
.evidence_summary();
let current = ledger_with(vec![
phase_record_with_items(RefreshPhase::Scan, 200, 100),
phase_record_with_items(RefreshPhase::Persist, 800, 100),
])
.evidence_summary();
assert_eq!(baseline.dominant_phase, Some(RefreshPhase::Scan));
assert_eq!(current.dominant_phase, Some(RefreshPhase::Persist));
let cmp = current.compare_to(&baseline);
assert_eq!(
cmp.dominant_phase_shift,
Some((RefreshPhase::Scan, RefreshPhase::Persist)),
"dominant phase shifted Scan→Persist; comparison must surface this; got {cmp:?}"
);
let same_dom = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 100)])
.evidence_summary();
let cmp_same = same_dom.compare_to(&same_dom);
assert_eq!(cmp_same.dominant_phase_shift, None);
}
#[test]
fn evidence_compare_to_safely_handles_zero_baseline_and_empty_evidence() {
let empty = ledger_with(Vec::new()).evidence_summary();
let normal = ledger_with(vec![phase_record_with_items(RefreshPhase::Scan, 100, 50)])
.evidence_summary();
let against_empty = normal.compare_to(&empty);
assert!(
against_empty
.phase_deltas
.iter()
.all(|d| d.duration_delta_pct.is_none() || d.baseline_duration_ms == 0),
"phases with zero-baseline duration must report None for duration_delta_pct"
);
assert_eq!(against_empty.aggregate_duration_delta_pct, None);
assert_eq!(against_empty.aggregate_throughput_delta_pct, None);
let against_self = empty.compare_to(&empty);
assert!(against_self.phase_deltas.is_empty());
assert_eq!(against_self.aggregate_duration_delta_pct, None);
let json = serde_json::to_string(&against_empty).expect("serialize");
assert!(
!json.contains("NaN"),
"comparison JSON must not contain NaN; got {json}"
);
assert!(
!json.contains("Infinity"),
"comparison JSON must not contain Infinity"
);
}
#[test]
fn evidence_comparison_emit_tracing_summary_uses_correct_severity_tier() {
use std::sync::{Arc, Mutex};
use tracing::field::{Field, Visit};
use tracing::{Event, Subscriber};
use tracing_subscriber::Registry;
use tracing_subscriber::layer::{Context, Layer, SubscriberExt};
#[derive(Debug, Clone, Default)]
struct CapturedEvent {
level: String,
message: String,
}
#[derive(Clone, Default)]
struct LevelCollector {
events: Arc<Mutex<Vec<CapturedEvent>>>,
}
impl<S: Subscriber> Layer<S> for LevelCollector {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "cass::indexer::lexical_refresh" {
return;
}
let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
self.events
.lock()
.expect("collector lock")
.push(CapturedEvent {
level: event.metadata().level().to_string(),
message: visitor.message,
});
}
}
#[derive(Default)]
struct MessageVisitor {
message: String,
}
impl Visit for MessageVisitor {
fn record_str(&mut self, _field: &Field, _value: &str) {}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value).trim_matches('"').to_string();
}
}
}
fn comparison_with_duration_pct(pct: f64) -> RefreshLedgerEvidenceComparison {
RefreshLedgerEvidenceComparison {
phase_deltas: Vec::new(),
aggregate_duration_delta_pct: Some(pct),
aggregate_throughput_delta_pct: None,
dominant_phase_shift: None,
}
}
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
comparison_with_duration_pct(50.0).emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs.len(),
1,
"exactly one event per emit_tracing_summary call"
);
assert_eq!(
evs[0].level, "WARN",
"+50% slowdown must be warn; got {evs:?}"
);
assert!(
evs[0].message.contains("significant slowdown"),
"warn message must name the slowdown; got {:?}",
evs[0].message
);
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
comparison_with_duration_pct(-25.0).emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs[0].level, "INFO",
"-25% improvement must be info; got {evs:?}"
);
assert!(
evs[0].message.contains("notable improvement"),
"info message must name the improvement; got {:?}",
evs[0].message
);
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
comparison_with_duration_pct(5.0).emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs[0].level, "DEBUG",
"+5% within steady-state must be debug; got {evs:?}"
);
assert!(
evs[0].message.contains("cross-run comparison"),
"debug message must use the steady-state phrasing; got {:?}",
evs[0].message
);
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
comparison_with_duration_pct(25.0).emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs[0].level, "WARN",
"exactly +25% must be warn (inclusive threshold); got {evs:?}"
);
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
comparison_with_duration_pct(-10.0).emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs[0].level, "INFO",
"exactly -10% must be info (inclusive threshold); got {evs:?}"
);
let collector = LevelCollector::default();
let subscriber = Registry::default().with(collector.clone());
tracing::subscriber::with_default(subscriber, || {
RefreshLedgerEvidenceComparison {
phase_deltas: Vec::new(),
aggregate_duration_delta_pct: None,
aggregate_throughput_delta_pct: None,
dominant_phase_shift: None,
}
.emit_tracing_summary();
});
let evs = collector.events.lock().expect("lock").clone();
assert_eq!(
evs[0].level, "DEBUG",
"None duration delta defaults to steady-state (debug); got {evs:?}"
);
}
#[test]
fn regression_verdict_categorizes_each_band_and_handles_degenerate_cases() {
let thresholds = RegressionVerdictThresholds::defaults();
assert_eq!(thresholds.warning_duration_pct, 15.0);
assert_eq!(thresholds.failure_duration_pct, 30.0);
fn comparison_with_pct(pct: Option<f64>) -> RefreshLedgerEvidenceComparison {
RefreshLedgerEvidenceComparison {
phase_deltas: Vec::new(),
aggregate_duration_delta_pct: pct,
aggregate_throughput_delta_pct: None,
dominant_phase_shift: None,
}
}
let clean = comparison_with_pct(Some(10.0)).regression_verdict(&thresholds);
assert_eq!(clean, RegressionVerdict::Clean);
assert!(!clean.should_fail_build());
let warn_at = comparison_with_pct(Some(15.0)).regression_verdict(&thresholds);
assert!(
matches!(
warn_at,
RegressionVerdict::Warning { duration_delta_pct, threshold_pct }
if (duration_delta_pct - 15.0).abs() < 0.01 && threshold_pct == 15.0
),
"+15% must trigger warn at the inclusive threshold; got {warn_at:?}"
);
assert!(!warn_at.should_fail_build());
let warn_mid = comparison_with_pct(Some(22.5)).regression_verdict(&thresholds);
assert!(matches!(warn_mid, RegressionVerdict::Warning { .. }));
assert!(!warn_mid.should_fail_build());
let fail_at = comparison_with_pct(Some(30.0)).regression_verdict(&thresholds);
assert!(
matches!(
fail_at,
RegressionVerdict::Failure { duration_delta_pct, threshold_pct }
if (duration_delta_pct - 30.0).abs() < 0.01 && threshold_pct == 30.0
),
"+30% must trigger failure at the inclusive threshold; got {fail_at:?}"
);
assert!(
fail_at.should_fail_build(),
"Failure verdict MUST cause CI to exit non-zero"
);
let fail_far = comparison_with_pct(Some(150.0)).regression_verdict(&thresholds);
assert!(matches!(fail_far, RegressionVerdict::Failure { .. }));
let improvement = comparison_with_pct(Some(-50.0)).regression_verdict(&thresholds);
assert_eq!(
improvement,
RegressionVerdict::Clean,
"improvements (negative duration delta) MUST NOT trigger regression verdicts; \
got {improvement:?}"
);
let no_data = comparison_with_pct(None).regression_verdict(&thresholds);
assert_eq!(
no_data,
RegressionVerdict::Clean,
"missing comparison data MUST NOT cause a CI failure (no signal to gate on)"
);
let invalid_negative = RegressionVerdictThresholds {
warning_duration_pct: -20.0,
failure_duration_pct: -10.0,
};
let steady_state = comparison_with_pct(Some(0.0)).regression_verdict(&invalid_negative);
assert_eq!(
steady_state,
RegressionVerdict::Clean,
"invalid negative thresholds must fail open instead of turning a 0% \
steady-state comparison into a CI failure"
);
}
#[test]
fn regression_verdict_thresholds_try_new_rejects_inconsistent_configurations() {
assert!(RegressionVerdictThresholds::try_new(10.0, 20.0).is_ok());
let err = RegressionVerdictThresholds::try_new(20.0, 10.0)
.expect_err("warning > failure must be rejected");
assert!(
err.contains("strictly less"),
"rejection message must explain the constraint; got {err:?}"
);
let err_eq = RegressionVerdictThresholds::try_new(15.0, 15.0)
.expect_err("warning == failure must be rejected");
assert!(err_eq.contains("strictly less"));
let negative_warning = RegressionVerdictThresholds::try_new(-20.0, 10.0)
.expect_err("negative warning threshold must be rejected");
assert!(negative_warning.contains("non-negative"));
let negative_failure = RegressionVerdictThresholds::try_new(10.0, -20.0)
.expect_err("negative failure threshold must be rejected");
assert!(negative_failure.contains("non-negative"));
let invalid_json = r#"{"warning_duration_pct":-30.0,"failure_duration_pct":-10.0}"#;
let deser = serde_json::from_str::<RegressionVerdictThresholds>(invalid_json)
.expect_err("serde-loaded negative thresholds must be rejected too");
assert!(
deser.to_string().contains("non-negative"),
"serde validation error must explain the threshold polarity; got {deser}"
);
assert!(RegressionVerdictThresholds::try_new(f64::NAN, 30.0).is_err());
assert!(RegressionVerdictThresholds::try_new(15.0, f64::INFINITY).is_err());
}
#[test]
fn regression_verdict_zero_change_under_valid_custom_thresholds_is_clean() {
fn zero_delta_comparison() -> RefreshLedgerEvidenceComparison {
RefreshLedgerEvidenceComparison {
phase_deltas: Vec::new(),
aggregate_duration_delta_pct: Some(0.0),
aggregate_throughput_delta_pct: None,
dominant_phase_shift: None,
}
}
let strict = RegressionVerdictThresholds::try_new(5.0, 20.0)
.expect("valid strict thresholds must construct");
let steady_state = zero_delta_comparison().regression_verdict(&strict);
assert_eq!(
steady_state,
RegressionVerdict::Clean,
"0% steady-state delta must be Clean under any valid \
threshold pair — tight CI profiles must not flag no-op runs"
);
let loose = RegressionVerdictThresholds::try_new(50.0, 200.0)
.expect("valid loose thresholds must construct");
let steady_state_loose = zero_delta_comparison().regression_verdict(&loose);
assert_eq!(
steady_state_loose,
RegressionVerdict::Clean,
"0% steady-state delta must be Clean under loose thresholds too"
);
}
#[test]
fn regression_verdict_serializes_with_snake_case_verdict_tag() {
let clean_json = serde_json::to_string(&RegressionVerdict::Clean).expect("serialize");
assert!(
clean_json.contains("\"verdict\":\"clean\""),
"Clean must serialize with snake_case `verdict` tag; got {clean_json}"
);
let warning_json = serde_json::to_string(&RegressionVerdict::Warning {
duration_delta_pct: 18.5,
threshold_pct: 15.0,
})
.expect("serialize");
assert!(warning_json.contains("\"verdict\":\"warning\""));
assert!(warning_json.contains("\"duration_delta_pct\":18.5"));
assert!(warning_json.contains("\"threshold_pct\":15"));
let failure_json = serde_json::to_string(&RegressionVerdict::Failure {
duration_delta_pct: 42.0,
threshold_pct: 30.0,
})
.expect("serialize");
assert!(failure_json.contains("\"verdict\":\"failure\""));
}
}