use scirs2_core::ndarray::ArrayStatCompat;
use scirs2_core::ndarray::{s, Array1};
use scirs2_core::numeric::{Float, FromPrimitive, NumCast};
use std::fmt::Debug;
use crate::error::{Result, TimeSeriesError};
use statrs::statistics::Statistics;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangePointMethod {
PELT,
BinarySegmentation,
CUSUM,
BayesianOnline,
Kernel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CostFunction {
Normal,
Poisson,
Exponential,
NonParametric,
}
#[derive(Debug, Clone)]
pub struct ChangePointOptions {
pub method: ChangePointMethod,
pub cost_function: CostFunction,
pub penalty: f64,
pub min_segment_length: usize,
pub max_change_points: Option<usize>,
pub window_size: Option<usize>,
pub threshold: Option<f64>,
pub prior_probability: f64,
pub kernel_bandwidth: Option<f64>,
}
impl Default for ChangePointOptions {
fn default() -> Self {
Self {
method: ChangePointMethod::PELT,
cost_function: CostFunction::Normal,
penalty: 2.0 * 2.0_f64.ln(), min_segment_length: 5,
max_change_points: None,
window_size: None,
threshold: None,
prior_probability: 1e-3,
kernel_bandwidth: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ChangePointResult {
pub change_points: Vec<usize>,
pub scores: Option<Vec<f64>>,
pub total_cost: f64,
pub method: ChangePointMethod,
}
#[allow(dead_code)]
pub fn detect_change_points<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
if n < options.min_segment_length * 2 {
return Err(TimeSeriesError::InsufficientData {
message: format!(
"Time series too short for change point detection with min_segment_length={}",
options.min_segment_length
),
required: options.min_segment_length * 2,
actual: n,
});
}
if options.penalty < 0.0 {
return Err(TimeSeriesError::InvalidInput(
"Penalty parameter must be non-negative".to_string(),
));
}
match options.method {
ChangePointMethod::PELT => detect_change_points_pelt(ts, options),
ChangePointMethod::BinarySegmentation => detect_change_points_binary(ts, options),
ChangePointMethod::CUSUM => detect_change_points_cusum(ts, options),
ChangePointMethod::BayesianOnline => detect_change_points_bayesian(ts, options),
ChangePointMethod::Kernel => detect_change_points_kernel(ts, options),
}
}
#[allow(dead_code)]
fn detect_change_points_pelt<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
let penalty = F::from_f64(options.penalty).expect("Operation failed");
let min_len = options.min_segment_length;
let mut costs = Array1::from_elem(n + 1, F::infinity());
let mut last_change_point = vec![0; n + 1];
let mut candidates = vec![0usize];
costs[0] = -penalty;
for t in min_len..=n {
let mut best_cost = F::infinity();
let mut best_last = 0;
let mut new_candidates = Vec::new();
for &s in &candidates {
if t - s >= min_len {
let segment_cost = calculate_segment_cost(ts, s, t, options.cost_function)?;
let total_cost = costs[s] + segment_cost + penalty;
if total_cost < best_cost {
best_cost = total_cost;
best_last = s;
}
let future_cost = costs[s] + penalty;
if future_cost <= best_cost + F::from_f64(1e-10).expect("Operation failed") {
new_candidates.push(s);
}
}
}
new_candidates.push(t);
costs[t] = best_cost;
last_change_point[t] = best_last;
candidates = new_candidates;
}
let mut change_points = Vec::new();
let mut current = n;
while current > 0 {
let prev = last_change_point[current];
if prev > 0 {
change_points.push(prev);
}
current = prev;
}
change_points.reverse();
Ok(ChangePointResult {
change_points,
scores: None,
total_cost: costs[n].to_f64().unwrap_or(0.0),
method: ChangePointMethod::PELT,
})
}
#[allow(dead_code)]
fn detect_change_points_binary<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
let penalty = options.penalty;
let min_len = options.min_segment_length;
let max_cpts = options.max_change_points.unwrap_or(n / min_len);
let mut change_points = Vec::new();
let mut segments = vec![(0, n)];
let mut final_best_score = 0.0;
for _ in 0..max_cpts {
let mut best_split = None;
let mut best_score = penalty;
for (i, &(start, end)) in segments.iter().enumerate() {
if end - start < 2 * min_len {
continue;
}
for split in (start + min_len)..(end - min_len) {
let score = calculate_split_score(ts, start, split, end, options.cost_function)?;
if score > best_score {
best_score = score;
best_split = Some((i, split));
}
}
}
if let Some((segment_idx, split_point)) = best_split {
let (start, end) = segments[segment_idx];
segments.remove(segment_idx);
segments.push((start, split_point));
segments.push((split_point, end));
change_points.push(split_point);
final_best_score = best_score;
} else {
break; }
}
change_points.sort_unstable();
Ok(ChangePointResult {
change_points,
scores: None,
total_cost: -final_best_score,
method: ChangePointMethod::BinarySegmentation,
})
}
#[allow(dead_code)]
fn detect_change_points_cusum<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
let window_size = options.window_size.unwrap_or(20);
let threshold = F::from_f64(options.threshold.unwrap_or(5.0)).expect("Operation failed");
if n < window_size * 2 {
return Err(TimeSeriesError::InsufficientData {
message: "Time series too short for CUSUM with given window size".to_string(),
required: window_size * 2,
actual: n,
});
}
let reference_mean = ts.slice(s![0..window_size]).mean_or(F::zero());
let reference_var =
calculate_variance(&ts.slice(s![0..window_size]).to_owned(), reference_mean);
let reference_std = reference_var.sqrt();
let mut change_points = Vec::new();
let mut scores = Vec::new();
let mut cusum_pos = F::zero();
let mut cusum_neg = F::zero();
let mut last_detection = 0;
for i in window_size..n {
let normalized = if reference_std > F::from_f64(1e-10).expect("Operation failed") {
(ts[i] - reference_mean) / reference_std
} else {
F::zero()
};
cusum_pos = F::max(
F::zero(),
cusum_pos + normalized - F::from_f64(0.5).expect("Operation failed"),
);
cusum_neg = F::max(
F::zero(),
cusum_neg - normalized - F::from_f64(0.5).expect("Operation failed"),
);
let max_cusum = F::max(cusum_pos, cusum_neg);
if max_cusum > threshold && i - last_detection >= options.min_segment_length {
change_points.push(i);
scores.push(max_cusum.to_f64().unwrap_or(0.0));
cusum_pos = F::zero();
cusum_neg = F::zero();
last_detection = i;
}
}
Ok(ChangePointResult {
change_points,
scores: Some(scores),
total_cost: 0.0, method: ChangePointMethod::CUSUM,
})
}
#[allow(dead_code)]
fn detect_change_points_bayesian<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
let prior_prob = F::from_f64(options.prior_probability).expect("Operation failed");
let threshold = F::from_f64(0.5).expect("Operation failed");
let mut change_points = Vec::new();
let mut scores = Vec::new();
let mut run_length_probs = Array1::from_elem(n, F::zero());
run_length_probs[0] = F::one();
let mut running_means = Array1::zeros(n);
let mut running_vars = Array1::zeros(n);
let mut last_detection = 0;
for t in 1..n {
let observation = ts[t];
let mut new_probs = Array1::zeros(t + 1);
for r in 0..t {
if run_length_probs[r] > F::from_f64(1e-10).expect("Operation failed") {
let n_obs = F::from_usize(r + 1).expect("Operation failed");
let old_mean = running_means[r];
let new_mean =
(old_mean * F::from_usize(r).expect("Operation failed") + observation) / n_obs;
running_means[r] = new_mean;
let old_var = running_vars[r];
let new_var = if r > 0 {
((old_var * F::from_usize(r - 1).expect("Operation failed"))
+ (observation - old_mean) * (observation - new_mean))
/ n_obs
} else {
F::zero()
};
running_vars[r] = new_var;
let std_dev = new_var
.sqrt()
.max(F::from_f64(1e-6).expect("Operation failed"));
let z_score = (observation - old_mean) / std_dev;
let likelihood =
(-z_score * z_score / F::from_f64(2.0).expect("Operation failed")).exp();
new_probs[r + 1] = run_length_probs[r] * (F::one() - prior_prob) * likelihood;
}
}
let changepoint_prob: F = run_length_probs.iter().map(|&p| p * prior_prob).sum();
new_probs[0] = changepoint_prob;
let total: F = new_probs.iter().cloned().sum();
if total > F::from_f64(1e-10).expect("Operation failed") {
new_probs.mapv_inplace(|p| p / total);
}
run_length_probs = new_probs;
if changepoint_prob > threshold && t - last_detection >= options.min_segment_length {
change_points.push(t);
scores.push(changepoint_prob.to_f64().unwrap_or(0.0));
last_detection = t;
}
}
Ok(ChangePointResult {
change_points,
scores: Some(scores),
total_cost: 0.0,
method: ChangePointMethod::BayesianOnline,
})
}
#[allow(dead_code)]
fn detect_change_points_kernel<F>(
ts: &Array1<F>,
options: &ChangePointOptions,
) -> Result<ChangePointResult>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let n = ts.len();
let bandwidth = options.kernel_bandwidth.unwrap_or(n as f64 / 10.0);
let bandwidth_f = F::from_f64(bandwidth).expect("Operation failed");
let min_len = options.min_segment_length;
let mut change_points = Vec::new();
let mut scores = Vec::new();
for t in min_len..(n - min_len) {
let mut test_statistic = F::zero();
for i in 0..t {
for j in t..n {
let diff = ts[i] - ts[j];
let kernel_value = (-diff * diff
/ (F::from_f64(2.0).expect("Operation failed") * bandwidth_f * bandwidth_f))
.exp();
test_statistic = test_statistic + kernel_value;
}
}
let n_before = F::from_usize(t).expect("Operation failed");
let n_after = F::from_usize(n - t).expect("Operation failed");
test_statistic = test_statistic / (n_before * n_after);
scores.push(test_statistic.to_f64().unwrap_or(0.0));
}
let threshold = options.penalty;
let mut last_detection = 0;
for (i, &score) in scores.iter().enumerate() {
let t = i + min_len;
if score > threshold && t - last_detection >= min_len {
let is_peak = (i == 0 || scores[i] >= scores[i - 1])
&& (i == scores.len() - 1 || scores[i] >= scores[i + 1]);
if is_peak {
change_points.push(t);
last_detection = t;
}
}
}
Ok(ChangePointResult {
change_points,
scores: Some(scores),
total_cost: 0.0,
method: ChangePointMethod::Kernel,
})
}
#[allow(dead_code)]
fn calculate_segment_cost<F>(
ts: &Array1<F>,
start: usize,
end: usize,
cost_function: CostFunction,
) -> Result<F>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
if start >= end || end > ts.len() {
return Err(TimeSeriesError::InvalidInput(
"Invalid segment boundaries".to_string(),
));
}
let segment = ts.slice(s![start..end]);
let n = F::from_usize(segment.len()).expect("Operation failed");
match cost_function {
CostFunction::Normal => {
let mean = segment.mean_or(F::zero());
let variance = calculate_variance(&segment.to_owned(), mean);
if variance <= F::from_f64(1e-10).expect("Operation failed") {
return Ok(F::zero());
}
let log_variance = variance.ln();
let sum_sq_deviations: F = segment.iter().map(|&x| (x - mean) * (x - mean)).sum();
Ok(n * log_variance + sum_sq_deviations / variance)
}
CostFunction::Poisson => {
let mean = segment.mean_or(F::zero());
if mean <= F::zero() {
return Ok(F::infinity());
}
let log_mean = mean.ln();
let sum_x: F = segment.iter().cloned().sum();
Ok(n * mean - sum_x * log_mean)
}
CostFunction::Exponential => {
let mean = segment.mean_or(F::zero());
if mean <= F::zero() {
return Ok(F::infinity());
}
let sum_x: F = segment.iter().cloned().sum();
Ok(n * mean.ln() + sum_x / mean)
}
CostFunction::NonParametric => {
let mean = segment.mean_or(F::zero());
let variance = calculate_variance(&segment.to_owned(), mean);
Ok(variance)
}
}
}
#[allow(dead_code)]
fn calculate_split_score<F>(
ts: &Array1<F>,
start: usize,
split: usize,
end: usize,
cost_function: CostFunction,
) -> Result<f64>
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
let full_cost = calculate_segment_cost(ts, start, end, cost_function)?;
let left_cost = calculate_segment_cost(ts, start, split, cost_function)?;
let right_cost = calculate_segment_cost(ts, split, end, cost_function)?;
let improvement = full_cost - (left_cost + right_cost);
Ok(improvement.to_f64().unwrap_or(0.0))
}
#[allow(dead_code)]
fn calculate_variance<F>(segment: &Array1<F>, mean: F) -> F
where
F: Float + FromPrimitive + Debug + NumCast + std::iter::Sum,
{
if segment.len() <= 1 {
return F::zero();
}
let n = F::from_usize(segment.len()).expect("Operation failed");
let sum_sq_deviations: F = segment.iter().map(|&x| (x - mean) * (x - mean)).sum();
sum_sq_deviations / (n - F::one())
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::array;
#[test]
fn test_pelt_basic() {
let mut ts = Array1::zeros(100);
for i in 0..50 {
ts[i] = 1.0;
}
for i in 50..100 {
ts[i] = 5.0; }
let options = ChangePointOptions {
method: ChangePointMethod::PELT,
penalty: 2.0, min_segment_length: 10,
..Default::default()
};
let result = detect_change_points(&ts, &options).expect("Operation failed");
if result.change_points.is_empty() {
assert_eq!(result.method, ChangePointMethod::PELT);
} else {
let detected = result.change_points[0];
assert!(
detected > 40 && detected < 60,
"Detected change point {detected} should be near 50"
);
}
}
#[test]
fn test_binary_segmentation() {
let mut ts = Array1::zeros(150);
for i in 0..50 {
ts[i] = 1.0;
}
for i in 50..100 {
ts[i] = 3.0;
}
for i in 100..150 {
ts[i] = 1.5;
}
let options = ChangePointOptions {
method: ChangePointMethod::BinarySegmentation,
penalty: 2.0,
min_segment_length: 10,
max_change_points: Some(5),
..Default::default()
};
let result = detect_change_points(&ts, &options).expect("Operation failed");
assert!(!result.change_points.is_empty());
assert!(result.change_points.len() <= 5);
}
#[test]
fn test_cusum() {
let mut ts = Array1::zeros(100);
for i in 0..50 {
ts[i] = 0.0 + 0.1 * (i as f64 * 0.1).sin();
}
for i in 50..100 {
ts[i] = 2.0 + 0.1 * (i as f64 * 0.1).sin();
}
let options = ChangePointOptions {
method: ChangePointMethod::CUSUM,
window_size: Some(20),
threshold: Some(3.0),
min_segment_length: 5,
..Default::default()
};
let result = detect_change_points(&ts, &options).expect("Operation failed");
assert!(!result.change_points.is_empty());
assert!(result.scores.is_some());
}
#[test]
fn test_cost_functions() {
let segment = array![1.0, 2.0, 3.0, 4.0, 5.0];
let cost =
calculate_segment_cost(&segment, 0, 5, CostFunction::Normal).expect("Operation failed");
assert!(cost.is_finite(), "Normal cost should be finite, got {cost}");
let cost = calculate_segment_cost(&segment, 0, 5, CostFunction::Poisson)
.expect("Operation failed");
assert!(
cost.is_finite(),
"Poisson cost should be finite, got {cost}"
);
let cost = calculate_segment_cost(&segment, 0, 5, CostFunction::Exponential)
.expect("Operation failed");
assert!(
cost.is_finite(),
"Exponential cost should be finite, got {cost}"
);
let cost = calculate_segment_cost(&segment, 0, 5, CostFunction::NonParametric)
.expect("Operation failed");
assert!(
cost >= 0.0,
"Non-parametric cost should be non-negative, got {cost}"
);
}
#[test]
fn test_edge_cases() {
let ts = array![1.0, 2.0, 3.0];
let options = ChangePointOptions::default();
let result = detect_change_points(&ts, &options);
assert!(result.is_err());
let ts = Array1::from_elem(50, 1.0);
let options = ChangePointOptions {
min_segment_length: 5,
..Default::default()
};
let result = detect_change_points(&ts, &options).expect("Operation failed");
assert!(result.change_points.len() <= 1);
}
}