Skip to main content

do_memory_mcp/patterns/statistical/analysis/
bocpd.rs

1//! # Bayesian Online Change Point Detection (BOCPD)
2//!
3//! Implementation of BOCPD algorithm for detecting changepoints in time-series data.
4
5use anyhow::Result;
6use std::collections::VecDeque;
7
8use super::types::{BOCPDConfig, BOCPDResult, BOCPDState};
9
10/// Simple BOCPD detector
11#[derive(Debug)]
12pub struct SimpleBOCPD {
13    pub(crate) config: BOCPDConfig,
14    pub(crate) state: BOCPDState,
15}
16
17/// Simple BOCPD Implementation
18impl SimpleBOCPD {
19    /// Create a new simple BOCPD detector
20    pub fn new(config: BOCPDConfig) -> Self {
21        let state = BOCPDState {
22            log_posterior: vec![0.0; config.max_run_length_hypotheses],
23            hazard_rate: config.hazard_rate,
24            data_buffer: VecDeque::with_capacity(config.buffer_size),
25            max_buffer_size: config.buffer_size,
26            processed_points: 0,
27        };
28
29        Self { config, state }
30    }
31
32    /// Detect changepoints using BOCD
33    pub fn detect_changepoints(&mut self, data: &[f64]) -> Result<Vec<BOCPDResult>> {
34        let mut results = Vec::new();
35
36        for (i, &value) in data.iter().enumerate() {
37            self.update_state(value)?;
38
39            // Check for changepoint
40            if let Some(prob) = self.extract_changepoint()? {
41                if prob > self.config.alert_threshold {
42                    results.push(BOCPDResult {
43                        changepoint_index: Some(i),
44                        changepoint_probability: prob,
45                        map_run_length: self.compute_map_run_length(),
46                        run_length_distribution: self.normalize_distribution(),
47                        confidence: prob.min(1.0),
48                    });
49                }
50            }
51        }
52
53        Ok(results)
54    }
55
56    /// Update BOCPD state with new observation
57    pub(crate) fn update_state(&mut self, observation: f64) -> Result<()> {
58        // Add to buffer
59        self.state.data_buffer.push_back(observation);
60        if self.state.data_buffer.len() > self.state.max_buffer_size {
61            self.state.data_buffer.pop_front();
62        }
63
64        self.state.processed_points += 1;
65
66        // Update posterior if we have enough data
67        if self.state.data_buffer.len() >= 2 {
68            self.update_posterior(observation)?;
69        }
70
71        Ok(())
72    }
73
74    /// Update posterior distribution
75    #[allow(clippy::needless_range_loop)]
76    fn update_posterior(&mut self, observation: f64) -> Result<()> {
77        let max_r = self.state.log_posterior.len() - 1;
78        let mut new_posterior = vec![f64::NEG_INFINITY; self.state.log_posterior.len()];
79
80        let hazard_prob = (self.state.hazard_rate / (1.0 + self.state.hazard_rate)).ln();
81        let survival_prob = (1.0 / (1.0 + self.state.hazard_rate)).ln();
82
83        // Pre-compute likelihood once for this observation
84        let log_likelihood = self.compute_likelihood(observation)?;
85
86        // For r=0 (changepoint case): use cached max instead of iterating all prev_r
87        let max_prev_log_post = self
88            .state
89            .log_posterior
90            .iter()
91            .filter(|&&x| x.is_finite())
92            .fold(f64::NEG_INFINITY, |a, &b| a.max(b));
93
94        if max_prev_log_post.is_finite() {
95            let finite_count = self
96                .state
97                .log_posterior
98                .iter()
99                .filter(|&&x| x.is_finite())
100                .count();
101            new_posterior[0] =
102                (finite_count as f64).ln() + max_prev_log_post + hazard_prob + log_likelihood;
103        }
104
105        // For r>0 (continuity case): shift previous posterior
106        for r in 1..=max_r {
107            if self.state.log_posterior[r - 1].is_finite() {
108                new_posterior[r] = self.state.log_posterior[r - 1] + survival_prob + log_likelihood;
109            }
110        }
111
112        // Normalize
113        let log_normalizer = log_sum_exp(&new_posterior);
114        if log_normalizer.is_finite() {
115            for val in &mut new_posterior {
116                if val.is_finite() {
117                    *val -= log_normalizer;
118                }
119            }
120        }
121
122        self.state.log_posterior = new_posterior;
123        Ok(())
124    }
125
126    /// Compute likelihood of observation
127    fn compute_likelihood(&self, observation: f64) -> Result<f64> {
128        if self.state.data_buffer.len() < 2 {
129            return Ok(0.0);
130        }
131
132        let recent_data: Vec<f64> = self
133            .state
134            .data_buffer
135            .iter()
136            .rev()
137            .skip(1)
138            .take(5)
139            .cloned()
140            .collect();
141
142        if recent_data.is_empty() {
143            return Ok(0.0);
144        }
145
146        let mean = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
147        let variance =
148            recent_data.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / recent_data.len() as f64;
149        let std_dev = variance.sqrt().max(0.01);
150
151        let z_score = (observation - mean) / std_dev;
152        let log_likelihood = -0.5 * (z_score.powi(2) + (2.0 * std_dev * std_dev * std_dev).ln());
153
154        Ok(log_likelihood)
155    }
156
157    /// Extract changepoint if detected
158    fn extract_changepoint(&self) -> Result<Option<f64>> {
159        if self.state.processed_points < 10 {
160            return Ok(None);
161        }
162
163        // Low-variance guard
164        let recent: Vec<f64> = self
165            .state
166            .data_buffer
167            .iter()
168            .rev()
169            .take(20)
170            .copied()
171            .collect();
172        if recent.len() >= 5 {
173            let mean = recent.iter().sum::<f64>() / recent.len() as f64;
174            let var = recent
175                .iter()
176                .map(|&x| {
177                    let d = x - mean;
178                    d * d
179                })
180                .sum::<f64>()
181                / recent.len() as f64;
182            if var < 1e-6 {
183                return Ok(Some(0.0));
184            }
185        }
186
187        let changepoint_prob = self.state.log_posterior[0].exp();
188        Ok(Some(changepoint_prob))
189    }
190
191    /// Compute MAP run length
192    fn compute_map_run_length(&self) -> usize {
193        let mut max_log_prob = f64::NEG_INFINITY;
194        let mut map_index = 0;
195
196        for (i, &log_prob) in self.state.log_posterior.iter().enumerate() {
197            if log_prob > max_log_prob {
198                max_log_prob = log_prob;
199                map_index = i;
200            }
201        }
202
203        map_index
204    }
205
206    /// Normalize distribution
207    pub(crate) fn normalize_distribution(&self) -> Vec<f64> {
208        let log_normalizer = log_sum_exp(&self.state.log_posterior);
209        self.state
210            .log_posterior
211            .iter()
212            .map(|&x| (x - log_normalizer).exp())
213            .collect()
214    }
215}
216
217/// Compute log-sum-exp of a vector in log space
218pub fn log_sum_exp(values: &[f64]) -> f64 {
219    if values.is_empty() {
220        return f64::NEG_INFINITY;
221    }
222
223    let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
224
225    if max_val.is_infinite() && max_val < 0.0 {
226        return max_val;
227    }
228
229    let sum: f64 = values
230        .iter()
231        .filter(|&&x| x.is_finite())
232        .map(|&x| (x - max_val).exp())
233        .sum();
234
235    if sum == 0.0 {
236        max_val
237    } else {
238        max_val + sum.ln()
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn test_log_sum_exp() {
248        let values = vec![1.0, 2.0, 3.0];
249        let result = log_sum_exp(&values);
250        assert!(result.is_finite());
251        assert!(result > 3.0); // Should be > max value
252    }
253
254    #[test]
255    fn test_log_sum_exp_empty() {
256        let values: Vec<f64> = vec![];
257        let result = log_sum_exp(&values);
258        assert_eq!(result, f64::NEG_INFINITY);
259    }
260}