Skip to main content

flowsurface_data/aggr/
time.rs

1use std::collections::BTreeMap;
2
3use crate::chart::Basis;
4use crate::chart::heatmap::HeatmapDataPoint;
5use crate::chart::kline::{ClusterKind, KlineDataPoint, KlineTrades, NPoc};
6
7use exchange::unit::{Price, PriceStep, Qty};
8use exchange::{Kline, Timeframe, Trade, Volume};
9
10pub trait DataPoint {
11    fn add_trade(&mut self, trade: &Trade, step: PriceStep);
12
13    fn clear_trades(&mut self);
14
15    fn last_trade_time(&self) -> Option<u64>;
16
17    fn first_trade_time(&self) -> Option<u64>;
18
19    fn last_price(&self) -> Price;
20
21    fn kline(&self) -> Option<&Kline>;
22
23    fn value_high(&self) -> Price;
24
25    fn value_low(&self) -> Price;
26}
27
28pub struct TimeSeries<D: DataPoint> {
29    pub datapoints: BTreeMap<u64, D>,
30    pub interval: Timeframe,
31    pub tick_size: PriceStep,
32}
33
34impl<D: DataPoint> TimeSeries<D> {
35    pub fn base_price(&self) -> Price {
36        self.datapoints
37            .values()
38            .last()
39            .map_or(Price::from_f32(0.0), DataPoint::last_price)
40    }
41
42    pub fn latest_timestamp(&self) -> Option<u64> {
43        self.datapoints.keys().last().copied()
44    }
45
46    pub fn latest_kline(&self) -> Option<&Kline> {
47        self.datapoints.values().last().and_then(|dp| dp.kline())
48    }
49
50    pub fn price_scale(&self, lookback: usize) -> (Price, Price) {
51        let mut iter = self.datapoints.iter().rev().take(lookback);
52
53        if let Some((_, first)) = iter.next() {
54            let mut high = first.value_high();
55            let mut low = first.value_low();
56
57            for (_, dp) in iter {
58                let value_high = dp.value_high();
59                let value_low = dp.value_low();
60                if value_high > high {
61                    high = value_high;
62                }
63                if value_low < low {
64                    low = value_low;
65                }
66            }
67
68            (high, low)
69        } else {
70            (Price::from_f32(0.0), Price::from_f32(0.0))
71        }
72    }
73
74    pub fn volume_data<'a>(&'a self) -> BTreeMap<u64, exchange::Volume>
75    where
76        BTreeMap<u64, exchange::Volume>: From<&'a TimeSeries<D>>,
77    {
78        self.into()
79    }
80
81    pub fn timerange(&self) -> (u64, u64) {
82        let earliest = self.datapoints.keys().next().copied().unwrap_or(0);
83        let latest = self.datapoints.keys().last().copied().unwrap_or(0);
84
85        (earliest, latest)
86    }
87
88    pub fn min_max_price_in_range_prices(
89        &self,
90        earliest: u64,
91        latest: u64,
92    ) -> Option<(Price, Price)> {
93        let mut it = self.datapoints.range(earliest..=latest);
94
95        let (_, first) = it.next()?;
96        let mut min_price = first.value_low();
97        let mut max_price = first.value_high();
98
99        for (_, dp) in it {
100            let low = dp.value_low();
101            let high = dp.value_high();
102            if low < min_price {
103                min_price = low;
104            }
105            if high > max_price {
106                max_price = high;
107            }
108        }
109
110        Some((min_price, max_price))
111    }
112
113    pub fn min_max_price_in_range(&self, earliest: u64, latest: u64) -> Option<(f32, f32)> {
114        self.min_max_price_in_range_prices(earliest, latest)
115            .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
116    }
117
118    /// Ensures a datapoint bucket exists at `rounded_t` and ingests all trades into it.
119    pub fn ingest_trades_bucket(&mut self, rounded_t: u64, trades: &[Trade], step: PriceStep)
120    where
121        D: Default,
122    {
123        let bucket = self.datapoints.entry(rounded_t).or_default();
124
125        for trade in trades {
126            bucket.add_trade(trade, step);
127        }
128    }
129
130    pub fn clear_trades(&mut self) {
131        for data_point in self.datapoints.values_mut() {
132            data_point.clear_trades();
133        }
134    }
135
136    fn align_down_to_phase(time: u64, phase: u64, interval: u64) -> u64 {
137        if time >= phase {
138            time.saturating_sub((time - phase) % interval)
139        } else {
140            phase
141        }
142    }
143
144    fn check_kline_integrity_range(
145        &self,
146        earliest: u64,
147        latest: u64,
148        interval: u64,
149    ) -> Option<Vec<u64>> {
150        let mut time = earliest;
151        let mut missing_count = 0;
152
153        while time < latest {
154            if !self.datapoints.contains_key(&time) {
155                missing_count += 1;
156                break;
157            }
158            time += interval;
159        }
160
161        if missing_count > 0 {
162            let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
163            let mut time = earliest;
164
165            while time < latest {
166                if !self.datapoints.contains_key(&time) {
167                    missing_keys.push(time);
168                }
169                time += interval;
170            }
171
172            log::debug!(
173                "Integrity check failed: missing {} klines",
174                missing_keys.len()
175            );
176            return Some(missing_keys);
177        }
178
179        None
180    }
181
182    pub fn check_kline_integrity(&self, earliest: u64, latest: u64) -> Option<Vec<u64>> {
183        if self.datapoints.is_empty() {
184            return None;
185        }
186
187        let interval = self.interval.to_milliseconds();
188        if interval == 0 {
189            return None;
190        }
191
192        let (series_earliest, series_latest) = self.timerange();
193        let phase = series_earliest % interval;
194
195        let check_earliest =
196            Self::align_down_to_phase(earliest.max(series_earliest), phase, interval)
197                .max(series_earliest);
198        let check_latest = Self::align_down_to_phase(latest.min(series_latest), phase, interval)
199            .min(series_latest);
200
201        if check_earliest < check_latest {
202            self.check_kline_integrity_range(check_earliest, check_latest, interval)
203        } else {
204            None
205        }
206    }
207}
208
209impl TimeSeries<KlineDataPoint> {
210    pub fn new(interval: Timeframe, tick_size: PriceStep, klines: &[Kline]) -> Self {
211        let mut timeseries = Self {
212            datapoints: BTreeMap::new(),
213            interval,
214            tick_size,
215        };
216
217        timeseries.insert_klines(klines);
218        timeseries
219    }
220
221    pub fn with_trades(&self, trades: &[Trade]) -> TimeSeries<KlineDataPoint> {
222        let mut new_series = Self {
223            datapoints: self.datapoints.clone(),
224            interval: self.interval,
225            tick_size: self.tick_size,
226        };
227
228        new_series.insert_trades_or_create_bucket(trades);
229        new_series
230    }
231
232    pub fn insert_klines(&mut self, klines: &[Kline]) {
233        for kline in klines {
234            let entry = self
235                .datapoints
236                .entry(kline.time)
237                .or_insert_with(|| KlineDataPoint {
238                    kline: *kline,
239                    footprint: KlineTrades::new(),
240                });
241
242            entry.kline = *kline;
243        }
244
245        self.update_poc_status();
246    }
247
248    pub fn insert_trades_or_create_bucket(&mut self, buffer: &[Trade]) {
249        if buffer.is_empty() {
250            return;
251        }
252        let aggr_time = self.interval.to_milliseconds();
253        let mut updated_times = Vec::new();
254
255        buffer.iter().for_each(|trade| {
256            let rounded_time = (trade.time / aggr_time) * aggr_time;
257
258            if !updated_times.contains(&rounded_time) {
259                updated_times.push(rounded_time);
260            }
261
262            let entry = self
263                .datapoints
264                .entry(rounded_time)
265                .or_insert_with(|| KlineDataPoint {
266                    kline: Kline {
267                        time: rounded_time,
268                        open: trade.price,
269                        high: trade.price,
270                        low: trade.price,
271                        close: trade.price,
272                        volume: Volume::empty_buy_sell(),
273                    },
274                    footprint: KlineTrades::new(),
275                });
276
277            entry.add_trade(trade, self.tick_size);
278        });
279
280        for time in updated_times {
281            if let Some(data_point) = self.datapoints.get_mut(&time) {
282                data_point.calculate_poc();
283            }
284        }
285    }
286
287    pub fn insert_trades_existing_buckets(&mut self, buffer: &[Trade]) {
288        if buffer.is_empty() {
289            return;
290        }
291        let aggr_time = self.interval.to_milliseconds();
292        let mut updated_times: Vec<u64> = Vec::new();
293
294        for trade in buffer {
295            let rounded_time = (trade.time / aggr_time) * aggr_time;
296
297            if let Some(entry) = self.datapoints.get_mut(&rounded_time) {
298                if !updated_times.contains(&rounded_time) {
299                    updated_times.push(rounded_time);
300                }
301                entry.add_trade(trade, self.tick_size);
302            }
303        }
304
305        for time in updated_times {
306            if let Some(data_point) = self.datapoints.get_mut(&time) {
307                data_point.calculate_poc();
308            }
309        }
310    }
311
312    pub fn change_tick_size(&mut self, tick_size: PriceStep, raw_trades: &[Trade]) {
313        self.tick_size = tick_size;
314
315        self.clear_trades();
316
317        if !raw_trades.is_empty() {
318            self.insert_trades_existing_buckets(raw_trades);
319        }
320    }
321
322    pub fn update_poc_status(&mut self) {
323        let updates = self
324            .datapoints
325            .iter()
326            .filter_map(|(&time, dp)| dp.poc_price().map(|price| (time, price)))
327            .collect::<Vec<_>>();
328
329        for (current_time, poc_price) in updates {
330            let mut npoc = NPoc::default();
331
332            for (&next_time, next_dp) in self.datapoints.range((current_time + 1)..) {
333                let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
334                let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
335
336                if next_dp_low <= poc_price && next_dp_high >= poc_price {
337                    npoc.filled(next_time);
338                    break;
339                } else {
340                    npoc.unfilled();
341                }
342            }
343
344            if let Some(data_point) = self.datapoints.get_mut(&current_time) {
345                data_point.set_poc_status(npoc);
346            }
347        }
348    }
349
350    pub fn suggest_trade_fetch_range(
351        &self,
352        visible_earliest: u64,
353        visible_latest: u64,
354    ) -> Option<(u64, u64)> {
355        if self.datapoints.is_empty() {
356            return None;
357        }
358
359        self.find_trade_gap()
360            .and_then(|(last_t_before_gap, first_t_after_gap)| {
361                if last_t_before_gap.is_none() && first_t_after_gap.is_none() {
362                    return None;
363                }
364                let (data_earliest, data_latest) = self.timerange();
365
366                let fetch_from = last_t_before_gap
367                    .map_or(data_earliest, |t| t.saturating_add(1))
368                    .max(visible_earliest);
369                let fetch_to = first_t_after_gap
370                    .map_or(data_latest, |t| t.saturating_sub(1))
371                    .min(visible_latest);
372
373                if fetch_from < fetch_to {
374                    Some((fetch_from, fetch_to))
375                } else {
376                    None
377                }
378            })
379    }
380
381    fn find_trade_gap(&self) -> Option<(Option<u64>, Option<u64>)> {
382        let empty_kline_time = self
383            .datapoints
384            .iter()
385            .rev()
386            .find(|(_, dp)| dp.footprint.trades.is_empty())
387            .map(|(&time, _)| time);
388
389        if let Some(target_time) = empty_kline_time {
390            let last_t_before_gap = self
391                .datapoints
392                .range(..target_time)
393                .rev()
394                .find_map(|(_, dp)| dp.last_trade_time());
395
396            let first_t_after_gap = self
397                .datapoints
398                .range(target_time + 1..)
399                .find_map(|(_, dp)| dp.first_trade_time());
400
401            Some((last_t_before_gap, first_t_after_gap))
402        } else {
403            None
404        }
405    }
406
407    pub fn max_qty_ts_range(
408        &self,
409        cluster_kind: ClusterKind,
410        earliest: u64,
411        latest: u64,
412        highest: Price,
413        lowest: Price,
414    ) -> Qty {
415        let mut max_cluster_qty: Qty = Qty::default();
416
417        self.datapoints
418            .range(earliest..=latest)
419            .for_each(|(_, dp)| {
420                max_cluster_qty =
421                    max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
422            });
423
424        max_cluster_qty
425    }
426}
427
428impl TimeSeries<HeatmapDataPoint> {
429    pub fn new(basis: Basis, tick_size: PriceStep) -> Self {
430        let timeframe = match basis {
431            Basis::Time(interval) => interval,
432            Basis::Tick(_) => unimplemented!(),
433        };
434
435        Self {
436            datapoints: BTreeMap::new(),
437            interval: timeframe,
438            tick_size,
439        }
440    }
441
442    pub fn max_trade_qty_and_aggr_volume(&self, earliest: u64, latest: u64) -> (Qty, Qty) {
443        let mut max_trade_qty = Qty::ZERO;
444        let mut max_aggr_volume = Qty::ZERO;
445
446        self.datapoints
447            .range(earliest..=latest)
448            .for_each(|(_, dp)| {
449                let (mut buy_volume, mut sell_volume) = (Qty::ZERO, Qty::ZERO);
450
451                dp.grouped_trades.iter().for_each(|trade| {
452                    let trade_qty = trade.qty;
453                    max_trade_qty = max_trade_qty.max(trade_qty);
454
455                    if trade.is_sell {
456                        sell_volume += trade_qty;
457                    } else {
458                        buy_volume += trade_qty;
459                    }
460                });
461
462                max_aggr_volume = max_aggr_volume.max(buy_volume + sell_volume);
463            });
464
465        (max_trade_qty, max_aggr_volume)
466    }
467
468    pub fn max_trade_qty_in_range(
469        &self,
470        earliest: u64,
471        latest: u64,
472        highest: Price,
473        lowest: Price,
474    ) -> Qty {
475        let mut max_trade_qty = Qty::default();
476
477        self.datapoints
478            .range(earliest..=latest)
479            .for_each(|(_, dp)| {
480                dp.grouped_trades.iter().for_each(|trade| {
481                    if trade.price >= lowest && trade.price <= highest {
482                        max_trade_qty = max_trade_qty.max(trade.qty);
483                    }
484                });
485            });
486
487        max_trade_qty
488    }
489}
490
491impl From<&TimeSeries<KlineDataPoint>> for BTreeMap<u64, exchange::Volume> {
492    /// Converts datapoints into a map of timestamps and volume data
493    fn from(timeseries: &TimeSeries<KlineDataPoint>) -> Self {
494        timeseries
495            .datapoints
496            .iter()
497            .map(|(time, dp)| (*time, dp.kline.volume))
498            .collect()
499    }
500}