Skip to main content

flowsurface_data/chart/
heatmap.rs

1use super::Basis;
2use super::aggr::time::DataPoint;
3use exchange::unit::MinQtySize;
4use exchange::unit::price::{Price, PriceStep};
5use exchange::unit::qty::{Qty, SizeUnit, volume_size_unit};
6use exchange::{adapter::MarketKind, depth::Depth};
7
8use rustc_hash::{FxBuildHasher, FxHashMap};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12pub const CLEANUP_THRESHOLD: usize = 4800;
13
14#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
15pub struct Config {
16    pub trade_size_filter: f32,
17    pub order_size_filter: f32,
18    pub trade_size_scale: Option<i32>,
19    pub coalescing: Option<CoalesceKind>,
20}
21
22impl Default for Config {
23    fn default() -> Self {
24        Config {
25            trade_size_filter: 0.0,
26            order_size_filter: 0.0,
27            trade_size_scale: Some(100),
28            coalescing: Some(CoalesceKind::Average(0.15)),
29        }
30    }
31}
32
33#[derive(Default)]
34pub struct HeatmapDataPoint {
35    pub grouped_trades: Box<[GroupedTrade]>,
36    pub buy_sell: (Qty, Qty),
37}
38
39impl DataPoint for HeatmapDataPoint {
40    fn add_trade(&mut self, trade: &exchange::Trade, step: PriceStep) {
41        let grouped_price: Price = trade.price.round_to_side_step(trade.is_sell, step);
42
43        match self
44            .grouped_trades
45            .binary_search_by(|probe| probe.compare_with(grouped_price, trade.is_sell))
46        {
47            Ok(index) => self.grouped_trades[index].qty += trade.qty,
48            Err(index) => {
49                let mut trades = self.grouped_trades.to_vec();
50                trades.insert(
51                    index,
52                    GroupedTrade {
53                        is_sell: trade.is_sell,
54                        price: grouped_price,
55                        qty: trade.qty,
56                    },
57                );
58                self.grouped_trades = trades.into_boxed_slice();
59            }
60        }
61
62        if trade.is_sell {
63            self.buy_sell.1 += trade.qty;
64        } else {
65            self.buy_sell.0 += trade.qty;
66        }
67    }
68
69    fn clear_trades(&mut self) {
70        self.grouped_trades = Box::new([]);
71        self.buy_sell = (Qty::default(), Qty::default());
72    }
73
74    fn last_trade_time(&self) -> Option<u64> {
75        None
76    }
77
78    fn first_trade_time(&self) -> Option<u64> {
79        None
80    }
81
82    fn kline(&self) -> Option<&exchange::Kline> {
83        None
84    }
85
86    fn last_price(&self) -> Price {
87        self.grouped_trades
88            .last()
89            .map_or(Price { units: 0 }, |t| t.price)
90    }
91
92    fn value_high(&self) -> Price {
93        self.grouped_trades
94            .iter()
95            .map(|t| t.price)
96            .max()
97            .unwrap_or(Price::from_units(0))
98    }
99
100    fn value_low(&self) -> Price {
101        self.grouped_trades
102            .iter()
103            .map(|t| t.price)
104            .min()
105            .unwrap_or(Price::from_units(0))
106    }
107}
108
109#[derive(Default, Debug, Clone, Copy, PartialEq)]
110pub struct OrderRun {
111    pub start_time: u64,
112    pub until_time: u64,
113    pub qty: Qty,
114    pub is_bid: bool,
115}
116
117impl OrderRun {
118    pub fn new(start_time: u64, aggr_time: u64, qty: Qty, is_bid: bool) -> Self {
119        OrderRun {
120            start_time,
121            until_time: start_time + aggr_time,
122            qty,
123            is_bid,
124        }
125    }
126
127    pub fn with_range(&self, earliest: u64, latest: u64) -> Option<&OrderRun> {
128        if self.start_time <= latest && self.until_time >= earliest {
129            Some(self)
130        } else {
131            None
132        }
133    }
134}
135
136#[derive(Debug, Clone, PartialEq)]
137pub struct HistoricalDepth {
138    price_levels: BTreeMap<Price, Vec<OrderRun>>,
139    pub aggr_time: u64,
140    tick_size: PriceStep,
141    min_order_qty: MinQtySize,
142    last_snapshot_time: Option<u64>,
143}
144
145impl HistoricalDepth {
146    pub fn new(min_order_qty: MinQtySize, tick_size: PriceStep, basis: Basis) -> Self {
147        Self {
148            price_levels: BTreeMap::new(),
149            aggr_time: match basis {
150                Basis::Time(interval) => interval.into(),
151                Basis::Tick(_) => unimplemented!(),
152            },
153            tick_size,
154            min_order_qty,
155            last_snapshot_time: None,
156        }
157    }
158
159    pub fn insert_latest_depth(&mut self, depth: &Depth, time: u64) {
160        if let Some(prev_time) = self.last_snapshot_time
161            && time < prev_time
162        {
163            return;
164        }
165
166        let aggr_time = self.aggr_time.max(1);
167        let has_snapshot_gap = self
168            .last_snapshot_time
169            .is_some_and(|prev_time| time > prev_time.saturating_add(aggr_time));
170
171        self.process_side(&depth.bids, time, true, has_snapshot_gap);
172        self.process_side(&depth.asks, time, false, has_snapshot_gap);
173        self.last_snapshot_time = Some(time);
174    }
175
176    fn process_side(
177        &mut self,
178        side: &BTreeMap<Price, Qty>,
179        time: u64,
180        is_bid: bool,
181        has_snapshot_gap: bool,
182    ) {
183        let mut current_price = None;
184        let mut current_qty = Qty::ZERO;
185
186        let step = self.tick_size;
187
188        for (price, qty) in side {
189            let rounded_price = price.round_to_side_step(is_bid, step);
190            if Some(rounded_price) == current_price {
191                current_qty += *qty;
192            } else {
193                if let Some(price) = current_price {
194                    self.update_price_level(time, price, current_qty, is_bid, has_snapshot_gap);
195                }
196                current_price = Some(rounded_price);
197                current_qty = *qty;
198            }
199        }
200
201        if let Some(price) = current_price {
202            self.update_price_level(time, price, current_qty, is_bid, has_snapshot_gap);
203        }
204    }
205
206    fn update_price_level(
207        &mut self,
208        time: u64,
209        price: Price,
210        qty: Qty,
211        is_bid: bool,
212        has_snapshot_gap: bool,
213    ) {
214        let aggr_time = self.aggr_time;
215        let price_level = self.price_levels.entry(price).or_default();
216
217        match price_level.last_mut() {
218            Some(last_run) if last_run.is_bid == is_bid => {
219                if time > last_run.until_time {
220                    if has_snapshot_gap {
221                        if qty == last_run.qty {
222                            last_run.until_time = time.saturating_add(aggr_time);
223                            return;
224                        }
225
226                        last_run.until_time = time;
227                    }
228
229                    price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
230                    return;
231                }
232
233                if qty == last_run.qty {
234                    let new_until = time + aggr_time;
235                    if new_until > last_run.until_time {
236                        last_run.until_time = new_until;
237                    }
238                } else {
239                    if last_run.until_time > time {
240                        last_run.until_time = time;
241                    }
242                    price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
243                }
244            }
245            Some(last_run) => {
246                if last_run.until_time > time {
247                    last_run.until_time = time;
248                }
249                price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
250            }
251            None => {
252                price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
253            }
254        }
255    }
256
257    pub fn is_empty(&self) -> bool {
258        self.price_levels.is_empty()
259    }
260
261    pub fn iter_time_filtered(
262        &self,
263        earliest: u64,
264        latest: u64,
265        highest: Price,
266        lowest: Price,
267    ) -> impl Iterator<Item = (&Price, &Vec<OrderRun>)> {
268        self.price_levels
269            .range(lowest..=highest)
270            .filter(move |(_, runs)| {
271                runs.iter()
272                    .any(|run| run.until_time >= earliest && run.start_time <= latest)
273            })
274    }
275
276    pub fn latest_order_runs(
277        &self,
278        highest: Price,
279        lowest: Price,
280        latest_timestamp: u64,
281    ) -> impl Iterator<Item = (&Price, &OrderRun)> {
282        self.price_levels
283            .range(lowest..=highest)
284            .filter_map(move |(price, runs)| {
285                runs.last()
286                    .filter(|run| run.until_time >= latest_timestamp)
287                    .map(|run| (price, run))
288            })
289    }
290
291    pub fn cleanup_old_price_levels(&mut self, oldest_time: u64) {
292        self.price_levels.iter_mut().for_each(|(_, runs)| {
293            runs.retain(|run| run.until_time >= oldest_time);
294        });
295
296        self.price_levels.retain(|_, runs| !runs.is_empty());
297    }
298
299    pub fn coalesced_runs(
300        &self,
301        earliest: u64,
302        latest: u64,
303        highest: Price,
304        lowest: Price,
305        market_type: MarketKind,
306        order_size_filter: f32,
307        coalesce_kind: CoalesceKind,
308    ) -> Vec<(Price, OrderRun)> {
309        let mut result_runs = Vec::new();
310
311        let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
312
313        for (price_at_level, runs_at_price_level) in
314            self.iter_time_filtered(earliest, latest, highest, lowest)
315        {
316            let candidate_runs = runs_at_price_level
317                .iter()
318                .filter(|run_ref| {
319                    if !(run_ref.until_time >= earliest && run_ref.start_time <= latest) {
320                        return false;
321                    }
322                    let order_size = market_type.qty_in_quote_value(
323                        run_ref.qty,
324                        *price_at_level,
325                        size_in_quote_ccy,
326                    );
327                    order_size > order_size_filter
328                })
329                .collect::<Vec<&OrderRun>>();
330
331            if candidate_runs.is_empty() {
332                continue;
333            }
334
335            let mut current_accumulator_opt: Option<CoalescingRun> = None;
336
337            for run_to_process_ref in candidate_runs {
338                let run_to_process = *run_to_process_ref;
339
340                if let Some(current_accumulator) = current_accumulator_opt.as_mut() {
341                    let comparison_base_qty = current_accumulator.comparison_qty(&coalesce_kind);
342                    let qty_within_threshold = coalesce_kind.is_within_lot_similarity(
343                        comparison_base_qty,
344                        run_to_process.qty,
345                        self.min_order_qty,
346                    );
347
348                    if run_to_process.start_time <= current_accumulator.until_time
349                        && run_to_process.is_bid == current_accumulator.is_bid
350                        && qty_within_threshold
351                    {
352                        current_accumulator.merge_run(&run_to_process);
353                    } else {
354                        result_runs.push((
355                            *price_at_level,
356                            current_accumulator.to_order_run(&coalesce_kind),
357                        ));
358                        current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
359                    }
360                } else {
361                    current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
362                }
363            }
364
365            if let Some(accumulator) = current_accumulator_opt {
366                result_runs.push((*price_at_level, accumulator.to_order_run(&coalesce_kind)));
367            }
368        }
369        result_runs
370    }
371
372    pub fn query_grid_qtys(
373        &self,
374        center_time: u64,
375        center_price: Price,
376        time_interval_offsets: &[i64],
377        price_tick_offsets: &[i64],
378        market_type: MarketKind,
379        order_size_filter: f32,
380        coalesce_kind: Option<CoalesceKind>,
381    ) -> FxHashMap<(u64, Price), (Qty, bool)> {
382        let aggr_time = self.aggr_time;
383
384        let step = self.tick_size;
385
386        let query_earliest_time = time_interval_offsets
387            .iter()
388            .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
389            .min()
390            .unwrap_or(center_time);
391
392        let query_latest_time = time_interval_offsets
393            .iter()
394            .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
395            .max()
396            .map_or(center_time, |t| t.saturating_add(aggr_time));
397
398        let query_lowest = price_tick_offsets
399            .iter()
400            .copied()
401            .min()
402            .map_or(center_price, |offset| center_price.add_steps(offset, step));
403        let query_highest = price_tick_offsets
404            .iter()
405            .copied()
406            .max()
407            .map_or(center_price, |offset| center_price.add_steps(offset, step));
408
409        let runs_in_vicinity: Vec<(Price, OrderRun)> = if let Some(ck) = coalesce_kind {
410            self.coalesced_runs(
411                query_earliest_time,
412                query_latest_time,
413                query_highest,
414                query_lowest,
415                market_type,
416                order_size_filter,
417                ck,
418            )
419        } else {
420            self.iter_time_filtered(
421                query_earliest_time,
422                query_latest_time,
423                query_highest,
424                query_lowest,
425            )
426            .flat_map(|(price_level, runs_at_price)| {
427                runs_at_price.iter().map(move |run| (*price_level, *run))
428            })
429            .collect()
430        };
431
432        let capacity = time_interval_offsets.len() * price_tick_offsets.len();
433        let mut grid_quantities: FxHashMap<(u64, Price), (Qty, bool)> =
434            FxHashMap::with_capacity_and_hasher(capacity, FxBuildHasher);
435        for price_offset in price_tick_offsets {
436            let target_price_key = center_price.add_steps(*price_offset, step);
437
438            for time_offset in time_interval_offsets {
439                let target_time_val =
440                    center_time.saturating_add_signed(*time_offset * aggr_time as i64);
441                let current_grid_key = (target_time_val, target_price_key);
442
443                for (run_price_level, run_data) in &runs_in_vicinity {
444                    if *run_price_level == target_price_key
445                        && run_data.start_time <= target_time_val
446                        && run_data.until_time > target_time_val
447                    {
448                        grid_quantities.insert(current_grid_key, (run_data.qty, run_data.is_bid));
449                        break;
450                    }
451                }
452            }
453        }
454        grid_quantities
455    }
456
457    pub fn max_qty_in_range_raw(
458        &self,
459        earliest: u64,
460        latest: u64,
461        highest: Price,
462        lowest: Price,
463    ) -> Qty {
464        let mut max_qty = Qty::ZERO;
465
466        for (_price, runs) in self.price_levels.range(lowest..=highest) {
467            for run in runs.iter() {
468                if run.until_time < earliest || run.start_time > latest {
469                    continue;
470                }
471                max_qty = max_qty.max(run.qty);
472            }
473        }
474
475        max_qty
476    }
477
478    pub fn max_depth_qty_in_range(
479        &self,
480        earliest: u64,
481        latest: u64,
482        highest: Price,
483        lowest: Price,
484        market_type: MarketKind,
485        order_size_filter: f32,
486    ) -> Qty {
487        let mut max_depth_qty = Qty::ZERO;
488        let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
489
490        for (price, runs) in self.price_levels.range(lowest..=highest) {
491            for run in runs.iter() {
492                if run.until_time < earliest || run.start_time > latest {
493                    continue;
494                }
495
496                let order_size = market_type.qty_in_quote_value(run.qty, *price, size_in_quote_ccy);
497
498                if order_size > order_size_filter {
499                    max_depth_qty = max_depth_qty.max(run.qty);
500                }
501            }
502        }
503
504        max_depth_qty
505    }
506}
507
508#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
509pub enum CoalesceKind {
510    First(f32),
511    Average(f32),
512    Max(f32),
513}
514
515impl CoalesceKind {
516    pub fn threshold(&self) -> f32 {
517        match self {
518            CoalesceKind::Average(t) | CoalesceKind::First(t) | CoalesceKind::Max(t) => *t,
519        }
520    }
521
522    pub fn with_threshold(&self, threshold: f32) -> Self {
523        match self {
524            CoalesceKind::First(_) => CoalesceKind::First(threshold),
525            CoalesceKind::Average(_) => CoalesceKind::Average(threshold),
526            CoalesceKind::Max(_) => CoalesceKind::Max(threshold),
527        }
528    }
529
530    fn is_within_lot_similarity(
531        self,
532        base_qty: Qty,
533        candidate_qty: Qty,
534        min_qty: MinQtySize,
535    ) -> bool {
536        let ratio = self.threshold().max(0.0);
537
538        if !ratio.is_finite() {
539            return false;
540        }
541
542        let base_lots = base_qty.to_lots(min_qty).max(0);
543        let candidate_lots = candidate_qty.to_lots(min_qty).max(0);
544
545        if base_lots == 0 {
546            return candidate_lots == 0;
547        }
548
549        let lots_diff = base_lots.abs_diff(candidate_lots) as f64;
550        let allowed_diff = (base_lots as f64) * (ratio as f64);
551
552        lots_diff <= allowed_diff
553    }
554}
555
556impl PartialEq for CoalesceKind {
557    fn eq(&self, other: &Self) -> bool {
558        std::mem::discriminant(self) == std::mem::discriminant(other)
559    }
560}
561
562impl Eq for CoalesceKind {}
563
564#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
565pub struct CoalescingRun {
566    pub start_time: u64,
567    pub until_time: u64,
568    pub is_bid: bool,
569    pub qty_sum: Qty,
570    pub run_count: u32,
571    first_qty: Qty,
572    max_qty: Qty,
573}
574
575impl CoalescingRun {
576    pub fn new(run: &OrderRun) -> Self {
577        let run_qty = run.qty;
578        CoalescingRun {
579            start_time: run.start_time,
580            until_time: run.until_time,
581            is_bid: run.is_bid,
582            qty_sum: run_qty,
583            run_count: 1,
584            first_qty: run_qty,
585            max_qty: run_qty,
586        }
587    }
588
589    pub fn merge_run(&mut self, run: &OrderRun) {
590        self.until_time = self.until_time.max(run.until_time);
591        let run_qty = run.qty;
592        self.qty_sum += run_qty;
593        self.run_count += 1;
594        self.max_qty = self.max_qty.max(run_qty);
595    }
596
597    pub fn comparison_qty(&self, kind: &CoalesceKind) -> Qty {
598        match kind {
599            CoalesceKind::Average(_) => self.current_average_qty(),
600            CoalesceKind::Max(_) | CoalesceKind::First(_) => self.first_qty,
601        }
602    }
603
604    pub fn current_average_qty(&self) -> Qty {
605        if self.run_count == 0 {
606            Qty::ZERO
607        } else {
608            let count = i64::from(self.run_count);
609            let rounded_units = (self.qty_sum.units + (count / 2)) / count;
610            Qty::from_units(rounded_units)
611        }
612    }
613
614    pub fn to_order_run(&self, kind: &CoalesceKind) -> OrderRun {
615        let final_qty = match kind {
616            CoalesceKind::Average(_) => self.current_average_qty(),
617            CoalesceKind::First(_) => self.first_qty,
618            CoalesceKind::Max(_) => self.max_qty,
619        };
620        OrderRun {
621            start_time: self.start_time,
622            until_time: self.until_time,
623            qty: final_qty,
624            is_bid: self.is_bid,
625        }
626    }
627}
628
629#[derive(Default)]
630pub struct QtyScale {
631    pub max_trade_qty: Qty,
632    pub max_aggr_volume: Qty,
633    pub max_depth_qty: Qty,
634}
635
636#[derive(Debug, Clone)]
637pub struct GroupedTrade {
638    pub is_sell: bool,
639    pub price: Price,
640    pub qty: Qty,
641}
642
643impl GroupedTrade {
644    pub fn compare_with(&self, price: Price, is_sell: bool) -> std::cmp::Ordering {
645        if self.is_sell == is_sell {
646            self.price.cmp(&price)
647        } else {
648            self.is_sell.cmp(&is_sell)
649        }
650    }
651}
652
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
654pub enum HeatmapStudy {
655    VolumeProfile(ProfileKind),
656}
657
658impl HeatmapStudy {
659    pub const ALL: [HeatmapStudy; 1] = [HeatmapStudy::VolumeProfile(ProfileKind::VisibleRange)];
660}
661
662impl std::fmt::Display for HeatmapStudy {
663    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
664        match self {
665            HeatmapStudy::VolumeProfile(kind) => {
666                write!(f, "Volume Profile ({})", kind)
667            }
668        }
669    }
670}
671
672#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
673pub enum ProfileKind {
674    FixedWindow(usize),
675    #[default]
676    VisibleRange,
677}
678
679impl std::fmt::Display for ProfileKind {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        match self {
682            ProfileKind::FixedWindow(_) => write!(f, "Fixed window"),
683            ProfileKind::VisibleRange => write!(f, "Visible range"),
684        }
685    }
686}