1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! Python bindings.
use pyo3::{PyErr, exceptions::PyRuntimeError};
use crate::Error;
/// Convert Rust to Python error.
impl From<Error> for PyErr {
fn from(err: Error) -> Self {
PyRuntimeError::new_err(err.to_string())
}
}
#[pyo3::pymodule]
mod pelt {
use std::num::NonZero;
use numpy::{Ix1, Ix2, PyArray1, PyArrayLikeDyn};
use pyo3::{exceptions::PyValueError, prelude::*};
use crate::{Pelt, SegmentCostFunction};
/// Calculate the changepoints.
///
/// Arguments
/// ---------
/// signal : :py:class:`numpy.typing.NDArray[numpy.float64] <numpy.typing.NDArray>`
/// 1D or 2D input signal array. Can only contain numbers. ``None`` values are not accepted.
/// penalty : float
/// Penalty value for each changepoint added. Larger values result in fewer
/// changepoints detected.
/// segment_cost_function : str, optional
/// Determines how the cost of each potential segment is calculated.
/// Must be one of:
///
/// * ``"l1"`` - L1 cost function (least absolute deviation)
/// * ``"l2"`` - L2 cost function (least squared deviation)
///
/// Defaults to ``"l1"``.
/// jump : int, optional
/// Step size between candidate changepoint positions. Must be > 0.
/// If ``jump = 1``, a check is done every possible prior change point, guaranteeing an exact solution, finding the true minimum of the objective function.
/// If ``jump > 1``, previous change points are considered at intervals of ``jump``. This speeds up the computation, but the solution becomes approximate.
/// Defaults to 10.
/// minimum_segment_length : int, optional
/// Minimum number of allowable number of data points within a segment.
/// Must be positive. Defaults to 2.
///
/// Returns
/// -------
/// :py:class:`numpy.typing.NDArray[numpy.uint64] <numpy.typing.NDArray>`
/// 1D array of zero-based indices where changes in the signal were detected.
///
/// Raises
/// ------
/// ValueError
/// If the array has invalid dimensions or if any of the parameters are
/// outside their valid ranges.
///
/// Examples
/// --------
/// >>> from pelt import predict
/// >>> changepoints = predict(signal, penalty=20.0, segment_cost_function="l1", jump=10, minimum_segment_length=2)
/// >>> print(changepoints)
///
#[pyfunction(signature = (signal, penalty, segment_cost_function = "l1", jump = 10, minimum_segment_length = 2))]
fn predict<'py>(
py: Python<'py>,
signal: PyArrayLikeDyn<'py, f64>,
penalty: f64,
segment_cost_function: &str,
jump: usize,
minimum_segment_length: usize,
) -> PyResult<Bound<'py, PyArray1<usize>>> {
// Map input parameter to enum
let segment_cost_function = match segment_cost_function {
"l1" => SegmentCostFunction::L1,
"l2" => SegmentCostFunction::L2,
// Handle unknown case
_ => {
return Err(PyValueError::new_err(
"segment_cost_function must be 'l1' or 'l2'",
));
}
};
// Convert types
let jump = NonZero::new(jump).ok_or_else(|| PyValueError::new_err("jump must be > 0"))?;
let minimum_segment_length = NonZero::new(minimum_segment_length)
.ok_or_else(|| PyValueError::new_err("minimum_segment_length must be > 0"))?;
// Do calculation
let setup = Pelt::new()
.with_segment_cost_function(segment_cost_function)
.with_jump(jump)
.with_minimum_segment_length(minimum_segment_length);
// Try to coerce the input into a dimension we can use
let signal = signal.as_array();
let indices = match signal.ndim() {
1 => setup.predict(
signal
.into_dimensionality::<Ix1>()
.map_err(|_| PyValueError::new_err("dimension mismatch"))?,
penalty,
)?,
2 => setup.predict(
signal
.into_dimensionality::<Ix2>()
.map_err(|_| PyValueError::new_err("dimension mismatch"))?,
penalty,
)?,
_ => {
return Err(PyValueError::new_err(
"signal array dimensions must be 1 or 2",
));
}
};
Ok(PyArray1::from_vec(py, indices))
}
}