flowsurface_data/aggr/
ticks.rs

1use crate::aggr;
2use crate::chart::kline::{ClusterKind, KlineTrades, NPoc};
3use exchange::util::{Price, PriceStep};
4use exchange::{Kline, Trade};
5
6use std::collections::BTreeMap;
7
8#[derive(Debug, Clone)]
9pub struct TickAccumulation {
10    pub tick_count: usize,
11    pub kline: Kline,
12    pub footprint: KlineTrades,
13}
14
15impl TickAccumulation {
16    pub fn new(trade: &Trade, step: PriceStep) -> Self {
17        let mut footprint = KlineTrades::new();
18        footprint.add_trade_to_nearest_bin(trade, step);
19
20        let kline = Kline {
21            time: trade.time,
22            open: trade.price,
23            high: trade.price,
24            low: trade.price,
25            close: trade.price,
26            volume: (
27                if trade.is_sell { 0.0 } else { trade.qty },
28                if trade.is_sell { trade.qty } else { 0.0 },
29            ),
30        };
31
32        Self {
33            tick_count: 1,
34            kline,
35            footprint,
36        }
37    }
38
39    pub fn update_with_trade(&mut self, trade: &Trade, step: PriceStep) {
40        self.tick_count += 1;
41        self.kline.high = self.kline.high.max(trade.price);
42        self.kline.low = self.kline.low.min(trade.price);
43        self.kline.close = trade.price;
44
45        if trade.is_sell {
46            self.kline.volume.1 += trade.qty;
47        } else {
48            self.kline.volume.0 += trade.qty;
49        }
50
51        self.add_trade(trade, step);
52    }
53
54    fn add_trade(&mut self, trade: &Trade, step: PriceStep) {
55        self.footprint.add_trade_to_nearest_bin(trade, step);
56    }
57
58    pub fn max_cluster_qty(&self, cluster_kind: ClusterKind, highest: Price, lowest: Price) -> f32 {
59        match cluster_kind {
60            ClusterKind::BidAsk => self.footprint.max_qty_by(highest, lowest, f32::max),
61            ClusterKind::DeltaProfile => self
62                .footprint
63                .max_qty_by(highest, lowest, |buy, sell| (buy - sell).abs()),
64            ClusterKind::VolumeProfile => {
65                self.footprint
66                    .max_qty_by(highest, lowest, |buy, sell| buy + sell)
67            }
68        }
69    }
70
71    pub fn is_full(&self, interval: aggr::TickCount) -> bool {
72        self.tick_count >= interval.0 as usize
73    }
74
75    pub fn poc_price(&self) -> Option<Price> {
76        self.footprint.poc_price()
77    }
78
79    pub fn set_poc_status(&mut self, status: NPoc) {
80        self.footprint.set_poc_status(status);
81    }
82
83    pub fn calculate_poc(&mut self) {
84        self.footprint.calculate_poc();
85    }
86}
87
88pub struct TickAggr {
89    pub datapoints: Vec<TickAccumulation>,
90    pub interval: aggr::TickCount,
91    pub tick_size: PriceStep,
92}
93
94impl TickAggr {
95    pub fn new(interval: aggr::TickCount, tick_size: PriceStep, raw_trades: &[Trade]) -> Self {
96        let mut tick_aggr = Self {
97            datapoints: Vec::new(),
98            interval,
99            tick_size,
100        };
101
102        if !raw_trades.is_empty() {
103            tick_aggr.insert_trades(raw_trades);
104        }
105
106        tick_aggr
107    }
108
109    pub fn change_tick_size(&mut self, tick_size: f32, raw_trades: &[Trade]) {
110        self.tick_size = PriceStep::from_f32(tick_size);
111
112        self.datapoints.clear();
113
114        if !raw_trades.is_empty() {
115            self.insert_trades(raw_trades);
116        }
117    }
118
119    /// return latest data point and its index
120    pub fn latest_dp(&self) -> Option<(&TickAccumulation, usize)> {
121        self.datapoints
122            .last()
123            .map(|dp| (dp, self.datapoints.len() - 1))
124    }
125
126    pub fn volume_data(&self) -> BTreeMap<u64, (f32, f32)> {
127        self.into()
128    }
129
130    pub fn insert_trades(&mut self, buffer: &[Trade]) {
131        let mut updated_indices = Vec::new();
132
133        for trade in buffer {
134            if self.datapoints.is_empty() {
135                self.datapoints
136                    .push(TickAccumulation::new(trade, self.tick_size));
137                updated_indices.push(0);
138            } else {
139                let last_idx = self.datapoints.len() - 1;
140
141                if self.datapoints[last_idx].is_full(self.interval) {
142                    self.datapoints
143                        .push(TickAccumulation::new(trade, self.tick_size));
144                    updated_indices.push(self.datapoints.len() - 1);
145                } else {
146                    self.datapoints[last_idx].update_with_trade(trade, self.tick_size);
147                    if !updated_indices.contains(&last_idx) {
148                        updated_indices.push(last_idx);
149                    }
150                }
151            }
152        }
153
154        for idx in updated_indices {
155            if idx < self.datapoints.len() {
156                self.datapoints[idx].calculate_poc();
157            }
158        }
159
160        self.update_poc_status();
161    }
162
163    pub fn update_poc_status(&mut self) {
164        let updates = self
165            .datapoints
166            .iter()
167            .enumerate()
168            .filter_map(|(idx, dp)| dp.poc_price().map(|price| (idx, price)))
169            .collect::<Vec<_>>();
170
171        let total_points = self.datapoints.len();
172
173        for (current_idx, poc_price) in updates {
174            let mut npoc = NPoc::default();
175
176            for next_idx in (current_idx + 1)..total_points {
177                let next_dp = &self.datapoints[next_idx];
178
179                let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
180                let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
181
182                if next_dp_low <= poc_price && next_dp_high >= poc_price {
183                    // on render we reverse the order of the points
184                    // as it is easier to just take the idx=0 as latest candle for coords
185                    let reversed_idx = (total_points - 1) - next_idx;
186                    npoc.filled(reversed_idx as u64);
187                    break;
188                } else {
189                    npoc.unfilled();
190                }
191            }
192
193            if current_idx < total_points {
194                let data_point = &mut self.datapoints[current_idx];
195                data_point.set_poc_status(npoc);
196            }
197        }
198    }
199
200    pub fn min_max_price_in_range_prices(
201        &self,
202        earliest: usize,
203        latest: usize,
204    ) -> Option<(Price, Price)> {
205        if earliest > latest {
206            return None;
207        }
208
209        let mut min_p: Option<Price> = None;
210        let mut max_p: Option<Price> = None;
211
212        self.datapoints
213            .iter()
214            .rev()
215            .enumerate()
216            .filter(|(idx, _)| *idx >= earliest && *idx <= latest)
217            .for_each(|(_, dp)| {
218                let low = dp.kline.low;
219                let high = dp.kline.high;
220
221                min_p = Some(match min_p {
222                    Some(value) => value.min(low),
223                    None => low,
224                });
225                max_p = Some(match max_p {
226                    Some(value) => value.max(high),
227                    None => high,
228                });
229            });
230
231        match (min_p, max_p) {
232            (Some(low), Some(high)) => Some((low, high)),
233            _ => None,
234        }
235    }
236
237    pub fn min_max_price_in_range(&self, earliest: usize, latest: usize) -> Option<(f32, f32)> {
238        self.min_max_price_in_range_prices(earliest, latest)
239            .map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
240    }
241
242    pub fn max_qty_idx_range(
243        &self,
244        cluster_kind: ClusterKind,
245        earliest: usize,
246        latest: usize,
247        highest: Price,
248        lowest: Price,
249    ) -> f32 {
250        let mut max_cluster_qty: f32 = 0.0;
251
252        self.datapoints
253            .iter()
254            .rev()
255            .enumerate()
256            .filter(|(index, _)| *index <= latest && *index >= earliest)
257            .for_each(|(_, dp)| {
258                max_cluster_qty =
259                    max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
260            });
261
262        max_cluster_qty
263    }
264}
265
266impl From<&TickAggr> for BTreeMap<u64, (f32, f32)> {
267    /// Converts datapoints into a map of timestamps and volume data
268    fn from(tick_aggr: &TickAggr) -> Self {
269        tick_aggr
270            .datapoints
271            .iter()
272            .enumerate()
273            .map(|(idx, dp)| (idx as u64, (dp.kline.volume.0, dp.kline.volume.1)))
274            .collect()
275    }
276}