do_memory_mcp/patterns/statistical/analysis/
bocpd.rs1use anyhow::Result;
6use std::collections::VecDeque;
7
8use super::types::{BOCPDConfig, BOCPDResult, BOCPDState};
9
10#[derive(Debug)]
12pub struct SimpleBOCPD {
13 pub(crate) config: BOCPDConfig,
14 pub(crate) state: BOCPDState,
15}
16
17impl SimpleBOCPD {
19 pub fn new(config: BOCPDConfig) -> Self {
21 let state = BOCPDState {
22 log_posterior: vec![0.0; config.max_run_length_hypotheses],
23 hazard_rate: config.hazard_rate,
24 data_buffer: VecDeque::with_capacity(config.buffer_size),
25 max_buffer_size: config.buffer_size,
26 processed_points: 0,
27 };
28
29 Self { config, state }
30 }
31
32 pub fn detect_changepoints(&mut self, data: &[f64]) -> Result<Vec<BOCPDResult>> {
34 let mut results = Vec::new();
35
36 for (i, &value) in data.iter().enumerate() {
37 self.update_state(value)?;
38
39 if let Some(prob) = self.extract_changepoint()? {
41 if prob > self.config.alert_threshold {
42 results.push(BOCPDResult {
43 changepoint_index: Some(i),
44 changepoint_probability: prob,
45 map_run_length: self.compute_map_run_length(),
46 run_length_distribution: self.normalize_distribution(),
47 confidence: prob.min(1.0),
48 });
49 }
50 }
51 }
52
53 Ok(results)
54 }
55
56 pub(crate) fn update_state(&mut self, observation: f64) -> Result<()> {
58 self.state.data_buffer.push_back(observation);
60 if self.state.data_buffer.len() > self.state.max_buffer_size {
61 self.state.data_buffer.pop_front();
62 }
63
64 self.state.processed_points += 1;
65
66 if self.state.data_buffer.len() >= 2 {
68 self.update_posterior(observation)?;
69 }
70
71 Ok(())
72 }
73
74 #[allow(clippy::needless_range_loop)]
76 fn update_posterior(&mut self, observation: f64) -> Result<()> {
77 let max_r = self.state.log_posterior.len() - 1;
78 let mut new_posterior = vec![f64::NEG_INFINITY; self.state.log_posterior.len()];
79
80 let hazard_prob = (self.state.hazard_rate / (1.0 + self.state.hazard_rate)).ln();
81 let survival_prob = (1.0 / (1.0 + self.state.hazard_rate)).ln();
82
83 let log_likelihood = self.compute_likelihood(observation)?;
85
86 let max_prev_log_post = self
88 .state
89 .log_posterior
90 .iter()
91 .filter(|&&x| x.is_finite())
92 .fold(f64::NEG_INFINITY, |a, &b| a.max(b));
93
94 if max_prev_log_post.is_finite() {
95 let finite_count = self
96 .state
97 .log_posterior
98 .iter()
99 .filter(|&&x| x.is_finite())
100 .count();
101 new_posterior[0] =
102 (finite_count as f64).ln() + max_prev_log_post + hazard_prob + log_likelihood;
103 }
104
105 for r in 1..=max_r {
107 if self.state.log_posterior[r - 1].is_finite() {
108 new_posterior[r] = self.state.log_posterior[r - 1] + survival_prob + log_likelihood;
109 }
110 }
111
112 let log_normalizer = log_sum_exp(&new_posterior);
114 if log_normalizer.is_finite() {
115 for val in &mut new_posterior {
116 if val.is_finite() {
117 *val -= log_normalizer;
118 }
119 }
120 }
121
122 self.state.log_posterior = new_posterior;
123 Ok(())
124 }
125
126 fn compute_likelihood(&self, observation: f64) -> Result<f64> {
128 if self.state.data_buffer.len() < 2 {
129 return Ok(0.0);
130 }
131
132 let recent_data: Vec<f64> = self
133 .state
134 .data_buffer
135 .iter()
136 .rev()
137 .skip(1)
138 .take(5)
139 .cloned()
140 .collect();
141
142 if recent_data.is_empty() {
143 return Ok(0.0);
144 }
145
146 let mean = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
147 let variance =
148 recent_data.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / recent_data.len() as f64;
149 let std_dev = variance.sqrt().max(0.01);
150
151 let z_score = (observation - mean) / std_dev;
152 let log_likelihood = -0.5 * (z_score.powi(2) + (2.0 * std_dev * std_dev * std_dev).ln());
153
154 Ok(log_likelihood)
155 }
156
157 fn extract_changepoint(&self) -> Result<Option<f64>> {
159 if self.state.processed_points < 10 {
160 return Ok(None);
161 }
162
163 let recent: Vec<f64> = self
165 .state
166 .data_buffer
167 .iter()
168 .rev()
169 .take(20)
170 .copied()
171 .collect();
172 if recent.len() >= 5 {
173 let mean = recent.iter().sum::<f64>() / recent.len() as f64;
174 let var = recent
175 .iter()
176 .map(|&x| {
177 let d = x - mean;
178 d * d
179 })
180 .sum::<f64>()
181 / recent.len() as f64;
182 if var < 1e-6 {
183 return Ok(Some(0.0));
184 }
185 }
186
187 let changepoint_prob = self.state.log_posterior[0].exp();
188 Ok(Some(changepoint_prob))
189 }
190
191 fn compute_map_run_length(&self) -> usize {
193 let mut max_log_prob = f64::NEG_INFINITY;
194 let mut map_index = 0;
195
196 for (i, &log_prob) in self.state.log_posterior.iter().enumerate() {
197 if log_prob > max_log_prob {
198 max_log_prob = log_prob;
199 map_index = i;
200 }
201 }
202
203 map_index
204 }
205
206 pub(crate) fn normalize_distribution(&self) -> Vec<f64> {
208 let log_normalizer = log_sum_exp(&self.state.log_posterior);
209 self.state
210 .log_posterior
211 .iter()
212 .map(|&x| (x - log_normalizer).exp())
213 .collect()
214 }
215}
216
217pub fn log_sum_exp(values: &[f64]) -> f64 {
219 if values.is_empty() {
220 return f64::NEG_INFINITY;
221 }
222
223 let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
224
225 if max_val.is_infinite() && max_val < 0.0 {
226 return max_val;
227 }
228
229 let sum: f64 = values
230 .iter()
231 .filter(|&&x| x.is_finite())
232 .map(|&x| (x - max_val).exp())
233 .sum();
234
235 if sum == 0.0 {
236 max_val
237 } else {
238 max_val + sum.ln()
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn test_log_sum_exp() {
248 let values = vec![1.0, 2.0, 3.0];
249 let result = log_sum_exp(&values);
250 assert!(result.is_finite());
251 assert!(result > 3.0); }
253
254 #[test]
255 fn test_log_sum_exp_empty() {
256 let values: Vec<f64> = vec![];
257 let result = log_sum_exp(&values);
258 assert_eq!(result, f64::NEG_INFINITY);
259 }
260}