Skip to main content

fin_stream/norm/
mod.rs

1//! Rolling-window coordinate normalization for financial time series.
2//!
3//! ## Purpose
4//!
5//! Machine-learning pipelines require features in a bounded numeric range.
6//! For streaming market data, a global min/max is unavailable; this module
7//! maintains a **rolling window** of the last `W` observations and maps each
8//! new sample into `[0.0, 1.0]` using the running min and max.
9//!
10//! ## Formula
11//!
12//! Given a window of observations `x_1 ... x_W` with minimum `m` and maximum
13//! `M`, the normalized value of a new sample `x` is:
14//!
15//! ```text
16//!     x_norm = (x - m) / (M - m)    if M != m
17//!     x_norm = 0.0                  if M == m  (degenerate; single-valued window)
18//! ```
19//!
20//! The result is clamped to `[0.0, 1.0]` to handle the case where `x` falls
21//! outside the current window range.
22//!
23//! ## Guarantees
24//!
25//! - Non-panicking: all operations return `Result` or `Option`.
26//! - The window is a fixed-size ring buffer; once full, the oldest value is
27//!   evicted on each new observation.
28//! - `MinMaxNormalizer` is `Send` but not `Sync`; wrap in `Mutex` for shared
29//!   multi-thread access.
30
31use crate::error::StreamError;
32use std::collections::VecDeque;
33
34/// Rolling min-max normalizer over a sliding window of `f64` observations.
35///
36/// # Example
37///
38/// ```rust
39/// use fin_stream::norm::MinMaxNormalizer;
40///
41/// let mut norm = MinMaxNormalizer::new(4);
42/// norm.update(10.0);
43/// norm.update(20.0);
44/// norm.update(30.0);
45/// norm.update(40.0);
46///
47/// // 40.0 is the current max; 10.0 is the current min
48/// let v = norm.normalize(40.0).unwrap();
49/// assert!((v - 1.0).abs() < 1e-10);
50/// ```
51pub struct MinMaxNormalizer {
52    window_size: usize,
53    window: VecDeque<f64>,
54    cached_min: f64,
55    cached_max: f64,
56    dirty: bool,
57}
58
59impl MinMaxNormalizer {
60    /// Create a new normalizer with the given rolling window size.
61    ///
62    /// `window_size` must be at least 1. Passing 0 is a programming error;
63    /// this function will panic in debug mode and is flagged by the
64    /// `clippy::panic` lint configured in `Cargo.toml`.
65    ///
66    /// # Panics
67    ///
68    /// Panics if `window_size == 0`.
69    pub fn new(window_size: usize) -> Self {
70        // This is an API misuse guard; the window size of 0 makes the
71        // normalizer semantically undefined. The panic is intentional and
72        // documented. Production callers should validate before calling.
73        if window_size == 0 {
74            panic!("MinMaxNormalizer::new: window_size must be > 0");
75        }
76        Self {
77            window_size,
78            window: VecDeque::with_capacity(window_size),
79            cached_min: f64::MAX,
80            cached_max: f64::MIN,
81            dirty: false,
82        }
83    }
84
85    /// Add a new observation to the rolling window.
86    ///
87    /// If the window is full, the oldest value is evicted. After the call,
88    /// the internal min/max cache is marked dirty and will be recomputed lazily
89    /// on the next call to [`normalize`](Self::normalize) or
90    /// [`min_max`](Self::min_max).
91    ///
92    /// # Complexity: O(1) amortized
93    pub fn update(&mut self, value: f64) {
94        if self.window.len() == self.window_size {
95            self.window.pop_front();
96            self.dirty = true;
97        }
98        self.window.push_back(value);
99        // Eager update is cheaper than a full recompute when we don't evict.
100        if !self.dirty {
101            if value < self.cached_min { self.cached_min = value; }
102            if value > self.cached_max { self.cached_max = value; }
103        }
104    }
105
106    /// Recompute min and max from the full window.
107    ///
108    /// Called lazily when `dirty` is set (eviction occurred). O(W).
109    fn recompute(&mut self) {
110        self.cached_min = f64::MAX;
111        self.cached_max = f64::MIN;
112        for &v in &self.window {
113            if v < self.cached_min { self.cached_min = v; }
114            if v > self.cached_max { self.cached_max = v; }
115        }
116        self.dirty = false;
117    }
118
119    /// Return the current `(min, max)` of the window.
120    ///
121    /// Returns `None` if the window is empty.
122    ///
123    /// # Complexity: O(1) when the cache is clean; O(W) after an eviction.
124    pub fn min_max(&mut self) -> Option<(f64, f64)> {
125        if self.window.is_empty() {
126            return None;
127        }
128        if self.dirty {
129            self.recompute();
130        }
131        Some((self.cached_min, self.cached_max))
132    }
133
134    /// Normalize `value` into `[0.0, 1.0]` using the current window.
135    ///
136    /// The value is clamped so that even if `value` falls outside the window
137    /// range the result is always in `[0.0, 1.0]`.
138    ///
139    /// # Errors
140    ///
141    /// Returns `NormalizationError` if the window is empty (no observations
142    /// have been fed yet).
143    ///
144    /// # Complexity: O(1) when cache is clean; O(W) after an eviction.
145    pub fn normalize(&mut self, value: f64) -> Result<f64, StreamError> {
146        let (min, max) = self.min_max().ok_or_else(|| StreamError::NormalizationError {
147            reason: "window is empty; call update() before normalize()".into(),
148        })?;
149        if (max - min).abs() < f64::EPSILON {
150            // Degenerate: all values in the window are identical.
151            return Ok(0.0);
152        }
153        let normalized = (value - min) / (max - min);
154        Ok(normalized.clamp(0.0, 1.0))
155    }
156
157    /// Reset the normalizer, clearing all observations and the cache.
158    pub fn reset(&mut self) {
159        self.window.clear();
160        self.cached_min = f64::MAX;
161        self.cached_max = f64::MIN;
162        self.dirty = false;
163    }
164
165    /// Number of observations currently in the window.
166    pub fn len(&self) -> usize {
167        self.window.len()
168    }
169
170    /// Returns `true` if no observations have been added since construction or
171    /// the last reset.
172    pub fn is_empty(&self) -> bool {
173        self.window.is_empty()
174    }
175
176    /// The configured window size.
177    pub fn window_size(&self) -> usize {
178        self.window_size
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    // ── Construction ─────────────────────────────────────────────────────────
187
188    #[test]
189    fn test_new_normalizer_is_empty() {
190        let n = MinMaxNormalizer::new(4);
191        assert!(n.is_empty());
192        assert_eq!(n.len(), 0);
193    }
194
195    // ── Normalization range [0, 1] ────────────────────────────────────────────
196
197    #[test]
198    fn test_normalize_min_is_zero() {
199        let mut n = MinMaxNormalizer::new(4);
200        n.update(10.0);
201        n.update(20.0);
202        n.update(30.0);
203        n.update(40.0);
204        let v = n.normalize(10.0).unwrap();
205        assert!((v - 0.0).abs() < 1e-10, "min should normalize to 0.0, got {v}");
206    }
207
208    #[test]
209    fn test_normalize_max_is_one() {
210        let mut n = MinMaxNormalizer::new(4);
211        n.update(10.0);
212        n.update(20.0);
213        n.update(30.0);
214        n.update(40.0);
215        let v = n.normalize(40.0).unwrap();
216        assert!((v - 1.0).abs() < 1e-10, "max should normalize to 1.0, got {v}");
217    }
218
219    #[test]
220    fn test_normalize_midpoint_is_half() {
221        let mut n = MinMaxNormalizer::new(4);
222        n.update(0.0);
223        n.update(100.0);
224        let v = n.normalize(50.0).unwrap();
225        assert!((v - 0.5).abs() < 1e-10);
226    }
227
228    #[test]
229    fn test_normalize_result_clamped_below_zero() {
230        let mut n = MinMaxNormalizer::new(4);
231        n.update(50.0);
232        n.update(100.0);
233        // 10.0 is below the window min of 50.0
234        let v = n.normalize(10.0).unwrap();
235        assert!(v >= 0.0);
236        assert_eq!(v, 0.0);
237    }
238
239    #[test]
240    fn test_normalize_result_clamped_above_one() {
241        let mut n = MinMaxNormalizer::new(4);
242        n.update(50.0);
243        n.update(100.0);
244        // 200.0 is above the window max of 100.0
245        let v = n.normalize(200.0).unwrap();
246        assert!(v <= 1.0);
247        assert_eq!(v, 1.0);
248    }
249
250    #[test]
251    fn test_normalize_all_same_values_returns_zero() {
252        let mut n = MinMaxNormalizer::new(4);
253        n.update(5.0);
254        n.update(5.0);
255        n.update(5.0);
256        let v = n.normalize(5.0).unwrap();
257        assert_eq!(v, 0.0);
258    }
259
260    // ── Empty window error ────────────────────────────────────────────────────
261
262    #[test]
263    fn test_normalize_empty_window_returns_error() {
264        let mut n = MinMaxNormalizer::new(4);
265        let err = n.normalize(1.0).unwrap_err();
266        assert!(matches!(err, StreamError::NormalizationError { .. }));
267    }
268
269    #[test]
270    fn test_min_max_empty_returns_none() {
271        let mut n = MinMaxNormalizer::new(4);
272        assert!(n.min_max().is_none());
273    }
274
275    // ── Rolling window eviction ───────────────────────────────────────────────
276
277    /// After the window fills and the minimum is evicted, the new min must
278    /// reflect the remaining values.
279    #[test]
280    fn test_rolling_window_evicts_oldest() {
281        let mut n = MinMaxNormalizer::new(3);
282        n.update(1.0); // will be evicted
283        n.update(5.0);
284        n.update(10.0);
285        n.update(20.0); // evicts 1.0
286        let (min, max) = n.min_max().unwrap();
287        assert_eq!(min, 5.0);
288        assert_eq!(max, 20.0);
289    }
290
291    #[test]
292    fn test_rolling_window_len_does_not_exceed_capacity() {
293        let mut n = MinMaxNormalizer::new(3);
294        for i in 0..10 {
295            n.update(i as f64);
296        }
297        assert_eq!(n.len(), 3);
298    }
299
300    // ── Reset behavior ────────────────────────────────────────────────────────
301
302    #[test]
303    fn test_reset_clears_window() {
304        let mut n = MinMaxNormalizer::new(4);
305        n.update(10.0);
306        n.update(20.0);
307        n.reset();
308        assert!(n.is_empty());
309        assert!(n.min_max().is_none());
310    }
311
312    #[test]
313    fn test_normalize_works_after_reset() {
314        let mut n = MinMaxNormalizer::new(4);
315        n.update(10.0);
316        n.reset();
317        n.update(0.0);
318        n.update(100.0);
319        let v = n.normalize(100.0).unwrap();
320        assert!((v - 1.0).abs() < 1e-10);
321    }
322
323    // ── Streaming update ──────────────────────────────────────────────────────
324
325    #[test]
326    fn test_streaming_updates_monotone_sequence() {
327        let mut n = MinMaxNormalizer::new(5);
328        let prices = [100.0, 101.0, 102.0, 103.0, 104.0, 105.0];
329        for &p in &prices {
330            n.update(p);
331        }
332        // Window now holds [101, 102, 103, 104, 105]; min=101, max=105
333        let v_min = n.normalize(101.0).unwrap();
334        let v_max = n.normalize(105.0).unwrap();
335        assert!((v_min - 0.0).abs() < 1e-10);
336        assert!((v_max - 1.0).abs() < 1e-10);
337    }
338
339    #[test]
340    fn test_normalization_monotonicity_in_window() {
341        let mut n = MinMaxNormalizer::new(10);
342        for i in 0..10 {
343            n.update(i as f64 * 10.0);
344        }
345        // Values 0, 10, 20, ..., 90 in window; min=0, max=90
346        let v0 = n.normalize(0.0).unwrap();
347        let v50 = n.normalize(50.0).unwrap();
348        let v90 = n.normalize(90.0).unwrap();
349        assert!(v0 < v50, "normalized values should be monotone");
350        assert!(v50 < v90, "normalized values should be monotone");
351    }
352}