1use super::Basis;
2use super::aggr::time::DataPoint;
3use exchange::unit::MinQtySize;
4use exchange::unit::price::{Price, PriceStep};
5use exchange::unit::qty::{Qty, SizeUnit, volume_size_unit};
6use exchange::{adapter::MarketKind, depth::Depth};
7
8use rustc_hash::{FxBuildHasher, FxHashMap};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12pub const CLEANUP_THRESHOLD: usize = 4800;
13
14#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
15pub struct Config {
16 pub trade_size_filter: f32,
17 pub order_size_filter: f32,
18 pub trade_size_scale: Option<i32>,
19 pub coalescing: Option<CoalesceKind>,
20}
21
22impl Default for Config {
23 fn default() -> Self {
24 Config {
25 trade_size_filter: 0.0,
26 order_size_filter: 0.0,
27 trade_size_scale: Some(100),
28 coalescing: Some(CoalesceKind::Average(0.15)),
29 }
30 }
31}
32
33#[derive(Default)]
34pub struct HeatmapDataPoint {
35 pub grouped_trades: Box<[GroupedTrade]>,
36 pub buy_sell: (Qty, Qty),
37}
38
39impl DataPoint for HeatmapDataPoint {
40 fn add_trade(&mut self, trade: &exchange::Trade, step: PriceStep) {
41 let grouped_price: Price = trade.price.round_to_side_step(trade.is_sell, step);
42
43 match self
44 .grouped_trades
45 .binary_search_by(|probe| probe.compare_with(grouped_price, trade.is_sell))
46 {
47 Ok(index) => self.grouped_trades[index].qty += trade.qty,
48 Err(index) => {
49 let mut trades = self.grouped_trades.to_vec();
50 trades.insert(
51 index,
52 GroupedTrade {
53 is_sell: trade.is_sell,
54 price: grouped_price,
55 qty: trade.qty,
56 },
57 );
58 self.grouped_trades = trades.into_boxed_slice();
59 }
60 }
61
62 if trade.is_sell {
63 self.buy_sell.1 += trade.qty;
64 } else {
65 self.buy_sell.0 += trade.qty;
66 }
67 }
68
69 fn clear_trades(&mut self) {
70 self.grouped_trades = Box::new([]);
71 self.buy_sell = (Qty::default(), Qty::default());
72 }
73
74 fn last_trade_time(&self) -> Option<u64> {
75 None
76 }
77
78 fn first_trade_time(&self) -> Option<u64> {
79 None
80 }
81
82 fn kline(&self) -> Option<&exchange::Kline> {
83 None
84 }
85
86 fn last_price(&self) -> Price {
87 self.grouped_trades
88 .last()
89 .map_or(Price { units: 0 }, |t| t.price)
90 }
91
92 fn value_high(&self) -> Price {
93 self.grouped_trades
94 .iter()
95 .map(|t| t.price)
96 .max()
97 .unwrap_or(Price::from_units(0))
98 }
99
100 fn value_low(&self) -> Price {
101 self.grouped_trades
102 .iter()
103 .map(|t| t.price)
104 .min()
105 .unwrap_or(Price::from_units(0))
106 }
107}
108
109#[derive(Default, Debug, Clone, Copy, PartialEq)]
110pub struct OrderRun {
111 pub start_time: u64,
112 pub until_time: u64,
113 pub qty: Qty,
114 pub is_bid: bool,
115}
116
117impl OrderRun {
118 pub fn new(start_time: u64, aggr_time: u64, qty: Qty, is_bid: bool) -> Self {
119 OrderRun {
120 start_time,
121 until_time: start_time + aggr_time,
122 qty,
123 is_bid,
124 }
125 }
126
127 pub fn with_range(&self, earliest: u64, latest: u64) -> Option<&OrderRun> {
128 if self.start_time <= latest && self.until_time >= earliest {
129 Some(self)
130 } else {
131 None
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq)]
137pub struct HistoricalDepth {
138 price_levels: BTreeMap<Price, Vec<OrderRun>>,
139 pub aggr_time: u64,
140 tick_size: PriceStep,
141 min_order_qty: MinQtySize,
142 last_snapshot_time: Option<u64>,
143}
144
145impl HistoricalDepth {
146 pub fn new(min_order_qty: MinQtySize, tick_size: PriceStep, basis: Basis) -> Self {
147 Self {
148 price_levels: BTreeMap::new(),
149 aggr_time: match basis {
150 Basis::Time(interval) => interval.into(),
151 Basis::Tick(_) => unimplemented!(),
152 },
153 tick_size,
154 min_order_qty,
155 last_snapshot_time: None,
156 }
157 }
158
159 pub fn insert_latest_depth(&mut self, depth: &Depth, time: u64) {
160 if let Some(prev_time) = self.last_snapshot_time
161 && time < prev_time
162 {
163 return;
164 }
165
166 let aggr_time = self.aggr_time.max(1);
167 let has_snapshot_gap = self
168 .last_snapshot_time
169 .is_some_and(|prev_time| time > prev_time.saturating_add(aggr_time));
170
171 self.process_side(&depth.bids, time, true, has_snapshot_gap);
172 self.process_side(&depth.asks, time, false, has_snapshot_gap);
173 self.last_snapshot_time = Some(time);
174 }
175
176 fn process_side(
177 &mut self,
178 side: &BTreeMap<Price, Qty>,
179 time: u64,
180 is_bid: bool,
181 has_snapshot_gap: bool,
182 ) {
183 let mut current_price = None;
184 let mut current_qty = Qty::ZERO;
185
186 let step = self.tick_size;
187
188 for (price, qty) in side {
189 let rounded_price = price.round_to_side_step(is_bid, step);
190 if Some(rounded_price) == current_price {
191 current_qty += *qty;
192 } else {
193 if let Some(price) = current_price {
194 self.update_price_level(time, price, current_qty, is_bid, has_snapshot_gap);
195 }
196 current_price = Some(rounded_price);
197 current_qty = *qty;
198 }
199 }
200
201 if let Some(price) = current_price {
202 self.update_price_level(time, price, current_qty, is_bid, has_snapshot_gap);
203 }
204 }
205
206 fn update_price_level(
207 &mut self,
208 time: u64,
209 price: Price,
210 qty: Qty,
211 is_bid: bool,
212 has_snapshot_gap: bool,
213 ) {
214 let aggr_time = self.aggr_time;
215 let price_level = self.price_levels.entry(price).or_default();
216
217 match price_level.last_mut() {
218 Some(last_run) if last_run.is_bid == is_bid => {
219 if time > last_run.until_time {
220 if has_snapshot_gap {
221 if qty == last_run.qty {
222 last_run.until_time = time.saturating_add(aggr_time);
223 return;
224 }
225
226 last_run.until_time = time;
227 }
228
229 price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
230 return;
231 }
232
233 if qty == last_run.qty {
234 let new_until = time + aggr_time;
235 if new_until > last_run.until_time {
236 last_run.until_time = new_until;
237 }
238 } else {
239 if last_run.until_time > time {
240 last_run.until_time = time;
241 }
242 price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
243 }
244 }
245 Some(last_run) => {
246 if last_run.until_time > time {
247 last_run.until_time = time;
248 }
249 price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
250 }
251 None => {
252 price_level.push(OrderRun::new(time, aggr_time, qty, is_bid));
253 }
254 }
255 }
256
257 pub fn is_empty(&self) -> bool {
258 self.price_levels.is_empty()
259 }
260
261 pub fn iter_time_filtered(
262 &self,
263 earliest: u64,
264 latest: u64,
265 highest: Price,
266 lowest: Price,
267 ) -> impl Iterator<Item = (&Price, &Vec<OrderRun>)> {
268 self.price_levels
269 .range(lowest..=highest)
270 .filter(move |(_, runs)| {
271 runs.iter()
272 .any(|run| run.until_time >= earliest && run.start_time <= latest)
273 })
274 }
275
276 pub fn latest_order_runs(
277 &self,
278 highest: Price,
279 lowest: Price,
280 latest_timestamp: u64,
281 ) -> impl Iterator<Item = (&Price, &OrderRun)> {
282 self.price_levels
283 .range(lowest..=highest)
284 .filter_map(move |(price, runs)| {
285 runs.last()
286 .filter(|run| run.until_time >= latest_timestamp)
287 .map(|run| (price, run))
288 })
289 }
290
291 pub fn cleanup_old_price_levels(&mut self, oldest_time: u64) {
292 self.price_levels.iter_mut().for_each(|(_, runs)| {
293 runs.retain(|run| run.until_time >= oldest_time);
294 });
295
296 self.price_levels.retain(|_, runs| !runs.is_empty());
297 }
298
299 pub fn coalesced_runs(
300 &self,
301 earliest: u64,
302 latest: u64,
303 highest: Price,
304 lowest: Price,
305 market_type: MarketKind,
306 order_size_filter: f32,
307 coalesce_kind: CoalesceKind,
308 ) -> Vec<(Price, OrderRun)> {
309 let mut result_runs = Vec::new();
310
311 let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
312
313 for (price_at_level, runs_at_price_level) in
314 self.iter_time_filtered(earliest, latest, highest, lowest)
315 {
316 let candidate_runs = runs_at_price_level
317 .iter()
318 .filter(|run_ref| {
319 if !(run_ref.until_time >= earliest && run_ref.start_time <= latest) {
320 return false;
321 }
322 let order_size = market_type.qty_in_quote_value(
323 run_ref.qty,
324 *price_at_level,
325 size_in_quote_ccy,
326 );
327 order_size > order_size_filter
328 })
329 .collect::<Vec<&OrderRun>>();
330
331 if candidate_runs.is_empty() {
332 continue;
333 }
334
335 let mut current_accumulator_opt: Option<CoalescingRun> = None;
336
337 for run_to_process_ref in candidate_runs {
338 let run_to_process = *run_to_process_ref;
339
340 if let Some(current_accumulator) = current_accumulator_opt.as_mut() {
341 let comparison_base_qty = current_accumulator.comparison_qty(&coalesce_kind);
342 let qty_within_threshold = coalesce_kind.is_within_lot_similarity(
343 comparison_base_qty,
344 run_to_process.qty,
345 self.min_order_qty,
346 );
347
348 if run_to_process.start_time <= current_accumulator.until_time
349 && run_to_process.is_bid == current_accumulator.is_bid
350 && qty_within_threshold
351 {
352 current_accumulator.merge_run(&run_to_process);
353 } else {
354 result_runs.push((
355 *price_at_level,
356 current_accumulator.to_order_run(&coalesce_kind),
357 ));
358 current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
359 }
360 } else {
361 current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
362 }
363 }
364
365 if let Some(accumulator) = current_accumulator_opt {
366 result_runs.push((*price_at_level, accumulator.to_order_run(&coalesce_kind)));
367 }
368 }
369 result_runs
370 }
371
372 pub fn query_grid_qtys(
373 &self,
374 center_time: u64,
375 center_price: Price,
376 time_interval_offsets: &[i64],
377 price_tick_offsets: &[i64],
378 market_type: MarketKind,
379 order_size_filter: f32,
380 coalesce_kind: Option<CoalesceKind>,
381 ) -> FxHashMap<(u64, Price), (Qty, bool)> {
382 let aggr_time = self.aggr_time;
383
384 let step = self.tick_size;
385
386 let query_earliest_time = time_interval_offsets
387 .iter()
388 .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
389 .min()
390 .unwrap_or(center_time);
391
392 let query_latest_time = time_interval_offsets
393 .iter()
394 .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
395 .max()
396 .map_or(center_time, |t| t.saturating_add(aggr_time));
397
398 let query_lowest = price_tick_offsets
399 .iter()
400 .copied()
401 .min()
402 .map_or(center_price, |offset| center_price.add_steps(offset, step));
403 let query_highest = price_tick_offsets
404 .iter()
405 .copied()
406 .max()
407 .map_or(center_price, |offset| center_price.add_steps(offset, step));
408
409 let runs_in_vicinity: Vec<(Price, OrderRun)> = if let Some(ck) = coalesce_kind {
410 self.coalesced_runs(
411 query_earliest_time,
412 query_latest_time,
413 query_highest,
414 query_lowest,
415 market_type,
416 order_size_filter,
417 ck,
418 )
419 } else {
420 self.iter_time_filtered(
421 query_earliest_time,
422 query_latest_time,
423 query_highest,
424 query_lowest,
425 )
426 .flat_map(|(price_level, runs_at_price)| {
427 runs_at_price.iter().map(move |run| (*price_level, *run))
428 })
429 .collect()
430 };
431
432 let capacity = time_interval_offsets.len() * price_tick_offsets.len();
433 let mut grid_quantities: FxHashMap<(u64, Price), (Qty, bool)> =
434 FxHashMap::with_capacity_and_hasher(capacity, FxBuildHasher);
435 for price_offset in price_tick_offsets {
436 let target_price_key = center_price.add_steps(*price_offset, step);
437
438 for time_offset in time_interval_offsets {
439 let target_time_val =
440 center_time.saturating_add_signed(*time_offset * aggr_time as i64);
441 let current_grid_key = (target_time_val, target_price_key);
442
443 for (run_price_level, run_data) in &runs_in_vicinity {
444 if *run_price_level == target_price_key
445 && run_data.start_time <= target_time_val
446 && run_data.until_time > target_time_val
447 {
448 grid_quantities.insert(current_grid_key, (run_data.qty, run_data.is_bid));
449 break;
450 }
451 }
452 }
453 }
454 grid_quantities
455 }
456
457 pub fn max_qty_in_range_raw(
458 &self,
459 earliest: u64,
460 latest: u64,
461 highest: Price,
462 lowest: Price,
463 ) -> Qty {
464 let mut max_qty = Qty::ZERO;
465
466 for (_price, runs) in self.price_levels.range(lowest..=highest) {
467 for run in runs.iter() {
468 if run.until_time < earliest || run.start_time > latest {
469 continue;
470 }
471 max_qty = max_qty.max(run.qty);
472 }
473 }
474
475 max_qty
476 }
477
478 pub fn max_depth_qty_in_range(
479 &self,
480 earliest: u64,
481 latest: u64,
482 highest: Price,
483 lowest: Price,
484 market_type: MarketKind,
485 order_size_filter: f32,
486 ) -> Qty {
487 let mut max_depth_qty = Qty::ZERO;
488 let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
489
490 for (price, runs) in self.price_levels.range(lowest..=highest) {
491 for run in runs.iter() {
492 if run.until_time < earliest || run.start_time > latest {
493 continue;
494 }
495
496 let order_size = market_type.qty_in_quote_value(run.qty, *price, size_in_quote_ccy);
497
498 if order_size > order_size_filter {
499 max_depth_qty = max_depth_qty.max(run.qty);
500 }
501 }
502 }
503
504 max_depth_qty
505 }
506}
507
508#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
509pub enum CoalesceKind {
510 First(f32),
511 Average(f32),
512 Max(f32),
513}
514
515impl CoalesceKind {
516 pub fn threshold(&self) -> f32 {
517 match self {
518 CoalesceKind::Average(t) | CoalesceKind::First(t) | CoalesceKind::Max(t) => *t,
519 }
520 }
521
522 pub fn with_threshold(&self, threshold: f32) -> Self {
523 match self {
524 CoalesceKind::First(_) => CoalesceKind::First(threshold),
525 CoalesceKind::Average(_) => CoalesceKind::Average(threshold),
526 CoalesceKind::Max(_) => CoalesceKind::Max(threshold),
527 }
528 }
529
530 fn is_within_lot_similarity(
531 self,
532 base_qty: Qty,
533 candidate_qty: Qty,
534 min_qty: MinQtySize,
535 ) -> bool {
536 let ratio = self.threshold().max(0.0);
537
538 if !ratio.is_finite() {
539 return false;
540 }
541
542 let base_lots = base_qty.to_lots(min_qty).max(0);
543 let candidate_lots = candidate_qty.to_lots(min_qty).max(0);
544
545 if base_lots == 0 {
546 return candidate_lots == 0;
547 }
548
549 let lots_diff = base_lots.abs_diff(candidate_lots) as f64;
550 let allowed_diff = (base_lots as f64) * (ratio as f64);
551
552 lots_diff <= allowed_diff
553 }
554}
555
556impl PartialEq for CoalesceKind {
557 fn eq(&self, other: &Self) -> bool {
558 std::mem::discriminant(self) == std::mem::discriminant(other)
559 }
560}
561
562impl Eq for CoalesceKind {}
563
564#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
565pub struct CoalescingRun {
566 pub start_time: u64,
567 pub until_time: u64,
568 pub is_bid: bool,
569 pub qty_sum: Qty,
570 pub run_count: u32,
571 first_qty: Qty,
572 max_qty: Qty,
573}
574
575impl CoalescingRun {
576 pub fn new(run: &OrderRun) -> Self {
577 let run_qty = run.qty;
578 CoalescingRun {
579 start_time: run.start_time,
580 until_time: run.until_time,
581 is_bid: run.is_bid,
582 qty_sum: run_qty,
583 run_count: 1,
584 first_qty: run_qty,
585 max_qty: run_qty,
586 }
587 }
588
589 pub fn merge_run(&mut self, run: &OrderRun) {
590 self.until_time = self.until_time.max(run.until_time);
591 let run_qty = run.qty;
592 self.qty_sum += run_qty;
593 self.run_count += 1;
594 self.max_qty = self.max_qty.max(run_qty);
595 }
596
597 pub fn comparison_qty(&self, kind: &CoalesceKind) -> Qty {
598 match kind {
599 CoalesceKind::Average(_) => self.current_average_qty(),
600 CoalesceKind::Max(_) | CoalesceKind::First(_) => self.first_qty,
601 }
602 }
603
604 pub fn current_average_qty(&self) -> Qty {
605 if self.run_count == 0 {
606 Qty::ZERO
607 } else {
608 let count = i64::from(self.run_count);
609 let rounded_units = (self.qty_sum.units + (count / 2)) / count;
610 Qty::from_units(rounded_units)
611 }
612 }
613
614 pub fn to_order_run(&self, kind: &CoalesceKind) -> OrderRun {
615 let final_qty = match kind {
616 CoalesceKind::Average(_) => self.current_average_qty(),
617 CoalesceKind::First(_) => self.first_qty,
618 CoalesceKind::Max(_) => self.max_qty,
619 };
620 OrderRun {
621 start_time: self.start_time,
622 until_time: self.until_time,
623 qty: final_qty,
624 is_bid: self.is_bid,
625 }
626 }
627}
628
629#[derive(Default)]
630pub struct QtyScale {
631 pub max_trade_qty: Qty,
632 pub max_aggr_volume: Qty,
633 pub max_depth_qty: Qty,
634}
635
636#[derive(Debug, Clone)]
637pub struct GroupedTrade {
638 pub is_sell: bool,
639 pub price: Price,
640 pub qty: Qty,
641}
642
643impl GroupedTrade {
644 pub fn compare_with(&self, price: Price, is_sell: bool) -> std::cmp::Ordering {
645 if self.is_sell == is_sell {
646 self.price.cmp(&price)
647 } else {
648 self.is_sell.cmp(&is_sell)
649 }
650 }
651}
652
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
654pub enum HeatmapStudy {
655 VolumeProfile(ProfileKind),
656}
657
658impl HeatmapStudy {
659 pub const ALL: [HeatmapStudy; 1] = [HeatmapStudy::VolumeProfile(ProfileKind::VisibleRange)];
660}
661
662impl std::fmt::Display for HeatmapStudy {
663 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
664 match self {
665 HeatmapStudy::VolumeProfile(kind) => {
666 write!(f, "Volume Profile ({})", kind)
667 }
668 }
669 }
670}
671
672#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
673pub enum ProfileKind {
674 FixedWindow(usize),
675 #[default]
676 VisibleRange,
677}
678
679impl std::fmt::Display for ProfileKind {
680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681 match self {
682 ProfileKind::FixedWindow(_) => write!(f, "Fixed window"),
683 ProfileKind::VisibleRange => write!(f, "Visible range"),
684 }
685 }
686}