Skip to main content

flowsurface_data/aggr/
ticks.rs

1use crate::aggr;
2use crate::chart::kline::{ClusterKind, KlineTrades, NPoc};
3use exchange::unit::Qty;
4use exchange::unit::price::{Price, PriceStep};
5use exchange::{Kline, Trade, Volume};
6
7use std::collections::BTreeMap;
8
9#[derive(Debug, Clone)]
10pub struct TickAccumulation {
11    pub tick_count: usize,
12    pub kline: Kline,
13    pub footprint: KlineTrades,
14}
15
16impl TickAccumulation {
17    pub fn new(trade: &Trade, step: PriceStep) -> Self {
18        let mut footprint = KlineTrades::new();
19        footprint.add_trade_to_nearest_bin(trade, step);
20
21        let kline = Kline {
22            time: trade.time,
23            open: trade.price,
24            high: trade.price,
25            low: trade.price,
26            close: trade.price,
27            volume: Volume::empty_buy_sell().add_trade_qty(trade.is_sell, trade.qty),
28        };
29
30        Self {
31            tick_count: 1,
32            kline,
33            footprint,
34        }
35    }
36
37    pub fn update_with_trade(&mut self, trade: &Trade, step: PriceStep) {
38        self.tick_count += 1;
39        self.kline.high = self.kline.high.max(trade.price);
40        self.kline.low = self.kline.low.min(trade.price);
41        self.kline.close = trade.price;
42
43        self.kline.volume = self.kline.volume.add_trade_qty(trade.is_sell, trade.qty);
44
45        self.add_trade(trade, step);
46    }
47
48    fn add_trade(&mut self, trade: &Trade, step: PriceStep) {
49        self.footprint.add_trade_to_nearest_bin(trade, step);
50    }
51
52    pub fn max_cluster_qty(&self, cluster_kind: ClusterKind, highest: Price, lowest: Price) -> Qty {
53        self.footprint
54            .max_cluster_qty(cluster_kind, highest, lowest)
55    }
56
57    pub fn is_full(&self, interval: aggr::TickCount) -> bool {
58        self.tick_count >= interval.0 as usize
59    }
60
61    pub fn poc_price(&self) -> Option<Price> {
62        self.footprint.poc_price()
63    }
64
65    pub fn set_poc_status(&mut self, status: NPoc) {
66        self.footprint.set_poc_status(status);
67    }
68
69    pub fn calculate_poc(&mut self) {
70        self.footprint.calculate_poc();
71    }
72}
73
74pub struct TickAggr {
75    pub datapoints: Vec<TickAccumulation>,
76    pub interval: aggr::TickCount,
77    pub tick_size: PriceStep,
78}
79
80impl TickAggr {
81    pub fn new(interval: aggr::TickCount, tick_size: PriceStep, raw_trades: &[Trade]) -> Self {
82        let mut tick_aggr = Self {
83            datapoints: Vec::new(),
84            interval,
85            tick_size,
86        };
87
88        if !raw_trades.is_empty() {
89            tick_aggr.insert_trades(raw_trades);
90        }
91
92        tick_aggr
93    }
94
95    pub fn change_tick_size(&mut self, tick_size: PriceStep, raw_trades: &[Trade]) {
96        self.tick_size = tick_size;
97
98        self.datapoints.clear();
99
100        if !raw_trades.is_empty() {
101            self.insert_trades(raw_trades);
102        }
103    }
104
105    /// return latest data point and its index
106    pub fn latest_dp(&self) -> Option<(&TickAccumulation, usize)> {
107        self.datapoints
108            .last()
109            .map(|dp| (dp, self.datapoints.len() - 1))
110    }
111
112    pub fn volume_data(&self) -> BTreeMap<u64, exchange::Volume> {
113        self.into()
114    }
115
116    pub fn insert_trades(&mut self, buffer: &[Trade]) {
117        let mut updated_indices = Vec::new();
118
119        for trade in buffer {
120            if self.datapoints.is_empty() {
121                self.datapoints
122                    .push(TickAccumulation::new(trade, self.tick_size));
123                updated_indices.push(0);
124            } else {
125                let last_idx = self.datapoints.len() - 1;
126
127                if self.datapoints[last_idx].is_full(self.interval) {
128                    self.datapoints
129                        .push(TickAccumulation::new(trade, self.tick_size));
130                    updated_indices.push(self.datapoints.len() - 1);
131                } else {
132                    self.datapoints[last_idx].update_with_trade(trade, self.tick_size);
133                    if !updated_indices.contains(&last_idx) {
134                        updated_indices.push(last_idx);
135                    }
136                }
137            }
138        }
139
140        for idx in updated_indices {
141            if idx < self.datapoints.len() {
142                self.datapoints[idx].calculate_poc();
143            }
144        }
145
146        self.update_poc_status();
147    }
148
149    pub fn update_poc_status(&mut self) {
150        let updates = self
151            .datapoints
152            .iter()
153            .enumerate()
154            .filter_map(|(idx, dp)| dp.poc_price().map(|price| (idx, price)))
155            .collect::<Vec<_>>();
156
157        let total_points = self.datapoints.len();
158
159        for (current_idx, poc_price) in updates {
160            let mut npoc = NPoc::default();
161
162            for next_idx in (current_idx + 1)..total_points {
163                let next_dp = &self.datapoints[next_idx];
164
165                let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
166                let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
167
168                if next_dp_low <= poc_price && next_dp_high >= poc_price {
169                    // on render we reverse the order of the points
170                    // as it is easier to just take the idx=0 as latest candle for coords
171                    let reversed_idx = (total_points - 1) - next_idx;
172                    npoc.filled(reversed_idx as u64);
173                    break;
174                } else {
175                    npoc.unfilled();
176                }
177            }
178
179            if current_idx < total_points {
180                let data_point = &mut self.datapoints[current_idx];
181                data_point.set_poc_status(npoc);
182            }
183        }
184    }
185
186    pub fn min_max_price_in_range_prices(
187        &self,
188        earliest: usize,
189        latest: usize,
190    ) -> Option<(Price, Price)> {
191        if earliest > latest {
192            return None;
193        }
194
195        let mut min_p: Option<Price> = None;
196        let mut max_p: Option<Price> = None;
197
198        self.datapoints
199            .iter()
200            .rev()
201            .enumerate()
202            .filter(|(idx, _)| *idx >= earliest && *idx <= latest)
203            .for_each(|(_, dp)| {
204                let low = dp.kline.low;
205                let high = dp.kline.high;
206
207                min_p = Some(match min_p {
208                    Some(value) => value.min(low),
209                    None => low,
210                });
211                max_p = Some(match max_p {
212                    Some(value) => value.max(high),
213                    None => high,
214                });
215            });
216
217        match (min_p, max_p) {
218            (Some(low), Some(high)) => Some((low, high)),
219            _ => None,
220        }
221    }
222
223    pub fn min_max_price_in_range(&self, earliest: usize, latest: usize) -> Option<(f32, f32)> {
224        self.min_max_price_in_range_prices(earliest, latest)
225            .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
226    }
227
228    pub fn max_qty_idx_range(
229        &self,
230        cluster_kind: ClusterKind,
231        earliest: usize,
232        latest: usize,
233        highest: Price,
234        lowest: Price,
235    ) -> Qty {
236        let mut max_cluster_qty: Qty = Qty::default();
237
238        self.datapoints
239            .iter()
240            .rev()
241            .enumerate()
242            .filter(|(index, _)| *index <= latest && *index >= earliest)
243            .for_each(|(_, dp)| {
244                max_cluster_qty =
245                    max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
246            });
247
248        max_cluster_qty
249    }
250}
251
252impl From<&TickAggr> for BTreeMap<u64, exchange::Volume> {
253    /// Converts datapoints into a map of timestamps and volume data
254    fn from(tick_aggr: &TickAggr) -> Self {
255        tick_aggr
256            .datapoints
257            .iter()
258            .enumerate()
259            .map(|(idx, dp)| (idx as u64, dp.kline.volume))
260            .collect()
261    }
262}