Skip to main content

fdars_core/streaming_depth/
rolling.rs

1//! Rolling reference window with incremental sorted-column updates.
2
3use std::collections::VecDeque;
4
5use super::c2;
6use super::sorted_ref::SortedReferenceState;
7
8/// Sliding window of reference curves with incrementally maintained sorted columns.
9///
10/// When a new curve is pushed and the window is at capacity, the oldest curve
11/// is evicted. For each time point the old value is removed (binary-search +
12/// `Vec::remove`) and the new value is inserted (binary-search + `Vec::insert`).
13///
14/// Complexity per push: O(T x N) due to element shifting in the sorted vectors.
15#[derive(Debug, Clone, PartialEq)]
16pub struct RollingReference {
17    curves: VecDeque<Vec<f64>>,
18    capacity: usize,
19    n_points: usize,
20    sorted_columns: Vec<Vec<f64>>,
21}
22
23impl RollingReference {
24    /// Create an empty rolling window.
25    ///
26    /// * `capacity` -- maximum number of curves in the window (must be >= 1).
27    /// * `n_points` -- number of evaluation points per curve.
28    pub fn new(capacity: usize, n_points: usize) -> Self {
29        assert!(capacity >= 1, "capacity must be at least 1");
30        Self {
31            curves: VecDeque::with_capacity(capacity),
32            capacity,
33            n_points,
34            sorted_columns: (0..n_points)
35                .map(|_| Vec::with_capacity(capacity))
36                .collect(),
37        }
38    }
39
40    /// Push a new curve into the window.
41    ///
42    /// If the window is at capacity, the oldest curve is evicted and returned.
43    /// For each time point, the sorted column is updated incrementally.
44    pub fn push(&mut self, curve: &[f64]) -> Option<Vec<f64>> {
45        assert_eq!(
46            curve.len(),
47            self.n_points,
48            "curve length {} does not match n_points {}",
49            curve.len(),
50            self.n_points
51        );
52
53        let evicted = if self.curves.len() == self.capacity {
54            let old = self
55                .curves
56                .pop_front()
57                .expect("capacity invariant: deque is non-empty");
58            // Remove old values from sorted columns
59            for t in 0..self.n_points {
60                let col = &mut self.sorted_columns[t];
61                let old_val = old[t];
62                let pos = col.partition_point(|&v| v < old_val);
63                // Find exact match (handles duplicates by scanning nearby)
64                let mut found = false;
65                for idx in pos..col.len() {
66                    if col[idx] == old_val {
67                        col.remove(idx);
68                        found = true;
69                        break;
70                    }
71                    if col[idx] > old_val {
72                        break;
73                    }
74                }
75                if !found {
76                    // Fallback: scan from pos backwards for floating-point edge cases
77                    for idx in (0..pos).rev() {
78                        if col[idx] == old_val {
79                            col.remove(idx);
80                            break;
81                        }
82                        if col[idx] < old_val {
83                            break;
84                        }
85                    }
86                }
87            }
88            Some(old)
89        } else {
90            None
91        };
92
93        // Insert new values into sorted columns
94        let new_curve: Vec<f64> = curve.to_vec();
95        for t in 0..self.n_points {
96            let col = &mut self.sorted_columns[t];
97            let val = new_curve[t];
98            let pos = col.partition_point(|&v| v < val);
99            col.insert(pos, val);
100        }
101        self.curves.push_back(new_curve);
102
103        evicted
104    }
105
106    /// Take a snapshot of the current sorted reference state.
107    ///
108    /// This clones the sorted columns. For repeated queries, prefer
109    /// [`mbd_one`](Self::mbd_one) which queries the window directly.
110    #[must_use = "expensive computation whose result should not be discarded"]
111    pub fn snapshot(&self) -> SortedReferenceState {
112        SortedReferenceState {
113            sorted_columns: self.sorted_columns.clone(),
114            nori: self.curves.len(),
115            n_points: self.n_points,
116        }
117    }
118
119    /// Compute rank-based MBD for a single curve directly against the current window.
120    ///
121    /// Avoids the overhead of cloning sorted columns into a snapshot.
122    pub fn mbd_one(&self, curve: &[f64]) -> f64 {
123        let n = self.curves.len();
124        if n < 2 || self.n_points == 0 {
125            return 0.0;
126        }
127        assert_eq!(
128            curve.len(),
129            self.n_points,
130            "curve length {} does not match n_points {}",
131            curve.len(),
132            self.n_points
133        );
134        let cn2 = c2(n);
135        let mut total = 0usize;
136        for t in 0..self.n_points {
137            let col = &self.sorted_columns[t];
138            let below = col.partition_point(|&v| v < curve[t]);
139            let at_or_below = col.partition_point(|&v| v <= curve[t]);
140            let above = n - at_or_below;
141            total += cn2 - c2(below) - c2(above);
142        }
143        total as f64 / (cn2 as f64 * self.n_points as f64)
144    }
145
146    /// Number of curves currently in the window.
147    #[inline]
148    pub fn len(&self) -> usize {
149        self.curves.len()
150    }
151
152    /// Whether the window is empty.
153    #[inline]
154    pub fn is_empty(&self) -> bool {
155        self.curves.is_empty()
156    }
157
158    /// Maximum capacity of the window.
159    #[inline]
160    pub fn capacity(&self) -> usize {
161        self.capacity
162    }
163}