1use crate::error::StreamError;
12use crate::tick::NormalizedTick;
13use rust_decimal::Decimal;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub enum Timeframe {
18 Seconds(u64),
19 Minutes(u64),
20 Hours(u64),
21}
22
23impl Timeframe {
24 pub fn duration_ms(self) -> u64 {
26 match self {
27 Timeframe::Seconds(s) => s * 1_000,
28 Timeframe::Minutes(m) => m * 60 * 1_000,
29 Timeframe::Hours(h) => h * 3600 * 1_000,
30 }
31 }
32
33 pub fn bar_start_ms(self, ts_ms: u64) -> u64 {
35 let dur = self.duration_ms();
36 (ts_ms / dur) * dur
37 }
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct OhlcvBar {
43 pub symbol: String,
44 pub timeframe: Timeframe,
45 pub bar_start_ms: u64,
46 pub open: Decimal,
47 pub high: Decimal,
48 pub low: Decimal,
49 pub close: Decimal,
50 pub volume: Decimal,
51 pub trade_count: u64,
52 pub is_complete: bool,
53}
54
55pub struct OhlcvAggregator {
57 symbol: String,
58 timeframe: Timeframe,
59 current_bar: Option<OhlcvBar>,
60}
61
62impl OhlcvAggregator {
63 pub fn new(symbol: impl Into<String>, timeframe: Timeframe) -> Result<Self, StreamError> {
64 let tf_dur = timeframe.duration_ms();
65 if tf_dur == 0 {
66 return Err(StreamError::ParseError {
67 exchange: "OhlcvAggregator".into(),
68 reason: "timeframe duration must be > 0".into(),
69 });
70 }
71 Ok(Self {
72 symbol: symbol.into(),
73 timeframe,
74 current_bar: None,
75 })
76 }
77
78 pub fn feed(&mut self, tick: &NormalizedTick) -> Result<Option<OhlcvBar>, StreamError> {
80 if tick.symbol != self.symbol {
81 return Err(StreamError::ParseError {
82 exchange: tick.exchange.to_string(),
83 reason: format!("tick symbol '{}' does not match aggregator '{}'", tick.symbol, self.symbol),
84 });
85 }
86 let bar_start = self.timeframe.bar_start_ms(tick.received_at_ms);
87 let completed = match &self.current_bar {
88 Some(bar) if bar.bar_start_ms != bar_start => {
89 let mut completed = bar.clone();
91 completed.is_complete = true;
92 Some(completed)
93 }
94 _ => None,
95 };
96 if completed.is_some() {
97 self.current_bar = None;
98 }
99 match &mut self.current_bar {
100 Some(bar) => {
101 if tick.price > bar.high { bar.high = tick.price; }
102 if tick.price < bar.low { bar.low = tick.price; }
103 bar.close = tick.price;
104 bar.volume += tick.quantity;
105 bar.trade_count += 1;
106 }
107 None => {
108 self.current_bar = Some(OhlcvBar {
109 symbol: self.symbol.clone(),
110 timeframe: self.timeframe,
111 bar_start_ms: bar_start,
112 open: tick.price,
113 high: tick.price,
114 low: tick.price,
115 close: tick.price,
116 volume: tick.quantity,
117 trade_count: 1,
118 is_complete: false,
119 });
120 }
121 }
122 Ok(completed)
123 }
124
125 pub fn current_bar(&self) -> Option<&OhlcvBar> {
127 self.current_bar.as_ref()
128 }
129
130 pub fn flush(&mut self) -> Option<OhlcvBar> {
132 let mut bar = self.current_bar.take()?;
133 bar.is_complete = true;
134 Some(bar)
135 }
136
137 pub fn symbol(&self) -> &str { &self.symbol }
138 pub fn timeframe(&self) -> Timeframe { self.timeframe }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::tick::{Exchange, NormalizedTick, TradeSide};
145 use rust_decimal_macros::dec;
146
147 fn make_tick(symbol: &str, price: Decimal, qty: Decimal, ts_ms: u64) -> NormalizedTick {
148 NormalizedTick {
149 exchange: Exchange::Binance,
150 symbol: symbol.to_string(),
151 price,
152 quantity: qty,
153 side: Some(TradeSide::Buy),
154 trade_id: None,
155 exchange_ts_ms: None,
156 received_at_ms: ts_ms,
157 }
158 }
159
160 fn agg(symbol: &str, tf: Timeframe) -> OhlcvAggregator {
161 OhlcvAggregator::new(symbol, tf).unwrap()
162 }
163
164 #[test]
165 fn test_timeframe_seconds_duration_ms() {
166 assert_eq!(Timeframe::Seconds(30).duration_ms(), 30_000);
167 }
168
169 #[test]
170 fn test_timeframe_minutes_duration_ms() {
171 assert_eq!(Timeframe::Minutes(5).duration_ms(), 300_000);
172 }
173
174 #[test]
175 fn test_timeframe_hours_duration_ms() {
176 assert_eq!(Timeframe::Hours(1).duration_ms(), 3_600_000);
177 }
178
179 #[test]
180 fn test_timeframe_bar_start_ms_aligns() {
181 let tf = Timeframe::Minutes(1);
182 let ts = 61_500; assert_eq!(tf.bar_start_ms(ts), 60_000);
184 }
185
186 #[test]
187 fn test_ohlcv_aggregator_first_tick_sets_ohlcv() {
188 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
189 let tick = make_tick("BTC-USD", dec!(50000), dec!(1), 60_000);
190 let result = agg.feed(&tick).unwrap();
191 assert!(result.is_none()); let bar = agg.current_bar().unwrap();
193 assert_eq!(bar.open, dec!(50000));
194 assert_eq!(bar.high, dec!(50000));
195 assert_eq!(bar.low, dec!(50000));
196 assert_eq!(bar.close, dec!(50000));
197 assert_eq!(bar.volume, dec!(1));
198 assert_eq!(bar.trade_count, 1);
199 }
200
201 #[test]
202 fn test_ohlcv_aggregator_high_low_tracking() {
203 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
204 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
205 agg.feed(&make_tick("BTC-USD", dec!(51000), dec!(1), 60_100)).unwrap();
206 agg.feed(&make_tick("BTC-USD", dec!(49500), dec!(1), 60_200)).unwrap();
207 let bar = agg.current_bar().unwrap();
208 assert_eq!(bar.high, dec!(51000));
209 assert_eq!(bar.low, dec!(49500));
210 assert_eq!(bar.close, dec!(49500));
211 assert_eq!(bar.trade_count, 3);
212 }
213
214 #[test]
215 fn test_ohlcv_aggregator_bar_completes_on_new_window() {
216 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
217 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
218 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2), 60_500)).unwrap();
219 let completed = agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
221 let bar = completed.unwrap();
222 assert!(bar.is_complete);
223 assert_eq!(bar.open, dec!(50000));
224 assert_eq!(bar.close, dec!(50100));
225 assert_eq!(bar.volume, dec!(3));
226 assert_eq!(bar.bar_start_ms, 60_000);
227 }
228
229 #[test]
230 fn test_ohlcv_aggregator_new_bar_started_after_completion() {
231 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
232 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
233 agg.feed(&make_tick("BTC-USD", dec!(50200), dec!(1), 120_000)).unwrap();
234 let bar = agg.current_bar().unwrap();
235 assert_eq!(bar.open, dec!(50200));
236 assert_eq!(bar.bar_start_ms, 120_000);
237 }
238
239 #[test]
240 fn test_ohlcv_aggregator_flush_marks_complete() {
241 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
242 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 60_000)).unwrap();
243 let flushed = agg.flush().unwrap();
244 assert!(flushed.is_complete);
245 assert!(agg.current_bar().is_none());
246 }
247
248 #[test]
249 fn test_ohlcv_aggregator_flush_empty_returns_none() {
250 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
251 assert!(agg.flush().is_none());
252 }
253
254 #[test]
255 fn test_ohlcv_aggregator_wrong_symbol_returns_error() {
256 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
257 let tick = make_tick("ETH-USD", dec!(3000), dec!(1), 60_000);
258 let result = agg.feed(&tick);
259 assert!(matches!(result, Err(StreamError::ParseError { .. })));
260 }
261
262 #[test]
263 fn test_ohlcv_aggregator_volume_accumulates() {
264 let mut agg = agg("BTC-USD", Timeframe::Minutes(1));
265 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1.5), 60_000)).unwrap();
266 agg.feed(&make_tick("BTC-USD", dec!(50100), dec!(2.5), 60_100)).unwrap();
267 let bar = agg.current_bar().unwrap();
268 assert_eq!(bar.volume, dec!(4));
269 }
270
271 #[test]
272 fn test_ohlcv_bar_symbol_and_timeframe() {
273 let mut agg = agg("BTC-USD", Timeframe::Minutes(5));
274 agg.feed(&make_tick("BTC-USD", dec!(50000), dec!(1), 300_000)).unwrap();
275 let bar = agg.current_bar().unwrap();
276 assert_eq!(bar.symbol, "BTC-USD");
277 assert_eq!(bar.timeframe, Timeframe::Minutes(5));
278 }
279
280 #[test]
281 fn test_ohlcv_aggregator_symbol_accessor() {
282 let agg = agg("ETH-USD", Timeframe::Hours(1));
283 assert_eq!(agg.symbol(), "ETH-USD");
284 assert_eq!(agg.timeframe(), Timeframe::Hours(1));
285 }
286}