fin_stream/norm/mod.rs
1//! Rolling-window coordinate normalization for financial time series.
2//!
3//! ## Purpose
4//!
5//! Machine-learning pipelines require features in a bounded numeric range.
6//! For streaming market data, a global min/max is unavailable; this module
7//! maintains a **rolling window** of the last `W` observations and maps each
8//! new sample into `[0.0, 1.0]` using the running min and max.
9//!
10//! ## Formula
11//!
12//! Given a window of observations `x_1 ... x_W` with minimum `m` and maximum
13//! `M`, the normalized value of a new sample `x` is:
14//!
15//! ```text
16//! x_norm = (x - m) / (M - m) if M != m
17//! x_norm = 0.0 if M == m (degenerate; single-valued window)
18//! ```
19//!
20//! The result is clamped to `[0.0, 1.0]` to handle the case where `x` falls
21//! outside the current window range.
22//!
23//! ## Guarantees
24//!
25//! - Non-panicking: all operations return `Result` or `Option`.
26//! - The window is a fixed-size ring buffer; once full, the oldest value is
27//! evicted on each new observation.
28//! - `MinMaxNormalizer` is `Send` but not `Sync`; wrap in `Mutex` for shared
29//! multi-thread access.
30
31use crate::error::StreamError;
32use std::collections::VecDeque;
33
34/// Rolling min-max normalizer over a sliding window of `f64` observations.
35///
36/// # Example
37///
38/// ```rust
39/// use fin_stream::norm::MinMaxNormalizer;
40///
41/// let mut norm = MinMaxNormalizer::new(4);
42/// norm.update(10.0);
43/// norm.update(20.0);
44/// norm.update(30.0);
45/// norm.update(40.0);
46///
47/// // 40.0 is the current max; 10.0 is the current min
48/// let v = norm.normalize(40.0).unwrap();
49/// assert!((v - 1.0).abs() < 1e-10);
50/// ```
51pub struct MinMaxNormalizer {
52 window_size: usize,
53 window: VecDeque<f64>,
54 cached_min: f64,
55 cached_max: f64,
56 dirty: bool,
57}
58
59impl MinMaxNormalizer {
60 /// Create a new normalizer with the given rolling window size.
61 ///
62 /// `window_size` must be at least 1. Passing 0 is a programming error;
63 /// this function will panic in debug mode and is flagged by the
64 /// `clippy::panic` lint configured in `Cargo.toml`.
65 ///
66 /// # Panics
67 ///
68 /// Panics if `window_size == 0`.
69 pub fn new(window_size: usize) -> Self {
70 // This is an API misuse guard; the window size of 0 makes the
71 // normalizer semantically undefined. The panic is intentional and
72 // documented. Production callers should validate before calling.
73 if window_size == 0 {
74 panic!("MinMaxNormalizer::new: window_size must be > 0");
75 }
76 Self {
77 window_size,
78 window: VecDeque::with_capacity(window_size),
79 cached_min: f64::MAX,
80 cached_max: f64::MIN,
81 dirty: false,
82 }
83 }
84
85 /// Add a new observation to the rolling window.
86 ///
87 /// If the window is full, the oldest value is evicted. After the call,
88 /// the internal min/max cache is marked dirty and will be recomputed lazily
89 /// on the next call to [`normalize`](Self::normalize) or
90 /// [`min_max`](Self::min_max).
91 ///
92 /// # Complexity: O(1) amortized
93 pub fn update(&mut self, value: f64) {
94 if self.window.len() == self.window_size {
95 self.window.pop_front();
96 self.dirty = true;
97 }
98 self.window.push_back(value);
99 // Eager update is cheaper than a full recompute when we don't evict.
100 if !self.dirty {
101 if value < self.cached_min { self.cached_min = value; }
102 if value > self.cached_max { self.cached_max = value; }
103 }
104 }
105
106 /// Recompute min and max from the full window.
107 ///
108 /// Called lazily when `dirty` is set (eviction occurred). O(W).
109 fn recompute(&mut self) {
110 self.cached_min = f64::MAX;
111 self.cached_max = f64::MIN;
112 for &v in &self.window {
113 if v < self.cached_min { self.cached_min = v; }
114 if v > self.cached_max { self.cached_max = v; }
115 }
116 self.dirty = false;
117 }
118
119 /// Return the current `(min, max)` of the window.
120 ///
121 /// Returns `None` if the window is empty.
122 ///
123 /// # Complexity: O(1) when the cache is clean; O(W) after an eviction.
124 pub fn min_max(&mut self) -> Option<(f64, f64)> {
125 if self.window.is_empty() {
126 return None;
127 }
128 if self.dirty {
129 self.recompute();
130 }
131 Some((self.cached_min, self.cached_max))
132 }
133
134 /// Normalize `value` into `[0.0, 1.0]` using the current window.
135 ///
136 /// The value is clamped so that even if `value` falls outside the window
137 /// range the result is always in `[0.0, 1.0]`.
138 ///
139 /// # Errors
140 ///
141 /// Returns `NormalizationError` if the window is empty (no observations
142 /// have been fed yet).
143 ///
144 /// # Complexity: O(1) when cache is clean; O(W) after an eviction.
145 pub fn normalize(&mut self, value: f64) -> Result<f64, StreamError> {
146 let (min, max) = self.min_max().ok_or_else(|| StreamError::NormalizationError {
147 reason: "window is empty; call update() before normalize()".into(),
148 })?;
149 if (max - min).abs() < f64::EPSILON {
150 // Degenerate: all values in the window are identical.
151 return Ok(0.0);
152 }
153 let normalized = (value - min) / (max - min);
154 Ok(normalized.clamp(0.0, 1.0))
155 }
156
157 /// Reset the normalizer, clearing all observations and the cache.
158 pub fn reset(&mut self) {
159 self.window.clear();
160 self.cached_min = f64::MAX;
161 self.cached_max = f64::MIN;
162 self.dirty = false;
163 }
164
165 /// Number of observations currently in the window.
166 pub fn len(&self) -> usize {
167 self.window.len()
168 }
169
170 /// Returns `true` if no observations have been added since construction or
171 /// the last reset.
172 pub fn is_empty(&self) -> bool {
173 self.window.is_empty()
174 }
175
176 /// The configured window size.
177 pub fn window_size(&self) -> usize {
178 self.window_size
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 // ── Construction ─────────────────────────────────────────────────────────
187
188 #[test]
189 fn test_new_normalizer_is_empty() {
190 let n = MinMaxNormalizer::new(4);
191 assert!(n.is_empty());
192 assert_eq!(n.len(), 0);
193 }
194
195 // ── Normalization range [0, 1] ────────────────────────────────────────────
196
197 #[test]
198 fn test_normalize_min_is_zero() {
199 let mut n = MinMaxNormalizer::new(4);
200 n.update(10.0);
201 n.update(20.0);
202 n.update(30.0);
203 n.update(40.0);
204 let v = n.normalize(10.0).unwrap();
205 assert!((v - 0.0).abs() < 1e-10, "min should normalize to 0.0, got {v}");
206 }
207
208 #[test]
209 fn test_normalize_max_is_one() {
210 let mut n = MinMaxNormalizer::new(4);
211 n.update(10.0);
212 n.update(20.0);
213 n.update(30.0);
214 n.update(40.0);
215 let v = n.normalize(40.0).unwrap();
216 assert!((v - 1.0).abs() < 1e-10, "max should normalize to 1.0, got {v}");
217 }
218
219 #[test]
220 fn test_normalize_midpoint_is_half() {
221 let mut n = MinMaxNormalizer::new(4);
222 n.update(0.0);
223 n.update(100.0);
224 let v = n.normalize(50.0).unwrap();
225 assert!((v - 0.5).abs() < 1e-10);
226 }
227
228 #[test]
229 fn test_normalize_result_clamped_below_zero() {
230 let mut n = MinMaxNormalizer::new(4);
231 n.update(50.0);
232 n.update(100.0);
233 // 10.0 is below the window min of 50.0
234 let v = n.normalize(10.0).unwrap();
235 assert!(v >= 0.0);
236 assert_eq!(v, 0.0);
237 }
238
239 #[test]
240 fn test_normalize_result_clamped_above_one() {
241 let mut n = MinMaxNormalizer::new(4);
242 n.update(50.0);
243 n.update(100.0);
244 // 200.0 is above the window max of 100.0
245 let v = n.normalize(200.0).unwrap();
246 assert!(v <= 1.0);
247 assert_eq!(v, 1.0);
248 }
249
250 #[test]
251 fn test_normalize_all_same_values_returns_zero() {
252 let mut n = MinMaxNormalizer::new(4);
253 n.update(5.0);
254 n.update(5.0);
255 n.update(5.0);
256 let v = n.normalize(5.0).unwrap();
257 assert_eq!(v, 0.0);
258 }
259
260 // ── Empty window error ────────────────────────────────────────────────────
261
262 #[test]
263 fn test_normalize_empty_window_returns_error() {
264 let mut n = MinMaxNormalizer::new(4);
265 let err = n.normalize(1.0).unwrap_err();
266 assert!(matches!(err, StreamError::NormalizationError { .. }));
267 }
268
269 #[test]
270 fn test_min_max_empty_returns_none() {
271 let mut n = MinMaxNormalizer::new(4);
272 assert!(n.min_max().is_none());
273 }
274
275 // ── Rolling window eviction ───────────────────────────────────────────────
276
277 /// After the window fills and the minimum is evicted, the new min must
278 /// reflect the remaining values.
279 #[test]
280 fn test_rolling_window_evicts_oldest() {
281 let mut n = MinMaxNormalizer::new(3);
282 n.update(1.0); // will be evicted
283 n.update(5.0);
284 n.update(10.0);
285 n.update(20.0); // evicts 1.0
286 let (min, max) = n.min_max().unwrap();
287 assert_eq!(min, 5.0);
288 assert_eq!(max, 20.0);
289 }
290
291 #[test]
292 fn test_rolling_window_len_does_not_exceed_capacity() {
293 let mut n = MinMaxNormalizer::new(3);
294 for i in 0..10 {
295 n.update(i as f64);
296 }
297 assert_eq!(n.len(), 3);
298 }
299
300 // ── Reset behavior ────────────────────────────────────────────────────────
301
302 #[test]
303 fn test_reset_clears_window() {
304 let mut n = MinMaxNormalizer::new(4);
305 n.update(10.0);
306 n.update(20.0);
307 n.reset();
308 assert!(n.is_empty());
309 assert!(n.min_max().is_none());
310 }
311
312 #[test]
313 fn test_normalize_works_after_reset() {
314 let mut n = MinMaxNormalizer::new(4);
315 n.update(10.0);
316 n.reset();
317 n.update(0.0);
318 n.update(100.0);
319 let v = n.normalize(100.0).unwrap();
320 assert!((v - 1.0).abs() < 1e-10);
321 }
322
323 // ── Streaming update ──────────────────────────────────────────────────────
324
325 #[test]
326 fn test_streaming_updates_monotone_sequence() {
327 let mut n = MinMaxNormalizer::new(5);
328 let prices = [100.0, 101.0, 102.0, 103.0, 104.0, 105.0];
329 for &p in &prices {
330 n.update(p);
331 }
332 // Window now holds [101, 102, 103, 104, 105]; min=101, max=105
333 let v_min = n.normalize(101.0).unwrap();
334 let v_max = n.normalize(105.0).unwrap();
335 assert!((v_min - 0.0).abs() < 1e-10);
336 assert!((v_max - 1.0).abs() < 1e-10);
337 }
338
339 #[test]
340 fn test_normalization_monotonicity_in_window() {
341 let mut n = MinMaxNormalizer::new(10);
342 for i in 0..10 {
343 n.update(i as f64 * 10.0);
344 }
345 // Values 0, 10, 20, ..., 90 in window; min=0, max=90
346 let v0 = n.normalize(0.0).unwrap();
347 let v50 = n.normalize(50.0).unwrap();
348 let v90 = n.normalize(90.0).unwrap();
349 assert!(v0 < v50, "normalized values should be monotone");
350 assert!(v50 < v90, "normalized values should be monotone");
351 }
352}