1use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18 Seconds(u64),
20 Minutes(u64),
22 Hours(u64),
24}
25
26impl Timeframe {
27 pub fn duration_ms(self) -> u64 {
29 match self {
30 Timeframe::Seconds(s) => s * 1_000,
31 Timeframe::Minutes(m) => m * 60 * 1_000,
32 Timeframe::Hours(h) => h * 3600 * 1_000,
33 }
34 }
35
36 pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
38 let dur = self.duration_ms();
39 (ts_ms / dur) * dur
40 }
41}
42
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45pub struct OhlcvBar {
46 pub symbol: String,
48 pub timeframe: Timeframe,
50 pub bar_start_ms: u64,
52 pub open: Decimal,
54 pub high: Decimal,
56 pub low: Decimal,
58 pub close: Decimal,
60 pub volume: Decimal,
62 pub trade_count: u64,
64 pub is_complete: bool,
66}
67
68pub struct OhlcvAggregator {
70 symbol: String,
71 timeframe: Timeframe,
72 current_bar: Option<OhlcvBar>,
73 emit_empty_bars: bool,
77}
78
79impl OhlcvAggregator {
80 pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
85 let tf_dur = timeframe.duration_ms();
86 if tf_dur == 0 {
87 return Err(StreamError::ParseError {
88 exchange: "OhlcvAggregator".into(),
89 reason: "timeframe duration must be > 0".into(),
90 });
91 }
92 Ok(Self {
93 symbol: symbol.into(),
94 timeframe,
95 current_bar: None,
96 emit_empty_bars: false,
97 })
98 }
99
100 pub fn with_emit_empty_bars(mut self, enabled: bool) -> Self {
102 self.emit_empty_bars = enabled;
103 self
104 }
105
106 pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Vec<OhlcvBar>, StreamError> {
110 if tick.symbol != self.symbol {
111 return Err(StreamError::ParseError {
112 exchange: tick.exchange.to_string(),
113 reason: format!("tick symbol '{}' does not match aggregator '{}'", tick.symbol, self.symbol),
114 });
115 }
116 let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
117 let mut emitted: Vec<OhlcvBar> = Vec::new();
118
119 if let Some(prev) = &self.current_bar {
120 if prev.bar_start_ms != bar_start {
121 let mut completed = prev.clone();
123 completed.is_complete = true;
124 let prev_close = completed.close;
125 let prev_start = completed.bar_start_ms;
126 emitted.push(completed);
127 self.current_bar = None;
128
129 if self.emit_empty_bars {
131 let dur = self.timeframe.duration_ms();
132 let mut gap_start = prev_start + dur;
133 while gap_start < bar_start {
134 emitted.push(OhlcvBar {
135 symbol: self.symbol.clone(),
136 timeframe: self.timeframe,
137 bar_start_ms: gap_start,
138 open: prev_close,
139 high: prev_close,
140 low: prev_close,
141 close: prev_close,
142 volume: Decimal::ZERO,
143 trade_count: 0,
144 is_complete: true,
145 });
146 gap_start += dur;
147 }
148 }
149 }
150 }
151
152 match &mut self.current_bar {
153 Some(bar) => {
154 if tick.price > bar.high { bar.high = tick.price; }
155 if tick.price < bar.low { bar.low = tick.price; }
156 bar.close = tick.price;
157 bar.volume += tick.quantity;
158 bar.trade_count += 1;
159 }
160 None => {
161 self.current_bar = Some(OhlcvBar {
162 symbol: self.symbol.clone(),
163 timeframe: self.timeframe,
164 bar_start_ms: bar_start,
165 open: tick.price,
166 high: tick.price,
167 low: tick.price,
168 close: tick.price,
169 volume: tick.quantity,
170 trade_count: 1,
171 is_complete: false,
172 });
173 }
174 }
175 Ok(emitted)
176 }
177
178 pub fn current_bar(&self) -> Option<&OhlcvBar> {
180 self.current_bar.as_ref()
181 }
182
183 pub fn flush(&mut self) -> Option<OhlcvBar> {
185 let mut bar = self.current_bar.take()?;
186 bar.is_complete = true;
187 Some(bar)
188 }
189
190 pub fn symbol(&self) -> &str { &self.symbol }
192
193 pub fn timeframe(&self) -> Timeframe { self.timeframe }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use crate::tick::{Exchange, NormalizedTick, TradeSide};
201 use rust_decimal_macros::dec;
202
203 fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
204 NormalizedTick {
205 exchange: Exchange::Binance,
206 symbol: symbol.to_string(),
207 price,
208 quantity: qty,
209 side: Some(TradeSide::Buy),
210 trade_id: None,
211 exchange_ts_ms: None,
212 received_at_ms: ts_ms,
213 }
214 }
215
216 fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
217 OhlcvAggregator::new(symbol, tf).unwrap()
218 }
219
220 #[test]
221 fn test_timeframe_seconds_duration_ms() {
222 assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
223 }
224
225 #[test]
226 fn test_timeframe_minutes_duration_ms() {
227 assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
228 }
229
230 #[test]
231 fn test_timeframe_hours_duration_ms() {
232 assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
233 }
234
235 #[test]
236 fn test_timeframe_bar_start_ms_aligns() {
237 let tf = Timeframe::Minutes(1);
238 let ts = 61_500; assert_eq!(tf.bar_start_ms(ts), 60_000);
240 }
241
242 #[test]
243 fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
244 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
245 let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
246 let result = agg.feed(&tick).unwrap();
247 assert!(result.is_empty()); let bar = agg.current_bar().unwrap();
249 assert_eq!(bar.open, dec!(50000));
250 assert_eq!(bar.high, dec!(50000));
251 assert_eq!(bar.low, dec!(50000));
252 assert_eq!(bar.close, dec!(50000));
253 assert_eq!(bar.volume, dec!(1));
254 assert_eq!(bar.trade_count, 1);
255 }
256
257 #[test]
258 fn test_ohlcv_aggregator_high_low_tracking() {
259 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
260 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
261 agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100)).unwrap();
262 agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200)).unwrap();
263 let bar = agg.current_bar().unwrap();
264 assert_eq!(bar.high, dec!(51000));
265 assert_eq!(bar.low, dec!(49500));
266 assert_eq!(bar.close, dec!(49500));
267 assert_eq!(bar.trade_count, 3);
268 }
269
270 #[test]
271 fn test_ohlcv_aggregator_bar_completes_on_new_window() {
272 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
273 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
274 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500)).unwrap();
275 let mut bars = agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
277 assert_eq!(bars.len(), 1);
278 let bar = bars.remove(0);
279 assert!(bar.is_complete);
280 assert_eq!(bar.open, dec!(50000));
281 assert_eq!(bar.close, dec!(50100));
282 assert_eq!(bar.volume, dec!(3));
283 assert_eq!(bar.bar_start_ms, 60_000);
284 }
285
286 #[test]
287 fn test_ohlcv_aggregator_new_bar_started_after_completion() {
288 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
289 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
290 agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
291 let bar = agg.current_bar().unwrap();
292 assert_eq!(bar.open, dec!(50200));
293 assert_eq!(bar.bar_start_ms, 120_000);
294 }
295
296 #[test]
297 fn test_ohlcv_aggregator_flush_marks_complete() {
298 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
299 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
300 let flushed = agg.flush().unwrap();
301 assert!(flushed.is_complete);
302 assert!(agg.current_bar().is_none());
303 }
304
305 #[test]
306 fn test_ohlcv_aggregator_flush_empty_returns_none() {
307 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
308 assert!(agg.flush().is_none());
309 }
310
311 #[test]
312 fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
313 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
314 let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
315 let result = agg.feed(&tick);
316 assert!(matches!(result, Err(StreamError::ParseError { .. })));
317 }
318
319 #[test]
320 fn test_ohlcv_aggregator_volume_accumulates() {
321 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
322 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000)).unwrap();
323 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100)).unwrap();
324 let bar = agg.current_bar().unwrap();
325 assert_eq!(bar.volume, dec!(4));
326 }
327
328 #[test]
329 fn test_ohlcv_bar_symbol_and_timeframe() {
330 let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
331 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000)).unwrap();
332 let bar = agg.current_bar().unwrap();
333 assert_eq!(bar.symbol, "BTC-USD");
334 assert_eq!(bar.timeframe, Timeframe::Minutes(5));
335 }
336
337 #[test]
338 fn test_ohlcv_aggregator_symbol_accessor() {
339 let agg = agg("ETH-USD", Timeframe::Hours(1));
340 assert_eq!(agg.symbol(), "ETH-USD");
341 assert_eq!(agg.timeframe(), Timeframe::Hours(1));
342 }
343
344 #[test]
347 fn test_emit_empty_bars_no_gap_no_empties() {
348 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
350 .unwrap()
351 .with_emit_empty_bars(true);
352 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
353 let bars = agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(1), 120_000)).unwrap();
354 assert_eq!(bars.len(), 1);
356 assert_eq!(bars[0].bar_start_ms, 60_000);
357 assert_eq!(bars[0].volume, dec!(1));
358 }
359
360 #[test]
361 fn test_emit_empty_bars_two_skipped_windows() {
362 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
365 .unwrap()
366 .with_emit_empty_bars(true);
367 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
368 let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
369 assert_eq!(bars.len(), 3);
371 assert_eq!(bars[0].bar_start_ms, 60_000);
372 assert!(!bars[0].volume.is_zero()); assert_eq!(bars[1].bar_start_ms, 120_000);
374 assert!(bars[1].volume.is_zero()); assert_eq!(bars[1].trade_count, 0);
376 assert_eq!(bars[1].open, dec!(50000)); assert_eq!(bars[2].bar_start_ms, 180_000);
378 assert!(bars[2].volume.is_zero()); }
380
381 #[test]
382 fn test_emit_empty_bars_disabled_no_empties_on_gap() {
383 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
384 .unwrap()
385 .with_emit_empty_bars(false);
386 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
387 let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
388 assert_eq!(bars.len(), 1); }
390
391 #[test]
392 fn test_emit_empty_bars_is_complete_true() {
393 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
394 .unwrap()
395 .with_emit_empty_bars(true);
396 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
397 let bars = agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000)).unwrap();
398 for bar in &bars {
399 assert!(bar.is_complete, "all emitted bars must be marked complete");
400 }
401 }
402}