Skip to main content

fin_stream/ohlcv/
mod.rs

1//! Real-time tick-to-OHLCV aggregation at arbitrary timeframes.
2//!
3//! ## Responsibility
4//! Aggregate incoming NormalizedTicks into OHLCV bars at configurable
5//! timeframes. Handles bar completion detection and partial-bar access.
6//!
7//! ## Guarantees
8//! - Non-panicking: all operations return Result or Option
9//! - Thread-safe: OhlcvAggregator is Send + Sync
10
11use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15/// Supported bar timeframes.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18    /// Bar duration measured in seconds.
19    Seconds(u64),
20    /// Bar duration measured in minutes.
21    Minutes(u64),
22    /// Bar duration measured in hours.
23    Hours(u64),
24}
25
26impl Timeframe {
27    /// Duration in milliseconds.
28    pub fn duration_ms(self) -> u64 {
29        match self {
30            Timeframe::Seconds(s) => s * 1_000,
31            Timeframe::Minutes(m) => m * 60 * 1_000,
32            Timeframe::Hours(h) => h * 3600 * 1_000,
33        }
34    }
35
36    /// Bar start timestamp for a given ms timestamp.
37    pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
38        let dur = self.duration_ms();
39        (ts_ms / dur) * dur
40    }
41}
42
43/// A completed or partial OHLCV bar.
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45pub struct OhlcvBar {
46    /// Instrument symbol (e.g. `"BTC-USD"`).
47    pub symbol: String,
48    /// Timeframe of this bar.
49    pub timeframe: Timeframe,
50    /// UTC millisecond timestamp of the bar's open boundary.
51    pub bar_start_ms: u64,
52    /// Opening price (first tick's price in the bar window).
53    pub open: Decimal,
54    /// Highest price seen in the bar window.
55    pub high: Decimal,
56    /// Lowest price seen in the bar window.
57    pub low: Decimal,
58    /// Closing price (most recent tick's price in the bar window).
59    pub close: Decimal,
60    /// Total traded volume in this bar.
61    pub volume: Decimal,
62    /// Number of ticks contributing to this bar.
63    pub trade_count: u64,
64    /// `true` once the bar's time window has been closed by a tick in a later window.
65    pub is_complete: bool,
66}
67
68/// Aggregates ticks into OHLCV bars.
69pub struct OhlcvAggregator {
70    symbol: String,
71    timeframe: Timeframe,
72    current_bar: Option<OhlcvBar>,
73    /// When true, `feed` returns synthetic zero-volume bars for any bar windows
74    /// that were skipped between the previous tick and the current one.
75    /// The synthetic bars use the last known close price for all OHLC fields.
76    emit_empty_bars: bool,
77}
78
79impl OhlcvAggregator {
80    /// Create a new aggregator for `symbol` at `timeframe`.
81    ///
82    /// Returns an error if `timeframe.duration_ms()` is zero, which would make
83    /// bar boundary alignment undefined.
84    pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
85        let tf_dur = timeframe.duration_ms();
86        if tf_dur == 0 {
87            return Err(StreamError::ParseError {
88                exchange: "OhlcvAggregator".into(),
89                reason: "timeframe duration must be > 0".into(),
90            });
91        }
92        Ok(Self {
93            symbol: symbol.into(),
94            timeframe,
95            current_bar: None,
96            emit_empty_bars: false,
97        })
98    }
99
100    /// Enable emission of synthetic zero-volume bars for skipped bar windows.
101    pub fn with_emit_empty_bars(mut self, enabled: bool) -> Self {
102        self.emit_empty_bars = enabled;
103        self
104    }
105
106    /// Feed a tick. Returns completed bars (including any empty gap bars when
107    /// `emit_empty_bars` is true). At most one real completed bar plus zero or
108    /// more empty bars can be returned per call.
109    pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Vec<OhlcvBar>, StreamError> {
110        if tick.symbol != self.symbol {
111            return Err(StreamError::ParseError {
112                exchange: tick.exchange.to_string(),
113                reason: format!("tick symbol '{}' does not match aggregator '{}'", tick.symbol, self.symbol),
114            });
115        }
116        let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
117        let mut emitted: Vec<OhlcvBar> = Vec::new();
118
119        if let Some(prev) = &self.current_bar {
120            if prev.bar_start_ms != bar_start {
121                // Complete the previous bar.
122                let mut completed = prev.clone();
123                completed.is_complete = true;
124                let prev_close = completed.close;
125                let prev_start = completed.bar_start_ms;
126                emitted.push(completed);
127                self.current_bar = None;
128
129                // Optionally fill any empty bar windows between prev_start and bar_start.
130                if self.emit_empty_bars {
131                    let dur = self.timeframe.duration_ms();
132                    let mut gap_start = prev_start + dur;
133                    while gap_start < bar_start {
134                        emitted.push(OhlcvBar {
135                            symbol: self.symbol.clone(),
136                            timeframe: self.timeframe,
137                            bar_start_ms: gap_start,
138                            open: prev_close,
139                            high: prev_close,
140                            low: prev_close,
141                            close: prev_close,
142                            volume: Decimal::ZERO,
143                            trade_count: 0,
144                            is_complete: true,
145                        });
146                        gap_start += dur;
147                    }
148                }
149            }
150        }
151
152        match &mut self.current_bar {
153            Some(bar) => {
154                if tick.price > bar.high { bar.high = tick.price; }
155                if tick.price < bar.low { bar.low = tick.price; }
156                bar.close = tick.price;
157                bar.volume += tick.quantity;
158                bar.trade_count += 1;
159            }
160            None => {
161                self.current_bar = Some(OhlcvBar {
162                    symbol: self.symbol.clone(),
163                    timeframe: self.timeframe,
164                    bar_start_ms: bar_start,
165                    open: tick.price,
166                    high: tick.price,
167                    low: tick.price,
168                    close: tick.price,
169                    volume: tick.quantity,
170                    trade_count: 1,
171                    is_complete: false,
172                });
173            }
174        }
175        Ok(emitted)
176    }
177
178    /// Current partial bar (if any).
179    pub fn current_bar(&self) -> Option<&OhlcvBar> {
180        self.current_bar.as_ref()
181    }
182
183    /// Flush the current partial bar as complete.
184    pub fn flush(&mut self) -> Option<OhlcvBar> {
185        let mut bar = self.current_bar.take()?;
186        bar.is_complete = true;
187        Some(bar)
188    }
189
190    /// The symbol this aggregator tracks.
191    pub fn symbol(&self) -> &str { &self.symbol }
192
193    /// The timeframe used for bar alignment.
194    pub fn timeframe(&self) -> Timeframe { self.timeframe }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use crate::tick::{Exchange, NormalizedTick, TradeSide};
201    use rust_decimal_macros::dec;
202
203    fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
204        NormalizedTick {
205            exchange: Exchange::Binance,
206            symbol: symbol.to_string(),
207            price,
208            quantity: qty,
209            side: Some(TradeSide::Buy),
210            trade_id: None,
211            exchange_ts_ms: None,
212            received_at_ms: ts_ms,
213        }
214    }
215
216    fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
217        OhlcvAggregator::new(symbol, tf).unwrap()
218    }
219
220    #[test]
221    fn test_timeframe_seconds_duration_ms() {
222        assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
223    }
224
225    #[test]
226    fn test_timeframe_minutes_duration_ms() {
227        assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
228    }
229
230    #[test]
231    fn test_timeframe_hours_duration_ms() {
232        assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
233    }
234
235    #[test]
236    fn test_timeframe_bar_start_ms_aligns() {
237        let tf = Timeframe::Minutes(1);
238        let ts = 61_500; // 1min 1.5sec
239        assert_eq!(tf.bar_start_ms(ts), 60_000);
240    }
241
242    #[test]
243    fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
244        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
245        let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
246        let result = agg.feed(&tick).unwrap();
247        assert!(result.is_empty()); // no completed bar yet
248        let bar = agg.current_bar().unwrap();
249        assert_eq!(bar.open, dec!(50000));
250        assert_eq!(bar.high, dec!(50000));
251        assert_eq!(bar.low, dec!(50000));
252        assert_eq!(bar.close, dec!(50000));
253        assert_eq!(bar.volume, dec!(1));
254        assert_eq!(bar.trade_count, 1);
255    }
256
257    #[test]
258    fn test_ohlcv_aggregator_high_low_tracking() {
259        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
260        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
261        agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100)).unwrap();
262        agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200)).unwrap();
263        let bar = agg.current_bar().unwrap();
264        assert_eq!(bar.high, dec!(51000));
265        assert_eq!(bar.low, dec!(49500));
266        assert_eq!(bar.close, dec!(49500));
267        assert_eq!(bar.trade_count, 3);
268    }
269
270    #[test]
271    fn test_ohlcv_aggregator_bar_completes_on_new_window() {
272        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
273        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
274        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500)).unwrap();
275        // Tick in next minute window closes previous bar
276        let mut bars = agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
277        assert_eq!(bars.len(), 1);
278        let bar = bars.remove(0);
279        assert!(bar.is_complete);
280        assert_eq!(bar.open, dec!(50000));
281        assert_eq!(bar.close, dec!(50100));
282        assert_eq!(bar.volume, dec!(3));
283        assert_eq!(bar.bar_start_ms, 60_000);
284    }
285
286    #[test]
287    fn test_ohlcv_aggregator_new_bar_started_after_completion() {
288        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
289        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
290        agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
291        let bar = agg.current_bar().unwrap();
292        assert_eq!(bar.open, dec!(50200));
293        assert_eq!(bar.bar_start_ms, 120_000);
294    }
295
296    #[test]
297    fn test_ohlcv_aggregator_flush_marks_complete() {
298        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
299        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
300        let flushed = agg.flush().unwrap();
301        assert!(flushed.is_complete);
302        assert!(agg.current_bar().is_none());
303    }
304
305    #[test]
306    fn test_ohlcv_aggregator_flush_empty_returns_none() {
307        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
308        assert!(agg.flush().is_none());
309    }
310
311    #[test]
312    fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
313        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
314        let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
315        let result = agg.feed(&tick);
316        assert!(matches!(result, Err(StreamError::ParseError { .. })));
317    }
318
319    #[test]
320    fn test_ohlcv_aggregator_volume_accumulates() {
321        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
322        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000)).unwrap();
323        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100)).unwrap();
324        let bar = agg.current_bar().unwrap();
325        assert_eq!(bar.volume, dec!(4));
326    }
327
328    #[test]
329    fn test_ohlcv_bar_symbol_and_timeframe() {
330        let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
331        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000)).unwrap();
332        let bar = agg.current_bar().unwrap();
333        assert_eq!(bar.symbol, "BTC-USD");
334        assert_eq!(bar.timeframe, Timeframe::Minutes(5));
335    }
336
337    #[test]
338    fn test_ohlcv_aggregator_symbol_accessor() {
339        let agg = agg("ETH-USD", Timeframe::Hours(1));
340        assert_eq!(agg.symbol(), "ETH-USD");
341        assert_eq!(agg.timeframe(), Timeframe::Hours(1));
342    }
343
344    // --- emit_empty_bars tests ---
345
346    #[test]
347    fn test_emit_empty_bars_no_gap_no_empties() {
348        // Consecutive bars — no gap — should not produce empty bars.
349        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
350            .unwrap()
351            .with_emit_empty_bars(true);
352        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
353        let bars = agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(1), 120_000)).unwrap();
354        // Only the completed bar for the first minute; no empties.
355        assert_eq!(bars.len(), 1);
356        assert_eq!(bars[0].bar_start_ms, 60_000);
357        assert_eq!(bars[0].volume, dec!(1));
358    }
359
360    #[test]
361    fn test_emit_empty_bars_two_skipped_windows() {
362        // Gap of 3 minutes: complete bar at 60s, then two empty bars at 120s and 180s,
363        // then the 240s tick starts a new bar.
364        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
365            .unwrap()
366            .with_emit_empty_bars(true);
367        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
368        let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
369        // 1 real completed bar + 2 empty gap bars (120_000, 180_000)
370        assert_eq!(bars.len(), 3);
371        assert_eq!(bars[0].bar_start_ms, 60_000);
372        assert!(!bars[0].volume.is_zero()); // real bar
373        assert_eq!(bars[1].bar_start_ms, 120_000);
374        assert!(bars[1].volume.is_zero()); // empty
375        assert_eq!(bars[1].trade_count, 0);
376        assert_eq!(bars[1].open, dec!(50000)); // last close carried forward
377        assert_eq!(bars[2].bar_start_ms, 180_000);
378        assert!(bars[2].volume.is_zero()); // empty
379    }
380
381    #[test]
382    fn test_emit_empty_bars_disabled_no_empties_on_gap() {
383        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
384            .unwrap()
385            .with_emit_empty_bars(false);
386        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
387        let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
388        assert_eq!(bars.len(), 1); // only real completed bar, no empties
389    }
390
391    #[test]
392    fn test_emit_empty_bars_is_complete_true() {
393        let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
394            .unwrap()
395            .with_emit_empty_bars(true);
396        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
397        let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
398        for bar in &bars {
399            assert!(bar.is_complete, "all emitted bars must be marked complete");
400        }
401    }
402}