1use std::collections::BTreeMap;
2
3use crate::chart::Basis;
4use crate::chart::heatmap::HeatmapDataPoint;
5use crate::chart::kline::{ClusterKind, KlineDataPoint, KlineTrades, NPoc};
6
7use exchange::util::{Price, PriceStep};
8use exchange::{Kline, Timeframe, Trade};
9
10pub trait DataPoint {
11 fn add_trade(&mut self, trade: &Trade, step: PriceStep);
12
13 fn clear_trades(&mut self);
14
15 fn last_trade_time(&self) -> Option<u64>;
16
17 fn first_trade_time(&self) -> Option<u64>;
18
19 fn last_price(&self) -> Price;
20
21 fn kline(&self) -> Option<&Kline>;
22
23 fn value_high(&self) -> Price;
24
25 fn value_low(&self) -> Price;
26}
27
28pub struct TimeSeries<D: DataPoint> {
29 pub datapoints: BTreeMap<u64, D>,
30 pub interval: Timeframe,
31 pub tick_size: PriceStep,
32}
33
34impl<D: DataPoint> TimeSeries<D> {
35 pub fn base_price(&self) -> Price {
36 self.datapoints
37 .values()
38 .last()
39 .map_or(Price::from_f32(0.0), DataPoint::last_price)
40 }
41
42 pub fn latest_timestamp(&self) -> Option<u64> {
43 self.datapoints.keys().last().copied()
44 }
45
46 pub fn latest_kline(&self) -> Option<&Kline> {
47 self.datapoints.values().last().and_then(|dp| dp.kline())
48 }
49
50 pub fn price_scale(&self, lookback: usize) -> (Price, Price) {
51 let mut iter = self.datapoints.iter().rev().take(lookback);
52
53 if let Some((_, first)) = iter.next() {
54 let mut high = first.value_high();
55 let mut low = first.value_low();
56
57 for (_, dp) in iter {
58 let value_high = dp.value_high();
59 let value_low = dp.value_low();
60 if value_high > high {
61 high = value_high;
62 }
63 if value_low < low {
64 low = value_low;
65 }
66 }
67
68 (high, low)
69 } else {
70 (Price::from_f32(0.0), Price::from_f32(0.0))
71 }
72 }
73
74 pub fn volume_data<'a>(&'a self) -> BTreeMap<u64, (f32, f32)>
75 where
76 BTreeMap<u64, (f32, f32)>: From<&'a TimeSeries<D>>,
77 {
78 self.into()
79 }
80
81 pub fn timerange(&self) -> (u64, u64) {
82 let earliest = self.datapoints.keys().next().copied().unwrap_or(0);
83 let latest = self.datapoints.keys().last().copied().unwrap_or(0);
84
85 (earliest, latest)
86 }
87
88 pub fn min_max_price_in_range_prices(
89 &self,
90 earliest: u64,
91 latest: u64,
92 ) -> Option<(Price, Price)> {
93 let mut it = self.datapoints.range(earliest..=latest);
94
95 let (_, first) = it.next()?;
96 let mut min_price = first.value_low();
97 let mut max_price = first.value_high();
98
99 for (_, dp) in it {
100 let low = dp.value_low();
101 let high = dp.value_high();
102 if low < min_price {
103 min_price = low;
104 }
105 if high > max_price {
106 max_price = high;
107 }
108 }
109
110 Some((min_price, max_price))
111 }
112
113 pub fn min_max_price_in_range(&self, earliest: u64, latest: u64) -> Option<(f32, f32)> {
114 self.min_max_price_in_range_prices(earliest, latest)
115 .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
116 }
117
118 pub fn clear_trades(&mut self) {
119 for data_point in self.datapoints.values_mut() {
120 data_point.clear_trades();
121 }
122 }
123
124 pub fn check_kline_integrity(
125 &self,
126 earliest: u64,
127 latest: u64,
128 interval: u64,
129 ) -> Option<Vec<u64>> {
130 let mut time = earliest;
131 let mut missing_count = 0;
132
133 while time < latest {
134 if !self.datapoints.contains_key(&time) {
135 missing_count += 1;
136 break;
137 }
138 time += interval;
139 }
140
141 if missing_count > 0 {
142 let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
143 let mut time = earliest;
144
145 while time < latest {
146 if !self.datapoints.contains_key(&time) {
147 missing_keys.push(time);
148 }
149 time += interval;
150 }
151
152 log::warn!(
153 "Integrity check failed: missing {} klines",
154 missing_keys.len()
155 );
156 return Some(missing_keys);
157 }
158
159 None
160 }
161}
162
163impl TimeSeries<KlineDataPoint> {
164 pub fn new(interval: Timeframe, tick_size: PriceStep, klines: &[Kline]) -> Self {
165 let mut timeseries = Self {
166 datapoints: BTreeMap::new(),
167 interval,
168 tick_size,
169 };
170
171 timeseries.insert_klines(klines);
172 timeseries
173 }
174
175 pub fn with_trades(&self, trades: &[Trade]) -> TimeSeries<KlineDataPoint> {
176 let mut new_series = Self {
177 datapoints: self.datapoints.clone(),
178 interval: self.interval,
179 tick_size: self.tick_size,
180 };
181
182 new_series.insert_trades_or_create_bucket(trades);
183 new_series
184 }
185
186 pub fn insert_klines(&mut self, klines: &[Kline]) {
187 for kline in klines {
188 let entry = self
189 .datapoints
190 .entry(kline.time)
191 .or_insert_with(|| KlineDataPoint {
192 kline: *kline,
193 footprint: KlineTrades::new(),
194 });
195
196 entry.kline = *kline;
197 }
198
199 self.update_poc_status();
200 }
201
202 pub fn insert_trades_or_create_bucket(&mut self, buffer: &[Trade]) {
203 if buffer.is_empty() {
204 return;
205 }
206 let aggr_time = self.interval.to_milliseconds();
207 let mut updated_times = Vec::new();
208
209 buffer.iter().for_each(|trade| {
210 let rounded_time = (trade.time / aggr_time) * aggr_time;
211
212 if !updated_times.contains(&rounded_time) {
213 updated_times.push(rounded_time);
214 }
215
216 let entry = self
217 .datapoints
218 .entry(rounded_time)
219 .or_insert_with(|| KlineDataPoint {
220 kline: Kline {
221 time: rounded_time,
222 open: trade.price,
223 high: trade.price,
224 low: trade.price,
225 close: trade.price,
226 volume: (0.0, 0.0),
227 },
228 footprint: KlineTrades::new(),
229 });
230
231 entry.add_trade(trade, self.tick_size);
232 });
233
234 for time in updated_times {
235 if let Some(data_point) = self.datapoints.get_mut(&time) {
236 data_point.calculate_poc();
237 }
238 }
239 }
240
241 pub fn insert_trades_existing_buckets(&mut self, buffer: &[Trade]) {
242 if buffer.is_empty() {
243 return;
244 }
245 let aggr_time = self.interval.to_milliseconds();
246 let mut updated_times: Vec<u64> = Vec::new();
247
248 for trade in buffer {
249 let rounded_time = (trade.time / aggr_time) * aggr_time;
250
251 if let Some(entry) = self.datapoints.get_mut(&rounded_time) {
252 if !updated_times.contains(&rounded_time) {
253 updated_times.push(rounded_time);
254 }
255 entry.add_trade(trade, self.tick_size);
256 }
257 }
258
259 for time in updated_times {
260 if let Some(data_point) = self.datapoints.get_mut(&time) {
261 data_point.calculate_poc();
262 }
263 }
264 }
265
266 pub fn change_tick_size(&mut self, tick_size: f32, raw_trades: &[Trade]) {
267 self.tick_size = PriceStep::from_f32(tick_size);
268 self.clear_trades();
269
270 if !raw_trades.is_empty() {
271 self.insert_trades_existing_buckets(raw_trades);
272 }
273 }
274
275 pub fn update_poc_status(&mut self) {
276 let updates = self
277 .datapoints
278 .iter()
279 .filter_map(|(&time, dp)| dp.poc_price().map(|price| (time, price)))
280 .collect::<Vec<_>>();
281
282 for (current_time, poc_price) in updates {
283 let mut npoc = NPoc::default();
284
285 for (&next_time, next_dp) in self.datapoints.range((current_time + 1)..) {
286 let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
287 let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
288
289 if next_dp_low <= poc_price && next_dp_high >= poc_price {
290 npoc.filled(next_time);
291 break;
292 } else {
293 npoc.unfilled();
294 }
295 }
296
297 if let Some(data_point) = self.datapoints.get_mut(¤t_time) {
298 data_point.set_poc_status(npoc);
299 }
300 }
301 }
302
303 pub fn suggest_trade_fetch_range(
304 &self,
305 visible_earliest: u64,
306 visible_latest: u64,
307 ) -> Option<(u64, u64)> {
308 if self.datapoints.is_empty() {
309 return None;
310 }
311
312 self.find_trade_gap()
313 .and_then(|(last_t_before_gap, first_t_after_gap)| {
314 if last_t_before_gap.is_none() && first_t_after_gap.is_none() {
315 return None;
316 }
317 let (data_earliest, data_latest) = self.timerange();
318
319 let fetch_from = last_t_before_gap
320 .map_or(data_earliest, |t| t.saturating_add(1))
321 .max(visible_earliest);
322 let fetch_to = first_t_after_gap
323 .map_or(data_latest, |t| t.saturating_sub(1))
324 .min(visible_latest);
325
326 if fetch_from < fetch_to {
327 Some((fetch_from, fetch_to))
328 } else {
329 None
330 }
331 })
332 }
333
334 fn find_trade_gap(&self) -> Option<(Option<u64>, Option<u64>)> {
335 let empty_kline_time = self
336 .datapoints
337 .iter()
338 .rev()
339 .find(|(_, dp)| dp.footprint.trades.is_empty())
340 .map(|(&time, _)| time);
341
342 if let Some(target_time) = empty_kline_time {
343 let last_t_before_gap = self
344 .datapoints
345 .range(..target_time)
346 .rev()
347 .find_map(|(_, dp)| dp.last_trade_time());
348
349 let first_t_after_gap = self
350 .datapoints
351 .range(target_time + 1..)
352 .find_map(|(_, dp)| dp.first_trade_time());
353
354 Some((last_t_before_gap, first_t_after_gap))
355 } else {
356 None
357 }
358 }
359
360 pub fn max_qty_ts_range(
361 &self,
362 cluster_kind: ClusterKind,
363 earliest: u64,
364 latest: u64,
365 highest: Price,
366 lowest: Price,
367 ) -> f32 {
368 let mut max_cluster_qty: f32 = 0.0;
369
370 self.datapoints
371 .range(earliest..=latest)
372 .for_each(|(_, dp)| {
373 max_cluster_qty =
374 max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
375 });
376
377 max_cluster_qty
378 }
379}
380
381impl TimeSeries<HeatmapDataPoint> {
382 pub fn new(basis: Basis, tick_size: PriceStep) -> Self {
383 let timeframe = match basis {
384 Basis::Time(interval) => interval,
385 Basis::Tick(_) => unimplemented!(),
386 };
387
388 Self {
389 datapoints: BTreeMap::new(),
390 interval: timeframe,
391 tick_size,
392 }
393 }
394
395 pub fn max_trade_qty_and_aggr_volume(&self, earliest: u64, latest: u64) -> (f32, f32) {
396 let mut max_trade_qty = 0.0f32;
397 let mut max_aggr_volume = 0.0f32;
398
399 self.datapoints
400 .range(earliest..=latest)
401 .for_each(|(_, dp)| {
402 let (mut buy_volume, mut sell_volume) = (0.0, 0.0);
403
404 dp.grouped_trades.iter().for_each(|trade| {
405 max_trade_qty = max_trade_qty.max(trade.qty);
406
407 if trade.is_sell {
408 sell_volume += trade.qty;
409 } else {
410 buy_volume += trade.qty;
411 }
412 });
413
414 max_aggr_volume = max_aggr_volume.max(buy_volume + sell_volume);
415 });
416
417 (max_trade_qty, max_aggr_volume)
418 }
419}
420
421impl From<&TimeSeries<KlineDataPoint>> for BTreeMap<u64, (f32, f32)> {
422 fn from(timeseries: &TimeSeries<KlineDataPoint>) -> Self {
424 timeseries
425 .datapoints
426 .iter()
427 .map(|(time, dp)| (*time, (dp.kline.volume.0, dp.kline.volume.1)))
428 .collect()
429 }
430}