Skip to main content

fdars_core/streaming_depth/
fraiman_muniz.rs

1//! Streaming Fraiman-Muniz depth estimator.
2
3use crate::iter_maybe_parallel;
4use crate::matrix::FdMatrix;
5#[cfg(feature = "parallel")]
6use rayon::iter::ParallelIterator;
7
8use super::sorted_ref::SortedReferenceState;
9use super::StreamingDepth;
10
11/// Streaming Fraiman-Muniz depth estimator.
12///
13/// Uses binary search on sorted columns to compute the empirical CDF at each
14/// time point: Fn(x) = #{ref <= x} / N.
15///
16/// Per-query complexity: **O(T x log N)** instead of O(T x N).
17#[derive(Debug, Clone, PartialEq)]
18pub struct StreamingFraimanMuniz {
19    state: SortedReferenceState,
20    scale: bool,
21}
22
23impl StreamingFraimanMuniz {
24    pub fn new(state: SortedReferenceState, scale: bool) -> Self {
25        Self { state, scale }
26    }
27
28    #[inline]
29    fn fm_one_inner(&self, curve: &[f64]) -> f64 {
30        let n = self.state.nori;
31        if n == 0 {
32            return 0.0;
33        }
34        let t_len = self.state.n_points;
35        if t_len == 0 {
36            return 0.0;
37        }
38        let scale_factor = if self.scale { 2.0 } else { 1.0 };
39        let mut depth_sum = 0.0;
40        for t in 0..t_len {
41            let col = &self.state.sorted_columns[t];
42            let at_or_below = col.partition_point(|&v| v <= curve[t]);
43            let fn_x = at_or_below as f64 / n as f64;
44            depth_sum += fn_x.min(1.0 - fn_x) * scale_factor;
45        }
46        depth_sum / t_len as f64
47    }
48
49    /// Compute FM depth for row `row` of `data` without allocating a temporary Vec.
50    #[inline]
51    fn fm_one_from_row(&self, data: &FdMatrix, row: usize) -> f64 {
52        let n = self.state.nori;
53        if n == 0 {
54            return 0.0;
55        }
56        let t_len = self.state.n_points;
57        if t_len == 0 {
58            return 0.0;
59        }
60        let scale_factor = if self.scale { 2.0 } else { 1.0 };
61        let mut depth_sum = 0.0;
62        for t in 0..t_len {
63            let col = &self.state.sorted_columns[t];
64            let at_or_below = col.partition_point(|&v| v <= data[(row, t)]);
65            let fn_x = at_or_below as f64 / n as f64;
66            depth_sum += fn_x.min(1.0 - fn_x) * scale_factor;
67        }
68        depth_sum / t_len as f64
69    }
70}
71
72impl StreamingDepth for StreamingFraimanMuniz {
73    fn depth_one(&self, curve: &[f64]) -> f64 {
74        self.fm_one_inner(curve)
75    }
76
77    fn depth_batch(&self, data_obj: &FdMatrix) -> Vec<f64> {
78        let nobj = data_obj.nrows();
79        if nobj == 0 || self.state.n_points == 0 || self.state.nori == 0 {
80            return vec![0.0; nobj];
81        }
82        iter_maybe_parallel!(0..nobj)
83            .map(|i| self.fm_one_from_row(data_obj, i))
84            .collect()
85    }
86
87    fn n_points(&self) -> usize {
88        self.state.n_points
89    }
90
91    fn n_reference(&self) -> usize {
92        self.state.nori
93    }
94}