1use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18 Seconds(u64),
20 Minutes(u64),
22 Hours(u64),
24}
25
26impl Timeframe {
27 pub fn duration_ms(self) -> u64 {
29 match self {
30 Timeframe::Seconds(s) => s * 1_000,
31 Timeframe::Minutes(m) => m * 60 * 1_000,
32 Timeframe::Hours(h) => h * 3600 * 1_000,
33 }
34 }
35
36 pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
38 let dur = self.duration_ms();
39 (ts_ms / dur) * dur
40 }
41}
42
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45pub struct OhlcvBar {
46 pub symbol: String,
48 pub timeframe: Timeframe,
50 pub bar_start_ms: u64,
52 pub open: Decimal,
54 pub high: Decimal,
56 pub low: Decimal,
58 pub close: Decimal,
60 pub volume: Decimal,
62 pub trade_count: u64,
64 pub is_complete: bool,
66}
67
68pub struct OhlcvAggregator {
70 symbol: String,
71 timeframe: Timeframe,
72 current_bar: Option<OhlcvBar>,
73 emit_empty_bars: bool,
77}
78
79impl OhlcvAggregator {
80 pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
85 let tf_dur = timeframe.duration_ms();
86 if tf_dur == 0 {
87 return Err(StreamError::ParseError {
88 exchange: "OhlcvAggregator".into(),
89 reason: "timeframe duration must be > 0".into(),
90 });
91 }
92 Ok(Self {
93 symbol: symbol.into(),
94 timeframe,
95 current_bar: None,
96 emit_empty_bars: false,
97 })
98 }
99
100 pub fn with_emit_empty_bars(mut self, enabled: bool) -> Self {
102 self.emit_empty_bars = enabled;
103 self
104 }
105
106 pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Vec<OhlcvBar>, StreamError> {
110 if tick.symbol != self.symbol {
111 return Err(StreamError::ParseError {
112 exchange: tick.exchange.to_string(),
113 reason: format!(
114 "tick symbol '{}' does not match aggregator '{}'",
115 tick.symbol, self.symbol
116 ),
117 });
118 }
119 let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
120 let mut emitted: Vec<OhlcvBar> = Vec::new();
121
122 if let Some(prev) = &self.current_bar {
123 if prev.bar_start_ms != bar_start {
124 let mut completed = prev.clone();
126 completed.is_complete = true;
127 let prev_close = completed.close;
128 let prev_start = completed.bar_start_ms;
129 emitted.push(completed);
130 self.current_bar = None;
131
132 if self.emit_empty_bars {
134 let dur = self.timeframe.duration_ms();
135 let mut gap_start = prev_start + dur;
136 while gap_start < bar_start {
137 emitted.push(OhlcvBar {
138 symbol: self.symbol.clone(),
139 timeframe: self.timeframe,
140 bar_start_ms: gap_start,
141 open: prev_close,
142 high: prev_close,
143 low: prev_close,
144 close: prev_close,
145 volume: Decimal::ZERO,
146 trade_count: 0,
147 is_complete: true,
148 });
149 gap_start += dur;
150 }
151 }
152 }
153 }
154
155 match &mut self.current_bar {
156 Some(bar) => {
157 if tick.price > bar.high {
158 bar.high = tick.price;
159 }
160 if tick.price < bar.low {
161 bar.low = tick.price;
162 }
163 bar.close = tick.price;
164 bar.volume += tick.quantity;
165 bar.trade_count += 1;
166 }
167 None => {
168 self.current_bar = Some(OhlcvBar {
169 symbol: self.symbol.clone(),
170 timeframe: self.timeframe,
171 bar_start_ms: bar_start,
172 open: tick.price,
173 high: tick.price,
174 low: tick.price,
175 close: tick.price,
176 volume: tick.quantity,
177 trade_count: 1,
178 is_complete: false,
179 });
180 }
181 }
182 Ok(emitted)
183 }
184
185 pub fn current_bar(&self) -> Option<&OhlcvBar> {
187 self.current_bar.as_ref()
188 }
189
190 pub fn flush(&mut self) -> Option<OhlcvBar> {
192 let mut bar = self.current_bar.take()?;
193 bar.is_complete = true;
194 Some(bar)
195 }
196
197 pub fn symbol(&self) -> &str {
199 &self.symbol
200 }
201
202 pub fn timeframe(&self) -> Timeframe {
204 self.timeframe
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::tick::{Exchange, NormalizedTick, TradeSide};
212 use rust_decimal_macros::dec;
213
214 fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
215 NormalizedTick {
216 exchange: Exchange::Binance,
217 symbol: symbol.to_string(),
218 price,
219 quantity: qty,
220 side: Some(TradeSide::Buy),
221 trade_id: None,
222 exchange_ts_ms: None,
223 received_at_ms: ts_ms,
224 }
225 }
226
227 fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
228 OhlcvAggregator::new(symbol, tf).unwrap()
229 }
230
231 #[test]
232 fn test_timeframe_seconds_duration_ms() {
233 assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
234 }
235
236 #[test]
237 fn test_timeframe_minutes_duration_ms() {
238 assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
239 }
240
241 #[test]
242 fn test_timeframe_hours_duration_ms() {
243 assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
244 }
245
246 #[test]
247 fn test_timeframe_bar_start_ms_aligns() {
248 let tf = Timeframe::Minutes(1);
249 let ts = 61_500; assert_eq!(tf.bar_start_ms(ts), 60_000);
251 }
252
253 #[test]
254 fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
255 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
256 let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
257 let result = agg.feed(&tick).unwrap();
258 assert!(result.is_empty()); let bar = agg.current_bar().unwrap();
260 assert_eq!(bar.open, dec!(50000));
261 assert_eq!(bar.high, dec!(50000));
262 assert_eq!(bar.low, dec!(50000));
263 assert_eq!(bar.close, dec!(50000));
264 assert_eq!(bar.volume, dec!(1));
265 assert_eq!(bar.trade_count, 1);
266 }
267
268 #[test]
269 fn test_ohlcv_aggregator_high_low_tracking() {
270 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
271 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
272 .unwrap();
273 agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100))
274 .unwrap();
275 agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200))
276 .unwrap();
277 let bar = agg.current_bar().unwrap();
278 assert_eq!(bar.high, dec!(51000));
279 assert_eq!(bar.low, dec!(49500));
280 assert_eq!(bar.close, dec!(49500));
281 assert_eq!(bar.trade_count, 3);
282 }
283
284 #[test]
285 fn test_ohlcv_aggregator_bar_completes_on_new_window() {
286 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
287 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
288 .unwrap();
289 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500))
290 .unwrap();
291 let mut bars = agg
293 .feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000))
294 .unwrap();
295 assert_eq!(bars.len(), 1);
296 let bar = bars.remove(0);
297 assert!(bar.is_complete);
298 assert_eq!(bar.open, dec!(50000));
299 assert_eq!(bar.close, dec!(50100));
300 assert_eq!(bar.volume, dec!(3));
301 assert_eq!(bar.bar_start_ms, 60_000);
302 }
303
304 #[test]
305 fn test_ohlcv_aggregator_new_bar_started_after_completion() {
306 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
307 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
308 .unwrap();
309 agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000))
310 .unwrap();
311 let bar = agg.current_bar().unwrap();
312 assert_eq!(bar.open, dec!(50200));
313 assert_eq!(bar.bar_start_ms, 120_000);
314 }
315
316 #[test]
317 fn test_ohlcv_aggregator_flush_marks_complete() {
318 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
319 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
320 .unwrap();
321 let flushed = agg.flush().unwrap();
322 assert!(flushed.is_complete);
323 assert!(agg.current_bar().is_none());
324 }
325
326 #[test]
327 fn test_ohlcv_aggregator_flush_empty_returns_none() {
328 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
329 assert!(agg.flush().is_none());
330 }
331
332 #[test]
333 fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
334 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
335 let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
336 let result = agg.feed(&tick);
337 assert!(matches!(result, Err(StreamError::ParseError { .. })));
338 }
339
340 #[test]
341 fn test_ohlcv_aggregator_volume_accumulates() {
342 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
343 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000))
344 .unwrap();
345 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100))
346 .unwrap();
347 let bar = agg.current_bar().unwrap();
348 assert_eq!(bar.volume, dec!(4));
349 }
350
351 #[test]
352 fn test_ohlcv_bar_symbol_and_timeframe() {
353 let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
354 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000))
355 .unwrap();
356 let bar = agg.current_bar().unwrap();
357 assert_eq!(bar.symbol, "BTC-USD");
358 assert_eq!(bar.timeframe, Timeframe::Minutes(5));
359 }
360
361 #[test]
362 fn test_ohlcv_aggregator_symbol_accessor() {
363 let agg = agg("ETH-USD", Timeframe::Hours(1));
364 assert_eq!(agg.symbol(), "ETH-USD");
365 assert_eq!(agg.timeframe(), Timeframe::Hours(1));
366 }
367
368 #[test]
371 fn test_emit_empty_bars_no_gap_no_empties() {
372 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
374 .unwrap()
375 .with_emit_empty_bars(true);
376 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
377 .unwrap();
378 let bars = agg
379 .feed(&make_tick("BTC-USD", dec!(50100), dec!(1), 120_000))
380 .unwrap();
381 assert_eq!(bars.len(), 1);
383 assert_eq!(bars[0].bar_start_ms, 60_000);
384 assert_eq!(bars[0].volume, dec!(1));
385 }
386
387 #[test]
388 fn test_emit_empty_bars_two_skipped_windows() {
389 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
392 .unwrap()
393 .with_emit_empty_bars(true);
394 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
395 .unwrap();
396 let bars = agg
397 .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
398 .unwrap();
399 assert_eq!(bars.len(), 3);
401 assert_eq!(bars[0].bar_start_ms, 60_000);
402 assert!(!bars[0].volume.is_zero()); assert_eq!(bars[1].bar_start_ms, 120_000);
404 assert!(bars[1].volume.is_zero()); assert_eq!(bars[1].trade_count, 0);
406 assert_eq!(bars[1].open, dec!(50000)); assert_eq!(bars[2].bar_start_ms, 180_000);
408 assert!(bars[2].volume.is_zero()); }
410
411 #[test]
412 fn test_emit_empty_bars_disabled_no_empties_on_gap() {
413 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
414 .unwrap()
415 .with_emit_empty_bars(false);
416 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
417 .unwrap();
418 let bars = agg
419 .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
420 .unwrap();
421 assert_eq!(bars.len(), 1); }
423
424 #[test]
425 fn test_emit_empty_bars_is_complete_true() {
426 let mut agg = OhlcvAggregator::new("BTC-USD", Timeframe::Minutes(1))
427 .unwrap()
428 .with_emit_empty_bars(true);
429 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000))
430 .unwrap();
431 let bars = agg
432 .feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 240_000))
433 .unwrap();
434 for bar in &bars {
435 assert!(bar.is_complete, "all emitted bars must be marked complete");
436 }
437 }
438}