1use crate::errors::ProcessingError;
5use crate::fixed_point::FixedPoint;
6use crate::types::{AggTrade, RangeBar};
7
8#[derive(Debug, Clone)]
10pub(crate) struct InternalRangeBar {
11 open_time: i64,
12 close_time: i64,
13 open: FixedPoint,
14 high: FixedPoint,
15 low: FixedPoint,
16 close: FixedPoint,
17 volume: i128,
20 turnover: i128,
21 individual_trade_count: i64,
22 agg_record_count: u32,
23 first_trade_id: i64,
24 last_trade_id: i64,
25 first_agg_trade_id: i64,
27 last_agg_trade_id: i64,
29 buy_volume: i128,
32 sell_volume: i128,
35 buy_trade_count: i64,
37 sell_trade_count: i64,
39 vwap: FixedPoint,
41 buy_turnover: i128,
43 sell_turnover: i128,
45}
46
47pub struct ExportRangeBarProcessor {
52 threshold_decimal_bps: u32,
53 current_bar: Option<InternalRangeBar>,
54 completed_bars: Vec<RangeBar>,
55 prevent_same_timestamp_close: bool,
57 defer_open: bool,
59}
60
61impl ExportRangeBarProcessor {
62 pub fn new(threshold_decimal_bps: u32) -> Result<Self, ProcessingError> {
78 Self::with_options(threshold_decimal_bps, true)
79 }
80
81 pub fn with_options(
83 threshold_decimal_bps: u32,
84 prevent_same_timestamp_close: bool,
85 ) -> Result<Self, ProcessingError> {
86 if threshold_decimal_bps < 1 {
90 return Err(ProcessingError::InvalidThreshold {
91 threshold_decimal_bps,
92 });
93 }
94 if threshold_decimal_bps > 100_000 {
95 return Err(ProcessingError::InvalidThreshold {
96 threshold_decimal_bps,
97 });
98 }
99
100 Ok(Self {
101 threshold_decimal_bps,
102 current_bar: None,
103 completed_bars: Vec::new(),
104 prevent_same_timestamp_close,
105 defer_open: false,
106 })
107 }
108
109 pub fn process_trades_continuously(&mut self, trades: &[AggTrade]) {
112 for trade in trades {
113 self.process_single_trade_fixed_point(trade);
114 }
115 }
116
117 fn process_single_trade_fixed_point(&mut self, trade: &AggTrade) {
119 if self.defer_open {
122 self.defer_open = false;
123 self.current_bar = None; }
126
127 if self.current_bar.is_none() {
128 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
130
131 self.current_bar = Some(InternalRangeBar {
132 open_time: trade.timestamp,
133 close_time: trade.timestamp,
134 open: trade.price,
135 high: trade.price,
136 low: trade.price,
137 close: trade.price,
138 volume: trade.volume.0 as i128,
140 turnover: trade_turnover,
141 individual_trade_count: 1,
142 agg_record_count: 1,
143 first_trade_id: trade.first_trade_id,
144 last_trade_id: trade.last_trade_id,
145 first_agg_trade_id: trade.agg_trade_id,
147 last_agg_trade_id: trade.agg_trade_id,
148 buy_volume: if trade.is_buyer_maker {
150 0i128
151 } else {
152 trade.volume.0 as i128
153 },
154 sell_volume: if trade.is_buyer_maker {
155 trade.volume.0 as i128
156 } else {
157 0i128
158 },
159 buy_trade_count: if trade.is_buyer_maker { 0 } else { 1 },
160 sell_trade_count: if trade.is_buyer_maker { 1 } else { 0 },
161 vwap: trade.price,
162 buy_turnover: if trade.is_buyer_maker {
163 0
164 } else {
165 trade_turnover
166 },
167 sell_turnover: if trade.is_buyer_maker {
168 trade_turnover
169 } else {
170 0
171 },
172 });
173 return;
174 }
175
176 let bar = self.current_bar.as_mut().unwrap();
179 let trade_turnover = (trade.price.to_f64() * trade.volume.to_f64()) as i128;
180
181 let price_val = trade.price.0;
184 let bar_open_val = bar.open.0;
185 let threshold_decimal_bps = self.threshold_decimal_bps as i64;
186 let upper_threshold = bar_open_val + (bar_open_val * threshold_decimal_bps) / 100_000;
187 let lower_threshold = bar_open_val - (bar_open_val * threshold_decimal_bps) / 100_000;
188
189 bar.close_time = trade.timestamp;
191 bar.close = trade.price;
192 bar.volume += trade.volume.0 as i128; bar.turnover += trade_turnover;
194 bar.individual_trade_count += 1;
195 bar.agg_record_count += 1;
196 bar.last_trade_id = trade.last_trade_id;
197 bar.last_agg_trade_id = trade.agg_trade_id; if price_val > bar.high.0 {
201 bar.high = trade.price;
202 }
203 if price_val < bar.low.0 {
204 bar.low = trade.price;
205 }
206
207 if trade.is_buyer_maker {
209 bar.sell_volume += trade.volume.0 as i128; bar.sell_turnover += trade_turnover;
211 bar.sell_trade_count += 1;
212 } else {
213 bar.buy_volume += trade.volume.0 as i128; bar.buy_turnover += trade_turnover;
215 bar.buy_trade_count += 1;
216 }
217
218 let price_breaches = price_val >= upper_threshold || price_val <= lower_threshold;
220
221 let timestamp_allows_close =
223 !self.prevent_same_timestamp_close || trade.timestamp != bar.open_time;
224
225 if price_breaches && timestamp_allows_close {
226 let completed_bar = self.current_bar.take().unwrap();
229
230 let mut export_bar = RangeBar {
233 open_time: completed_bar.open_time,
234 close_time: completed_bar.close_time,
235 open: completed_bar.open,
236 high: completed_bar.high,
237 low: completed_bar.low,
238 close: completed_bar.close,
239 volume: completed_bar.volume,
240 turnover: completed_bar.turnover,
241 individual_trade_count: completed_bar.individual_trade_count as u32,
242 agg_record_count: completed_bar.agg_record_count,
243 first_trade_id: completed_bar.first_trade_id,
244 last_trade_id: completed_bar.last_trade_id,
245 first_agg_trade_id: completed_bar.first_agg_trade_id, last_agg_trade_id: completed_bar.last_agg_trade_id,
247 buy_volume: completed_bar.buy_volume,
248 sell_volume: completed_bar.sell_volume,
249 buy_trade_count: completed_bar.buy_trade_count as u32,
250 sell_trade_count: completed_bar.sell_trade_count as u32,
251 vwap: completed_bar.vwap,
252 buy_turnover: completed_bar.buy_turnover,
253 sell_turnover: completed_bar.sell_turnover,
254 ..Default::default() };
256
257 export_bar.compute_microstructure_features();
259
260 self.completed_bars.push(export_bar);
261
262 self.current_bar = None;
265 self.defer_open = true;
266 }
267 }
268
269 pub fn get_all_completed_bars(&mut self) -> Vec<RangeBar> {
272 std::mem::take(&mut self.completed_bars)
273 }
274
275 pub fn get_incomplete_bar(&mut self) -> Option<RangeBar> {
277 self.current_bar.as_ref().map(|incomplete| {
278 let mut bar = RangeBar {
279 open_time: incomplete.open_time,
280 close_time: incomplete.close_time,
281 open: incomplete.open,
282 high: incomplete.high,
283 low: incomplete.low,
284 close: incomplete.close,
285 volume: incomplete.volume,
286 turnover: incomplete.turnover,
287
288 individual_trade_count: incomplete.individual_trade_count as u32,
290 agg_record_count: incomplete.agg_record_count,
291 first_trade_id: incomplete.first_trade_id,
292 last_trade_id: incomplete.last_trade_id,
293 first_agg_trade_id: incomplete.first_agg_trade_id,
294 last_agg_trade_id: incomplete.last_agg_trade_id,
295 data_source: crate::types::DataSource::default(),
296
297 buy_volume: incomplete.buy_volume,
299 sell_volume: incomplete.sell_volume,
300 buy_trade_count: incomplete.buy_trade_count as u32,
301 sell_trade_count: incomplete.sell_trade_count as u32,
302 vwap: incomplete.vwap,
303 buy_turnover: incomplete.buy_turnover,
304 sell_turnover: incomplete.sell_turnover,
305
306 ..Default::default()
308 };
309 bar.compute_microstructure_features();
311 bar
312 })
313 }
314}