Skip to main content

fin_stream/ohlcv/
mod.rs

1//! Real-time tick-to-OHLCV aggregation at arbitrary timeframes.
2//!
3//! ## Responsibility
4//! Aggregate incoming NormalizedTicks into OHLCV bars at configurable
5//! timeframes. Handles bar completion detection and partial-bar access.
6//!
7//! ## Guarantees
8//! - Non-panicking: all operations return Result or Option
9//! - Thread-safe: OhlcvAggregator is Send + Sync
10
11use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15/// Supported bar timeframes.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18    Seconds(u64),
19    Minutes(u64),
20    Hours(u64),
21}
22
23impl Timeframe {
24    /// Duration in milliseconds.
25    pub fn duration_ms(self) -> u64 {
26        match self {
27            Timeframe::Seconds(s) => s * 1_000,
28            Timeframe::Minutes(m) => m * 60 * 1_000,
29            Timeframe::Hours(h) => h * 3600 * 1_000,
30        }
31    }
32
33    /// Bar start timestamp for a given ms timestamp.
34    pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
35        let dur = self.duration_ms();
36        (ts_ms / dur) * dur
37    }
38}
39
40/// A completed or partial OHLCV bar.
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct OhlcvBar {
43    pub symbol: String,
44    pub timeframe: Timeframe,
45    pub bar_start_ms: u64,
46    pub open: Decimal,
47    pub high: Decimal,
48    pub low: Decimal,
49    pub close: Decimal,
50    pub volume: Decimal,
51    pub trade_count: u64,
52    pub is_complete: bool,
53}
54
55/// Aggregates ticks into OHLCV bars.
56pub struct OhlcvAggregator {
57    symbol: String,
58    timeframe: Timeframe,
59    current_bar: Option<OhlcvBar>,
60}
61
62impl OhlcvAggregator {
63    pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
64        let tf_dur = timeframe.duration_ms();
65        if tf_dur == 0 {
66            return Err(StreamError::ParseError {
67                exchange: "OhlcvAggregator".into(),
68                reason: "timeframe duration must be > 0".into(),
69            });
70        }
71        Ok(Self {
72            symbol: symbol.into(),
73            timeframe,
74            current_bar: None,
75        })
76    }
77
78    /// Feed a tick. Returns a completed bar if this tick closed the previous bar.
79    pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Option<OhlcvBar>, StreamError> {
80        if tick.symbol != self.symbol {
81            return Err(StreamError::ParseError {
82                exchange: tick.exchange.to_string(),
83                reason: format!("tick symbol '{}' does not match aggregator '{}'", tick.symbol, self.symbol),
84            });
85        }
86        let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
87        let completed = match &self.current_bar {
88            Some(bar) if bar.bar_start_ms != bar_start => {
89                // New bar window — complete the old one
90                let mut completed = bar.clone();
91                completed.is_complete = true;
92                Some(completed)
93            }
94            _ => None,
95        };
96        if completed.is_some() {
97            self.current_bar = None;
98        }
99        match &mut self.current_bar {
100            Some(bar) => {
101                if tick.price > bar.high { bar.high = tick.price; }
102                if tick.price < bar.low { bar.low = tick.price; }
103                bar.close = tick.price;
104                bar.volume += tick.quantity;
105                bar.trade_count += 1;
106            }
107            None => {
108                self.current_bar = Some(OhlcvBar {
109                    symbol: self.symbol.clone(),
110                    timeframe: self.timeframe,
111                    bar_start_ms: bar_start,
112                    open: tick.price,
113                    high: tick.price,
114                    low: tick.price,
115                    close: tick.price,
116                    volume: tick.quantity,
117                    trade_count: 1,
118                    is_complete: false,
119                });
120            }
121        }
122        Ok(completed)
123    }
124
125    /// Current partial bar (if any).
126    pub fn current_bar(&self) -> Option<&OhlcvBar> {
127        self.current_bar.as_ref()
128    }
129
130    /// Flush the current partial bar as complete.
131    pub fn flush(&mut self) -> Option<OhlcvBar> {
132        let mut bar = self.current_bar.take()?;
133        bar.is_complete = true;
134        Some(bar)
135    }
136
137    pub fn symbol(&self) -> &str { &self.symbol }
138    pub fn timeframe(&self) -> Timeframe { self.timeframe }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::tick::{Exchange, NormalizedTick, TradeSide};
145    use rust_decimal_macros::dec;
146
147    fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
148        NormalizedTick {
149            exchange: Exchange::Binance,
150            symbol: symbol.to_string(),
151            price,
152            quantity: qty,
153            side: Some(TradeSide::Buy),
154            trade_id: None,
155            exchange_ts_ms: None,
156            received_at_ms: ts_ms,
157        }
158    }
159
160    fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
161        OhlcvAggregator::new(symbol, tf).unwrap()
162    }
163
164    #[test]
165    fn test_timeframe_seconds_duration_ms() {
166        assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
167    }
168
169    #[test]
170    fn test_timeframe_minutes_duration_ms() {
171        assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
172    }
173
174    #[test]
175    fn test_timeframe_hours_duration_ms() {
176        assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
177    }
178
179    #[test]
180    fn test_timeframe_bar_start_ms_aligns() {
181        let tf = Timeframe::Minutes(1);
182        let ts = 61_500; // 1min 1.5sec
183        assert_eq!(tf.bar_start_ms(ts), 60_000);
184    }
185
186    #[test]
187    fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
188        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
189        let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
190        let result = agg.feed(&tick).unwrap();
191        assert!(result.is_none()); // no completed bar yet
192        let bar = agg.current_bar().unwrap();
193        assert_eq!(bar.open, dec!(50000));
194        assert_eq!(bar.high, dec!(50000));
195        assert_eq!(bar.low, dec!(50000));
196        assert_eq!(bar.close, dec!(50000));
197        assert_eq!(bar.volume, dec!(1));
198        assert_eq!(bar.trade_count, 1);
199    }
200
201    #[test]
202    fn test_ohlcv_aggregator_high_low_tracking() {
203        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
204        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
205        agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100)).unwrap();
206        agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200)).unwrap();
207        let bar = agg.current_bar().unwrap();
208        assert_eq!(bar.high, dec!(51000));
209        assert_eq!(bar.low, dec!(49500));
210        assert_eq!(bar.close, dec!(49500));
211        assert_eq!(bar.trade_count, 3);
212    }
213
214    #[test]
215    fn test_ohlcv_aggregator_bar_completes_on_new_window() {
216        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
217        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
218        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500)).unwrap();
219        // Tick in next minute window closes previous bar
220        let completed = agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
221        let bar = completed.unwrap();
222        assert!(bar.is_complete);
223        assert_eq!(bar.open, dec!(50000));
224        assert_eq!(bar.close, dec!(50100));
225        assert_eq!(bar.volume, dec!(3));
226        assert_eq!(bar.bar_start_ms, 60_000);
227    }
228
229    #[test]
230    fn test_ohlcv_aggregator_new_bar_started_after_completion() {
231        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
232        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
233        agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
234        let bar = agg.current_bar().unwrap();
235        assert_eq!(bar.open, dec!(50200));
236        assert_eq!(bar.bar_start_ms, 120_000);
237    }
238
239    #[test]
240    fn test_ohlcv_aggregator_flush_marks_complete() {
241        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
242        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
243        let flushed = agg.flush().unwrap();
244        assert!(flushed.is_complete);
245        assert!(agg.current_bar().is_none());
246    }
247
248    #[test]
249    fn test_ohlcv_aggregator_flush_empty_returns_none() {
250        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
251        assert!(agg.flush().is_none());
252    }
253
254    #[test]
255    fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
256        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
257        let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
258        let result = agg.feed(&tick);
259        assert!(matches!(result, Err(StreamError::ParseError { .. })));
260    }
261
262    #[test]
263    fn test_ohlcv_aggregator_volume_accumulates() {
264        let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
265        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000)).unwrap();
266        agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100)).unwrap();
267        let bar = agg.current_bar().unwrap();
268        assert_eq!(bar.volume, dec!(4));
269    }
270
271    #[test]
272    fn test_ohlcv_bar_symbol_and_timeframe() {
273        let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
274        agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000)).unwrap();
275        let bar = agg.current_bar().unwrap();
276        assert_eq!(bar.symbol, "BTC-USD");
277        assert_eq!(bar.timeframe, Timeframe::Minutes(5));
278    }
279
280    #[test]
281    fn test_ohlcv_aggregator_symbol_accessor() {
282        let agg = agg("ETH-USD", Timeframe::Hours(1));
283        assert_eq!(agg.symbol(), "ETH-USD");
284        assert_eq!(agg.timeframe(), Timeframe::Hours(1));
285    }
286}