Skip to main content

fin_stream/ohlcv/
mod.rs

1//! Real-time tick-to-OHLCV aggregation at arbitrary timeframes.
2//!
3//! ## Responsibility
4//! Aggregate incoming NormalizedTicks into OHLCV bars at configurable
5//! timeframes. Handles bar completion detection and partial-bar access.
6//!
7//! ## Guarantees
8//! - Non-panicking: all operations return Result or Option
9//! - Thread-safe: OhlcvAggregator is Send + Sync
10
11use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15/// Supported bar timeframes.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18    /// Bar duration measured in seconds.
19    Seconds(u64),
20    /// Bar duration measured in minutes.
21    Minutes(u64),
22    /// Bar duration measured in hours.
23    Hours(u64),
24}
25
26impl Timeframe {
27    /// Duration in milliseconds.
28    pub fn duration_ms(self) -> u64 {
29        match self {
30            Timeframe::Seconds(s) => s * 1_000,
31            Timeframe::Minutes(m) => m * 60 * 1_000,
32            Timeframe::Hours(h) => h * 3600 * 1_000,
33        }
34    }
35
36    /// Bar start timestamp for a given ms timestamp.
37    pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
38        let dur = self.duration_ms();
39        (ts_ms / dur) * dur
40    }
41}
42
43/// A completed or partial OHLCV bar.
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45pub struct OhlcvBar {
46    /// Instrument symbol (e.g. `"BTC-USD"`).
47    pub symbol: String,
48    /// Timeframe of this bar.
49    pub timeframe: Timeframe,
50    /// UTC millisecond timestamp of the bar's open boundary.
51    pub bar_start_ms: u64,
52    /// Opening price (first tick's price in the bar window).
53    pub open: Decimal,
54    /// Highest price seen in the bar window.
55    pub high: Decimal,
56    /// Lowest price seen in the bar window.
57    pub low: Decimal,
58    /// Closing price (most recent tick's price in the bar window).
59    pub close: Decimal,
60    /// Total traded volume in this bar.
61    pub volume: Decimal,
62    /// Number of ticks contributing to this bar.
63    pub trade_count: u64,
64    /// `true` once the bar's time window has been closed by a tick in a later window.
65    pub is_complete: bool,
66}
67
68/// Aggregates ticks into OHLCV bars.
69pub struct OhlcvAggregator {
70    symbol: String,
71    timeframe: Timeframe,
72    current_bar: Option<OhlcvBar>,
73    /// When true, `feed` returns synthetic zero-volume bars for any bar windows
74    /// that were skipped between the previous tick and the current one.
75    /// The synthetic bars use the last known close price for all OHLC fields.
76    emit_empty_bars: bool,
77}
78
79impl OhlcvAggregator {
80    /// Create a new aggregator for `symbol` at `timeframe`.
81    ///
82    /// Returns an error if `timeframe.duration_ms()` is zero, which would make
83    /// bar boundary alignment undefined.
84    pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
85        let tf_dur = timeframe.duration_ms();
86        if tf_dur == 0 {
87            return Err(StreamError::ParseError {
88                exchange: "OhlcvAggregator".into(),
89                reason: "timeframe duration must be > 0".into(),
90            });
91        }
92        Ok(Self {
93            symbol: symbol.into(),
94            timeframe,
95            current_bar: None,
96            emit_empty_bars: false,
97        })
98    }
99
100    /// Enable emission of synthetic zero-volume bars for skipped bar windows.
101    pub fn with_emit_empty_bars(mut self, enabled: bool) -> Self {
102        self.emit_empty_bars = enabled;
103        self
104    }
105
106    /// Feed a tick. Returns completed bars (including any empty gap bars when
107    /// `emit_empty_bars` is true). At most one real completed bar plus zero or
108    /// more empty bars can be returned per call.
109    pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Vec<OhlcvBar>, StreamError> {
110        if tick.symbol != self.symbol {
111            return Err(StreamError::ParseError {
112                exchange: tick.exchange.to_string(),
113                reason: format!(
114                    "tick symbol '{}' does not match aggregator '{}'",
115                    tick.symbol, self.symbol
116                ),
117            });
118        }
119        let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
120        let mut emitted: Vec<OhlcvBar> = Vec::new();
121
122        if let Some(prev) = &self.current_bar {
123            if prev.bar_start_ms != bar_start {
124                // Complete the previous bar.
125                let mut completed = prev.clone();
126                completed.is_complete = true;
127                let prev_close = completed.close;
128                let prev_start = completed.bar_start_ms;
129                emitted.push(completed);
130                self.current_bar = None;
131
132                // Optionally fill any empty bar windows between prev_start and bar_start.
133                if self.emit_empty_bars {
134                    let dur = self.timeframe.duration_ms();
135                    let mut gap_start = prev_start + dur;
136                    while gap_start < bar_start {
137                        emitted.push(OhlcvBar {
138                            symbol: self.symbol.clone(),
139                            timeframe: self.timeframe,
140                            bar_start_ms: gap_start,
141                            open: prev_close,
142                            high: prev_close,
143                            low: prev_close,
144                            close: prev_close,
145                            volume: Decimal::ZERO,
146                            trade_count: 0,
147                            is_complete: true,
148                        });
149                        gap_start += dur;
150                    }
151                }
152            }
153        }
154
155        match &mut self.current_bar {
156            Some(bar) => {
157                if tick.price > bar.high {
158                    bar.high = tick.price;
159                }
160                if tick.price < bar.low {
161                    bar.low = tick.price;
162                }
163                bar.close = tick.price;
164                bar.volume += tick.quantity;
165                bar.trade_count += 1;
166            }
167            None => {
168                self.current_bar = Some(OhlcvBar {
169                    symbol: self.symbol.clone(),
170                    timeframe: self.timeframe,
171                    bar_start_ms: bar_start,
172                    open: tick.price,
173                    high: tick.price,
174                    low: tick.price,
175                    close: tick.price,
176                    volume: tick.quantity,
177                    trade_count: 1,
178                    is_complete: false,
179                });
180            }
181        }
182        Ok(emitted)
183    }
184
185    /// Current partial bar (if any).
186    pub fn current_bar(&self) -> Option<&OhlcvBar> {
187        self.current_bar.as_ref()
188    }
189
190    /// Flush the current partial bar as complete.
191    pub fn flush(&mut self) -> Option<OhlcvBar> {
192        let mut bar = self.current_bar.take()?;
193        bar.is_complete = true;
194        Some(bar)
195    }
196
197    /// The symbol this aggregator tracks.
198    pub fn symbol(&self) -> &str {
199        &self.symbol
200    }
201
202    /// The timeframe used for bar alignment.
203    pub fn timeframe(&self) -> Timeframe {
204        self.timeframe
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::tick::{Exchange, NormalizedTick, TradeSide};
212    use rust_decimal_macros::dec;
213
214    fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
215        NormalizedTick {
216            exchange: Exchange::Binance,
217            symbol: symbol.to_string(),
218            price,
219            quantity: qty,
220            side: Some(TradeSide::Buy),
221            trade_id: None,
222            exchange_ts_ms: None,
223            received_at_ms: ts_ms,
224        }
225    }
226
227    fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
228        OhlcvAggregator::new(symbol, tf).unwrap()
229    }
230
231    #[test]
232    fn test_timeframe_seconds_duration_ms() {
233        assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
234    }
235
236    #[test]
237    fn test_timeframe_minutes_duration_ms() {
238        assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
239    }
240
241    #[test]
242    fn test_timeframe_hours_duration_ms() {
243        assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
244    }
245
246    #[test]
247    fn test_timeframe_bar_start_ms_aligns() {
248        let tf = Timeframe::Minutes(1);
249        let ts = 61_500; // 1min 1.5sec
250        assert_eq!(tf.bar_start_ms(ts), 60_000);
251    }
252
253    #[test]
254    fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
255        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
256        let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
257        let result = agg.feed(&tick).unwrap();
258        assert!(result.is_empty()); // no completed bar yet
259        let bar = agg.current_bar().unwrap();
260        assert_eq!(bar.open, dec!(50000));
261        assert_eq!(bar.high, dec!(50000));
262        assert_eq!(bar.low, dec!(50000));
263        assert_eq!(bar.close, dec!(50000));
264        assert_eq!(bar.volume, dec!(1));
265        assert_eq!(bar.trade_count, 1);
266    }
267
268    #[test]
269    fn test_ohlcv_aggregator_high_low_tracking() {
270        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
271        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
272            .unwrap();
273        agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100))
274            .unwrap();
275        agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200))
276            .unwrap();
277        let bar = agg.current_bar().unwrap();
278        assert_eq!(bar.high, dec!(51000));
279        assert_eq!(bar.low, dec!(49500));
280        assert_eq!(bar.close, dec!(49500));
281        assert_eq!(bar.trade_count, 3);
282    }
283
284    #[test]
285    fn test_ohlcv_aggregator_bar_completes_on_new_window() {
286        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
287        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
288            .unwrap();
289        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500))
290            .unwrap();
291        // Tick in next minute window closes previous bar
292        let mut bars = agg
293            .feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000))
294            .unwrap();
295        assert_eq!(bars.len(), 1);
296        let bar = bars.remove(0);
297        assert!(bar.is_complete);
298        assert_eq!(bar.open, dec!(50000));
299        assert_eq!(bar.close, dec!(50100));
300        assert_eq!(bar.volume, dec!(3));
301        assert_eq!(bar.bar_start_ms, 60_000);
302    }
303
304    #[test]
305    fn test_ohlcv_aggregator_new_bar_started_after_completion() {
306        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
307        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
308            .unwrap();
309        agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000))
310            .unwrap();
311        let bar = agg.current_bar().unwrap();
312        assert_eq!(bar.open, dec!(50200));
313        assert_eq!(bar.bar_start_ms, 120_000);
314    }
315
316    #[test]
317    fn test_ohlcv_aggregator_flush_marks_complete() {
318        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
319        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
320            .unwrap();
321        let flushed = agg.flush().unwrap();
322        assert!(flushed.is_complete);
323        assert!(agg.current_bar().is_none());
324    }
325
326    #[test]
327    fn test_ohlcv_aggregator_flush_empty_returns_none() {
328        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
329        assert!(agg.flush().is_none());
330    }
331
332    #[test]
333    fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
334        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
335        let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
336        let result = agg.feed(&tick);
337        assert!(matches!(result, Err(StreamError::ParseError { .. })));
338    }
339
340    #[test]
341    fn test_ohlcv_aggregator_volume_accumulates() {
342        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
343        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000))
344            .unwrap();
345        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100))
346            .unwrap();
347        let bar = agg.current_bar().unwrap();
348        assert_eq!(bar.volume, dec!(4));
349    }
350
351    #[test]
352    fn test_ohlcv_bar_symbol_and_timeframe() {
353        let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
354        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000))
355            .unwrap();
356        let bar = agg.current_bar().unwrap();
357        assert_eq!(bar.symbol, "BTC-USD");
358        assert_eq!(bar.timeframe, Timeframe::Minutes(5));
359    }
360
361    #[test]
362    fn test_ohlcv_aggregator_symbol_accessor() {
363        let agg = agg("ETH-USD", Timeframe::Hours(1));
364        assert_eq!(agg.symbol(), "ETH-USD");
365        assert_eq!(agg.timeframe(), Timeframe::Hours(1));
366    }
367
368    // --- emit_empty_bars tests ---
369
370    #[test]
371    fn test_emit_empty_bars_no_gap_no_empties() {
372        // Consecutive bars — no gap — should not produce empty bars.
373        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
374            .unwrap()
375            .with_emit_empty_bars(true);
376        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
377            .unwrap();
378        let bars = agg
379            .feed(&make_tick("BTC-USD", dec!(50100), dec!(1), 120_000))
380            .unwrap();
381        // Only the completed bar for the first minute; no empties.
382        assert_eq!(bars.len(), 1);
383        assert_eq!(bars[0].bar_start_ms, 60_000);
384        assert_eq!(bars[0].volume, dec!(1));
385    }
386
387    #[test]
388    fn test_emit_empty_bars_two_skipped_windows() {
389        // Gap of 3 minutes: complete bar at 60s, then two empty bars at 120s and 180s,
390        // then the 240s tick starts a new bar.
391        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
392            .unwrap()
393            .with_emit_empty_bars(true);
394        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
395            .unwrap();
396        let bars = agg
397            .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
398            .unwrap();
399        // 1 real completed bar + 2 empty gap bars (120_000, 180_000)
400        assert_eq!(bars.len(), 3);
401        assert_eq!(bars[0].bar_start_ms, 60_000);
402        assert!(!bars[0].volume.is_zero()); // real bar
403        assert_eq!(bars[1].bar_start_ms, 120_000);
404        assert!(bars[1].volume.is_zero()); // empty
405        assert_eq!(bars[1].trade_count, 0);
406        assert_eq!(bars[1].open, dec!(50000)); // last close carried forward
407        assert_eq!(bars[2].bar_start_ms, 180_000);
408        assert!(bars[2].volume.is_zero()); // empty
409    }
410
411    #[test]
412    fn test_emit_empty_bars_disabled_no_empties_on_gap() {
413        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
414            .unwrap()
415            .with_emit_empty_bars(false);
416        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
417            .unwrap();
418        let bars = agg
419            .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
420            .unwrap();
421        assert_eq!(bars.len(), 1); // only real completed bar, no empties
422    }
423
424    #[test]
425    fn test_emit_empty_bars_is_complete_true() {
426        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
427            .unwrap()
428            .with_emit_empty_bars(true);
429        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
430            .unwrap();
431        let bars = agg
432            .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
433            .unwrap();
434        for bar in &bars {
435            assert!(bar.is_complete, "all emitted bars must be marked complete");
436        }
437    }
438}