flowsurface_data/aggr/
time.rs

1use std::collections::BTreeMap;
2
3use crate::chart::Basis;
4use crate::chart::heatmap::HeatmapDataPoint;
5use crate::chart::kline::{ClusterKind, KlineDataPoint, KlineTrades, NPoc};
6
7use exchange::util::{Price, PriceStep};
8use exchange::{Kline, Timeframe, Trade};
9
10pub trait DataPoint {
11    fn add_trade(&mut self, trade: &Trade, step: PriceStep);
12
13    fn clear_trades(&mut self);
14
15    fn last_trade_time(&self) -> Option<u64>;
16
17    fn first_trade_time(&self) -> Option<u64>;
18
19    fn last_price(&self) -> Price;
20
21    fn kline(&self) -> Option<&Kline>;
22
23    fn value_high(&self) -> Price;
24
25    fn value_low(&self) -> Price;
26}
27
28pub struct TimeSeries<D: DataPoint> {
29    pub datapoints: BTreeMap<u64, D>,
30    pub interval: Timeframe,
31    pub tick_size: PriceStep,
32}
33
34impl<D: DataPoint> TimeSeries<D> {
35    pub fn base_price(&self) -> Price {
36        self.datapoints
37            .values()
38            .last()
39            .map_or(Price::from_f32(0.0), DataPoint::last_price)
40    }
41
42    pub fn latest_timestamp(&self) -> Option<u64> {
43        self.datapoints.keys().last().copied()
44    }
45
46    pub fn latest_kline(&self) -> Option<&Kline> {
47        self.datapoints.values().last().and_then(|dp| dp.kline())
48    }
49
50    pub fn price_scale(&self, lookback: usize) -> (Price, Price) {
51        let mut iter = self.datapoints.iter().rev().take(lookback);
52
53        if let Some((_, first)) = iter.next() {
54            let mut high = first.value_high();
55            let mut low = first.value_low();
56
57            for (_, dp) in iter {
58                let value_high = dp.value_high();
59                let value_low = dp.value_low();
60                if value_high > high {
61                    high = value_high;
62                }
63                if value_low < low {
64                    low = value_low;
65                }
66            }
67
68            (high, low)
69        } else {
70            (Price::from_f32(0.0), Price::from_f32(0.0))
71        }
72    }
73
74    pub fn volume_data<'a>(&'a self) -> BTreeMap<u64, (f32, f32)>
75    where
76        BTreeMap<u64, (f32, f32)>: From<&'a TimeSeries<D>>,
77    {
78        self.into()
79    }
80
81    pub fn timerange(&self) -> (u64, u64) {
82        let earliest = self.datapoints.keys().next().copied().unwrap_or(0);
83        let latest = self.datapoints.keys().last().copied().unwrap_or(0);
84
85        (earliest, latest)
86    }
87
88    pub fn min_max_price_in_range_prices(
89        &self,
90        earliest: u64,
91        latest: u64,
92    ) -> Option<(Price, Price)> {
93        let mut it = self.datapoints.range(earliest..=latest);
94
95        let (_, first) = it.next()?;
96        let mut min_price = first.value_low();
97        let mut max_price = first.value_high();
98
99        for (_, dp) in it {
100            let low = dp.value_low();
101            let high = dp.value_high();
102            if low < min_price {
103                min_price = low;
104            }
105            if high > max_price {
106                max_price = high;
107            }
108        }
109
110        Some((min_price, max_price))
111    }
112
113    pub fn min_max_price_in_range(&self, earliest: u64, latest: u64) -> Option<(f32, f32)> {
114        self.min_max_price_in_range_prices(earliest, latest)
115            .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
116    }
117
118    pub fn clear_trades(&mut self) {
119        for data_point in self.datapoints.values_mut() {
120            data_point.clear_trades();
121        }
122    }
123
124    pub fn check_kline_integrity(
125        &self,
126        earliest: u64,
127        latest: u64,
128        interval: u64,
129    ) -> Option<Vec<u64>> {
130        let mut time = earliest;
131        let mut missing_count = 0;
132
133        while time < latest {
134            if !self.datapoints.contains_key(&time) {
135                missing_count += 1;
136                break;
137            }
138            time += interval;
139        }
140
141        if missing_count > 0 {
142            let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
143            let mut time = earliest;
144
145            while time < latest {
146                if !self.datapoints.contains_key(&time) {
147                    missing_keys.push(time);
148                }
149                time += interval;
150            }
151
152            log::warn!(
153                "Integrity check failed: missing {} klines",
154                missing_keys.len()
155            );
156            return Some(missing_keys);
157        }
158
159        None
160    }
161}
162
163impl TimeSeries<KlineDataPoint> {
164    pub fn new(interval: Timeframe, tick_size: PriceStep, klines: &[Kline]) -> Self {
165        let mut timeseries = Self {
166            datapoints: BTreeMap::new(),
167            interval,
168            tick_size,
169        };
170
171        timeseries.insert_klines(klines);
172        timeseries
173    }
174
175    pub fn with_trades(&self, trades: &[Trade]) -> TimeSeries<KlineDataPoint> {
176        let mut new_series = Self {
177            datapoints: self.datapoints.clone(),
178            interval: self.interval,
179            tick_size: self.tick_size,
180        };
181
182        new_series.insert_trades_or_create_bucket(trades);
183        new_series
184    }
185
186    pub fn insert_klines(&mut self, klines: &[Kline]) {
187        for kline in klines {
188            let entry = self
189                .datapoints
190                .entry(kline.time)
191                .or_insert_with(|| KlineDataPoint {
192                    kline: *kline,
193                    footprint: KlineTrades::new(),
194                });
195
196            entry.kline = *kline;
197        }
198
199        self.update_poc_status();
200    }
201
202    pub fn insert_trades_or_create_bucket(&mut self, buffer: &[Trade]) {
203        if buffer.is_empty() {
204            return;
205        }
206        let aggr_time = self.interval.to_milliseconds();
207        let mut updated_times = Vec::new();
208
209        buffer.iter().for_each(|trade| {
210            let rounded_time = (trade.time / aggr_time) * aggr_time;
211
212            if !updated_times.contains(&rounded_time) {
213                updated_times.push(rounded_time);
214            }
215
216            let entry = self
217                .datapoints
218                .entry(rounded_time)
219                .or_insert_with(|| KlineDataPoint {
220                    kline: Kline {
221                        time: rounded_time,
222                        open: trade.price,
223                        high: trade.price,
224                        low: trade.price,
225                        close: trade.price,
226                        volume: (0.0, 0.0),
227                    },
228                    footprint: KlineTrades::new(),
229                });
230
231            entry.add_trade(trade, self.tick_size);
232        });
233
234        for time in updated_times {
235            if let Some(data_point) = self.datapoints.get_mut(&time) {
236                data_point.calculate_poc();
237            }
238        }
239    }
240
241    pub fn insert_trades_existing_buckets(&mut self, buffer: &[Trade]) {
242        if buffer.is_empty() {
243            return;
244        }
245        let aggr_time = self.interval.to_milliseconds();
246        let mut updated_times: Vec<u64> = Vec::new();
247
248        for trade in buffer {
249            let rounded_time = (trade.time / aggr_time) * aggr_time;
250
251            if let Some(entry) = self.datapoints.get_mut(&rounded_time) {
252                if !updated_times.contains(&rounded_time) {
253                    updated_times.push(rounded_time);
254                }
255                entry.add_trade(trade, self.tick_size);
256            }
257        }
258
259        for time in updated_times {
260            if let Some(data_point) = self.datapoints.get_mut(&time) {
261                data_point.calculate_poc();
262            }
263        }
264    }
265
266    pub fn change_tick_size(&mut self, tick_size: f32, raw_trades: &[Trade]) {
267        self.tick_size = PriceStep::from_f32(tick_size);
268        self.clear_trades();
269
270        if !raw_trades.is_empty() {
271            self.insert_trades_existing_buckets(raw_trades);
272        }
273    }
274
275    pub fn update_poc_status(&mut self) {
276        let updates = self
277            .datapoints
278            .iter()
279            .filter_map(|(&time, dp)| dp.poc_price().map(|price| (time, price)))
280            .collect::<Vec<_>>();
281
282        for (current_time, poc_price) in updates {
283            let mut npoc = NPoc::default();
284
285            for (&next_time, next_dp) in self.datapoints.range((current_time + 1)..) {
286                let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
287                let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
288
289                if next_dp_low <= poc_price && next_dp_high >= poc_price {
290                    npoc.filled(next_time);
291                    break;
292                } else {
293                    npoc.unfilled();
294                }
295            }
296
297            if let Some(data_point) = self.datapoints.get_mut(&current_time) {
298                data_point.set_poc_status(npoc);
299            }
300        }
301    }
302
303    pub fn suggest_trade_fetch_range(
304        &self,
305        visible_earliest: u64,
306        visible_latest: u64,
307    ) -> Option<(u64, u64)> {
308        if self.datapoints.is_empty() {
309            return None;
310        }
311
312        self.find_trade_gap()
313            .and_then(|(last_t_before_gap, first_t_after_gap)| {
314                if last_t_before_gap.is_none() && first_t_after_gap.is_none() {
315                    return None;
316                }
317                let (data_earliest, data_latest) = self.timerange();
318
319                let fetch_from = last_t_before_gap
320                    .map_or(data_earliest, |t| t.saturating_add(1))
321                    .max(visible_earliest);
322                let fetch_to = first_t_after_gap
323                    .map_or(data_latest, |t| t.saturating_sub(1))
324                    .min(visible_latest);
325
326                if fetch_from < fetch_to {
327                    Some((fetch_from, fetch_to))
328                } else {
329                    None
330                }
331            })
332    }
333
334    fn find_trade_gap(&self) -> Option<(Option<u64>, Option<u64>)> {
335        let empty_kline_time = self
336            .datapoints
337            .iter()
338            .rev()
339            .find(|(_, dp)| dp.footprint.trades.is_empty())
340            .map(|(&time, _)| time);
341
342        if let Some(target_time) = empty_kline_time {
343            let last_t_before_gap = self
344                .datapoints
345                .range(..target_time)
346                .rev()
347                .find_map(|(_, dp)| dp.last_trade_time());
348
349            let first_t_after_gap = self
350                .datapoints
351                .range(target_time + 1..)
352                .find_map(|(_, dp)| dp.first_trade_time());
353
354            Some((last_t_before_gap, first_t_after_gap))
355        } else {
356            None
357        }
358    }
359
360    pub fn max_qty_ts_range(
361        &self,
362        cluster_kind: ClusterKind,
363        earliest: u64,
364        latest: u64,
365        highest: Price,
366        lowest: Price,
367    ) -> f32 {
368        let mut max_cluster_qty: f32 = 0.0;
369
370        self.datapoints
371            .range(earliest..=latest)
372            .for_each(|(_, dp)| {
373                max_cluster_qty =
374                    max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
375            });
376
377        max_cluster_qty
378    }
379}
380
381impl TimeSeries<HeatmapDataPoint> {
382    pub fn new(basis: Basis, tick_size: PriceStep) -> Self {
383        let timeframe = match basis {
384            Basis::Time(interval) => interval,
385            Basis::Tick(_) => unimplemented!(),
386        };
387
388        Self {
389            datapoints: BTreeMap::new(),
390            interval: timeframe,
391            tick_size,
392        }
393    }
394
395    pub fn max_trade_qty_and_aggr_volume(&self, earliest: u64, latest: u64) -> (f32, f32) {
396        let mut max_trade_qty = 0.0f32;
397        let mut max_aggr_volume = 0.0f32;
398
399        self.datapoints
400            .range(earliest..=latest)
401            .for_each(|(_, dp)| {
402                let (mut buy_volume, mut sell_volume) = (0.0, 0.0);
403
404                dp.grouped_trades.iter().for_each(|trade| {
405                    max_trade_qty = max_trade_qty.max(trade.qty);
406
407                    if trade.is_sell {
408                        sell_volume += trade.qty;
409                    } else {
410                        buy_volume += trade.qty;
411                    }
412                });
413
414                max_aggr_volume = max_aggr_volume.max(buy_volume + sell_volume);
415            });
416
417        (max_trade_qty, max_aggr_volume)
418    }
419}
420
421impl From<&TimeSeries<KlineDataPoint>> for BTreeMap<u64, (f32, f32)> {
422    /// Converts datapoints into a map of timestamps and volume data
423    fn from(timeseries: &TimeSeries<KlineDataPoint>) -> Self {
424        timeseries
425            .datapoints
426            .iter()
427            .map(|(time, dp)| (*time, (dp.kline.volume.0, dp.kline.volume.1)))
428            .collect()
429    }
430}