1use std::time::Instant;
8
9use async_trait::async_trait;
10
11use crate::messages::{
12 ChangePointDetectionInput, ChangePointDetectionOutput, TimeSeriesAnomalyDetectionInput,
13 TimeSeriesAnomalyDetectionOutput,
14};
15use crate::types::{
16 AnomalyMethod, ChangePointMethod, ChangePointResult, TimeSeries, TimeSeriesAnomalyResult,
17};
18use rustkernel_core::{
19 domain::Domain,
20 error::Result,
21 kernel::KernelMetadata,
22 traits::{BatchKernel, GpuKernel},
23};
24
25#[derive(Debug, Clone)]
34pub struct ChangePointDetection {
35 metadata: KernelMetadata,
36}
37
38impl Default for ChangePointDetection {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl ChangePointDetection {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 metadata: KernelMetadata::batch(
50 "temporal/changepoint-detection",
51 Domain::TemporalAnalysis,
52 )
53 .with_description("PELT/Binary segmentation change point detection")
54 .with_throughput(20_000)
55 .with_latency_us(50.0),
56 }
57 }
58
59 pub fn compute(
67 series: &TimeSeries,
68 method: ChangePointMethod,
69 penalty: f64,
70 min_segment: usize,
71 ) -> ChangePointResult {
72 if series.len() < 2 * min_segment {
73 return ChangePointResult {
74 change_points: Vec::new(),
75 confidence: Vec::new(),
76 segment_means: vec![series.mean()],
77 segment_variances: vec![series.variance()],
78 cost: 0.0,
79 };
80 }
81
82 match method {
83 ChangePointMethod::PELT => Self::pelt(series, penalty, min_segment),
84 ChangePointMethod::BinarySegmentation => {
85 Self::binary_segmentation(series, penalty, min_segment)
86 }
87 ChangePointMethod::CUSUM => Self::cusum(series, penalty, min_segment),
88 }
89 }
90
91 #[allow(clippy::needless_range_loop)]
93 fn pelt(series: &TimeSeries, penalty: f64, min_segment: usize) -> ChangePointResult {
94 let n = series.len();
95 let values = &series.values;
96
97 let mut sum = vec![0.0; n + 1];
99 let mut sum_sq = vec![0.0; n + 1];
100
101 for (i, &v) in values.iter().enumerate() {
102 sum[i + 1] = sum[i] + v;
103 sum_sq[i + 1] = sum_sq[i] + v * v;
104 }
105
106 let segment_cost = |start: usize, end: usize| -> f64 {
108 let len = (end - start) as f64;
109 if len < 1.0 {
110 return 0.0;
111 }
112 let s = sum[end] - sum[start];
113 let s2 = sum_sq[end] - sum_sq[start];
114 s2 - (s * s) / len
115 };
116
117 let mut f = vec![f64::INFINITY; n + 1]; let mut cp = vec![0usize; n + 1]; f[0] = -penalty;
121
122 for t in min_segment..=n {
123 let mut best_cost = f64::INFINITY;
124 let mut best_cp = 0;
125
126 for s in 0..=(t - min_segment) {
127 let cost = f[s] + segment_cost(s, t) + penalty;
128 if cost < best_cost {
129 best_cost = cost;
130 best_cp = s;
131 }
132 }
133
134 f[t] = best_cost;
135 cp[t] = best_cp;
136 }
137
138 let mut change_points = Vec::new();
140 let mut idx = n;
141 while cp[idx] > 0 {
142 change_points.push(cp[idx]);
143 idx = cp[idx];
144 }
145 change_points.reverse();
146
147 let confidence = Self::compute_confidence(&change_points, &f, penalty);
149
150 let mut segments = vec![0];
152 segments.extend(&change_points);
153 segments.push(n);
154
155 let segment_means: Vec<f64> = segments
156 .windows(2)
157 .map(|w| {
158 let start = w[0];
159 let end = w[1];
160 (sum[end] - sum[start]) / (end - start) as f64
161 })
162 .collect();
163
164 let segment_variances: Vec<f64> = segments
165 .windows(2)
166 .map(|w| {
167 let start = w[0];
168 let end = w[1];
169 let len = (end - start) as f64;
170 if len <= 1.0 {
171 return 0.0;
172 }
173 let mean = (sum[end] - sum[start]) / len;
174 values[start..end]
175 .iter()
176 .map(|&x| (x - mean).powi(2))
177 .sum::<f64>()
178 / (len - 1.0)
179 })
180 .collect();
181
182 ChangePointResult {
183 change_points,
184 confidence,
185 segment_means,
186 segment_variances,
187 cost: f[n],
188 }
189 }
190
191 fn binary_segmentation(
193 series: &TimeSeries,
194 penalty: f64,
195 min_segment: usize,
196 ) -> ChangePointResult {
197 let values = &series.values;
198 let n = values.len();
199
200 let mut change_points = Vec::new();
201 Self::binary_segment_recursive(values, 0, n, penalty, min_segment, &mut change_points);
202 change_points.sort();
203
204 let mut segments = vec![0];
206 segments.extend(&change_points);
207 segments.push(n);
208
209 let segment_means: Vec<f64> = segments
210 .windows(2)
211 .map(|w| {
212 let seg = &values[w[0]..w[1]];
213 seg.iter().sum::<f64>() / seg.len() as f64
214 })
215 .collect();
216
217 let segment_variances: Vec<f64> = segments
218 .windows(2)
219 .map(|w| {
220 let seg = &values[w[0]..w[1]];
221 let mean = seg.iter().sum::<f64>() / seg.len() as f64;
222 if seg.len() <= 1 {
223 0.0
224 } else {
225 seg.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (seg.len() - 1) as f64
226 }
227 })
228 .collect();
229
230 let confidence = vec![0.5; change_points.len()];
232
233 ChangePointResult {
234 change_points,
235 confidence,
236 segment_means,
237 segment_variances,
238 cost: 0.0,
239 }
240 }
241
242 fn binary_segment_recursive(
244 values: &[f64],
245 start: usize,
246 end: usize,
247 penalty: f64,
248 min_segment: usize,
249 change_points: &mut Vec<usize>,
250 ) {
251 if end - start < 2 * min_segment {
252 return;
253 }
254
255 let mut best_gain = 0.0;
257 let mut best_split = 0;
258
259 let segment = &values[start..end];
260 let n = segment.len();
261
262 let total_mean: f64 = segment.iter().sum::<f64>() / n as f64;
264 let total_cost: f64 = segment.iter().map(|x| (x - total_mean).powi(2)).sum();
265
266 for split in min_segment..(n - min_segment + 1) {
267 let left = &segment[..split];
268 let right = &segment[split..];
269
270 let left_mean: f64 = left.iter().sum::<f64>() / left.len() as f64;
271 let right_mean: f64 = right.iter().sum::<f64>() / right.len() as f64;
272
273 let left_cost: f64 = left.iter().map(|x| (x - left_mean).powi(2)).sum();
274 let right_cost: f64 = right.iter().map(|x| (x - right_mean).powi(2)).sum();
275
276 let gain = total_cost - left_cost - right_cost;
277
278 if gain > best_gain {
279 best_gain = gain;
280 best_split = split;
281 }
282 }
283
284 if best_gain > penalty {
286 let cp = start + best_split;
287 change_points.push(cp);
288
289 Self::binary_segment_recursive(values, start, cp, penalty, min_segment, change_points);
290 Self::binary_segment_recursive(values, cp, end, penalty, min_segment, change_points);
291 }
292 }
293
294 fn cusum(series: &TimeSeries, threshold: f64, min_segment: usize) -> ChangePointResult {
296 let values = &series.values;
297 let n = values.len();
298 let mean = series.mean();
299
300 let mut cusum = vec![0.0; n];
302 let mut cum = 0.0;
303 for (i, &v) in values.iter().enumerate() {
304 cum += v - mean;
305 cusum[i] = cum;
306 }
307
308 let mut change_points = Vec::new();
310 let mut confidence = Vec::new();
311 let mut last_cp = 0;
312
313 for i in min_segment..(n - min_segment) {
314 let max_cusum = cusum[i].abs();
315
316 if max_cusum > threshold && i - last_cp >= min_segment {
317 let is_peak = cusum[i - 1].abs() < max_cusum && cusum[i + 1].abs() < max_cusum;
319
320 if is_peak {
321 change_points.push(i);
322 confidence.push((max_cusum / threshold).min(1.0));
323 last_cp = i;
324 }
325 }
326 }
327
328 let mut segments = vec![0];
330 segments.extend(&change_points);
331 segments.push(n);
332
333 let segment_means: Vec<f64> = segments
334 .windows(2)
335 .map(|w| {
336 let seg = &values[w[0]..w[1]];
337 seg.iter().sum::<f64>() / seg.len() as f64
338 })
339 .collect();
340
341 let segment_variances: Vec<f64> = segments
342 .windows(2)
343 .map(|w| {
344 let seg = &values[w[0]..w[1]];
345 let mean = seg.iter().sum::<f64>() / seg.len() as f64;
346 if seg.len() <= 1 {
347 0.0
348 } else {
349 seg.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (seg.len() - 1) as f64
350 }
351 })
352 .collect();
353
354 ChangePointResult {
355 change_points,
356 confidence,
357 segment_means,
358 segment_variances,
359 cost: 0.0,
360 }
361 }
362
363 fn compute_confidence(change_points: &[usize], costs: &[f64], penalty: f64) -> Vec<f64> {
365 change_points
366 .iter()
367 .map(|&cp| {
368 if cp < costs.len() {
369 let cost_reduction = if cp > 0 {
371 (costs[cp - 1] - costs[cp]).max(0.0)
372 } else {
373 penalty
374 };
375 (cost_reduction / penalty).clamp(0.0, 1.0)
376 } else {
377 0.5
378 }
379 })
380 .collect()
381 }
382}
383
384impl GpuKernel for ChangePointDetection {
385 fn metadata(&self) -> &KernelMetadata {
386 &self.metadata
387 }
388}
389
390#[async_trait]
391impl BatchKernel<ChangePointDetectionInput, ChangePointDetectionOutput> for ChangePointDetection {
392 async fn execute(
393 &self,
394 input: ChangePointDetectionInput,
395 ) -> Result<ChangePointDetectionOutput> {
396 let start = Instant::now();
397 let result = Self::compute(
398 &input.series,
399 input.method,
400 input.penalty,
401 input.min_segment,
402 );
403 Ok(ChangePointDetectionOutput {
404 result,
405 compute_time_us: start.elapsed().as_micros() as u64,
406 })
407 }
408}
409
410#[derive(Debug, Clone)]
418pub struct TimeSeriesAnomalyDetection {
419 metadata: KernelMetadata,
420}
421
422impl Default for TimeSeriesAnomalyDetection {
423 fn default() -> Self {
424 Self::new()
425 }
426}
427
428impl TimeSeriesAnomalyDetection {
429 #[must_use]
431 pub fn new() -> Self {
432 Self {
433 metadata: KernelMetadata::ring("temporal/anomaly-detection", Domain::TemporalAnalysis)
434 .with_description("Statistical threshold anomaly detection")
435 .with_throughput(50_000)
436 .with_latency_us(20.0),
437 }
438 }
439
440 pub fn compute(
448 series: &TimeSeries,
449 method: AnomalyMethod,
450 threshold: f64,
451 window: Option<usize>,
452 ) -> TimeSeriesAnomalyResult {
453 if series.is_empty() {
454 return TimeSeriesAnomalyResult {
455 scores: Vec::new(),
456 anomaly_indices: Vec::new(),
457 expected: Vec::new(),
458 threshold,
459 };
460 }
461
462 match method {
463 AnomalyMethod::ZScore => Self::zscore_detection(series, threshold, window),
464 AnomalyMethod::IQR => Self::iqr_detection(series, threshold),
465 AnomalyMethod::MovingAverageDeviation => {
466 Self::moving_average_detection(series, threshold, window.unwrap_or(10))
467 }
468 AnomalyMethod::SeasonalESD => {
469 Self::seasonal_esd_detection(series, threshold, window.unwrap_or(12))
470 }
471 }
472 }
473
474 fn zscore_detection(
476 series: &TimeSeries,
477 threshold: f64,
478 window: Option<usize>,
479 ) -> TimeSeriesAnomalyResult {
480 let n = series.len();
481
482 let (scores, expected) = if let Some(w) = window {
483 Self::rolling_zscore(&series.values, w)
485 } else {
486 let mean = series.mean();
488 let std = series.std_dev();
489
490 let scores: Vec<f64> = series
491 .values
492 .iter()
493 .map(|&v| {
494 if std > 1e-10 {
495 (v - mean).abs() / std
496 } else {
497 0.0
498 }
499 })
500 .collect();
501
502 let expected = vec![mean; n];
503 (scores, expected)
504 };
505
506 let anomaly_indices: Vec<usize> = scores
507 .iter()
508 .enumerate()
509 .filter(|&(_, s)| *s > threshold)
510 .map(|(i, _)| i)
511 .collect();
512
513 TimeSeriesAnomalyResult {
514 scores,
515 anomaly_indices,
516 expected,
517 threshold,
518 }
519 }
520
521 fn rolling_zscore(values: &[f64], window: usize) -> (Vec<f64>, Vec<f64>) {
523 let n = values.len();
524 let w = window.min(n);
525
526 let mut scores = vec![0.0; n];
527 let mut expected = vec![0.0; n];
528
529 for i in 0..n {
530 let start = i.saturating_sub(w);
531 let window_vals: Vec<f64> = values[start..=i.min(n - 1)].to_vec();
532
533 let mean: f64 = window_vals.iter().sum::<f64>() / window_vals.len() as f64;
534 let var: f64 = window_vals.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
535 / window_vals.len() as f64;
536 let std = var.sqrt();
537
538 expected[i] = mean;
539 scores[i] = if std > 1e-10 {
540 (values[i] - mean).abs() / std
541 } else {
542 0.0
543 };
544 }
545
546 (scores, expected)
547 }
548
549 fn iqr_detection(series: &TimeSeries, multiplier: f64) -> TimeSeriesAnomalyResult {
551 let mut sorted = series.values.clone();
552 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
553
554 let n = sorted.len();
555 let q1 = sorted[n / 4];
556 let q3 = sorted[3 * n / 4];
557 let iqr = q3 - q1;
558
559 let lower = q1 - multiplier * iqr;
560 let upper = q3 + multiplier * iqr;
561 let median = sorted[n / 2];
562
563 let scores: Vec<f64> = series
564 .values
565 .iter()
566 .map(|&v| {
567 if v < lower {
568 (lower - v) / iqr
569 } else if v > upper {
570 (v - upper) / iqr
571 } else {
572 0.0
573 }
574 })
575 .collect();
576
577 let anomaly_indices: Vec<usize> = series
578 .values
579 .iter()
580 .enumerate()
581 .filter(|&(_, v)| *v < lower || *v > upper)
582 .map(|(i, _)| i)
583 .collect();
584
585 TimeSeriesAnomalyResult {
586 scores,
587 anomaly_indices,
588 expected: vec![median; n],
589 threshold: multiplier,
590 }
591 }
592
593 #[allow(clippy::needless_range_loop)]
595 fn moving_average_detection(
596 series: &TimeSeries,
597 threshold: f64,
598 window: usize,
599 ) -> TimeSeriesAnomalyResult {
600 let n = series.len();
601 let w = window.min(n);
602
603 let mut expected = vec![0.0; n];
604 let mut scores = vec![0.0; n];
605
606 for i in 0..n {
608 let start = i.saturating_sub(w / 2);
609 let end = (i + w / 2 + 1).min(n);
610 expected[i] = series.values[start..end].iter().sum::<f64>() / (end - start) as f64;
611 }
612
613 let deviations: Vec<f64> = series
615 .values
616 .iter()
617 .zip(expected.iter())
618 .map(|(v, e)| (v - e).abs())
619 .collect();
620
621 let mut sorted_deviations = deviations.clone();
623 sorted_deviations.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
624 let mad = sorted_deviations[n / 2];
625
626 for (i, &dev) in deviations.iter().enumerate() {
627 scores[i] = if mad > 1e-10 { dev / mad } else { 0.0 };
628 }
629
630 let anomaly_indices: Vec<usize> = scores
631 .iter()
632 .enumerate()
633 .filter(|&(_, s)| *s > threshold)
634 .map(|(i, _)| i)
635 .collect();
636
637 TimeSeriesAnomalyResult {
638 scores,
639 anomaly_indices,
640 expected,
641 threshold,
642 }
643 }
644
645 fn seasonal_esd_detection(
648 series: &TimeSeries,
649 threshold: f64,
650 period: usize,
651 ) -> TimeSeriesAnomalyResult {
652 let n = series.len();
653
654 let mut seasonal = vec![0.0; period];
656 let mut counts = vec![0usize; period];
657
658 for (i, &v) in series.values.iter().enumerate() {
659 seasonal[i % period] += v;
660 counts[i % period] += 1;
661 }
662
663 for i in 0..period {
664 if counts[i] > 0 {
665 seasonal[i] /= counts[i] as f64;
666 }
667 }
668
669 let deseasonalized: Vec<f64> = series
671 .values
672 .iter()
673 .enumerate()
674 .map(|(i, &v)| v - seasonal[i % period])
675 .collect();
676
677 let mean: f64 = deseasonalized.iter().sum::<f64>() / n as f64;
679 let std: f64 = (deseasonalized
680 .iter()
681 .map(|x| (x - mean).powi(2))
682 .sum::<f64>()
683 / n as f64)
684 .sqrt();
685
686 let scores: Vec<f64> = deseasonalized
687 .iter()
688 .map(|&v| {
689 if std > 1e-10 {
690 (v - mean).abs() / std
691 } else {
692 0.0
693 }
694 })
695 .collect();
696
697 let expected: Vec<f64> = (0..n).map(|i| mean + seasonal[i % period]).collect();
699
700 let anomaly_indices: Vec<usize> = scores
701 .iter()
702 .enumerate()
703 .filter(|&(_, s)| *s > threshold)
704 .map(|(i, _)| i)
705 .collect();
706
707 TimeSeriesAnomalyResult {
708 scores,
709 anomaly_indices,
710 expected,
711 threshold,
712 }
713 }
714}
715
716impl GpuKernel for TimeSeriesAnomalyDetection {
717 fn metadata(&self) -> &KernelMetadata {
718 &self.metadata
719 }
720}
721
722#[async_trait]
723impl BatchKernel<TimeSeriesAnomalyDetectionInput, TimeSeriesAnomalyDetectionOutput>
724 for TimeSeriesAnomalyDetection
725{
726 async fn execute(
727 &self,
728 input: TimeSeriesAnomalyDetectionInput,
729 ) -> Result<TimeSeriesAnomalyDetectionOutput> {
730 let start = Instant::now();
731 let result = Self::compute(&input.series, input.method, input.threshold, input.window);
732 Ok(TimeSeriesAnomalyDetectionOutput {
733 result,
734 compute_time_us: start.elapsed().as_micros() as u64,
735 })
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742
743 fn create_step_series() -> TimeSeries {
744 let mut values = vec![10.0; 50];
746 values.extend(vec![20.0; 50]);
747 for (i, v) in values.iter_mut().enumerate() {
749 *v += (i as f64 * 0.1).sin();
750 }
751 TimeSeries::new(values)
752 }
753
754 fn create_anomaly_series() -> TimeSeries {
755 let mut values: Vec<f64> = (0..100).map(|i| 10.0 + (i as f64 * 0.1).sin()).collect();
756 values[25] = 50.0;
758 values[75] = -30.0;
759 TimeSeries::new(values)
760 }
761
762 #[test]
763 fn test_changepoint_metadata() {
764 let kernel = ChangePointDetection::new();
765 assert_eq!(kernel.metadata().id, "temporal/changepoint-detection");
766 assert_eq!(kernel.metadata().domain, Domain::TemporalAnalysis);
767 }
768
769 #[test]
770 fn test_pelt_detection() {
771 let series = create_step_series();
772 let result = ChangePointDetection::compute(&series, ChangePointMethod::PELT, 100.0, 10);
773
774 assert!(!result.change_points.is_empty());
776
777 let near_50 = result
779 .change_points
780 .iter()
781 .any(|&cp| (cp as i32 - 50).abs() < 10);
782 assert!(
783 near_50,
784 "Expected change point near 50, got {:?}",
785 result.change_points
786 );
787
788 assert_eq!(result.segment_means.len(), result.change_points.len() + 1);
790 }
791
792 #[test]
793 fn test_binary_segmentation() {
794 let series = create_step_series();
795 let result =
796 ChangePointDetection::compute(&series, ChangePointMethod::BinarySegmentation, 50.0, 10);
797
798 assert!(!result.change_points.is_empty());
800 }
801
802 #[test]
803 fn test_cusum_detection() {
804 let series = create_step_series();
805 let result = ChangePointDetection::compute(&series, ChangePointMethod::CUSUM, 20.0, 10);
806
807 assert!(!result.segment_means.is_empty());
810 }
811
812 #[test]
813 fn test_anomaly_metadata() {
814 let kernel = TimeSeriesAnomalyDetection::new();
815 assert_eq!(kernel.metadata().id, "temporal/anomaly-detection");
816 }
817
818 #[test]
819 fn test_zscore_anomaly() {
820 let series = create_anomaly_series();
821 let result = TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::ZScore, 3.0, None);
822
823 assert!(!result.anomaly_indices.is_empty());
825 assert!(result.anomaly_indices.contains(&25) || result.anomaly_indices.contains(&75));
826 }
827
828 #[test]
829 fn test_iqr_anomaly() {
830 let series = create_anomaly_series();
831 let result = TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::IQR, 1.5, None);
832
833 assert!(!result.anomaly_indices.is_empty());
835 }
836
837 #[test]
838 fn test_moving_average_anomaly() {
839 let series = create_anomaly_series();
840 let result = TimeSeriesAnomalyDetection::compute(
841 &series,
842 AnomalyMethod::MovingAverageDeviation,
843 5.0,
844 Some(10),
845 );
846
847 assert!(result.expected.len() == series.len());
849 }
850
851 #[test]
852 fn test_seasonal_esd() {
853 let values: Vec<f64> = (0..120)
855 .map(|i| {
856 let seasonal = 10.0 * ((2.0 * std::f64::consts::PI * i as f64 / 12.0).sin());
857 let trend = 100.0;
858 if i == 60 {
859 200.0 } else {
861 trend + seasonal
862 }
863 })
864 .collect();
865 let series = TimeSeries::new(values);
866
867 let result =
868 TimeSeriesAnomalyDetection::compute(&series, AnomalyMethod::SeasonalESD, 3.0, Some(12));
869
870 assert!(result.anomaly_indices.contains(&60));
872 }
873
874 #[test]
875 fn test_empty_series() {
876 let empty = TimeSeries::new(Vec::new());
877
878 let cp_result = ChangePointDetection::compute(&empty, ChangePointMethod::PELT, 10.0, 5);
879 assert!(cp_result.change_points.is_empty());
880
881 let anomaly_result =
882 TimeSeriesAnomalyDetection::compute(&empty, AnomalyMethod::ZScore, 3.0, None);
883 assert!(anomaly_result.anomaly_indices.is_empty());
884 }
885}