1#![forbid(unsafe_code)]
2
3use std::fmt;
177use std::time::Instant;
178
179#[derive(Debug, Clone)]
185pub struct BocpdConfig {
186 pub mu_steady_ms: f64,
190
191 pub mu_burst_ms: f64,
195
196 pub hazard_lambda: f64,
200
201 pub max_run_length: usize,
205
206 pub steady_threshold: f64,
210
211 pub burst_threshold: f64,
215
216 pub burst_prior: f64,
220
221 pub min_observation_ms: f64,
224
225 pub max_observation_ms: f64,
228
229 pub enable_logging: bool,
232}
233
234impl Default for BocpdConfig {
235 fn default() -> Self {
236 Self {
237 mu_steady_ms: 200.0,
238 mu_burst_ms: 20.0,
239 hazard_lambda: 50.0,
240 max_run_length: 100,
241 steady_threshold: 0.3,
242 burst_threshold: 0.7,
243 burst_prior: 0.2,
244 min_observation_ms: 1.0,
245 max_observation_ms: 10000.0,
246 enable_logging: false,
247 }
248 }
249}
250
251impl BocpdConfig {
252 #[must_use]
256 pub fn responsive() -> Self {
257 Self {
258 mu_steady_ms: 150.0,
259 mu_burst_ms: 15.0,
260 hazard_lambda: 30.0,
261 steady_threshold: 0.25,
262 burst_threshold: 0.6,
263 ..Default::default()
264 }
265 }
266
267 #[must_use]
271 pub fn aggressive_coalesce() -> Self {
272 Self {
273 mu_steady_ms: 250.0,
274 mu_burst_ms: 25.0,
275 hazard_lambda: 80.0,
276 steady_threshold: 0.4,
277 burst_threshold: 0.8,
278 burst_prior: 0.3,
279 ..Default::default()
280 }
281 }
282
283 #[must_use]
285 pub fn with_logging(mut self, enabled: bool) -> Self {
286 self.enable_logging = enabled;
287 self
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
297pub enum BocpdRegime {
298 #[default]
300 Steady,
301 Burst,
303 Transitional,
305}
306
307impl BocpdRegime {
308 #[must_use]
310 pub const fn as_str(self) -> &'static str {
311 match self {
312 Self::Steady => "steady",
313 Self::Burst => "burst",
314 Self::Transitional => "transitional",
315 }
316 }
317}
318
319impl fmt::Display for BocpdRegime {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 write!(f, "{}", self.as_str())
322 }
323}
324
325#[derive(Debug, Clone)]
333pub struct BocpdEvidence {
334 pub p_burst: f64,
336
337 pub log_bayes_factor: f64,
339
340 pub observation_ms: f64,
342
343 pub regime: BocpdRegime,
345
346 pub likelihood_steady: f64,
348
349 pub likelihood_burst: f64,
351
352 pub expected_run_length: f64,
354
355 pub run_length_variance: f64,
357
358 pub run_length_mode: usize,
360
361 pub run_length_p95: usize,
363
364 pub run_length_tail_mass: f64,
366
367 pub recommended_delay_ms: Option<u64>,
369
370 pub hard_deadline_forced: Option<bool>,
372
373 pub observation_count: u64,
375
376 pub timestamp: Instant,
378}
379
380impl BocpdEvidence {
381 #[must_use]
383 pub fn to_jsonl(&self) -> String {
384 const SCHEMA_VERSION: &str = "bocpd-v1";
385 let delay_ms = self
386 .recommended_delay_ms
387 .map(|v| v.to_string())
388 .unwrap_or_else(|| "null".to_string());
389 let forced = self
390 .hard_deadline_forced
391 .map(|v| v.to_string())
392 .unwrap_or_else(|| "null".to_string());
393 format!(
394 r#"{{"schema_version":"{}","event":"bocpd","p_burst":{:.4},"log_bf":{:.3},"obs_ms":{:.1},"regime":"{}","ll_steady":{:.6},"ll_burst":{:.6},"runlen_mean":{:.1},"runlen_var":{:.3},"runlen_mode":{},"runlen_p95":{},"runlen_tail":{:.4},"delay_ms":{},"forced_deadline":{},"n_obs":{}}}"#,
395 SCHEMA_VERSION,
396 self.p_burst,
397 self.log_bayes_factor,
398 self.observation_ms,
399 self.regime.as_str(),
400 self.likelihood_steady,
401 self.likelihood_burst,
402 self.expected_run_length,
403 self.run_length_variance,
404 self.run_length_mode,
405 self.run_length_p95,
406 self.run_length_tail_mass,
407 delay_ms,
408 forced,
409 self.observation_count,
410 )
411 }
412}
413
414impl fmt::Display for BocpdEvidence {
415 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416 writeln!(f, "BOCPD Evidence:")?;
417 writeln!(
418 f,
419 " Regime: {} (P(burst) = {:.3})",
420 self.regime, self.p_burst
421 )?;
422 writeln!(
423 f,
424 " Log BF: {:+.3} (positive favors burst)",
425 self.log_bayes_factor
426 )?;
427 writeln!(f, " Observation: {:.1} ms", self.observation_ms)?;
428 writeln!(
429 f,
430 " Likelihoods: steady={:.6}, burst={:.6}",
431 self.likelihood_steady, self.likelihood_burst
432 )?;
433 writeln!(f, " E[run-length]: {:.1}", self.expected_run_length)?;
434 write!(f, " Observations: {}", self.observation_count)
435 }
436}
437
438#[derive(Debug, Clone, Copy)]
439struct RunLengthSummary {
440 mean: f64,
441 variance: f64,
442 mode: usize,
443 p95: usize,
444 tail_mass: f64,
445}
446
447#[derive(Debug, Clone)]
456pub struct BocpdDetector {
457 config: BocpdConfig,
459
460 run_length_posterior: Vec<f64>,
463
464 p_burst: f64,
466
467 last_event_time: Option<Instant>,
469
470 observation_count: u64,
472
473 last_evidence: Option<BocpdEvidence>,
475
476 lambda_steady: f64, lambda_burst: f64, hazard: f64, }
481
482impl BocpdDetector {
483 pub fn new(config: BocpdConfig) -> Self {
485 let mut config = config;
486 config.max_run_length = config.max_run_length.max(1);
487 config.mu_steady_ms = config.mu_steady_ms.max(1.0);
488 config.mu_burst_ms = config.mu_burst_ms.max(1.0);
489 config.hazard_lambda = config.hazard_lambda.max(1.0);
490 config.min_observation_ms = config.min_observation_ms.max(0.1);
491 config.max_observation_ms = config.max_observation_ms.max(config.min_observation_ms);
492 config.steady_threshold = config.steady_threshold.clamp(0.0, 1.0);
493 config.burst_threshold = config.burst_threshold.clamp(0.0, 1.0);
494 if config.burst_threshold < config.steady_threshold {
495 std::mem::swap(&mut config.steady_threshold, &mut config.burst_threshold);
496 }
497 config.burst_prior = config.burst_prior.clamp(0.001, 0.999);
498
499 let k = config.max_run_length;
500
501 let initial_prob = 1.0 / (k + 1) as f64;
503 let run_length_posterior = vec![initial_prob; k + 1];
504
505 let lambda_steady = 1.0 / config.mu_steady_ms;
507 let lambda_burst = 1.0 / config.mu_burst_ms;
508 let hazard = 1.0 / config.hazard_lambda;
509
510 Self {
511 p_burst: config.burst_prior,
512 run_length_posterior,
513 last_event_time: None,
514 observation_count: 0,
515 last_evidence: None,
516 lambda_steady,
517 lambda_burst,
518 hazard,
519 config,
520 }
521 }
522
523 pub fn with_defaults() -> Self {
525 Self::new(BocpdConfig::default())
526 }
527
528 #[inline]
530 pub fn p_burst(&self) -> f64 {
531 self.p_burst
532 }
533
534 #[inline]
539 pub fn run_length_posterior(&self) -> &[f64] {
540 &self.run_length_posterior
541 }
542
543 #[inline]
545 pub fn regime(&self) -> BocpdRegime {
546 if self.p_burst < self.config.steady_threshold {
547 BocpdRegime::Steady
548 } else if self.p_burst > self.config.burst_threshold {
549 BocpdRegime::Burst
550 } else {
551 BocpdRegime::Transitional
552 }
553 }
554
555 pub fn expected_run_length(&self) -> f64 {
557 self.run_length_posterior
558 .iter()
559 .enumerate()
560 .map(|(r, p)| r as f64 * p)
561 .sum()
562 }
563
564 fn run_length_summary(&self) -> RunLengthSummary {
565 let mean = self.expected_run_length();
566 let mut variance = 0.0;
567 let mut mode = 0;
568 let mut mode_p = -1.0;
569 let mut cumulative = 0.0;
570 let mut p95 = self.config.max_run_length;
571
572 for (r, p) in self.run_length_posterior.iter().enumerate() {
573 if *p > mode_p {
574 mode_p = *p;
575 mode = r;
576 }
577 let diff = r as f64 - mean;
578 variance += p * diff * diff;
579 if cumulative < 0.95 {
580 cumulative += p;
581 if cumulative >= 0.95 {
582 p95 = r;
583 }
584 }
585 }
586
587 RunLengthSummary {
588 mean,
589 variance,
590 mode,
591 p95,
592 tail_mass: self.run_length_posterior[self.config.max_run_length],
593 }
594 }
595
596 pub fn last_evidence(&self) -> Option<&BocpdEvidence> {
598 self.last_evidence.as_ref()
599 }
600
601 pub fn set_decision_context(
606 &mut self,
607 steady_delay_ms: u64,
608 burst_delay_ms: u64,
609 hard_deadline_forced: bool,
610 ) {
611 let recommended_delay = self.recommended_delay(steady_delay_ms, burst_delay_ms);
612 if let Some(ref mut evidence) = self.last_evidence {
613 evidence.recommended_delay_ms = Some(recommended_delay);
614 evidence.hard_deadline_forced = Some(hard_deadline_forced);
615 }
616 }
617
618 #[must_use]
620 pub fn evidence_jsonl(&self) -> Option<String> {
621 if !self.config.enable_logging {
622 return None;
623 }
624 self.last_evidence.as_ref().map(BocpdEvidence::to_jsonl)
625 }
626
627 #[must_use]
629 pub fn decision_log_jsonl(
630 &self,
631 steady_delay_ms: u64,
632 burst_delay_ms: u64,
633 hard_deadline_forced: bool,
634 ) -> Option<String> {
635 if !self.config.enable_logging {
636 return None;
637 }
638 let mut evidence = self.last_evidence.clone()?;
639 evidence.recommended_delay_ms =
640 Some(self.recommended_delay(steady_delay_ms, burst_delay_ms));
641 evidence.hard_deadline_forced = Some(hard_deadline_forced);
642 Some(evidence.to_jsonl())
643 }
644
645 #[inline]
647 pub fn observation_count(&self) -> u64 {
648 self.observation_count
649 }
650
651 pub fn config(&self) -> &BocpdConfig {
653 &self.config
654 }
655
656 pub fn observe_event(&mut self, now: Instant) -> BocpdRegime {
660 let observation_ms = self
662 .last_event_time
663 .map(|last| now.duration_since(last).as_secs_f64() * 1000.0)
664 .unwrap_or(self.config.mu_steady_ms); let x = observation_ms
668 .max(self.config.min_observation_ms)
669 .min(self.config.max_observation_ms);
670
671 self.update_posterior(x, now);
673
674 self.last_event_time = Some(now);
676
677 self.regime()
678 }
679
680 fn update_posterior(&mut self, x: f64, now: Instant) {
682 self.observation_count += 1;
683
684 let ll_steady = self.exponential_pdf(x, self.lambda_steady);
686 let ll_burst = self.exponential_pdf(x, self.lambda_burst);
687
688 let log_bf = if ll_steady > 0.0 && ll_burst > 0.0 {
690 (ll_burst / ll_steady).log10()
691 } else {
692 0.0
693 };
694
695 let k = self.config.max_run_length;
697 let mut new_posterior = vec![0.0; k + 1];
698
699 for r in 0..k {
701 let growth_prob = self.run_length_posterior[r] * (1.0 - self.hazard);
702 new_posterior[r + 1] += growth_prob * self.predictive_likelihood(r, x);
703 }
704
705 new_posterior[k] +=
707 self.run_length_posterior[k] * (1.0 - self.hazard) * self.predictive_likelihood(k, x);
708
709 let cp_prob: f64 = self
711 .run_length_posterior
712 .iter()
713 .enumerate()
714 .map(|(r, &p)| p * self.hazard * self.predictive_likelihood(r, x))
715 .sum();
716 new_posterior[0] = cp_prob;
717
718 let total: f64 = new_posterior.iter().sum();
720 if total > 0.0 {
721 for p in &mut new_posterior {
722 *p /= total;
723 }
724 } else {
725 let uniform = 1.0 / (k + 1) as f64;
727 new_posterior.fill(uniform);
728 }
729
730 self.run_length_posterior = new_posterior;
731
732 let prior_odds = self.p_burst / (1.0 - self.p_burst).max(1e-10);
735 let likelihood_ratio = ll_burst / ll_steady.max(1e-10);
736 let posterior_odds = prior_odds * likelihood_ratio;
737 self.p_burst = (posterior_odds / (1.0 + posterior_odds)).clamp(0.001, 0.999);
738
739 let summary = self.run_length_summary();
741 self.last_evidence = Some(BocpdEvidence {
742 p_burst: self.p_burst,
743 log_bayes_factor: log_bf,
744 observation_ms: x,
745 regime: self.regime(),
746 likelihood_steady: ll_steady,
747 likelihood_burst: ll_burst,
748 expected_run_length: summary.mean,
749 run_length_variance: summary.variance,
750 run_length_mode: summary.mode,
751 run_length_p95: summary.p95,
752 run_length_tail_mass: summary.tail_mass,
753 recommended_delay_ms: None,
754 hard_deadline_forced: None,
755 observation_count: self.observation_count,
756 timestamp: now,
757 });
758 }
759
760 #[inline]
762 fn exponential_pdf(&self, x: f64, lambda: f64) -> f64 {
763 lambda * (-lambda * x).exp()
764 }
765
766 #[inline]
771 fn predictive_likelihood(&self, _r: usize, x: f64) -> f64 {
772 let ll_steady = self.exponential_pdf(x, self.lambda_steady);
775 let ll_burst = self.exponential_pdf(x, self.lambda_burst);
776 self.p_burst * ll_burst + (1.0 - self.p_burst) * ll_steady
777 }
778
779 pub fn reset(&mut self) {
781 let k = self.config.max_run_length;
782 let initial_prob = 1.0 / (k + 1) as f64;
783 self.run_length_posterior = vec![initial_prob; k + 1];
784 self.p_burst = self.config.burst_prior;
785 self.last_event_time = None;
786 self.observation_count = 0;
787 self.last_evidence = None;
788 }
789
790 pub fn recommended_delay(&self, steady_delay_ms: u64, burst_delay_ms: u64) -> u64 {
794 if self.p_burst < self.config.steady_threshold {
795 steady_delay_ms
796 } else if self.p_burst > self.config.burst_threshold {
797 burst_delay_ms
798 } else {
799 let denom = (self.config.burst_threshold - self.config.steady_threshold).max(1e-6);
801 let t = ((self.p_burst - self.config.steady_threshold) / denom).clamp(0.0, 1.0);
802 let delay = steady_delay_ms as f64 * (1.0 - t) + burst_delay_ms as f64 * t;
803 delay.round() as u64
804 }
805 }
806}
807
808impl Default for BocpdDetector {
809 fn default() -> Self {
810 Self::with_defaults()
811 }
812}
813
814#[cfg(test)]
819mod tests {
820 use super::*;
821 use std::time::Duration;
822
823 #[test]
824 fn test_default_config() {
825 let config = BocpdConfig::default();
826 assert!((config.mu_steady_ms - 200.0).abs() < 0.01);
827 assert!((config.mu_burst_ms - 20.0).abs() < 0.01);
828 assert_eq!(config.max_run_length, 100);
829 }
830
831 #[test]
832 fn test_initial_state() {
833 let detector = BocpdDetector::with_defaults();
834 assert!((detector.p_burst() - 0.2).abs() < 0.01); assert_eq!(detector.regime(), BocpdRegime::Steady);
836 assert_eq!(detector.observation_count(), 0);
837 }
838
839 #[test]
840 fn test_steady_detection() {
841 let mut detector = BocpdDetector::with_defaults();
842 let start = Instant::now();
843
844 for i in 0..10 {
846 let t = start + Duration::from_millis(200 * (i + 1));
847 detector.observe_event(t);
848 }
849
850 assert!(
851 detector.p_burst() < 0.5,
852 "p_burst={} should be low",
853 detector.p_burst()
854 );
855 assert_eq!(detector.regime(), BocpdRegime::Steady);
856 }
857
858 #[test]
859 fn test_burst_detection() {
860 let mut detector = BocpdDetector::with_defaults();
861 let start = Instant::now();
862
863 for i in 0..20 {
865 let t = start + Duration::from_millis(10 * (i + 1));
866 detector.observe_event(t);
867 }
868
869 assert!(
870 detector.p_burst() > 0.5,
871 "p_burst={} should be high",
872 detector.p_burst()
873 );
874 assert!(matches!(
875 detector.regime(),
876 BocpdRegime::Burst | BocpdRegime::Transitional
877 ));
878 }
879
880 #[test]
881 fn test_regime_transition() {
882 let mut detector = BocpdDetector::with_defaults();
883 let start = Instant::now();
884
885 for i in 0..5 {
887 let t = start + Duration::from_millis(200 * (i + 1));
888 detector.observe_event(t);
889 }
890 let initial_p_burst = detector.p_burst();
891
892 let burst_start = start + Duration::from_millis(1000);
894 for i in 0..20 {
895 let t = burst_start + Duration::from_millis(10 * (i + 1));
896 detector.observe_event(t);
897 }
898
899 assert!(
900 detector.p_burst() > initial_p_burst,
901 "p_burst should increase during burst"
902 );
903 }
904
905 #[test]
906 fn test_evidence_stored() {
907 let mut detector = BocpdDetector::with_defaults();
908 let t = Instant::now();
909 detector.observe_event(t);
910
911 let evidence = detector.last_evidence().expect("Evidence should be stored");
912 assert_eq!(evidence.observation_count, 1);
913 assert!(evidence.log_bayes_factor.is_finite());
914 }
915
916 #[test]
917 fn test_reset() {
918 let mut detector = BocpdDetector::with_defaults();
919 let start = Instant::now();
920
921 for i in 0..10 {
923 let t = start + Duration::from_millis(10 * (i + 1));
924 detector.observe_event(t);
925 }
926
927 detector.reset();
928
929 assert!((detector.p_burst() - 0.2).abs() < 0.01);
930 assert_eq!(detector.observation_count(), 0);
931 assert!(detector.last_evidence().is_none());
932 }
933
934 #[test]
935 fn test_recommended_delay() {
936 let mut detector = BocpdDetector::with_defaults();
937
938 assert_eq!(detector.recommended_delay(16, 40), 16);
940
941 detector.p_burst = 0.9;
943 assert_eq!(detector.recommended_delay(16, 40), 40);
944
945 detector.p_burst = 0.5;
947 let delay = detector.recommended_delay(16, 40);
948 assert!(
949 delay > 16 && delay < 40,
950 "delay={} should be interpolated",
951 delay
952 );
953 }
954
955 #[test]
956 fn test_deterministic() {
957 let mut det1 = BocpdDetector::with_defaults();
958 let mut det2 = BocpdDetector::with_defaults();
959 let start = Instant::now();
960
961 for i in 0..10 {
962 let t = start + Duration::from_millis(15 * (i + 1));
963 det1.observe_event(t);
964 det2.observe_event(t);
965 }
966
967 assert!((det1.p_burst() - det2.p_burst()).abs() < 1e-10);
968 assert_eq!(det1.regime(), det2.regime());
969 }
970
971 #[test]
972 fn test_posterior_normalized() {
973 let mut detector = BocpdDetector::with_defaults();
974 let start = Instant::now();
975
976 for i in 0..20 {
977 let t = start + Duration::from_millis(25 * (i + 1));
978 detector.observe_event(t);
979
980 let sum: f64 = detector.run_length_posterior.iter().sum();
981 assert!(
982 (sum - 1.0).abs() < 1e-6,
983 "Posterior not normalized: sum={}",
984 sum
985 );
986 }
987 }
988
989 #[test]
990 fn test_p_burst_bounded() {
991 let mut detector = BocpdDetector::with_defaults();
992 let start = Instant::now();
993
994 for i in 0..100 {
996 let t = start + Duration::from_millis(i + 1);
997 detector.observe_event(t);
998 assert!(detector.p_burst() >= 0.0 && detector.p_burst() <= 1.0);
999 }
1000 }
1001
1002 #[test]
1003 fn config_sanitization_clamps_thresholds_and_priors() {
1004 let config = BocpdConfig {
1005 steady_threshold: 0.9,
1006 burst_threshold: 0.1,
1007 burst_prior: 2.0,
1008 max_run_length: 0,
1009 mu_steady_ms: 0.0,
1010 mu_burst_ms: 0.0,
1011 hazard_lambda: 0.0,
1012 min_observation_ms: 0.0,
1013 max_observation_ms: 0.0,
1014 ..Default::default()
1015 };
1016
1017 let detector = BocpdDetector::new(config);
1018 let cfg = detector.config();
1019
1020 assert!(
1021 cfg.steady_threshold <= cfg.burst_threshold,
1022 "thresholds should be ordered after sanitization"
1023 );
1024 assert_eq!(cfg.max_run_length, 1);
1025 assert!(cfg.mu_steady_ms >= 1.0);
1026 assert!(cfg.mu_burst_ms >= 1.0);
1027 assert!(cfg.hazard_lambda >= 1.0);
1028 assert!(cfg.min_observation_ms >= 0.1);
1029 assert!(cfg.max_observation_ms >= cfg.min_observation_ms);
1030 assert!(
1031 (0.0..=1.0).contains(&detector.p_burst()),
1032 "p_burst should be clamped into [0,1]"
1033 );
1034 }
1035
1036 #[test]
1037 fn test_jsonl_output() {
1038 let mut detector = BocpdDetector::with_defaults();
1039 let t = Instant::now();
1040 detector.observe_event(t);
1041 detector.config.enable_logging = true;
1042
1043 let jsonl = detector
1044 .decision_log_jsonl(16, 40, false)
1045 .expect("jsonl should be emitted when enabled");
1046
1047 assert!(jsonl.contains("bocpd-v1"));
1048 assert!(jsonl.contains("p_burst"));
1049 assert!(jsonl.contains("regime"));
1050 assert!(jsonl.contains("runlen_mean"));
1051 assert!(jsonl.contains("runlen_mode"));
1052 assert!(jsonl.contains("runlen_p95"));
1053 assert!(jsonl.contains("delay_ms"));
1054 assert!(jsonl.contains("forced_deadline"));
1055 }
1056
1057 #[test]
1058 fn evidence_jsonl_respects_config() {
1059 let mut detector = BocpdDetector::with_defaults();
1060 let t = Instant::now();
1061 detector.observe_event(t);
1062
1063 assert!(detector.evidence_jsonl().is_none());
1064
1065 detector.config.enable_logging = true;
1066 assert!(detector.evidence_jsonl().is_some());
1067 }
1068
1069 #[test]
1071 fn prop_expected_runlen_non_negative() {
1072 let mut detector = BocpdDetector::with_defaults();
1073 let start = Instant::now();
1074
1075 for i in 0..50 {
1076 let t = start + Duration::from_millis((i % 30 + 5) * (i + 1));
1077 detector.observe_event(t);
1078 assert!(detector.expected_run_length() >= 0.0);
1079 }
1080 }
1081}