1use std::collections::BTreeMap;
2
3use crate::chart::Basis;
4use crate::chart::heatmap::HeatmapDataPoint;
5use crate::chart::kline::{ClusterKind, KlineDataPoint, KlineTrades, NPoc};
6
7use exchange::unit::{Price, PriceStep, Qty};
8use exchange::{Kline, Timeframe, Trade, Volume};
9
10pub trait DataPoint {
11 fn add_trade(&mut self, trade: &Trade, step: PriceStep);
12
13 fn clear_trades(&mut self);
14
15 fn last_trade_time(&self) -> Option<u64>;
16
17 fn first_trade_time(&self) -> Option<u64>;
18
19 fn last_price(&self) -> Price;
20
21 fn kline(&self) -> Option<&Kline>;
22
23 fn value_high(&self) -> Price;
24
25 fn value_low(&self) -> Price;
26}
27
28pub struct TimeSeries<D: DataPoint> {
29 pub datapoints: BTreeMap<u64, D>,
30 pub interval: Timeframe,
31 pub tick_size: PriceStep,
32}
33
34impl<D: DataPoint> TimeSeries<D> {
35 pub fn base_price(&self) -> Price {
36 self.datapoints
37 .values()
38 .last()
39 .map_or(Price::from_f32(0.0), DataPoint::last_price)
40 }
41
42 pub fn latest_timestamp(&self) -> Option<u64> {
43 self.datapoints.keys().last().copied()
44 }
45
46 pub fn latest_kline(&self) -> Option<&Kline> {
47 self.datapoints.values().last().and_then(|dp| dp.kline())
48 }
49
50 pub fn price_scale(&self, lookback: usize) -> (Price, Price) {
51 let mut iter = self.datapoints.iter().rev().take(lookback);
52
53 if let Some((_, first)) = iter.next() {
54 let mut high = first.value_high();
55 let mut low = first.value_low();
56
57 for (_, dp) in iter {
58 let value_high = dp.value_high();
59 let value_low = dp.value_low();
60 if value_high > high {
61 high = value_high;
62 }
63 if value_low < low {
64 low = value_low;
65 }
66 }
67
68 (high, low)
69 } else {
70 (Price::from_f32(0.0), Price::from_f32(0.0))
71 }
72 }
73
74 pub fn volume_data<'a>(&'a self) -> BTreeMap<u64, exchange::Volume>
75 where
76 BTreeMap<u64, exchange::Volume>: From<&'a TimeSeries<D>>,
77 {
78 self.into()
79 }
80
81 pub fn timerange(&self) -> (u64, u64) {
82 let earliest = self.datapoints.keys().next().copied().unwrap_or(0);
83 let latest = self.datapoints.keys().last().copied().unwrap_or(0);
84
85 (earliest, latest)
86 }
87
88 pub fn min_max_price_in_range_prices(
89 &self,
90 earliest: u64,
91 latest: u64,
92 ) -> Option<(Price, Price)> {
93 let mut it = self.datapoints.range(earliest..=latest);
94
95 let (_, first) = it.next()?;
96 let mut min_price = first.value_low();
97 let mut max_price = first.value_high();
98
99 for (_, dp) in it {
100 let low = dp.value_low();
101 let high = dp.value_high();
102 if low < min_price {
103 min_price = low;
104 }
105 if high > max_price {
106 max_price = high;
107 }
108 }
109
110 Some((min_price, max_price))
111 }
112
113 pub fn min_max_price_in_range(&self, earliest: u64, latest: u64) -> Option<(f32, f32)> {
114 self.min_max_price_in_range_prices(earliest, latest)
115 .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
116 }
117
118 pub fn ingest_trades_bucket(&mut self, rounded_t: u64, trades: &[Trade], step: PriceStep)
120 where
121 D: Default,
122 {
123 let bucket = self.datapoints.entry(rounded_t).or_default();
124
125 for trade in trades {
126 bucket.add_trade(trade, step);
127 }
128 }
129
130 pub fn clear_trades(&mut self) {
131 for data_point in self.datapoints.values_mut() {
132 data_point.clear_trades();
133 }
134 }
135
136 fn align_down_to_phase(time: u64, phase: u64, interval: u64) -> u64 {
137 if time >= phase {
138 time.saturating_sub((time - phase) % interval)
139 } else {
140 phase
141 }
142 }
143
144 fn check_kline_integrity_range(
145 &self,
146 earliest: u64,
147 latest: u64,
148 interval: u64,
149 ) -> Option<Vec<u64>> {
150 let mut time = earliest;
151 let mut missing_count = 0;
152
153 while time < latest {
154 if !self.datapoints.contains_key(&time) {
155 missing_count += 1;
156 break;
157 }
158 time += interval;
159 }
160
161 if missing_count > 0 {
162 let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
163 let mut time = earliest;
164
165 while time < latest {
166 if !self.datapoints.contains_key(&time) {
167 missing_keys.push(time);
168 }
169 time += interval;
170 }
171
172 log::debug!(
173 "Integrity check failed: missing {} klines",
174 missing_keys.len()
175 );
176 return Some(missing_keys);
177 }
178
179 None
180 }
181
182 pub fn check_kline_integrity(&self, earliest: u64, latest: u64) -> Option<Vec<u64>> {
183 if self.datapoints.is_empty() {
184 return None;
185 }
186
187 let interval = self.interval.to_milliseconds();
188 if interval == 0 {
189 return None;
190 }
191
192 let (series_earliest, series_latest) = self.timerange();
193 let phase = series_earliest % interval;
194
195 let check_earliest =
196 Self::align_down_to_phase(earliest.max(series_earliest), phase, interval)
197 .max(series_earliest);
198 let check_latest = Self::align_down_to_phase(latest.min(series_latest), phase, interval)
199 .min(series_latest);
200
201 if check_earliest < check_latest {
202 self.check_kline_integrity_range(check_earliest, check_latest, interval)
203 } else {
204 None
205 }
206 }
207}
208
209impl TimeSeries<KlineDataPoint> {
210 pub fn new(interval: Timeframe, tick_size: PriceStep, klines: &[Kline]) -> Self {
211 let mut timeseries = Self {
212 datapoints: BTreeMap::new(),
213 interval,
214 tick_size,
215 };
216
217 timeseries.insert_klines(klines);
218 timeseries
219 }
220
221 pub fn with_trades(&self, trades: &[Trade]) -> TimeSeries<KlineDataPoint> {
222 let mut new_series = Self {
223 datapoints: self.datapoints.clone(),
224 interval: self.interval,
225 tick_size: self.tick_size,
226 };
227
228 new_series.insert_trades_or_create_bucket(trades);
229 new_series
230 }
231
232 pub fn insert_klines(&mut self, klines: &[Kline]) {
233 for kline in klines {
234 let entry = self
235 .datapoints
236 .entry(kline.time)
237 .or_insert_with(|| KlineDataPoint {
238 kline: *kline,
239 footprint: KlineTrades::new(),
240 });
241
242 entry.kline = *kline;
243 }
244
245 self.update_poc_status();
246 }
247
248 pub fn insert_trades_or_create_bucket(&mut self, buffer: &[Trade]) {
249 if buffer.is_empty() {
250 return;
251 }
252 let aggr_time = self.interval.to_milliseconds();
253 let mut updated_times = Vec::new();
254
255 buffer.iter().for_each(|trade| {
256 let rounded_time = (trade.time / aggr_time) * aggr_time;
257
258 if !updated_times.contains(&rounded_time) {
259 updated_times.push(rounded_time);
260 }
261
262 let entry = self
263 .datapoints
264 .entry(rounded_time)
265 .or_insert_with(|| KlineDataPoint {
266 kline: Kline {
267 time: rounded_time,
268 open: trade.price,
269 high: trade.price,
270 low: trade.price,
271 close: trade.price,
272 volume: Volume::empty_buy_sell(),
273 },
274 footprint: KlineTrades::new(),
275 });
276
277 entry.add_trade(trade, self.tick_size);
278 });
279
280 for time in updated_times {
281 if let Some(data_point) = self.datapoints.get_mut(&time) {
282 data_point.calculate_poc();
283 }
284 }
285 }
286
287 pub fn insert_trades_existing_buckets(&mut self, buffer: &[Trade]) {
288 if buffer.is_empty() {
289 return;
290 }
291 let aggr_time = self.interval.to_milliseconds();
292 let mut updated_times: Vec<u64> = Vec::new();
293
294 for trade in buffer {
295 let rounded_time = (trade.time / aggr_time) * aggr_time;
296
297 if let Some(entry) = self.datapoints.get_mut(&rounded_time) {
298 if !updated_times.contains(&rounded_time) {
299 updated_times.push(rounded_time);
300 }
301 entry.add_trade(trade, self.tick_size);
302 }
303 }
304
305 for time in updated_times {
306 if let Some(data_point) = self.datapoints.get_mut(&time) {
307 data_point.calculate_poc();
308 }
309 }
310 }
311
312 pub fn change_tick_size(&mut self, tick_size: PriceStep, raw_trades: &[Trade]) {
313 self.tick_size = tick_size;
314
315 self.clear_trades();
316
317 if !raw_trades.is_empty() {
318 self.insert_trades_existing_buckets(raw_trades);
319 }
320 }
321
322 pub fn update_poc_status(&mut self) {
323 let updates = self
324 .datapoints
325 .iter()
326 .filter_map(|(&time, dp)| dp.poc_price().map(|price| (time, price)))
327 .collect::<Vec<_>>();
328
329 for (current_time, poc_price) in updates {
330 let mut npoc = NPoc::default();
331
332 for (&next_time, next_dp) in self.datapoints.range((current_time + 1)..) {
333 let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
334 let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
335
336 if next_dp_low <= poc_price && next_dp_high >= poc_price {
337 npoc.filled(next_time);
338 break;
339 } else {
340 npoc.unfilled();
341 }
342 }
343
344 if let Some(data_point) = self.datapoints.get_mut(¤t_time) {
345 data_point.set_poc_status(npoc);
346 }
347 }
348 }
349
350 pub fn suggest_trade_fetch_range(
351 &self,
352 visible_earliest: u64,
353 visible_latest: u64,
354 ) -> Option<(u64, u64)> {
355 if self.datapoints.is_empty() {
356 return None;
357 }
358
359 self.find_trade_gap()
360 .and_then(|(last_t_before_gap, first_t_after_gap)| {
361 if last_t_before_gap.is_none() && first_t_after_gap.is_none() {
362 return None;
363 }
364 let (data_earliest, data_latest) = self.timerange();
365
366 let fetch_from = last_t_before_gap
367 .map_or(data_earliest, |t| t.saturating_add(1))
368 .max(visible_earliest);
369 let fetch_to = first_t_after_gap
370 .map_or(data_latest, |t| t.saturating_sub(1))
371 .min(visible_latest);
372
373 if fetch_from < fetch_to {
374 Some((fetch_from, fetch_to))
375 } else {
376 None
377 }
378 })
379 }
380
381 fn find_trade_gap(&self) -> Option<(Option<u64>, Option<u64>)> {
382 let empty_kline_time = self
383 .datapoints
384 .iter()
385 .rev()
386 .find(|(_, dp)| dp.footprint.trades.is_empty())
387 .map(|(&time, _)| time);
388
389 if let Some(target_time) = empty_kline_time {
390 let last_t_before_gap = self
391 .datapoints
392 .range(..target_time)
393 .rev()
394 .find_map(|(_, dp)| dp.last_trade_time());
395
396 let first_t_after_gap = self
397 .datapoints
398 .range(target_time + 1..)
399 .find_map(|(_, dp)| dp.first_trade_time());
400
401 Some((last_t_before_gap, first_t_after_gap))
402 } else {
403 None
404 }
405 }
406
407 pub fn max_qty_ts_range(
408 &self,
409 cluster_kind: ClusterKind,
410 earliest: u64,
411 latest: u64,
412 highest: Price,
413 lowest: Price,
414 ) -> Qty {
415 let mut max_cluster_qty: Qty = Qty::default();
416
417 self.datapoints
418 .range(earliest..=latest)
419 .for_each(|(_, dp)| {
420 max_cluster_qty =
421 max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
422 });
423
424 max_cluster_qty
425 }
426}
427
428impl TimeSeries<HeatmapDataPoint> {
429 pub fn new(basis: Basis, tick_size: PriceStep) -> Self {
430 let timeframe = match basis {
431 Basis::Time(interval) => interval,
432 Basis::Tick(_) => unimplemented!(),
433 };
434
435 Self {
436 datapoints: BTreeMap::new(),
437 interval: timeframe,
438 tick_size,
439 }
440 }
441
442 pub fn max_trade_qty_and_aggr_volume(&self, earliest: u64, latest: u64) -> (Qty, Qty) {
443 let mut max_trade_qty = Qty::ZERO;
444 let mut max_aggr_volume = Qty::ZERO;
445
446 self.datapoints
447 .range(earliest..=latest)
448 .for_each(|(_, dp)| {
449 let (mut buy_volume, mut sell_volume) = (Qty::ZERO, Qty::ZERO);
450
451 dp.grouped_trades.iter().for_each(|trade| {
452 let trade_qty = trade.qty;
453 max_trade_qty = max_trade_qty.max(trade_qty);
454
455 if trade.is_sell {
456 sell_volume += trade_qty;
457 } else {
458 buy_volume += trade_qty;
459 }
460 });
461
462 max_aggr_volume = max_aggr_volume.max(buy_volume + sell_volume);
463 });
464
465 (max_trade_qty, max_aggr_volume)
466 }
467
468 pub fn max_trade_qty_in_range(
469 &self,
470 earliest: u64,
471 latest: u64,
472 highest: Price,
473 lowest: Price,
474 ) -> Qty {
475 let mut max_trade_qty = Qty::default();
476
477 self.datapoints
478 .range(earliest..=latest)
479 .for_each(|(_, dp)| {
480 dp.grouped_trades.iter().for_each(|trade| {
481 if trade.price >= lowest && trade.price <= highest {
482 max_trade_qty = max_trade_qty.max(trade.qty);
483 }
484 });
485 });
486
487 max_trade_qty
488 }
489}
490
491impl From<&TimeSeries<KlineDataPoint>> for BTreeMap<u64, exchange::Volume> {
492 fn from(timeseries: &TimeSeries<KlineDataPoint>) -> Self {
494 timeseries
495 .datapoints
496 .iter()
497 .map(|(time, dp)| (*time, dp.kline.volume))
498 .collect()
499 }
500}