Skip to main content

flowsurface_data/chart/
heatmap.rs

1use super::Basis;
2use super::aggr::time::DataPoint;
3use exchange::unit::MinQtySize;
4use exchange::unit::price::{Price, PriceStep};
5use exchange::unit::qty::{Qty, SizeUnit, volume_size_unit};
6use exchange::{adapter::MarketKind, depth::Depth};
7
8use rustc_hash::{FxBuildHasher, FxHashMap};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12pub const CLEANUP_THRESHOLD: usize = 4800;
13
14/// Allow up to 500ms delay in order updates before starting a new order run.
15/// Prevents fragmentation(e.g. network latency) when qty and is_bid remain unchanged.
16const GRACE_PERIOD_MS: u64 = 500;
17
18#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
19pub struct Config {
20    pub trade_size_filter: f32,
21    pub order_size_filter: f32,
22    pub trade_size_scale: Option<i32>,
23    pub coalescing: Option<CoalesceKind>,
24}
25
26impl Default for Config {
27    fn default() -> Self {
28        Config {
29            trade_size_filter: 0.0,
30            order_size_filter: 0.0,
31            trade_size_scale: Some(100),
32            coalescing: Some(CoalesceKind::Average(0.15)),
33        }
34    }
35}
36
37#[derive(Default)]
38pub struct HeatmapDataPoint {
39    pub grouped_trades: Box<[GroupedTrade]>,
40    pub buy_sell: (Qty, Qty),
41}
42
43impl DataPoint for HeatmapDataPoint {
44    fn add_trade(&mut self, trade: &exchange::Trade, step: PriceStep) {
45        let grouped_price: Price = trade.price.round_to_side_step(trade.is_sell, step);
46
47        match self
48            .grouped_trades
49            .binary_search_by(|probe| probe.compare_with(grouped_price, trade.is_sell))
50        {
51            Ok(index) => self.grouped_trades[index].qty += trade.qty,
52            Err(index) => {
53                let mut trades = self.grouped_trades.to_vec();
54                trades.insert(
55                    index,
56                    GroupedTrade {
57                        is_sell: trade.is_sell,
58                        price: grouped_price,
59                        qty: trade.qty,
60                    },
61                );
62                self.grouped_trades = trades.into_boxed_slice();
63            }
64        }
65
66        if trade.is_sell {
67            self.buy_sell.1 += trade.qty;
68        } else {
69            self.buy_sell.0 += trade.qty;
70        }
71    }
72
73    fn clear_trades(&mut self) {
74        self.grouped_trades = Box::new([]);
75        self.buy_sell = (Qty::default(), Qty::default());
76    }
77
78    fn last_trade_time(&self) -> Option<u64> {
79        None
80    }
81
82    fn first_trade_time(&self) -> Option<u64> {
83        None
84    }
85
86    fn kline(&self) -> Option<&exchange::Kline> {
87        None
88    }
89
90    fn last_price(&self) -> Price {
91        self.grouped_trades
92            .last()
93            .map_or(Price { units: 0 }, |t| t.price)
94    }
95
96    fn value_high(&self) -> Price {
97        self.grouped_trades
98            .iter()
99            .map(|t| t.price)
100            .max()
101            .unwrap_or(Price::from_units(0))
102    }
103
104    fn value_low(&self) -> Price {
105        self.grouped_trades
106            .iter()
107            .map(|t| t.price)
108            .min()
109            .unwrap_or(Price::from_units(0))
110    }
111}
112
113#[derive(Default, Debug, Clone, Copy, PartialEq)]
114pub struct OrderRun {
115    pub start_time: u64,
116    pub until_time: u64,
117    pub qty: Qty,
118    pub is_bid: bool,
119}
120
121impl OrderRun {
122    pub fn new(start_time: u64, aggr_time: u64, qty: Qty, is_bid: bool) -> Self {
123        OrderRun {
124            start_time,
125            until_time: start_time + aggr_time,
126            qty,
127            is_bid,
128        }
129    }
130
131    pub fn with_range(&self, earliest: u64, latest: u64) -> Option<&OrderRun> {
132        if self.start_time <= latest && self.until_time >= earliest {
133            Some(self)
134        } else {
135            None
136        }
137    }
138}
139
140#[derive(Debug, Clone, PartialEq)]
141pub struct HistoricalDepth {
142    price_levels: BTreeMap<Price, Vec<OrderRun>>,
143    pub aggr_time: u64,
144    tick_size: PriceStep,
145    min_order_qty: MinQtySize,
146}
147
148impl HistoricalDepth {
149    pub fn new(min_order_qty: MinQtySize, tick_size: PriceStep, basis: Basis) -> Self {
150        Self {
151            price_levels: BTreeMap::new(),
152            aggr_time: match basis {
153                Basis::Time(interval) => interval.into(),
154                Basis::Tick(_) => unimplemented!(),
155            },
156            tick_size,
157            min_order_qty,
158        }
159    }
160
161    pub fn insert_latest_depth(&mut self, depth: &Depth, time: u64) {
162        self.process_side(&depth.bids, time, true);
163        self.process_side(&depth.asks, time, false);
164    }
165
166    pub fn is_empty(&self) -> bool {
167        self.price_levels.is_empty()
168    }
169
170    fn process_side(&mut self, side: &BTreeMap<Price, Qty>, time: u64, is_bid: bool) {
171        let mut current_price = None;
172        let mut current_qty = Qty::from_units(0);
173
174        let step = self.tick_size;
175
176        for (price, qty) in side {
177            let rounded_price = price.round_to_side_step(is_bid, step);
178            if Some(rounded_price) == current_price {
179                current_qty += *qty;
180            } else {
181                if let Some(price) = current_price {
182                    self.update_price_level(time, price, current_qty, is_bid);
183                }
184                current_price = Some(rounded_price);
185                current_qty = *qty;
186            }
187        }
188
189        if let Some(price) = current_price {
190            self.update_price_level(time, price, current_qty, is_bid);
191        }
192    }
193
194    fn update_price_level(&mut self, time: u64, price: Price, qty: Qty, is_bid: bool) {
195        let aggr_time = self.aggr_time;
196
197        let qty_q = qty.round_to_min_qty(self.min_order_qty);
198        let price_level = self.price_levels.entry(price).or_default();
199
200        match price_level.last_mut() {
201            Some(last_run) if last_run.is_bid == is_bid => {
202                if time > last_run.until_time + GRACE_PERIOD_MS {
203                    price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
204                    return;
205                }
206
207                let current_lots = qty_q.to_lots(self.min_order_qty);
208                let last_lots = last_run.qty.to_lots(self.min_order_qty);
209
210                if current_lots == last_lots {
211                    let new_until = time + aggr_time;
212                    if new_until > last_run.until_time {
213                        last_run.until_time = new_until;
214                    }
215                } else {
216                    if last_run.until_time > time {
217                        last_run.until_time = time;
218                    }
219                    price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
220                }
221            }
222            Some(last_run) => {
223                if last_run.until_time > time {
224                    last_run.until_time = time;
225                }
226                price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
227            }
228            None => {
229                price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
230            }
231        }
232    }
233
234    pub fn iter_time_filtered(
235        &self,
236        earliest: u64,
237        latest: u64,
238        highest: Price,
239        lowest: Price,
240    ) -> impl Iterator<Item = (&Price, &Vec<OrderRun>)> {
241        self.price_levels
242            .range(lowest..=highest)
243            .filter(move |(_, runs)| {
244                runs.iter()
245                    .any(|run| run.until_time >= earliest && run.start_time <= latest)
246            })
247    }
248
249    pub fn latest_order_runs(
250        &self,
251        highest: Price,
252        lowest: Price,
253        latest_timestamp: u64,
254    ) -> impl Iterator<Item = (&Price, &OrderRun)> {
255        self.price_levels
256            .range(lowest..=highest)
257            .filter_map(move |(price, runs)| {
258                runs.last()
259                    .filter(|run| run.until_time >= latest_timestamp)
260                    .map(|run| (price, run))
261            })
262    }
263
264    pub fn cleanup_old_price_levels(&mut self, oldest_time: u64) {
265        self.price_levels.iter_mut().for_each(|(_, runs)| {
266            runs.retain(|run| run.until_time >= oldest_time);
267        });
268
269        self.price_levels.retain(|_, runs| !runs.is_empty());
270    }
271
272    pub fn coalesced_runs(
273        &self,
274        earliest: u64,
275        latest: u64,
276        highest: Price,
277        lowest: Price,
278        market_type: MarketKind,
279        order_size_filter: f32,
280        coalesce_kind: CoalesceKind,
281    ) -> Vec<(Price, OrderRun)> {
282        let mut result_runs = Vec::new();
283
284        let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
285
286        for (price_at_level, runs_at_price_level) in
287            self.iter_time_filtered(earliest, latest, highest, lowest)
288        {
289            let candidate_runs = runs_at_price_level
290                .iter()
291                .filter(|run_ref| {
292                    if !(run_ref.until_time >= earliest && run_ref.start_time <= latest) {
293                        return false;
294                    }
295                    let order_size = market_type.qty_in_quote_value(
296                        run_ref.qty,
297                        *price_at_level,
298                        size_in_quote_ccy,
299                    );
300                    order_size > order_size_filter
301                })
302                .collect::<Vec<&OrderRun>>();
303
304            if candidate_runs.is_empty() {
305                continue;
306            }
307
308            let mut current_accumulator_opt: Option<CoalescingRun> = None;
309
310            for run_to_process_ref in candidate_runs {
311                let run_to_process = *run_to_process_ref;
312
313                if let Some(current_accumulator) = current_accumulator_opt.as_mut() {
314                    let comparison_base_qty = current_accumulator.comparison_qty(&coalesce_kind);
315                    let qty_within_threshold = coalesce_kind.is_within_lot_similarity(
316                        comparison_base_qty,
317                        run_to_process.qty,
318                        self.min_order_qty,
319                    );
320
321                    if run_to_process.start_time <= current_accumulator.until_time
322                        && run_to_process.is_bid == current_accumulator.is_bid
323                        && qty_within_threshold
324                    {
325                        current_accumulator.merge_run(&run_to_process);
326                    } else {
327                        result_runs.push((
328                            *price_at_level,
329                            current_accumulator.to_order_run(&coalesce_kind),
330                        ));
331                        current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
332                    }
333                } else {
334                    current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
335                }
336            }
337
338            if let Some(accumulator) = current_accumulator_opt {
339                result_runs.push((*price_at_level, accumulator.to_order_run(&coalesce_kind)));
340            }
341        }
342        result_runs
343    }
344
345    pub fn query_grid_qtys(
346        &self,
347        center_time: u64,
348        center_price: Price,
349        time_interval_offsets: &[i64],
350        price_tick_offsets: &[i64],
351        market_type: MarketKind,
352        order_size_filter: f32,
353        coalesce_kind: Option<CoalesceKind>,
354    ) -> FxHashMap<(u64, Price), (Qty, bool)> {
355        let aggr_time = self.aggr_time;
356
357        let step = self.tick_size;
358
359        let query_earliest_time = time_interval_offsets
360            .iter()
361            .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
362            .min()
363            .unwrap_or(center_time);
364
365        let query_latest_time = time_interval_offsets
366            .iter()
367            .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
368            .max()
369            .map_or(center_time, |t| t.saturating_add(aggr_time));
370
371        let query_lowest = price_tick_offsets
372            .iter()
373            .copied()
374            .min()
375            .map_or(center_price, |offset| center_price.add_steps(offset, step));
376        let query_highest = price_tick_offsets
377            .iter()
378            .copied()
379            .max()
380            .map_or(center_price, |offset| center_price.add_steps(offset, step));
381
382        let runs_in_vicinity: Vec<(Price, OrderRun)> = if let Some(ck) = coalesce_kind {
383            self.coalesced_runs(
384                query_earliest_time,
385                query_latest_time,
386                query_highest,
387                query_lowest,
388                market_type,
389                order_size_filter,
390                ck,
391            )
392        } else {
393            self.iter_time_filtered(
394                query_earliest_time,
395                query_latest_time,
396                query_highest,
397                query_lowest,
398            )
399            .flat_map(|(price_level, runs_at_price)| {
400                runs_at_price.iter().map(move |run| (*price_level, *run))
401            })
402            .collect()
403        };
404
405        let capacity = time_interval_offsets.len() * price_tick_offsets.len();
406        let mut grid_quantities: FxHashMap<(u64, Price), (Qty, bool)> =
407            FxHashMap::with_capacity_and_hasher(capacity, FxBuildHasher);
408        for price_offset in price_tick_offsets {
409            let target_price_key = center_price.add_steps(*price_offset, step);
410
411            for time_offset in time_interval_offsets {
412                let target_time_val =
413                    center_time.saturating_add_signed(*time_offset * aggr_time as i64);
414                let current_grid_key = (target_time_val, target_price_key);
415
416                for (run_price_level, run_data) in &runs_in_vicinity {
417                    if *run_price_level == target_price_key
418                        && run_data.start_time <= target_time_val
419                        && run_data.until_time > target_time_val
420                    {
421                        grid_quantities.insert(current_grid_key, (run_data.qty, run_data.is_bid));
422                        break;
423                    }
424                }
425            }
426        }
427        grid_quantities
428    }
429
430    pub fn max_qty_in_range_raw(
431        &self,
432        earliest: u64,
433        latest: u64,
434        highest: Price,
435        lowest: Price,
436    ) -> Qty {
437        let mut max_qty = Qty::ZERO;
438
439        for (_price, runs) in self.price_levels.range(lowest..=highest) {
440            for run in runs.iter() {
441                if run.until_time < earliest || run.start_time > latest {
442                    continue;
443                }
444                max_qty = max_qty.max(run.qty);
445            }
446        }
447
448        max_qty
449    }
450
451    pub fn max_depth_qty_in_range(
452        &self,
453        earliest: u64,
454        latest: u64,
455        highest: Price,
456        lowest: Price,
457        market_type: MarketKind,
458        order_size_filter: f32,
459    ) -> Qty {
460        let mut max_depth_qty = Qty::ZERO;
461        let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
462
463        for (price, runs) in self.price_levels.range(lowest..=highest) {
464            for run in runs.iter() {
465                if run.until_time < earliest || run.start_time > latest {
466                    continue;
467                }
468
469                let order_size = market_type.qty_in_quote_value(run.qty, *price, size_in_quote_ccy);
470
471                if order_size > order_size_filter {
472                    max_depth_qty = max_depth_qty.max(run.qty);
473                }
474            }
475        }
476
477        max_depth_qty
478    }
479}
480
481#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
482pub enum CoalesceKind {
483    First(f32),
484    Average(f32),
485    Max(f32),
486}
487
488impl CoalesceKind {
489    pub fn threshold(&self) -> f32 {
490        match self {
491            CoalesceKind::Average(t) | CoalesceKind::First(t) | CoalesceKind::Max(t) => *t,
492        }
493    }
494
495    pub fn with_threshold(&self, threshold: f32) -> Self {
496        match self {
497            CoalesceKind::First(_) => CoalesceKind::First(threshold),
498            CoalesceKind::Average(_) => CoalesceKind::Average(threshold),
499            CoalesceKind::Max(_) => CoalesceKind::Max(threshold),
500        }
501    }
502
503    fn is_within_lot_similarity(
504        self,
505        base_qty: Qty,
506        candidate_qty: Qty,
507        min_qty: MinQtySize,
508    ) -> bool {
509        let ratio = self.threshold().max(0.0);
510
511        if !ratio.is_finite() {
512            return false;
513        }
514
515        let base_lots = base_qty.to_lots(min_qty).max(0);
516        let candidate_lots = candidate_qty.to_lots(min_qty).max(0);
517
518        if base_lots == 0 {
519            return candidate_lots == 0;
520        }
521
522        let lots_diff = base_lots.abs_diff(candidate_lots) as f64;
523        let allowed_diff = (base_lots as f64) * (ratio as f64);
524
525        lots_diff <= allowed_diff
526    }
527}
528
529impl PartialEq for CoalesceKind {
530    fn eq(&self, other: &Self) -> bool {
531        std::mem::discriminant(self) == std::mem::discriminant(other)
532    }
533}
534
535impl Eq for CoalesceKind {}
536
537#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
538pub struct CoalescingRun {
539    pub start_time: u64,
540    pub until_time: u64,
541    pub is_bid: bool,
542    pub qty_sum: Qty,
543    pub run_count: u32,
544    first_qty: Qty,
545    max_qty: Qty,
546}
547
548impl CoalescingRun {
549    pub fn new(run: &OrderRun) -> Self {
550        let run_qty = run.qty;
551        CoalescingRun {
552            start_time: run.start_time,
553            until_time: run.until_time,
554            is_bid: run.is_bid,
555            qty_sum: run_qty,
556            run_count: 1,
557            first_qty: run_qty,
558            max_qty: run_qty,
559        }
560    }
561
562    pub fn merge_run(&mut self, run: &OrderRun) {
563        self.until_time = self.until_time.max(run.until_time);
564        let run_qty = run.qty;
565        self.qty_sum += run_qty;
566        self.run_count += 1;
567        self.max_qty = self.max_qty.max(run_qty);
568    }
569
570    pub fn comparison_qty(&self, kind: &CoalesceKind) -> Qty {
571        match kind {
572            CoalesceKind::Average(_) => self.current_average_qty(),
573            CoalesceKind::Max(_) | CoalesceKind::First(_) => self.first_qty,
574        }
575    }
576
577    pub fn current_average_qty(&self) -> Qty {
578        if self.run_count == 0 {
579            Qty::ZERO
580        } else {
581            let count = i64::from(self.run_count);
582            let rounded_units = (self.qty_sum.units + (count / 2)) / count;
583            Qty::from_units(rounded_units)
584        }
585    }
586
587    pub fn to_order_run(&self, kind: &CoalesceKind) -> OrderRun {
588        let final_qty = match kind {
589            CoalesceKind::Average(_) => self.current_average_qty(),
590            CoalesceKind::First(_) => self.first_qty,
591            CoalesceKind::Max(_) => self.max_qty,
592        };
593        OrderRun {
594            start_time: self.start_time,
595            until_time: self.until_time,
596            qty: final_qty,
597            is_bid: self.is_bid,
598        }
599    }
600}
601
602#[derive(Default)]
603pub struct QtyScale {
604    pub max_trade_qty: Qty,
605    pub max_aggr_volume: Qty,
606    pub max_depth_qty: Qty,
607}
608
609#[derive(Debug, Clone)]
610pub struct GroupedTrade {
611    pub is_sell: bool,
612    pub price: Price,
613    pub qty: Qty,
614}
615
616impl GroupedTrade {
617    pub fn compare_with(&self, price: Price, is_sell: bool) -> std::cmp::Ordering {
618        if self.is_sell == is_sell {
619            self.price.cmp(&price)
620        } else {
621            self.is_sell.cmp(&is_sell)
622        }
623    }
624}
625
626#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
627pub enum HeatmapStudy {
628    VolumeProfile(ProfileKind),
629}
630
631impl HeatmapStudy {
632    pub const ALL: [HeatmapStudy; 1] = [HeatmapStudy::VolumeProfile(ProfileKind::VisibleRange)];
633}
634
635impl std::fmt::Display for HeatmapStudy {
636    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637        match self {
638            HeatmapStudy::VolumeProfile(kind) => {
639                write!(f, "Volume Profile ({})", kind)
640            }
641        }
642    }
643}
644
645#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
646pub enum ProfileKind {
647    FixedWindow(usize),
648    VisibleRange,
649}
650
651impl std::fmt::Display for ProfileKind {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        match self {
654            ProfileKind::FixedWindow(_) => write!(f, "Fixed window"),
655            ProfileKind::VisibleRange => write!(f, "Visible range"),
656        }
657    }
658}