#![forbid(unsafe_code)]
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use web_time::{Duration, Instant};
static BOCPD_CHANGE_POINTS_DETECTED_TOTAL: AtomicU64 = AtomicU64::new(0);
pub fn bocpd_change_points_detected_total() -> u64 {
BOCPD_CHANGE_POINTS_DETECTED_TOTAL.load(Ordering::Relaxed)
}
#[derive(Debug, Clone)]
pub struct BocpdConfig {
pub mu_steady_ms: f64,
pub mu_burst_ms: f64,
pub hazard_lambda: f64,
pub max_run_length: usize,
pub steady_threshold: f64,
pub burst_threshold: f64,
pub burst_prior: f64,
pub min_observation_ms: f64,
pub max_observation_ms: f64,
pub enable_logging: bool,
}
impl Default for BocpdConfig {
fn default() -> Self {
Self {
mu_steady_ms: 200.0,
mu_burst_ms: 20.0,
hazard_lambda: 50.0,
max_run_length: 100,
steady_threshold: 0.3,
burst_threshold: 0.7,
burst_prior: 0.2,
min_observation_ms: 1.0,
max_observation_ms: 10000.0,
enable_logging: false,
}
}
}
impl BocpdConfig {
#[must_use]
pub fn responsive() -> Self {
Self {
mu_steady_ms: 150.0,
mu_burst_ms: 15.0,
hazard_lambda: 30.0,
steady_threshold: 0.25,
burst_threshold: 0.6,
..Default::default()
}
}
#[must_use]
pub fn aggressive_coalesce() -> Self {
Self {
mu_steady_ms: 250.0,
mu_burst_ms: 25.0,
hazard_lambda: 80.0,
steady_threshold: 0.4,
burst_threshold: 0.8,
burst_prior: 0.3,
..Default::default()
}
}
#[must_use]
pub fn with_logging(mut self, enabled: bool) -> Self {
self.enable_logging = enabled;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BocpdRegime {
#[default]
Steady,
Burst,
Transitional,
}
impl BocpdRegime {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Steady => "steady",
Self::Burst => "burst",
Self::Transitional => "transitional",
}
}
}
impl fmt::Display for BocpdRegime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct BocpdEvidence {
pub p_burst: f64,
pub log_bayes_factor: f64,
pub observation_ms: f64,
pub regime: BocpdRegime,
pub likelihood_steady: f64,
pub likelihood_burst: f64,
pub expected_run_length: f64,
pub run_length_variance: f64,
pub run_length_mode: usize,
pub run_length_p95: usize,
pub run_length_tail_mass: f64,
pub recommended_delay_ms: Option<u64>,
pub hard_deadline_forced: Option<bool>,
pub observation_count: u64,
pub timestamp: Instant,
}
impl BocpdEvidence {
#[must_use]
pub fn to_jsonl(&self) -> String {
const SCHEMA_VERSION: &str = "bocpd-v1";
let delay_ms = self
.recommended_delay_ms
.map(|v| v.to_string())
.unwrap_or_else(|| "null".to_string());
let forced = self
.hard_deadline_forced
.map(|v| v.to_string())
.unwrap_or_else(|| "null".to_string());
format!(
r#"{{"schema_version":"{}","event":"bocpd","p_burst":{:.4},"log_bf":{:.3},"obs_ms":{:.1},"regime":"{}","ll_steady":{:.6},"ll_burst":{:.6},"runlen_mean":{:.1},"runlen_var":{:.3},"runlen_mode":{},"runlen_p95":{},"runlen_tail":{:.4},"delay_ms":{},"forced_deadline":{},"n_obs":{}}}"#,
SCHEMA_VERSION,
self.p_burst,
self.log_bayes_factor,
self.observation_ms,
self.regime.as_str(),
self.likelihood_steady,
self.likelihood_burst,
self.expected_run_length,
self.run_length_variance,
self.run_length_mode,
self.run_length_p95,
self.run_length_tail_mass,
delay_ms,
forced,
self.observation_count,
)
}
}
impl fmt::Display for BocpdEvidence {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "BOCPD Evidence:")?;
writeln!(
f,
" Regime: {} (P(burst) = {:.3})",
self.regime, self.p_burst
)?;
writeln!(
f,
" Log BF: {:+.3} (positive favors burst)",
self.log_bayes_factor
)?;
writeln!(f, " Observation: {:.1} ms", self.observation_ms)?;
writeln!(
f,
" Likelihoods: steady={:.6}, burst={:.6}",
self.likelihood_steady, self.likelihood_burst
)?;
writeln!(f, " E[run-length]: {:.1}", self.expected_run_length)?;
write!(f, " Observations: {}", self.observation_count)
}
}
#[derive(Debug, Clone, Copy)]
struct RunLengthSummary {
mean: f64,
variance: f64,
mode: usize,
p95: usize,
tail_mass: f64,
}
#[derive(Debug, Clone)]
pub struct BocpdDetector {
config: BocpdConfig,
run_length_posterior: Vec<f64>,
p_burst: f64,
last_event_time: Option<Instant>,
observation_count: u64,
last_evidence: Option<BocpdEvidence>,
previous_regime: BocpdRegime,
lambda_steady: f64, lambda_burst: f64, hazard: f64, }
impl BocpdDetector {
pub fn new(config: BocpdConfig) -> Self {
let mut config = config;
config.max_run_length = config.max_run_length.max(1);
config.mu_steady_ms = config.mu_steady_ms.max(1.0);
config.mu_burst_ms = config.mu_burst_ms.max(1.0);
config.hazard_lambda = config.hazard_lambda.max(1.0);
config.min_observation_ms = if config.min_observation_ms.is_nan() {
0.1
} else {
config.min_observation_ms.max(0.1)
};
config.max_observation_ms = if config.max_observation_ms.is_nan() {
config.min_observation_ms
} else {
config.max_observation_ms.max(config.min_observation_ms)
};
config.steady_threshold = if config.steady_threshold.is_nan() {
0.3
} else {
config.steady_threshold.clamp(0.0, 1.0)
};
config.burst_threshold = if config.burst_threshold.is_nan() {
0.7
} else {
config.burst_threshold.clamp(0.0, 1.0)
};
if config.burst_threshold < config.steady_threshold {
std::mem::swap(&mut config.steady_threshold, &mut config.burst_threshold);
}
config.burst_prior = if config.burst_prior.is_nan() {
0.1
} else {
config.burst_prior.clamp(0.001, 0.999)
};
let k = config.max_run_length;
let initial_prob = 1.0 / (k + 1) as f64;
let run_length_posterior = vec![initial_prob; k + 1];
let lambda_steady = 1.0 / config.mu_steady_ms;
let lambda_burst = 1.0 / config.mu_burst_ms;
let hazard = 1.0 / config.hazard_lambda;
Self {
p_burst: config.burst_prior,
run_length_posterior,
last_event_time: None,
observation_count: 0,
last_evidence: None,
previous_regime: BocpdRegime::Steady,
lambda_steady,
lambda_burst,
hazard,
config,
}
}
pub fn with_defaults() -> Self {
Self::new(BocpdConfig::default())
}
#[inline]
pub fn p_burst(&self) -> f64 {
self.p_burst
}
#[inline]
pub fn run_length_posterior(&self) -> &[f64] {
&self.run_length_posterior
}
#[inline]
pub fn regime(&self) -> BocpdRegime {
if self.p_burst < self.config.steady_threshold {
BocpdRegime::Steady
} else if self.p_burst > self.config.burst_threshold {
BocpdRegime::Burst
} else {
BocpdRegime::Transitional
}
}
pub fn expected_run_length(&self) -> f64 {
self.run_length_posterior
.iter()
.enumerate()
.map(|(r, p)| r as f64 * p)
.sum()
}
fn run_length_summary(&self) -> RunLengthSummary {
let mean = self.expected_run_length();
let mut variance = 0.0;
let mut mode = 0;
let mut mode_p = -1.0;
let mut cumulative = 0.0;
let mut p95 = self.config.max_run_length;
for (r, p) in self.run_length_posterior.iter().enumerate() {
if *p > mode_p {
mode_p = *p;
mode = r;
}
let diff = r as f64 - mean;
variance += p * diff * diff;
if cumulative < 0.95 {
cumulative += p;
if cumulative >= 0.95 {
p95 = r;
}
}
}
RunLengthSummary {
mean,
variance,
mode,
p95,
tail_mass: self.run_length_posterior[self.config.max_run_length],
}
}
pub fn last_evidence(&self) -> Option<&BocpdEvidence> {
self.last_evidence.as_ref()
}
pub fn set_decision_context(
&mut self,
steady_delay_ms: u64,
burst_delay_ms: u64,
hard_deadline_forced: bool,
) {
let recommended_delay = self.recommended_delay(steady_delay_ms, burst_delay_ms);
if let Some(ref mut evidence) = self.last_evidence {
evidence.recommended_delay_ms = Some(recommended_delay);
evidence.hard_deadline_forced = Some(hard_deadline_forced);
}
}
#[must_use]
pub fn evidence_jsonl(&self) -> Option<String> {
if !self.config.enable_logging {
return None;
}
self.last_evidence.as_ref().map(BocpdEvidence::to_jsonl)
}
#[must_use]
pub fn decision_log_jsonl(
&self,
steady_delay_ms: u64,
burst_delay_ms: u64,
hard_deadline_forced: bool,
) -> Option<String> {
if !self.config.enable_logging {
return None;
}
let mut evidence = self.last_evidence.clone()?;
evidence.recommended_delay_ms =
Some(self.recommended_delay(steady_delay_ms, burst_delay_ms));
evidence.hard_deadline_forced = Some(hard_deadline_forced);
Some(evidence.to_jsonl())
}
#[inline]
pub fn observation_count(&self) -> u64 {
self.observation_count
}
pub fn config(&self) -> &BocpdConfig {
&self.config
}
pub fn observe_event(&mut self, now: Instant) -> BocpdRegime {
let observation_ms = self
.last_event_time
.map(|last| {
now.checked_duration_since(last)
.unwrap_or(Duration::ZERO)
.as_secs_f64()
* 1000.0
})
.unwrap_or(self.config.mu_steady_ms);
let x = observation_ms
.max(self.config.min_observation_ms)
.min(self.config.max_observation_ms);
self.update_posterior(x, now);
self.last_event_time = Some(now);
let current_regime = self.regime();
let posterior_max = self
.run_length_posterior
.iter()
.copied()
.fold(0.0_f64, f64::max);
let change_point_probability = self.run_length_posterior[0];
let coalescing_active = matches!(
current_regime,
BocpdRegime::Burst | BocpdRegime::Transitional
);
let _span = tracing::debug_span!(
"bocpd.update",
run_length_posterior_max = %posterior_max,
change_point_probability = %change_point_probability,
coalescing_active = coalescing_active,
resize_count_in_window = self.observation_count,
)
.entered();
tracing::debug!(
target: "ftui.bocpd",
p_burst = %self.p_burst,
observation_ms = %x,
posterior_max = %posterior_max,
change_point_prob = %change_point_probability,
observation_count = self.observation_count,
"posterior update"
);
tracing::debug!(
target: "ftui.bocpd",
bocpd_run_length = %self.expected_run_length(),
"bocpd run length histogram"
);
if current_regime != self.previous_regime {
BOCPD_CHANGE_POINTS_DETECTED_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::info!(
target: "ftui.bocpd",
from_regime = %self.previous_regime.as_str(),
to_regime = %current_regime.as_str(),
p_burst = %self.p_burst,
observation_count = self.observation_count,
"regime transition detected"
);
self.previous_regime = current_regime;
}
current_regime
}
fn update_posterior(&mut self, x: f64, now: Instant) {
self.observation_count += 1;
let pred_steady = self.exponential_pdf(x, self.lambda_steady);
let pred_burst = self.exponential_pdf(x, self.lambda_burst);
let log_lr = self.lambda_burst.ln()
- self.lambda_burst * x
- (self.lambda_steady.ln() - self.lambda_steady * x);
let log_bf = log_lr * std::f64::consts::LOG10_E;
let prior_burst =
self.p_burst * (1.0 - self.hazard) + self.config.burst_prior * self.hazard;
let prior_odds = prior_burst / (1.0 - prior_burst).max(1e-10);
let likelihood_ratio = log_lr.exp();
let posterior_odds = prior_odds * likelihood_ratio;
let mut p_burst_raw = posterior_odds / (1.0 + posterior_odds);
if p_burst_raw.is_nan() {
p_burst_raw = if posterior_odds.is_infinite() {
1.0
} else {
0.5
};
}
self.p_burst = p_burst_raw.clamp(0.001, 0.999);
let mixture_likelihood = self.p_burst * pred_burst + (1.0 - self.p_burst) * pred_steady;
let k = self.config.max_run_length;
let mut new_posterior = vec![0.0; k + 1];
for r in 0..k {
let growth_prob = self.run_length_posterior[r] * (1.0 - self.hazard);
new_posterior[r + 1] += growth_prob * mixture_likelihood;
}
new_posterior[k] += self.run_length_posterior[k] * (1.0 - self.hazard) * mixture_likelihood;
let cp_prob: f64 = self
.run_length_posterior
.iter()
.map(|&p| p * self.hazard * mixture_likelihood)
.sum();
new_posterior[0] = cp_prob;
let total: f64 = new_posterior.iter().sum();
if total > 0.0 {
for p in &mut new_posterior {
*p /= total;
}
} else {
let uniform = 1.0 / (k + 1) as f64;
new_posterior.fill(uniform);
}
self.run_length_posterior = new_posterior;
let summary = self.run_length_summary();
self.last_evidence = Some(BocpdEvidence {
p_burst: self.p_burst,
log_bayes_factor: log_bf,
observation_ms: x,
regime: self.regime(),
likelihood_steady: pred_steady,
likelihood_burst: pred_burst,
expected_run_length: summary.mean,
run_length_variance: summary.variance,
run_length_mode: summary.mode,
run_length_p95: summary.p95,
run_length_tail_mass: summary.tail_mass,
recommended_delay_ms: None,
hard_deadline_forced: None,
observation_count: self.observation_count,
timestamp: now,
});
}
#[inline]
fn exponential_pdf(&self, x: f64, lambda: f64) -> f64 {
lambda * (-lambda * x).exp()
}
pub fn reset(&mut self) {
let k = self.config.max_run_length;
let initial_prob = 1.0 / (k + 1) as f64;
self.run_length_posterior = vec![initial_prob; k + 1];
self.p_burst = self.config.burst_prior;
self.last_event_time = None;
self.observation_count = 0;
self.last_evidence = None;
self.previous_regime = BocpdRegime::Steady;
}
pub fn recommended_delay(&self, steady_delay_ms: u64, burst_delay_ms: u64) -> u64 {
if self.p_burst < self.config.steady_threshold {
steady_delay_ms
} else if self.p_burst > self.config.burst_threshold {
burst_delay_ms
} else {
let denom = (self.config.burst_threshold - self.config.steady_threshold).max(1e-6);
let t = ((self.p_burst - self.config.steady_threshold) / denom).clamp(0.0, 1.0);
let delay = steady_delay_ms as f64 * (1.0 - t) + burst_delay_ms as f64 * t;
delay.round() as u64
}
}
}
impl Default for BocpdDetector {
fn default() -> Self {
Self::with_defaults()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_default_config() {
let config = BocpdConfig::default();
assert!((config.mu_steady_ms - 200.0).abs() < 0.01);
assert!((config.mu_burst_ms - 20.0).abs() < 0.01);
assert_eq!(config.max_run_length, 100);
}
#[test]
fn test_initial_state() {
let detector = BocpdDetector::with_defaults();
assert!((detector.p_burst() - 0.2).abs() < 0.01); assert_eq!(detector.regime(), BocpdRegime::Steady);
assert_eq!(detector.observation_count(), 0);
}
#[test]
fn test_steady_detection() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..10 {
let t = start + Duration::from_millis(200 * (i + 1));
detector.observe_event(t);
}
assert!(
detector.p_burst() < 0.5,
"p_burst={} should be low",
detector.p_burst()
);
assert_eq!(detector.regime(), BocpdRegime::Steady);
}
#[test]
fn test_burst_detection() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..20 {
let t = start + Duration::from_millis(10 * (i + 1));
detector.observe_event(t);
}
assert!(
detector.p_burst() > 0.5,
"p_burst={} should be high",
detector.p_burst()
);
assert!(matches!(
detector.regime(),
BocpdRegime::Burst | BocpdRegime::Transitional
));
}
#[test]
fn test_regime_transition() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..5 {
let t = start + Duration::from_millis(200 * (i + 1));
detector.observe_event(t);
}
let initial_p_burst = detector.p_burst();
let burst_start = start + Duration::from_millis(1000);
for i in 0..20 {
let t = burst_start + Duration::from_millis(10 * (i + 1));
detector.observe_event(t);
}
assert!(
detector.p_burst() > initial_p_burst,
"p_burst should increase during burst"
);
}
#[test]
fn test_evidence_stored() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
let evidence = detector.last_evidence().expect("Evidence should be stored");
assert_eq!(evidence.observation_count, 1);
assert!(evidence.log_bayes_factor.is_finite());
}
#[test]
fn test_reset() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..10 {
let t = start + Duration::from_millis(10 * (i + 1));
detector.observe_event(t);
}
detector.reset();
assert!((detector.p_burst() - 0.2).abs() < 0.01);
assert_eq!(detector.observation_count(), 0);
assert!(detector.last_evidence().is_none());
}
#[test]
fn test_recommended_delay() {
let mut detector = BocpdDetector::with_defaults();
assert_eq!(detector.recommended_delay(16, 40), 16);
detector.p_burst = 0.9;
assert_eq!(detector.recommended_delay(16, 40), 40);
detector.p_burst = 0.5;
let delay = detector.recommended_delay(16, 40);
assert!(
delay > 16 && delay < 40,
"delay={} should be interpolated",
delay
);
}
#[test]
fn test_deterministic() {
let mut det1 = BocpdDetector::with_defaults();
let mut det2 = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..10 {
let t = start + Duration::from_millis(15 * (i + 1));
det1.observe_event(t);
det2.observe_event(t);
}
assert!((det1.p_burst() - det2.p_burst()).abs() < 1e-10);
assert_eq!(det1.regime(), det2.regime());
}
#[test]
fn test_posterior_normalized() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..20 {
let t = start + Duration::from_millis(25 * (i + 1));
detector.observe_event(t);
let sum: f64 = detector.run_length_posterior.iter().sum();
assert!(
(sum - 1.0).abs() < 1e-6,
"Posterior not normalized: sum={}",
sum
);
}
}
#[test]
fn test_p_burst_bounded() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..100 {
let t = start + Duration::from_millis(i + 1);
detector.observe_event(t);
assert!(detector.p_burst() >= 0.0 && detector.p_burst() <= 1.0);
}
}
#[test]
fn config_sanitization_clamps_thresholds_and_priors() {
let config = BocpdConfig {
steady_threshold: 0.9,
burst_threshold: 0.1,
burst_prior: 2.0,
max_run_length: 0,
mu_steady_ms: 0.0,
mu_burst_ms: 0.0,
hazard_lambda: 0.0,
min_observation_ms: 0.0,
max_observation_ms: 0.0,
..Default::default()
};
let detector = BocpdDetector::new(config);
let cfg = detector.config();
assert!(
cfg.steady_threshold <= cfg.burst_threshold,
"thresholds should be ordered after sanitization"
);
assert_eq!(cfg.max_run_length, 1);
assert!(cfg.mu_steady_ms >= 1.0);
assert!(cfg.mu_burst_ms >= 1.0);
assert!(cfg.hazard_lambda >= 1.0);
assert!(cfg.min_observation_ms >= 0.1);
assert!(cfg.max_observation_ms >= cfg.min_observation_ms);
assert!(
(0.0..=1.0).contains(&detector.p_burst()),
"p_burst should be clamped into [0,1]"
);
}
#[test]
fn test_jsonl_output() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
detector.config.enable_logging = true;
let jsonl = detector
.decision_log_jsonl(16, 40, false)
.expect("jsonl should be emitted when enabled");
assert!(jsonl.contains("bocpd-v1"));
assert!(jsonl.contains("p_burst"));
assert!(jsonl.contains("regime"));
assert!(jsonl.contains("runlen_mean"));
assert!(jsonl.contains("runlen_mode"));
assert!(jsonl.contains("runlen_p95"));
assert!(jsonl.contains("delay_ms"));
assert!(jsonl.contains("forced_deadline"));
}
#[test]
fn evidence_jsonl_respects_config() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
assert!(detector.evidence_jsonl().is_none());
detector.config.enable_logging = true;
assert!(detector.evidence_jsonl().is_some());
}
#[test]
fn prop_expected_runlen_non_negative() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..50 {
let t = start + Duration::from_millis((i % 30 + 5) * (i + 1));
detector.observe_event(t);
assert!(detector.expected_run_length() >= 0.0);
}
}
#[test]
fn responsive_config_values() {
let cfg = BocpdConfig::responsive();
assert!((cfg.mu_steady_ms - 150.0).abs() < f64::EPSILON);
assert!((cfg.mu_burst_ms - 15.0).abs() < f64::EPSILON);
assert!((cfg.hazard_lambda - 30.0).abs() < f64::EPSILON);
assert!((cfg.steady_threshold - 0.25).abs() < f64::EPSILON);
assert!((cfg.burst_threshold - 0.6).abs() < f64::EPSILON);
}
#[test]
fn aggressive_coalesce_config_values() {
let cfg = BocpdConfig::aggressive_coalesce();
assert!((cfg.mu_steady_ms - 250.0).abs() < f64::EPSILON);
assert!((cfg.mu_burst_ms - 25.0).abs() < f64::EPSILON);
assert!((cfg.hazard_lambda - 80.0).abs() < f64::EPSILON);
assert!((cfg.steady_threshold - 0.4).abs() < f64::EPSILON);
assert!((cfg.burst_threshold - 0.8).abs() < f64::EPSILON);
assert!((cfg.burst_prior - 0.3).abs() < f64::EPSILON);
}
#[test]
fn with_logging_builder() {
let cfg = BocpdConfig::default().with_logging(true);
assert!(cfg.enable_logging);
let cfg2 = cfg.with_logging(false);
assert!(!cfg2.enable_logging);
}
#[test]
fn regime_as_str_values() {
assert_eq!(BocpdRegime::Steady.as_str(), "steady");
assert_eq!(BocpdRegime::Burst.as_str(), "burst");
assert_eq!(BocpdRegime::Transitional.as_str(), "transitional");
}
#[test]
fn regime_display_matches_as_str() {
for regime in [
BocpdRegime::Steady,
BocpdRegime::Burst,
BocpdRegime::Transitional,
] {
assert_eq!(format!("{regime}"), regime.as_str());
}
}
#[test]
fn regime_default_is_steady() {
assert_eq!(BocpdRegime::default(), BocpdRegime::Steady);
}
#[test]
fn regime_copy() {
let r = BocpdRegime::Burst;
let r2 = r;
assert_eq!(r, r2);
let r3 = r;
assert_eq!(r, r3);
}
#[test]
fn evidence_to_jsonl_has_all_fields() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
let evidence = detector.last_evidence().unwrap();
let jsonl = evidence.to_jsonl();
for key in [
"schema_version",
"bocpd-v1",
"p_burst",
"log_bf",
"obs_ms",
"regime",
"ll_steady",
"ll_burst",
"runlen_mean",
"runlen_var",
"runlen_mode",
"runlen_p95",
"runlen_tail",
"delay_ms",
"forced_deadline",
"n_obs",
] {
assert!(jsonl.contains(key), "missing field {key} in {jsonl}");
}
}
#[test]
fn evidence_display_contains_regime_and_pburst() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
let evidence = detector.last_evidence().unwrap();
let display = format!("{evidence}");
assert!(display.contains("BOCPD Evidence:"));
assert!(display.contains("Regime:"));
assert!(display.contains("P(burst)"));
assert!(display.contains("Log BF:"));
assert!(display.contains("Observation:"));
assert!(display.contains("Observations:"));
}
#[test]
fn evidence_null_optionals_in_jsonl() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
let evidence = detector.last_evidence().unwrap();
let jsonl = evidence.to_jsonl();
assert!(jsonl.contains("\"delay_ms\":null"));
assert!(jsonl.contains("\"forced_deadline\":null"));
}
#[test]
fn initial_detector_state() {
let detector = BocpdDetector::with_defaults();
assert!((detector.p_burst() - 0.2).abs() < 0.01);
assert_eq!(detector.observation_count(), 0);
assert!(detector.last_evidence().is_none());
assert_eq!(detector.regime(), BocpdRegime::Steady);
}
#[test]
fn run_length_posterior_sums_to_one() {
let detector = BocpdDetector::with_defaults();
let sum: f64 = detector.run_length_posterior().iter().sum();
assert!((sum - 1.0).abs() < 1e-10);
}
#[test]
fn config_accessor_returns_config() {
let cfg = BocpdConfig::responsive();
let detector = BocpdDetector::new(cfg);
assert!((detector.config().mu_steady_ms - 150.0).abs() < f64::EPSILON);
}
#[test]
fn first_event_uses_steady_default() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
let evidence = detector.last_evidence().unwrap();
assert!(
(evidence.observation_ms - 200.0).abs() < 1.0,
"first observation should be ~mu_steady_ms"
);
}
#[test]
fn rapid_events_increase_pburst() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
detector.observe_event(start);
let initial = detector.p_burst();
for i in 1..20 {
let t = start + Duration::from_millis(5 * i);
detector.observe_event(t);
}
assert!(
detector.p_burst() > initial,
"p_burst should increase with rapid events"
);
}
#[test]
fn slow_events_decrease_pburst() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..10 {
let t = start + Duration::from_millis(5 * (i + 1));
detector.observe_event(t);
}
let after_burst = detector.p_burst();
let slow_start = start + Duration::from_millis(50);
for i in 0..20 {
let t = slow_start + Duration::from_millis(500 * (i + 1));
detector.observe_event(t);
}
assert!(
detector.p_burst() < after_burst,
"p_burst should decrease with slow events"
);
}
#[test]
fn burst_to_steady_recovery() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..30 {
let t = start + Duration::from_millis(5 * (i + 1));
detector.observe_event(t);
}
let burst_p = detector.p_burst();
assert!(burst_p > 0.5, "should be in burst, got p={burst_p}");
let slow_start = start + Duration::from_millis(150);
for i in 0..30 {
let t = slow_start + Duration::from_millis(200 * (i + 1));
detector.observe_event(t);
}
let steady_p = detector.p_burst();
assert!(
steady_p < burst_p,
"p_burst should decrease during recovery"
);
}
#[test]
fn set_decision_context_populates_evidence() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
detector.set_decision_context(16, 40, false);
let evidence = detector.last_evidence().unwrap();
assert!(evidence.recommended_delay_ms.is_some());
assert_eq!(evidence.hard_deadline_forced, Some(false));
}
#[test]
fn set_decision_context_forced_deadline() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
detector.set_decision_context(16, 40, true);
let evidence = detector.last_evidence().unwrap();
assert_eq!(evidence.hard_deadline_forced, Some(true));
}
#[test]
fn decision_log_jsonl_none_when_logging_disabled() {
let mut detector = BocpdDetector::with_defaults();
let t = Instant::now();
detector.observe_event(t);
assert!(detector.decision_log_jsonl(16, 40, false).is_none());
}
#[test]
fn decision_log_jsonl_has_delay_when_logging_enabled() {
let mut detector = BocpdDetector::new(BocpdConfig::default().with_logging(true));
let t = Instant::now();
detector.observe_event(t);
let jsonl = detector
.decision_log_jsonl(16, 40, true)
.expect("should emit when logging enabled");
assert!(jsonl.contains("\"delay_ms\":"));
assert!(!jsonl.contains("\"delay_ms\":null"));
assert!(jsonl.contains("\"forced_deadline\":true"));
}
#[test]
fn recommended_delay_interpolation_in_transitional() {
let mut detector = BocpdDetector::with_defaults();
detector.p_burst = 0.5;
let delay = detector.recommended_delay(16, 40);
assert!(
delay > 16 && delay < 40,
"transitional delay={delay} should be interpolated"
);
}
#[test]
fn recommended_delay_steady_when_low_pburst() {
let detector = BocpdDetector::with_defaults();
assert_eq!(detector.recommended_delay(16, 40), 16);
}
#[test]
fn recommended_delay_burst_when_high_pburst() {
let mut detector = BocpdDetector::with_defaults();
detector.p_burst = 0.9;
assert_eq!(detector.recommended_delay(16, 40), 40);
}
#[test]
fn expected_run_length_initial_uniform() {
let detector = BocpdDetector::with_defaults();
let erl = detector.expected_run_length();
assert!((erl - 50.0).abs() < 1.0);
}
#[test]
fn evidence_observation_count_matches_events() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..7 {
let t = start + Duration::from_millis(20 * (i + 1));
detector.observe_event(t);
}
let evidence = detector.last_evidence().unwrap();
assert_eq!(evidence.observation_count, 7);
}
#[test]
fn evidence_likelihoods_are_positive() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..5 {
let t = start + Duration::from_millis(50 * (i + 1));
detector.observe_event(t);
}
let evidence = detector.last_evidence().unwrap();
assert!(evidence.likelihood_steady > 0.0);
assert!(evidence.likelihood_burst > 0.0);
}
#[test]
fn responsive_detects_burst_faster() {
let start = Instant::now();
let mut default_det = BocpdDetector::with_defaults();
let mut responsive_det = BocpdDetector::new(BocpdConfig::responsive());
for i in 0..15 {
let t = start + Duration::from_millis(5 * (i + 1));
default_det.observe_event(t);
responsive_det.observe_event(t);
}
let d_regime = default_det.regime();
let r_regime = responsive_det.regime();
if d_regime == BocpdRegime::Steady {
assert_ne!(
r_regime,
BocpdRegime::Steady,
"responsive should not be steady when default is"
);
}
}
#[test]
fn reset_restores_initial_state() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..20 {
let t = start + Duration::from_millis(5 * (i + 1));
detector.observe_event(t);
}
assert!(detector.p_burst() > 0.5);
detector.reset();
assert!((detector.p_burst() - 0.2).abs() < 0.01);
assert_eq!(detector.observation_count(), 0);
assert!(detector.last_evidence().is_none());
assert!(detector.last_event_time.is_none());
}
#[test]
fn posterior_stays_normalized_under_alternating_traffic() {
let mut detector = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..100 {
let gap = if i % 2 == 0 { 5 } else { 300 };
let t = start + Duration::from_millis(gap * (i + 1));
detector.observe_event(t);
let sum: f64 = detector.run_length_posterior().iter().sum();
assert!(
(sum - 1.0).abs() < 1e-6,
"posterior not normalized at step {i}: sum={sum}"
);
}
}
#[test]
fn responsive_config_values_dup() {
let config = BocpdConfig::responsive();
assert!((config.mu_steady_ms - 150.0).abs() < f64::EPSILON);
assert!((config.mu_burst_ms - 15.0).abs() < f64::EPSILON);
assert!((config.hazard_lambda - 30.0).abs() < f64::EPSILON);
assert!((config.steady_threshold - 0.25).abs() < f64::EPSILON);
assert!((config.burst_threshold - 0.6).abs() < f64::EPSILON);
}
#[test]
fn aggressive_coalesce_config_values_dup() {
let config = BocpdConfig::aggressive_coalesce();
assert!((config.mu_steady_ms - 250.0).abs() < f64::EPSILON);
assert!((config.mu_burst_ms - 25.0).abs() < f64::EPSILON);
assert!((config.hazard_lambda - 80.0).abs() < f64::EPSILON);
assert!((config.steady_threshold - 0.4).abs() < f64::EPSILON);
assert!((config.burst_threshold - 0.8).abs() < f64::EPSILON);
assert!((config.burst_prior - 0.3).abs() < f64::EPSILON);
}
#[test]
fn with_logging_builder_dup() {
let config = BocpdConfig::default().with_logging(true);
assert!(config.enable_logging);
let config2 = config.with_logging(false);
assert!(!config2.enable_logging);
}
#[test]
fn regime_as_str() {
assert_eq!(BocpdRegime::Steady.as_str(), "steady");
assert_eq!(BocpdRegime::Burst.as_str(), "burst");
assert_eq!(BocpdRegime::Transitional.as_str(), "transitional");
}
#[test]
fn regime_display() {
assert_eq!(format!("{}", BocpdRegime::Steady), "steady");
assert_eq!(format!("{}", BocpdRegime::Burst), "burst");
assert_eq!(format!("{}", BocpdRegime::Transitional), "transitional");
}
#[test]
fn regime_default_is_steady_dup() {
assert_eq!(BocpdRegime::default(), BocpdRegime::Steady);
}
#[test]
fn regime_clone_eq() {
let r = BocpdRegime::Burst;
assert_eq!(r, r.clone());
assert_ne!(BocpdRegime::Steady, BocpdRegime::Burst);
}
#[test]
fn detector_default_impl() {
let det = BocpdDetector::default();
assert_eq!(det.regime(), BocpdRegime::Steady);
assert_eq!(det.observation_count(), 0);
}
#[test]
fn detector_config_accessor() {
let config = BocpdConfig {
mu_steady_ms: 300.0,
..Default::default()
};
let det = BocpdDetector::new(config);
assert!((det.config().mu_steady_ms - 300.0).abs() < f64::EPSILON);
}
#[test]
fn detector_run_length_posterior_accessor() {
let det = BocpdDetector::with_defaults();
let posterior = det.run_length_posterior();
assert_eq!(posterior.len(), 101);
let sum: f64 = posterior.iter().sum();
assert!((sum - 1.0).abs() < 1e-10);
}
#[test]
fn detector_expected_run_length_initial() {
let det = BocpdDetector::with_defaults();
let erl = det.expected_run_length();
assert!((erl - 50.0).abs() < 1e-10);
}
#[test]
fn detector_last_evidence_initially_none() {
let det = BocpdDetector::with_defaults();
assert!(det.last_evidence().is_none());
}
#[test]
fn set_decision_context_updates_evidence() {
let mut det = BocpdDetector::with_defaults();
det.observe_event(Instant::now());
det.set_decision_context(16, 40, false);
let ev = det.last_evidence().unwrap();
assert_eq!(ev.recommended_delay_ms, Some(16)); assert_eq!(ev.hard_deadline_forced, Some(false));
}
#[test]
fn set_decision_context_noop_without_evidence() {
let mut det = BocpdDetector::with_defaults();
det.set_decision_context(16, 40, true);
assert!(det.last_evidence().is_none());
}
#[test]
fn evidence_jsonl_none_when_disabled() {
let mut det = BocpdDetector::with_defaults();
det.observe_event(Instant::now());
assert!(det.evidence_jsonl().is_none());
}
#[test]
fn decision_log_jsonl_none_when_disabled() {
let mut det = BocpdDetector::with_defaults();
det.observe_event(Instant::now());
assert!(det.decision_log_jsonl(16, 40, false).is_none());
}
#[test]
fn decision_log_jsonl_none_without_evidence() {
let det = BocpdDetector::new(BocpdConfig::default().with_logging(true));
assert!(det.decision_log_jsonl(16, 40, false).is_none());
}
#[test]
fn evidence_display_format() {
let mut det = BocpdDetector::with_defaults();
det.observe_event(Instant::now());
let ev = det.last_evidence().unwrap();
let display = format!("{}", ev);
assert!(display.contains("BOCPD Evidence:"));
assert!(display.contains("Regime:"));
assert!(display.contains("P(burst)"));
assert!(display.contains("Log BF:"));
assert!(display.contains("Observation:"));
assert!(display.contains("Likelihoods:"));
assert!(display.contains("E[run-length]:"));
assert!(display.contains("Observations:"));
}
#[test]
fn evidence_jsonl_with_decision_context() {
let mut det = BocpdDetector::new(BocpdConfig::default().with_logging(true));
det.observe_event(Instant::now());
det.set_decision_context(16, 40, true);
let jsonl = det.evidence_jsonl().unwrap();
assert!(jsonl.contains("\"delay_ms\":16"));
assert!(jsonl.contains("\"forced_deadline\":true"));
}
#[test]
fn evidence_jsonl_null_optional_fields() {
let mut det = BocpdDetector::new(BocpdConfig::default().with_logging(true));
det.observe_event(Instant::now());
let jsonl = det.evidence_jsonl().unwrap();
assert!(jsonl.contains("\"delay_ms\":null"));
assert!(jsonl.contains("\"forced_deadline\":null"));
}
#[test]
fn recommended_delay_at_exact_thresholds() {
let mut det = BocpdDetector::with_defaults();
det.p_burst = 0.3;
let delay = det.recommended_delay(16, 40);
assert_eq!(delay, 16);
det.p_burst = 0.7;
let delay = det.recommended_delay(16, 40);
assert_eq!(delay, 40); }
#[test]
fn recommended_delay_midpoint() {
let mut det = BocpdDetector::with_defaults();
det.p_burst = 0.5; let delay = det.recommended_delay(16, 40);
assert_eq!(delay, 28); }
#[test]
fn reset_clears_last_event_time() {
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
det.observe_event(start);
det.observe_event(start + Duration::from_millis(10));
assert_eq!(det.observation_count(), 2);
det.reset();
assert_eq!(det.observation_count(), 0);
assert!(det.last_evidence().is_none());
let _ = det.observe_event(start + Duration::from_millis(100));
assert_eq!(det.observation_count(), 1);
}
#[test]
fn first_event_uses_steady_default_dup() {
let mut det = BocpdDetector::with_defaults();
let t = Instant::now();
det.observe_event(t);
let ev = det.last_evidence().unwrap();
assert!((ev.observation_ms - 200.0).abs() < f64::EPSILON);
}
#[test]
fn observation_clamped_to_bounds() {
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
det.observe_event(start);
det.observe_event(start);
let ev = det.last_evidence().unwrap();
assert!(ev.observation_ms >= det.config().min_observation_ms);
}
#[test]
fn config_clone_debug() {
let config = BocpdConfig::default();
let cloned = config.clone();
assert!((cloned.mu_steady_ms - 200.0).abs() < f64::EPSILON);
let dbg = format!("{:?}", config);
assert!(dbg.contains("BocpdConfig"));
}
#[test]
fn detector_clone_debug() {
let det = BocpdDetector::with_defaults();
let cloned = det.clone();
assert!((cloned.p_burst() - det.p_burst()).abs() < f64::EPSILON);
let dbg = format!("{:?}", det);
assert!(dbg.contains("BocpdDetector"));
}
#[test]
fn evidence_clone() {
let mut det = BocpdDetector::with_defaults();
det.observe_event(Instant::now());
let ev = det.last_evidence().unwrap().clone();
assert_eq!(ev.observation_count, 1);
}
#[test]
fn change_points_counter_increments_on_regime_transition() {
let before = bocpd_change_points_detected_total();
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..5 {
det.observe_event(start + Duration::from_millis(200 * (i + 1)));
}
let after_steady = bocpd_change_points_detected_total();
let burst_start = start + Duration::from_millis(1100);
for i in 0..30 {
det.observe_event(burst_start + Duration::from_millis(5 * (i + 1)));
}
let after_burst = bocpd_change_points_detected_total();
assert!(
after_burst > before || after_burst > after_steady,
"Expected change-point counter to increment: before={before}, after_steady={after_steady}, after_burst={after_burst}"
);
}
#[test]
fn previous_regime_tracks_last_state() {
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
assert_eq!(det.previous_regime, BocpdRegime::Steady);
for i in 0..5 {
det.observe_event(start + Duration::from_millis(200 * (i + 1)));
}
assert_eq!(det.previous_regime, BocpdRegime::Steady);
}
#[test]
fn reset_clears_previous_regime() {
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..30 {
det.observe_event(start + Duration::from_millis(5 * (i + 1)));
}
det.reset();
assert_eq!(det.previous_regime, BocpdRegime::Steady);
}
#[test]
fn observe_event_returns_correct_regime() {
let mut det = BocpdDetector::with_defaults();
let start = Instant::now();
for i in 0..10 {
let regime = det.observe_event(start + Duration::from_millis(200 * (i + 1)));
assert_eq!(regime, det.regime());
}
}
}