Skip to main content

anomstream_core/
matrix_profile.rs

1//! Matrix profile — batch time-series discord / motif detector
2//! (STOMP, Zhu et al. 2016).
3//!
4//! For a univariate series `T` of length `n` and a window length
5//! `m`, the matrix profile `P[i]` is the z-normalised Euclidean
6//! distance from the subsequence `T[i..i+m]` to its nearest
7//! *non-trivial* neighbour in `T` (i.e. skipping a small exclusion
8//! zone around `i`). `P` localises anomalies two ways:
9//!
10//! - **Discord** — `argmax P[i]`: subsequence least similar to
11//!   anything else in the series. Analogue of a point-wise outlier,
12//!   but at the shape level. Ideal for "this one window looks
13//!   unlike anything we've seen before" detection.
14//! - **Motif** — `argmin P[i]`: most-repeated shape. Useful for
15//!   carving out the dominant beaconing or periodic pattern before
16//!   feeding residuals to another detector.
17//!
18//! Complements [`crate::ShingledForest`]: the shingled forest is an
19//! online, approximate, tree-based detector; the matrix profile is
20//! an exact, batch, distance-based detector. Run the forest on the
21//! hot stream, run the matrix profile on a captured window when
22//! forensic-grade exactness matters.
23//!
24//! STOMP computes the profile in `O(n²)` time with `O(n)` memory,
25//! using the diagonal recurrence
26//! `QT[i, j] = QT[i-1, j-1] + T[i+m-1]·T[j+m-1] - T[i-1]·T[j-1]`
27//! over pre-computed per-subsequence means / standard deviations.
28//!
29//! # References
30//!
31//! 1. Y. Zhu, Z. Zimmerman, N. Senobari, C. Yeh, G. Funning,
32//!    A. Mueen, P. Brisk, E. Keogh, "Matrix Profile II: Exploiting
33//!    a Novel Algorithm and GPUs to Break the One Hundred Million
34//!    Barrier for Time Series Motifs and Joins", ICDM 2016.
35//! 2. C. Yeh, Y. Zhu, L. Ulanova, N. Begum, Y. Ding, H. A. Dau,
36//!    D. F. Silva, A. Mueen, E. Keogh, "Matrix Profile I: All
37//!    Pairs Similarity Joins for Time Series", ICDM 2016.
38
39use alloc::vec;
40use alloc::vec::Vec;
41
42#[cfg(not(feature = "std"))]
43#[allow(unused_imports)]
44use num_traits::Float;
45
46use crate::error::{RcfError, RcfResult};
47
48/// Minimum window length. STOMP requires at least `m = 4` for the
49/// z-normalisation to be meaningful.
50pub const MIN_WINDOW: usize = 4;
51
52/// Upper bound on window length. STOMP is `O(n²)` in the series
53/// length and `O(n · m)` in the window-seed step; a runaway
54/// `window` turns a forensic call into a compute bomb. `10 000`
55/// keeps the worst-case first-column seed at `~10⁸` multiplies
56/// (sub-second on a modern core); callers that legitimately need
57/// longer windows should pre-downsample or chunk the series.
58pub const MAX_WINDOW: usize = 10_000;
59
60/// Computed matrix profile for a fixed `(series, window)` pair.
61///
62/// The profile array is always in 1-to-1 correspondence with the
63/// `n − m + 1` candidate subsequences; `profile[i]` is the nearest
64/// non-trivial-neighbour distance for subsequence `T[i..i+m]`, and
65/// `index[i]` is that neighbour's starting offset.
66///
67/// # Examples
68///
69/// ```
70/// use anomstream_core::MatrixProfile;
71///
72/// // Synthetic: smooth cosine with one injected spike near i=48.
73/// let mut series: Vec<f64> = (0..128)
74///     .map(|i| (f64::from(i as i32) * 0.3).cos())
75///     .collect();
76/// for v in &mut series[48..56] {
77///     *v += 5.0;
78/// }
79/// let mp = MatrixProfile::compute(&series, 8, None).expect("mp");
80/// let (pos, _score) = mp.discord();
81/// assert!((40..=56).contains(&pos));
82/// ```
83#[derive(Clone, Debug)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85pub struct MatrixProfile {
86    /// Per-subsequence nearest-neighbour distance.
87    profile: Vec<f64>,
88    /// Per-subsequence nearest-neighbour index.
89    index: Vec<usize>,
90    /// Window length used to compute this profile.
91    window: usize,
92    /// Exclusion-zone half-width used during the join.
93    exclusion_zone: usize,
94}
95
96impl MatrixProfile {
97    /// Run STOMP over `series` with subsequence length `window`.
98    ///
99    /// `exclusion_zone` is the half-width of the trivial-match
100    /// band around each query (`|i − j| < exclusion_zone` is
101    /// skipped). Pass `None` for the conventional `ceil(window / 4)`
102    /// default (Keogh / Mueen matrix-profile tutorials).
103    ///
104    /// # Complexity
105    ///
106    /// Time: `O(n²)` over the series length `n`, with an extra
107    /// `O(n · m)` one-time cost on the first-column seed (where
108    /// `m = window`). Practical wall-clock: ~3 ms at
109    /// `(n = 1 024, m = 32)`, ~49 ms at `(n = 4 096, m = 128)`
110    /// on a modern core. Budget aggressively — doubling `n`
111    /// quadruples the cost. Do **not** call on hot-path streams;
112    /// reserve for forensic windows captured by the online
113    /// [`crate::ShingledForest`] or triage batch jobs.
114    ///
115    /// Memory: `O(n)` for the profile + `O(n)` for the scratch
116    /// dot-product column.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`RcfError::InvalidConfig`] when
121    /// `window < MIN_WINDOW`, `window > MAX_WINDOW`, when the
122    /// series is too short (`series.len() < 2 · window`), when
123    /// `series` contains a non-finite value, or when
124    /// `exclusion_zone` would leave zero valid neighbours.
125    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
126    pub fn compute(
127        series: &[f64],
128        window: usize,
129        exclusion_zone: Option<usize>,
130    ) -> RcfResult<Self> {
131        if window < MIN_WINDOW {
132            return Err(RcfError::InvalidConfig(
133                alloc::format!("MatrixProfile: window {window} < MIN_WINDOW {MIN_WINDOW}").into(),
134            ));
135        }
136        if window > MAX_WINDOW {
137            return Err(RcfError::InvalidConfig(
138                alloc::format!("MatrixProfile: window {window} > MAX_WINDOW {MAX_WINDOW}").into(),
139            ));
140        }
141        let n = series.len();
142        if n < window * 2 {
143            return Err(RcfError::InvalidConfig(
144                alloc::format!(
145                    "MatrixProfile: series len {n} must be ≥ 2·window ({})",
146                    window * 2
147                )
148                .into(),
149            ));
150        }
151        if series.iter().any(|v| !v.is_finite()) {
152            return Err(RcfError::InvalidConfig(
153                alloc::string::ToString::to_string(
154                    "MatrixProfile: series contains non-finite values",
155                )
156                .into(),
157            ));
158        }
159        let subseq_n = n - window + 1;
160        let exclusion_zone = exclusion_zone.unwrap_or_else(|| window.div_ceil(4));
161        if exclusion_zone >= subseq_n {
162            return Err(RcfError::InvalidConfig(
163                alloc::format!(
164                    "MatrixProfile: exclusion_zone {exclusion_zone} ≥ subseq count {subseq_n}"
165                )
166                .into(),
167            ));
168        }
169
170        let (means, stds) = sliding_stats(series, window);
171        // First column of the QT matrix — sliding dot products of
172        // `series` against the prefix `series[0..window]`.
173        let qt_first = sliding_dot_product(series, &series[0..window]);
174        let mut qt = qt_first.clone();
175
176        let mut profile = vec![f64::INFINITY; subseq_n];
177        let mut index = vec![0_usize; subseq_n];
178
179        update_row(
180            &mut profile,
181            &mut index,
182            &qt,
183            0,
184            window,
185            &means,
186            &stds,
187            exclusion_zone,
188        );
189
190        for j in 1..subseq_n {
191            // Diagonal recurrence — must iterate top-down in
192            // reverse so `qt[i]` reads the previous-iteration
193            // `qt[i-1]` before it is overwritten.
194            for i in (1..subseq_n).rev() {
195                qt[i] = qt[i - 1] + series[i + window - 1] * series[j + window - 1]
196                    - series[i - 1] * series[j - 1];
197            }
198            qt[0] = qt_first[j];
199            update_row(
200                &mut profile,
201                &mut index,
202                &qt,
203                j,
204                window,
205                &means,
206                &stds,
207                exclusion_zone,
208            );
209        }
210
211        Ok(Self {
212            profile,
213            index,
214            window,
215            exclusion_zone,
216        })
217    }
218
219    /// Window length used when computing this profile.
220    #[must_use]
221    pub fn window(&self) -> usize {
222        self.window
223    }
224
225    /// Exclusion-zone half-width used when computing this profile.
226    #[must_use]
227    pub fn exclusion_zone(&self) -> usize {
228        self.exclusion_zone
229    }
230
231    /// Number of subsequences (`n − m + 1`).
232    #[must_use]
233    pub fn len(&self) -> usize {
234        self.profile.len()
235    }
236
237    /// `true` when the profile holds zero subsequences — never
238    /// returned by [`Self::compute`], provided as a total accessor.
239    #[must_use]
240    pub fn is_empty(&self) -> bool {
241        self.profile.is_empty()
242    }
243
244    /// Per-subsequence nearest-neighbour distance vector.
245    #[must_use]
246    pub fn profile(&self) -> &[f64] {
247        &self.profile
248    }
249
250    /// Per-subsequence nearest-neighbour index vector.
251    #[must_use]
252    pub fn profile_index(&self) -> &[usize] {
253        &self.index
254    }
255
256    /// Discord — subsequence whose nearest neighbour is farthest.
257    /// Returns `(start_index, distance)`.
258    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
259    pub fn discord(&self) -> (usize, f64) {
260        self.profile
261            .iter()
262            .enumerate()
263            .max_by(|a, b| a.1.total_cmp(b.1))
264            .map_or((0, f64::NAN), |(i, d)| (i, *d))
265    }
266
267    /// Top-`k` discords ranked by descending distance. `k` is
268    /// clamped to [`Self::len`]. Uses a greedy suppression pass
269    /// that skips any candidate within the exclusion zone of an
270    /// already-emitted discord — prevents the top-`k` from
271    /// clustering inside a single anomalous region.
272    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
273    pub fn discord_topk(&self, k: usize) -> Vec<(usize, f64)> {
274        let mut candidates: Vec<(usize, f64)> = self.profile.iter().copied().enumerate().collect();
275        candidates.sort_by(|a, b| b.1.total_cmp(&a.1));
276        let mut out: Vec<(usize, f64)> = Vec::with_capacity(k.min(candidates.len()));
277        for (pos, dist) in candidates {
278            if out.len() >= k {
279                break;
280            }
281            if !dist.is_finite() {
282                continue;
283            }
284            if out
285                .iter()
286                .any(|(p, _)| p.abs_diff(pos) < self.exclusion_zone)
287            {
288                continue;
289            }
290            out.push((pos, dist));
291        }
292        out
293    }
294
295    /// Motif — subsequence whose nearest neighbour is closest.
296    /// Returns `(start_index, distance)`.
297    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
298    pub fn motif(&self) -> (usize, f64) {
299        self.profile
300            .iter()
301            .enumerate()
302            .min_by(|a, b| a.1.total_cmp(b.1))
303            .map_or((0, f64::NAN), |(i, d)| (i, *d))
304    }
305}
306
307/// Sliding mean and standard deviation of every length-`window`
308/// window of `series`. Output has length `n − window + 1`.
309#[allow(clippy::cast_precision_loss)]
310fn sliding_stats(series: &[f64], window: usize) -> (Vec<f64>, Vec<f64>) {
311    let n = series.len();
312    let subseq_n = n - window + 1;
313    let w = window as f64;
314
315    let mut sum = 0.0_f64;
316    let mut sum_sq = 0.0_f64;
317    for &v in &series[0..window] {
318        sum += v;
319        sum_sq += v * v;
320    }
321
322    let mut means = vec![0.0_f64; subseq_n];
323    let mut stds = vec![0.0_f64; subseq_n];
324    means[0] = sum / w;
325    let var0 = (sum_sq / w - means[0] * means[0]).max(0.0);
326    stds[0] = var0.sqrt();
327
328    for i in 1..subseq_n {
329        let drop = series[i - 1];
330        let add = series[i + window - 1];
331        sum += add - drop;
332        sum_sq += add * add - drop * drop;
333        let mean = sum / w;
334        let var = (sum_sq / w - mean * mean).max(0.0);
335        means[i] = mean;
336        stds[i] = var.sqrt();
337    }
338    (means, stds)
339}
340
341/// Sliding dot product of `series` against a fixed `query` of
342/// length `m`. Naïve `O(n · m)` — used once to seed the first
343/// column of `QT`; subsequent columns ride the diagonal recurrence.
344fn sliding_dot_product(series: &[f64], query: &[f64]) -> Vec<f64> {
345    let m = query.len();
346    let subseq_n = series.len() - m + 1;
347    let mut out = vec![0.0_f64; subseq_n];
348    for i in 0..subseq_n {
349        let mut acc = 0.0_f64;
350        for k in 0..m {
351            acc += series[i + k] * query[k];
352        }
353        out[i] = acc;
354    }
355    out
356}
357
358/// Fold one row of the current `QT` column into the running
359/// profile. `j` is the query index; `qt[i]` is the dot product of
360/// `series[i..i+m]` with `series[j..j+m]`.
361#[allow(clippy::too_many_arguments, clippy::cast_precision_loss)]
362fn update_row(
363    profile: &mut [f64],
364    index: &mut [usize],
365    qt: &[f64],
366    j: usize,
367    window: usize,
368    means: &[f64],
369    stds: &[f64],
370    exclusion_zone: usize,
371) {
372    let m = window as f64;
373    let sigma_j = stds[j];
374    for i in 0..qt.len() {
375        if i.abs_diff(j) < exclusion_zone {
376            continue;
377        }
378        let sigma_i = stds[i];
379        // Flat (constant) subsequence → distance undefined. Skip
380        // rather than propagate `NaN`.
381        if sigma_i == 0.0 || sigma_j == 0.0 {
382            continue;
383        }
384        let numer = qt[i] - m * means[i] * means[j];
385        let denom = m * sigma_i * sigma_j;
386        // Clamp to the `[−1, 1]` Pearson range. QT noise in flat
387        // regions can push the ratio slightly out of band.
388        let corr = (numer / denom).clamp(-1.0, 1.0);
389        let dist_sq = (2.0 * m * (1.0 - corr)).max(0.0);
390        let dist = dist_sq.sqrt();
391        if dist < profile[i] {
392            profile[i] = dist;
393            index[i] = j;
394        }
395    }
396}
397
398#[cfg(test)]
399#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
400mod tests {
401    use super::*;
402
403    fn cosine_series(n: usize, freq: f64) -> Vec<f64> {
404        (0..n).map(|i| (i as f64 * freq).cos()).collect()
405    }
406
407    #[test]
408    fn compute_rejects_tiny_window() {
409        let data = cosine_series(128, 0.3);
410        assert!(MatrixProfile::compute(&data, 3, None).is_err());
411    }
412
413    #[test]
414    fn compute_rejects_oversized_window() {
415        let data = cosine_series(32_000, 0.3);
416        assert!(MatrixProfile::compute(&data, MAX_WINDOW + 1, None).is_err());
417    }
418
419    #[test]
420    fn compute_rejects_short_series() {
421        let data = cosine_series(10, 0.3);
422        assert!(MatrixProfile::compute(&data, 8, None).is_err());
423    }
424
425    #[test]
426    fn compute_rejects_non_finite_input() {
427        let mut data = cosine_series(128, 0.3);
428        data[42] = f64::NAN;
429        assert!(MatrixProfile::compute(&data, 8, None).is_err());
430    }
431
432    #[test]
433    fn profile_length_equals_subsequence_count() {
434        let data = cosine_series(128, 0.3);
435        let mp = MatrixProfile::compute(&data, 16, None).unwrap();
436        assert_eq!(mp.len(), 128 - 16 + 1);
437        assert_eq!(mp.profile().len(), mp.len());
438        assert_eq!(mp.profile_index().len(), mp.len());
439    }
440
441    #[test]
442    fn discord_finds_injected_anomaly() {
443        let mut data = cosine_series(256, 0.25);
444        // Inject a shape anomaly — large triangular pulse.
445        for (k, v) in data.iter_mut().enumerate().skip(120).take(16) {
446            *v += (k - 120) as f64 * 0.8;
447        }
448        let mp = MatrixProfile::compute(&data, 16, None).unwrap();
449        let (pos, dist) = mp.discord();
450        assert!(
451            (100..=140).contains(&pos),
452            "discord at unexpected position {pos}"
453        );
454        assert!(dist.is_finite() && dist > 0.0);
455    }
456
457    #[test]
458    fn motif_finds_repeated_shape() {
459        // Pure cosine → every window is a near-copy of some other
460        // window → motif distance is tiny.
461        let data = cosine_series(256, 0.2);
462        let mp = MatrixProfile::compute(&data, 16, None).unwrap();
463        let (_, d) = mp.motif();
464        assert!(d < 0.5, "motif dist {d} unexpectedly large");
465    }
466
467    #[test]
468    fn exclusion_zone_respected() {
469        let data = cosine_series(128, 0.3);
470        let mp = MatrixProfile::compute(&data, 16, Some(8)).unwrap();
471        for (i, &j) in mp.profile_index().iter().enumerate() {
472            // Skip entries whose profile is infinite (can happen if
473            // every candidate is flat).
474            if mp.profile()[i].is_finite() {
475                assert!(
476                    i.abs_diff(j) >= 8,
477                    "neighbour inside exclusion zone: i={i} j={j}"
478                );
479            }
480        }
481    }
482
483    #[test]
484    fn discord_topk_suppresses_within_exclusion_zone() {
485        let mut data = cosine_series(512, 0.25);
486        for (k, v) in data.iter_mut().enumerate().skip(128).take(16) {
487            *v += (k - 128) as f64 * 0.8;
488        }
489        for (k, v) in data.iter_mut().enumerate().skip(320).take(16) {
490            *v -= (k - 320) as f64 * 0.8;
491        }
492        let mp = MatrixProfile::compute(&data, 16, None).unwrap();
493        let top = mp.discord_topk(2);
494        assert_eq!(top.len(), 2);
495        assert!(top[0].0.abs_diff(top[1].0) >= mp.exclusion_zone());
496    }
497
498    #[test]
499    fn accessors_mirror_inputs() {
500        let data = cosine_series(128, 0.3);
501        let mp = MatrixProfile::compute(&data, 16, Some(6)).unwrap();
502        assert_eq!(mp.window(), 16);
503        assert_eq!(mp.exclusion_zone(), 6);
504        assert!(!mp.is_empty());
505    }
506
507    #[cfg(all(feature = "serde", feature = "postcard"))]
508    #[test]
509    fn postcard_roundtrip_preserves_profile() {
510        let data = cosine_series(128, 0.3);
511        let mp = MatrixProfile::compute(&data, 16, None).unwrap();
512        let bytes = postcard::to_allocvec(&mp).expect("serde ok");
513        let back: MatrixProfile = postcard::from_bytes(&bytes).expect("serde ok");
514        assert_eq!(mp.profile(), back.profile());
515        assert_eq!(mp.profile_index(), back.profile_index());
516        assert_eq!(mp.window(), back.window());
517        assert_eq!(mp.exclusion_zone(), back.exclusion_zone());
518    }
519}