1use std::time::Instant;
8
9use async_trait::async_trait;
10
11use crate::messages::{
12 ChangePointDetectionInput, ChangePointDetectionOutput, TimeSeriesAnomalyDetectionInput,
13 TimeSeriesAnomalyDetectionOutput,
14};
15use crate::types::{
16 AnomalyMethod, ChangePointMethod, ChangePointResult, TimeSeries, TimeSeriesAnomalyResult,
17};
18use rustkernel_core::{
19 domain::Domain,
20 error::Result,
21 kernel::KernelMetadata,
22 traits::{BatchKernel, GpuKernel},
23};
24
25#[derive(Debug, Clone)]
34pub struct ChangePointDetection {
35 metadata: KernelMetadata,
36}
37
38impl Default for ChangePointDetection {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl ChangePointDetection {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 metadata: KernelMetadata::batch(
50 "temporal/changepoint-detection",
51 Domain::TemporalAnalysis,
52 )
53 .with_description("PELT/Binary segmentation change point detection")
54 .with_throughput(20_000)
55 .with_latency_us(50.0),
56 }
57 }
58
59 pub fn compute(
67 series: &TimeSeries,
68 method: ChangePointMethod,
69 penalty: f64,
70 min_segment: usize,
71 ) -> ChangePointResult {
72 if series.len() < 2 * min_segment {
73 return ChangePointResult {
74 change_points: Vec::new(),
75 confidence: Vec::new(),
76 segment_means: vec![series.mean()],
77 segment_variances: vec![series.variance()],
78 cost: 0.0,
79 };
80 }
81
82 match method {
83 ChangePointMethod::PELT => Self::pelt(series, penalty, min_segment),
84 ChangePointMethod::BinarySegmentation => {
85 Self::binary_segmentation(series, penalty, min_segment)
86 }
87 ChangePointMethod::CUSUM => Self::cusum(series, penalty, min_segment),
88 }
89 }
90
91 fn pelt(series: &TimeSeries, penalty: f64, min_segment: usize) -> ChangePointResult {
93 let n = series.len();
94 let values = &series.values;
95
96 let mut sum = vec![0.0; n + 1];
98 let mut sum_sq = vec![0.0; n + 1];
99
100 for (i, &v) in values.iter().enumerate() {
101 sum[i + 1] = sum[i] + v;
102 sum_sq[i + 1] = sum_sq[i] + v * v;
103 }
104
105 let segment_cost = |start: usize, end: usize| -> f64 {
107 let len = (end - start) as f64;
108 if len < 1.0 {
109 return 0.0;
110 }
111 let s = sum[end] - sum[start];
112 let s2 = sum_sq[end] - sum_sq[start];
113 s2 - (s * s) / len
114 };
115
116 let mut f = vec![f64::INFINITY; n + 1]; let mut cp = vec![0usize; n + 1]; f[0] = -penalty;
120
121 for t in min_segment..=n {
122 let mut best_cost = f64::INFINITY;
123 let mut best_cp = 0;
124
125 for s in 0..=(t - min_segment) {
126 let cost = f[s] + segment_cost(s, t) + penalty;
127 if cost < best_cost {
128 best_cost = cost;
129 best_cp = s;
130 }
131 }
132
133 f[t] = best_cost;
134 cp[t] = best_cp;
135 }
136
137 let mut change_points = Vec::new();
139 let mut idx = n;
140 while cp[idx] > 0 {
141 change_points.push(cp[idx]);
142 idx = cp[idx];
143 }
144 change_points.reverse();
145
146 let confidence = Self::compute_confidence(&change_points, &f, penalty);
148
149 let mut segments = vec![0];
151 segments.extend(&change_points);
152 segments.push(n);
153
154 let segment_means: Vec<f64> = segments
155 .windows(2)
156 .map(|w| {
157 let start = w[0];
158 let end = w[1];
159 (sum[end] - sum[start]) / (end - start) as f64
160 })
161 .collect();
162
163 let segment_variances: Vec<f64> = segments
164 .windows(2)
165 .map(|w| {
166 let start = w[0];
167 let end = w[1];
168 let len = (end - start) as f64;
169 if len <= 1.0 {
170 return 0.0;
171 }
172 let mean = (sum[end] - sum[start]) / len;
173 values[start..end]
174 .iter()
175 .map(|&x| (x - mean).powi(2))
176 .sum::<f64>()
177 / (len - 1.0)
178 })
179 .collect();
180
181 ChangePointResult {
182 change_points,
183 confidence,
184 segment_means,
185 segment_variances,
186 cost: f[n],
187 }
188 }
189
190 fn binary_segmentation(
192 series: &TimeSeries,
193 penalty: f64,
194 min_segment: usize,
195 ) -> ChangePointResult {
196 let values = &series.values;
197 let n = values.len();
198
199 let mut change_points = Vec::new();
200 Self::binary_segment_recursive(values, 0, n, penalty, min_segment, &mut change_points);
201 change_points.sort();
202
203 let mut segments = vec![0];
205 segments.extend(&change_points);
206 segments.push(n);
207
208 let segment_means: Vec<f64> = segments
209 .windows(2)
210 .map(|w| {
211 let seg = &values[w[0]..w[1]];
212 seg.iter().sum::<f64>() / seg.len() as f64
213 })
214 .collect();
215
216 let segment_variances: Vec<f64> = segments
217 .windows(2)
218 .map(|w| {
219 let seg = &values[w[0]..w[1]];
220 let mean = seg.iter().sum::<f64>() / seg.len() as f64;
221 if seg.len() <= 1 {
222 0.0
223 } else {
224 seg.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (seg.len() - 1) as f64
225 }
226 })
227 .collect();
228
229 let confidence = vec![0.5; change_points.len()];
231
232 ChangePointResult {
233 change_points,
234 confidence,
235 segment_means,
236 segment_variances,
237 cost: 0.0,
238 }
239 }
240
241 fn binary_segment_recursive(
243 values: &[f64],
244 start: usize,
245 end: usize,
246 penalty: f64,
247 min_segment: usize,
248 change_points: &mut Vec<usize>,
249 ) {
250 if end - start < 2 * min_segment {
251 return;
252 }
253
254 let mut best_gain = 0.0;
256 let mut best_split = 0;
257
258 let segment = &values[start..end];
259 let n = segment.len();
260
261 let total_mean: f64 = segment.iter().sum::<f64>() / n as f64;
263 let total_cost: f64 = segment.iter().map(|x| (x - total_mean).powi(2)).sum();
264
265 for split in min_segment..(n - min_segment + 1) {
266 let left = &segment[..split];
267 let right = &segment[split..];
268
269 let left_mean: f64 = left.iter().sum::<f64>() / left.len() as f64;
270 let right_mean: f64 = right.iter().sum::<f64>() / right.len() as f64;
271
272 let left_cost: f64 = left.iter().map(|x| (x - left_mean).powi(2)).sum();
273 let right_cost: f64 = right.iter().map(|x| (x - right_mean).powi(2)).sum();
274
275 let gain = total_cost - left_cost - right_cost;
276
277 if gain > best_gain {
278 best_gain = gain;
279 best_split = split;
280 }
281 }
282
283 if best_gain > penalty {
285 let cp = start + best_split;
286 change_points.push(cp);
287
288 Self::binary_segment_recursive(values, start, cp, penalty, min_segment, change_points);
289 Self::binary_segment_recursive(values, cp, end, penalty, min_segment, change_points);
290 }
291 }
292
293 fn cusum(series: &TimeSeries, threshold: f64, min_segment: usize) -> ChangePointResult {
295 let values = &series.values;
296 let n = values.len();
297 let mean = series.mean();
298
299 let mut cusum = vec![0.0; n];
301 let mut cum = 0.0;
302 for (i, &v) in values.iter().enumerate() {
303 cum += v - mean;
304 cusum[i] = cum;
305 }
306
307 let mut change_points = Vec::new();
309 let mut confidence = Vec::new();
310 let mut last_cp = 0;
311
312 for i in min_segment..(n - min_segment) {
313 let max_cusum = cusum[i].abs();
314
315 if max_cusum > threshold && i - last_cp >= min_segment {
316 let is_peak = cusum[i - 1].abs() < max_cusum && cusum[i + 1].abs() < max_cusum;
318
319 if is_peak {
320 change_points.push(i);
321 confidence.push((max_cusum / threshold).min(1.0));
322 last_cp = i;
323 }
324 }
325 }
326
327 let mut segments = vec![0];
329 segments.extend(&change_points);
330 segments.push(n);
331
332 let segment_means: Vec<f64> = segments
333 .windows(2)
334 .map(|w| {
335 let seg = &values[w[0]..w[1]];
336 seg.iter().sum::<f64>() / seg.len() as f64
337 })
338 .collect();
339
340 let segment_variances: Vec<f64> = segments
341 .windows(2)
342 .map(|w| {
343 let seg = &values[w[0]..w[1]];
344 let mean = seg.iter().sum::<f64>() / seg.len() as f64;
345 if seg.len() <= 1 {
346 0.0
347 } else {
348 seg.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (seg.len() - 1) as f64
349 }
350 })
351 .collect();
352
353 ChangePointResult {
354 change_points,
355 confidence,
356 segment_means,
357 segment_variances,
358 cost: 0.0,
359 }
360 }
361
362 fn compute_confidence(change_points: &[usize], costs: &[f64], penalty: f64) -> Vec<f64> {
364 change_points
365 .iter()
366 .map(|&cp| {
367 if cp < costs.len() {
368 let cost_reduction = if cp > 0 {
370 (costs[cp - 1] - costs[cp]).max(0.0)
371 } else {
372 penalty
373 };
374 (cost_reduction / penalty).min(1.0).max(0.0)
375 } else {
376 0.5
377 }
378 })
379 .collect()
380 }
381}
382
383impl GpuKernel for ChangePointDetection {
384 fn metadata(&self) -> &KernelMetadata {
385 &self.metadata
386 }
387}
388
389#[async_trait]
390impl BatchKernel<ChangePointDetectionInput, ChangePointDetectionOutput> for ChangePointDetection {
391 async fn execute(
392 &self,
393 input: ChangePointDetectionInput,
394 ) -> Result<ChangePointDetectionOutput> {
395 let start = Instant::now();
396 let result = Self::compute(
397 &input.series,
398 input.method,
399 input.penalty,
400 input.min_segment,
401 );
402 Ok(ChangePointDetectionOutput {
403 result,
404 compute_time_us: start.elapsed().as_micros() as u64,
405 })
406 }
407}
408
409#[derive(Debug, Clone)]
417pub struct TimeSeriesAnomalyDetection {
418 metadata: KernelMetadata,
419}
420
421impl Default for TimeSeriesAnomalyDetection {
422 fn default() -> Self {
423 Self::new()
424 }
425}
426
427impl TimeSeriesAnomalyDetection {
428 #[must_use]
430 pub fn new() -> Self {
431 Self {
432 metadata: KernelMetadata::ring("temporal/anomaly-detection", Domain::TemporalAnalysis)
433 .with_description("Statistical threshold anomaly detection")
434 .with_throughput(50_000)
435 .with_latency_us(20.0),
436 }
437 }
438
439 pub fn compute(
447 series: &TimeSeries,
448 method: AnomalyMethod,
449 threshold: f64,
450 window: Option<usize>,
451 ) -> TimeSeriesAnomalyResult {
452 if series.is_empty() {
453 return TimeSeriesAnomalyResult {
454 scores: Vec::new(),
455 anomaly_indices: Vec::new(),
456 expected: Vec::new(),
457 threshold,
458 };
459 }
460
461 match method {
462 AnomalyMethod::ZScore => Self::zscore_detection(series, threshold, window),
463 AnomalyMethod::IQR => Self::iqr_detection(series, threshold),
464 AnomalyMethod::MovingAverageDeviation => {
465 Self::moving_average_detection(series, threshold, window.unwrap_or(10))
466 }
467 AnomalyMethod::SeasonalESD => {
468 Self::seasonal_esd_detection(series, threshold, window.unwrap_or(12))
469 }
470 }
471 }
472
473 fn zscore_detection(
475 series: &TimeSeries,
476 threshold: f64,
477 window: Option<usize>,
478 ) -> TimeSeriesAnomalyResult {
479 let n = series.len();
480
481 let (scores, expected) = if let Some(w) = window {
482 Self::rolling_zscore(&series.values, w)
484 } else {
485 let mean = series.mean();
487 let std = series.std_dev();
488
489 let scores: Vec<f64> = series
490 .values
491 .iter()
492 .map(|&v| {
493 if std > 1e-10 {
494 (v - mean).abs() / std
495 } else {
496 0.0
497 }
498 })
499 .collect();
500
501 let expected = vec![mean; n];
502 (scores, expected)
503 };
504
505 let anomaly_indices: Vec<usize> = scores
506 .iter()
507 .enumerate()
508 .filter(|&(_, s)| *s > threshold)
509 .map(|(i, _)| i)
510 .collect();
511
512 TimeSeriesAnomalyResult {
513 scores,
514 anomaly_indices,
515 expected,
516 threshold,
517 }
518 }
519
520 fn rolling_zscore(values: &[f64], window: usize) -> (Vec<f64>, Vec<f64>) {
522 let n = values.len();
523 let w = window.min(n);
524
525 let mut scores = vec![0.0; n];
526 let mut expected = vec![0.0; n];
527
528 for i in 0..n {
529 let start = i.saturating_sub(w);
530 let window_vals: Vec<f64> = values[start..=i.min(n - 1)].to_vec();
531
532 let mean: f64 = window_vals.iter().sum::<f64>() / window_vals.len() as f64;
533 let var: f64 = window_vals.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
534 / window_vals.len() as f64;
535 let std = var.sqrt();
536
537 expected[i] = mean;
538 scores[i] = if std > 1e-10 {
539 (values[i] - mean).abs() / std
540 } else {
541 0.0
542 };
543 }
544
545 (scores, expected)
546 }
547
548 fn iqr_detection(series: &TimeSeries, multiplier: f64) -> TimeSeriesAnomalyResult {
550 let mut sorted = series.values.clone();
551 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
552
553 let n = sorted.len();
554 let q1 = sorted[n / 4];
555 let q3 = sorted[3 * n / 4];
556 let iqr = q3 - q1;
557
558 let lower = q1 - multiplier * iqr;
559 let upper = q3 + multiplier * iqr;
560 let median = sorted[n / 2];
561
562 let scores: Vec<f64> = series
563 .values
564 .iter()
565 .map(|&v| {
566 if v < lower {
567 (lower - v) / iqr
568 } else if v > upper {
569 (v - upper) / iqr
570 } else {
571 0.0
572 }
573 })
574 .collect();
575
576 let anomaly_indices: Vec<usize> = series
577 .values
578 .iter()
579 .enumerate()
580 .filter(|&(_, v)| *v < lower || *v > upper)
581 .map(|(i, _)| i)
582 .collect();
583
584 TimeSeriesAnomalyResult {
585 scores,
586 anomaly_indices,
587 expected: vec![median; n],
588 threshold: multiplier,
589 }
590 }
591
592 fn moving_average_detection(
594 series: &TimeSeries,
595 threshold: f64,
596 window: usize,
597 ) -> TimeSeriesAnomalyResult {
598 let n = series.len();
599 let w = window.min(n);
600
601 let mut expected = vec![0.0; n];
602 let mut scores = vec![0.0; n];
603
604 for i in 0..n {
606 let start = i.saturating_sub(w / 2);
607 let end = (i + w / 2 + 1).min(n);
608 expected[i] = series.values[start..end].iter().sum::<f64>() / (end - start) as f64;
609 }
610
611 let deviations: Vec<f64> = series
613 .values
614 .iter()
615 .zip(expected.iter())
616 .map(|(v, e)| (v - e).abs())
617 .collect();
618
619 let mut sorted_deviations = deviations.clone();
621 sorted_deviations.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
622 let mad = sorted_deviations[n / 2];
623
624 for (i, &dev) in deviations.iter().enumerate() {
625 scores[i] = if mad > 1e-10 { dev / mad } else { 0.0 };
626 }
627
628 let anomaly_indices: Vec<usize> = scores
629 .iter()
630 .enumerate()
631 .filter(|&(_, s)| *s > threshold)
632 .map(|(i, _)| i)
633 .collect();
634
635 TimeSeriesAnomalyResult {
636 scores,
637 anomaly_indices,
638 expected,
639 threshold,
640 }
641 }
642
643 fn seasonal_esd_detection(
646 series: &TimeSeries,
647 threshold: f64,
648 period: usize,
649 ) -> TimeSeriesAnomalyResult {
650 let n = series.len();
651
652 let mut seasonal = vec![0.0; period];
654 let mut counts = vec![0usize; period];
655
656 for (i, &v) in series.values.iter().enumerate() {
657 seasonal[i % period] += v;
658 counts[i % period] += 1;
659 }
660
661 for i in 0..period {
662 if counts[i] > 0 {
663 seasonal[i] /= counts[i] as f64;
664 }
665 }
666
667 let deseasonalized: Vec<f64> = series
669 .values
670 .iter()
671 .enumerate()
672 .map(|(i, &v)| v - seasonal[i % period])
673 .collect();
674
675 let mean: f64 = deseasonalized.iter().sum::<f64>() / n as f64;
677 let std: f64 = (deseasonalized
678 .iter()
679 .map(|x| (x - mean).powi(2))
680 .sum::<f64>()
681 / n as f64)
682 .sqrt();
683
684 let scores: Vec<f64> = deseasonalized
685 .iter()
686 .map(|&v| {
687 if std > 1e-10 {
688 (v - mean).abs() / std
689 } else {
690 0.0
691 }
692 })
693 .collect();
694
695 let expected: Vec<f64> = (0..n).map(|i| mean + seasonal[i % period]).collect();
697
698 let anomaly_indices: Vec<usize> = scores
699 .iter()
700 .enumerate()
701 .filter(|&(_, s)| *s > threshold)
702 .map(|(i, _)| i)
703 .collect();
704
705 TimeSeriesAnomalyResult {
706 scores,
707 anomaly_indices,
708 expected,
709 threshold,
710 }
711 }
712}
713
714impl GpuKernel for TimeSeriesAnomalyDetection {
715 fn metadata(&self) -> &KernelMetadata {
716 &self.metadata
717 }
718}
719
720#[async_trait]
721impl BatchKernel<TimeSeriesAnomalyDetectionInput, TimeSeriesAnomalyDetectionOutput>
722 for TimeSeriesAnomalyDetection
723{
724 async fn execute(
725 &self,
726 input: TimeSeriesAnomalyDetectionInput,
727 ) -> Result<TimeSeriesAnomalyDetectionOutput> {
728 let start = Instant::now();
729 let result = Self::compute(&input.series, input.method, input.threshold, input.window);
730 Ok(TimeSeriesAnomalyDetectionOutput {
731 result,
732 compute_time_us: start.elapsed().as_micros() as u64,
733 })
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740
741 fn create_step_series() -> TimeSeries {
742 let mut values = vec![10.0; 50];
744 values.extend(vec![20.0; 50]);
745 for (i, v) in values.iter_mut().enumerate() {
747 *v += (i as f64 * 0.1).sin();
748 }
749 TimeSeries::new(values)
750 }
751
752 fn create_anomaly_series() -> TimeSeries {
753 let mut values: Vec<f64> = (0..100).map(|i| 10.0 + (i as f64 * 0.1).sin()).collect();
754 values[25] = 50.0;
756 values[75] = -30.0;
757 TimeSeries::new(values)
758 }
759
760 #[test]
761 fn test_changepoint_metadata() {
762 let kernel = ChangePointDetection::new();
763 assert_eq!(kernel.metadata().id, "temporal/changepoint-detection");
764 assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
765 }
766
767 #[test]
768 fn test_pelt_detection() {
769 let series = create_step_series();
770 let result = ChangePointDetection::compute(&series, ChangePointMethod::PELT, 100.0, 10);
771
772 assert!(!result.change_points.is_empty());
774
775 let near_50 = result
777 .change_points
778 .iter()
779 .any(|&cp| (cp as i32 - 50).abs() < 10);
780 assert!(
781 near_50,
782 "Expected change point near 50, got {:?}",
783 result.change_points
784 );
785
786 assert_eq!(result.segment_means.len(), result.change_points.len() + 1);
788 }
789
790 #[test]
791 fn test_binary_segmentation() {
792 let series = create_step_series();
793 let result =
794 ChangePointDetection::compute(&series, ChangePointMethod::BinarySegmentation, 50.0, 10);
795
796 assert!(!result.change_points.is_empty());
798 }
799
800 #[test]
801 fn test_cusum_detection() {
802 let series = create_step_series();
803 let result = ChangePointDetection::compute(&series, ChangePointMethod::CUSUM, 20.0, 10);
804
805 assert!(result.segment_means.len() >= 1);
808 }
809
810 #[test]
811 fn test_anomaly_metadata() {
812 let kernel = TimeSeriesAnomalyDetection::new();
813 assert_eq!(kernel.metadata().id, "temporal/anomaly-detection");
814 }
815
816 #[test]
817 fn test_zscore_anomaly() {
818 let series = create_anomaly_series();
819 let result = TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::ZScore, 3.0, None);
820
821 assert!(!result.anomaly_indices.is_empty());
823 assert!(result.anomaly_indices.contains(&25) || result.anomaly_indices.contains(&75));
824 }
825
826 #[test]
827 fn test_iqr_anomaly() {
828 let series = create_anomaly_series();
829 let result = TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::IQR, 1.5, None);
830
831 assert!(!result.anomaly_indices.is_empty());
833 }
834
835 #[test]
836 fn test_moving_average_anomaly() {
837 let series = create_anomaly_series();
838 let result = TimeSeriesAnomalyDetection::compute(
839 &series,
840 AnomalyMethod::MovingAverageDeviation,
841 5.0,
842 Some(10),
843 );
844
845 assert!(result.expected.len() == series.len());
847 }
848
849 #[test]
850 fn test_seasonal_esd() {
851 let values: Vec<f64> = (0..120)
853 .map(|i| {
854 let seasonal = 10.0 * ((2.0 * std::f64::consts::PI * i as f64 / 12.0).sin());
855 let trend = 100.0;
856 if i == 60 {
857 200.0 } else {
859 trend + seasonal
860 }
861 })
862 .collect();
863 let series = TimeSeries::new(values);
864
865 let result =
866 TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::SeasonalESD, 3.0, Some(12));
867
868 assert!(result.anomaly_indices.contains(&60));
870 }
871
872 #[test]
873 fn test_empty_series() {
874 let empty = TimeSeries::new(Vec::new());
875
876 let cp_result = ChangePointDetection::compute(&empty, ChangePointMethod::PELT, 10.0, 5);
877 assert!(cp_result.change_points.is_empty());
878
879 let anomaly_result =
880 TimeSeriesAnomalyDetection::compute(&empty, AnomalyMethod::ZScore, 3.0, None);
881 assert!(anomaly_result.anomaly_indices.is_empty());
882 }
883}