1use super::Basis;
2use super::aggr::time::DataPoint;
3use exchange::unit::MinQtySize;
4use exchange::unit::price::{Price, PriceStep};
5use exchange::unit::qty::{Qty, SizeUnit, volume_size_unit};
6use exchange::{adapter::MarketKind, depth::Depth};
7
8use rustc_hash::{FxBuildHasher, FxHashMap};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12pub const CLEANUP_THRESHOLD: usize = 4800;
13
14const GRACE_PERIOD_MS: u64 = 500;
17
18#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
19pub struct Config {
20 pub trade_size_filter: f32,
21 pub order_size_filter: f32,
22 pub trade_size_scale: Option<i32>,
23 pub coalescing: Option<CoalesceKind>,
24}
25
26impl Default for Config {
27 fn default() -> Self {
28 Config {
29 trade_size_filter: 0.0,
30 order_size_filter: 0.0,
31 trade_size_scale: Some(100),
32 coalescing: Some(CoalesceKind::Average(0.15)),
33 }
34 }
35}
36
37#[derive(Default)]
38pub struct HeatmapDataPoint {
39 pub grouped_trades: Box<[GroupedTrade]>,
40 pub buy_sell: (Qty, Qty),
41}
42
43impl DataPoint for HeatmapDataPoint {
44 fn add_trade(&mut self, trade: &exchange::Trade, step: PriceStep) {
45 let grouped_price: Price = trade.price.round_to_side_step(trade.is_sell, step);
46
47 match self
48 .grouped_trades
49 .binary_search_by(|probe| probe.compare_with(grouped_price, trade.is_sell))
50 {
51 Ok(index) => self.grouped_trades[index].qty += trade.qty,
52 Err(index) => {
53 let mut trades = self.grouped_trades.to_vec();
54 trades.insert(
55 index,
56 GroupedTrade {
57 is_sell: trade.is_sell,
58 price: grouped_price,
59 qty: trade.qty,
60 },
61 );
62 self.grouped_trades = trades.into_boxed_slice();
63 }
64 }
65
66 if trade.is_sell {
67 self.buy_sell.1 += trade.qty;
68 } else {
69 self.buy_sell.0 += trade.qty;
70 }
71 }
72
73 fn clear_trades(&mut self) {
74 self.grouped_trades = Box::new([]);
75 self.buy_sell = (Qty::default(), Qty::default());
76 }
77
78 fn last_trade_time(&self) -> Option<u64> {
79 None
80 }
81
82 fn first_trade_time(&self) -> Option<u64> {
83 None
84 }
85
86 fn kline(&self) -> Option<&exchange::Kline> {
87 None
88 }
89
90 fn last_price(&self) -> Price {
91 self.grouped_trades
92 .last()
93 .map_or(Price { units: 0 }, |t| t.price)
94 }
95
96 fn value_high(&self) -> Price {
97 self.grouped_trades
98 .iter()
99 .map(|t| t.price)
100 .max()
101 .unwrap_or(Price::from_units(0))
102 }
103
104 fn value_low(&self) -> Price {
105 self.grouped_trades
106 .iter()
107 .map(|t| t.price)
108 .min()
109 .unwrap_or(Price::from_units(0))
110 }
111}
112
113#[derive(Default, Debug, Clone, Copy, PartialEq)]
114pub struct OrderRun {
115 pub start_time: u64,
116 pub until_time: u64,
117 pub qty: Qty,
118 pub is_bid: bool,
119}
120
121impl OrderRun {
122 pub fn new(start_time: u64, aggr_time: u64, qty: Qty, is_bid: bool) -> Self {
123 OrderRun {
124 start_time,
125 until_time: start_time + aggr_time,
126 qty,
127 is_bid,
128 }
129 }
130
131 pub fn with_range(&self, earliest: u64, latest: u64) -> Option<&OrderRun> {
132 if self.start_time <= latest && self.until_time >= earliest {
133 Some(self)
134 } else {
135 None
136 }
137 }
138}
139
140#[derive(Debug, Clone, PartialEq)]
141pub struct HistoricalDepth {
142 price_levels: BTreeMap<Price, Vec<OrderRun>>,
143 pub aggr_time: u64,
144 tick_size: PriceStep,
145 min_order_qty: MinQtySize,
146}
147
148impl HistoricalDepth {
149 pub fn new(min_order_qty: MinQtySize, tick_size: PriceStep, basis: Basis) -> Self {
150 Self {
151 price_levels: BTreeMap::new(),
152 aggr_time: match basis {
153 Basis::Time(interval) => interval.into(),
154 Basis::Tick(_) => unimplemented!(),
155 },
156 tick_size,
157 min_order_qty,
158 }
159 }
160
161 pub fn insert_latest_depth(&mut self, depth: &Depth, time: u64) {
162 self.process_side(&depth.bids, time, true);
163 self.process_side(&depth.asks, time, false);
164 }
165
166 pub fn is_empty(&self) -> bool {
167 self.price_levels.is_empty()
168 }
169
170 fn process_side(&mut self, side: &BTreeMap<Price, Qty>, time: u64, is_bid: bool) {
171 let mut current_price = None;
172 let mut current_qty = Qty::from_units(0);
173
174 let step = self.tick_size;
175
176 for (price, qty) in side {
177 let rounded_price = price.round_to_side_step(is_bid, step);
178 if Some(rounded_price) == current_price {
179 current_qty += *qty;
180 } else {
181 if let Some(price) = current_price {
182 self.update_price_level(time, price, current_qty, is_bid);
183 }
184 current_price = Some(rounded_price);
185 current_qty = *qty;
186 }
187 }
188
189 if let Some(price) = current_price {
190 self.update_price_level(time, price, current_qty, is_bid);
191 }
192 }
193
194 fn update_price_level(&mut self, time: u64, price: Price, qty: Qty, is_bid: bool) {
195 let aggr_time = self.aggr_time;
196
197 let qty_q = qty.round_to_min_qty(self.min_order_qty);
198 let price_level = self.price_levels.entry(price).or_default();
199
200 match price_level.last_mut() {
201 Some(last_run) if last_run.is_bid == is_bid => {
202 if time > last_run.until_time + GRACE_PERIOD_MS {
203 price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
204 return;
205 }
206
207 let current_lots = qty_q.to_lots(self.min_order_qty);
208 let last_lots = last_run.qty.to_lots(self.min_order_qty);
209
210 if current_lots == last_lots {
211 let new_until = time + aggr_time;
212 if new_until > last_run.until_time {
213 last_run.until_time = new_until;
214 }
215 } else {
216 if last_run.until_time > time {
217 last_run.until_time = time;
218 }
219 price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
220 }
221 }
222 Some(last_run) => {
223 if last_run.until_time > time {
224 last_run.until_time = time;
225 }
226 price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
227 }
228 None => {
229 price_level.push(OrderRun::new(time, aggr_time, qty_q, is_bid));
230 }
231 }
232 }
233
234 pub fn iter_time_filtered(
235 &self,
236 earliest: u64,
237 latest: u64,
238 highest: Price,
239 lowest: Price,
240 ) -> impl Iterator<Item = (&Price, &Vec<OrderRun>)> {
241 self.price_levels
242 .range(lowest..=highest)
243 .filter(move |(_, runs)| {
244 runs.iter()
245 .any(|run| run.until_time >= earliest && run.start_time <= latest)
246 })
247 }
248
249 pub fn latest_order_runs(
250 &self,
251 highest: Price,
252 lowest: Price,
253 latest_timestamp: u64,
254 ) -> impl Iterator<Item = (&Price, &OrderRun)> {
255 self.price_levels
256 .range(lowest..=highest)
257 .filter_map(move |(price, runs)| {
258 runs.last()
259 .filter(|run| run.until_time >= latest_timestamp)
260 .map(|run| (price, run))
261 })
262 }
263
264 pub fn cleanup_old_price_levels(&mut self, oldest_time: u64) {
265 self.price_levels.iter_mut().for_each(|(_, runs)| {
266 runs.retain(|run| run.until_time >= oldest_time);
267 });
268
269 self.price_levels.retain(|_, runs| !runs.is_empty());
270 }
271
272 pub fn coalesced_runs(
273 &self,
274 earliest: u64,
275 latest: u64,
276 highest: Price,
277 lowest: Price,
278 market_type: MarketKind,
279 order_size_filter: f32,
280 coalesce_kind: CoalesceKind,
281 ) -> Vec<(Price, OrderRun)> {
282 let mut result_runs = Vec::new();
283
284 let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
285
286 for (price_at_level, runs_at_price_level) in
287 self.iter_time_filtered(earliest, latest, highest, lowest)
288 {
289 let candidate_runs = runs_at_price_level
290 .iter()
291 .filter(|run_ref| {
292 if !(run_ref.until_time >= earliest && run_ref.start_time <= latest) {
293 return false;
294 }
295 let order_size = market_type.qty_in_quote_value(
296 run_ref.qty,
297 *price_at_level,
298 size_in_quote_ccy,
299 );
300 order_size > order_size_filter
301 })
302 .collect::<Vec<&OrderRun>>();
303
304 if candidate_runs.is_empty() {
305 continue;
306 }
307
308 let mut current_accumulator_opt: Option<CoalescingRun> = None;
309
310 for run_to_process_ref in candidate_runs {
311 let run_to_process = *run_to_process_ref;
312
313 if let Some(current_accumulator) = current_accumulator_opt.as_mut() {
314 let comparison_base_qty = current_accumulator.comparison_qty(&coalesce_kind);
315 let qty_within_threshold = coalesce_kind.is_within_lot_similarity(
316 comparison_base_qty,
317 run_to_process.qty,
318 self.min_order_qty,
319 );
320
321 if run_to_process.start_time <= current_accumulator.until_time
322 && run_to_process.is_bid == current_accumulator.is_bid
323 && qty_within_threshold
324 {
325 current_accumulator.merge_run(&run_to_process);
326 } else {
327 result_runs.push((
328 *price_at_level,
329 current_accumulator.to_order_run(&coalesce_kind),
330 ));
331 current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
332 }
333 } else {
334 current_accumulator_opt = Some(CoalescingRun::new(&run_to_process));
335 }
336 }
337
338 if let Some(accumulator) = current_accumulator_opt {
339 result_runs.push((*price_at_level, accumulator.to_order_run(&coalesce_kind)));
340 }
341 }
342 result_runs
343 }
344
345 pub fn query_grid_qtys(
346 &self,
347 center_time: u64,
348 center_price: Price,
349 time_interval_offsets: &[i64],
350 price_tick_offsets: &[i64],
351 market_type: MarketKind,
352 order_size_filter: f32,
353 coalesce_kind: Option<CoalesceKind>,
354 ) -> FxHashMap<(u64, Price), (Qty, bool)> {
355 let aggr_time = self.aggr_time;
356
357 let step = self.tick_size;
358
359 let query_earliest_time = time_interval_offsets
360 .iter()
361 .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
362 .min()
363 .unwrap_or(center_time);
364
365 let query_latest_time = time_interval_offsets
366 .iter()
367 .map(|offset| center_time.saturating_add_signed(*offset * aggr_time as i64))
368 .max()
369 .map_or(center_time, |t| t.saturating_add(aggr_time));
370
371 let query_lowest = price_tick_offsets
372 .iter()
373 .copied()
374 .min()
375 .map_or(center_price, |offset| center_price.add_steps(offset, step));
376 let query_highest = price_tick_offsets
377 .iter()
378 .copied()
379 .max()
380 .map_or(center_price, |offset| center_price.add_steps(offset, step));
381
382 let runs_in_vicinity: Vec<(Price, OrderRun)> = if let Some(ck) = coalesce_kind {
383 self.coalesced_runs(
384 query_earliest_time,
385 query_latest_time,
386 query_highest,
387 query_lowest,
388 market_type,
389 order_size_filter,
390 ck,
391 )
392 } else {
393 self.iter_time_filtered(
394 query_earliest_time,
395 query_latest_time,
396 query_highest,
397 query_lowest,
398 )
399 .flat_map(|(price_level, runs_at_price)| {
400 runs_at_price.iter().map(move |run| (*price_level, *run))
401 })
402 .collect()
403 };
404
405 let capacity = time_interval_offsets.len() * price_tick_offsets.len();
406 let mut grid_quantities: FxHashMap<(u64, Price), (Qty, bool)> =
407 FxHashMap::with_capacity_and_hasher(capacity, FxBuildHasher);
408 for price_offset in price_tick_offsets {
409 let target_price_key = center_price.add_steps(*price_offset, step);
410
411 for time_offset in time_interval_offsets {
412 let target_time_val =
413 center_time.saturating_add_signed(*time_offset * aggr_time as i64);
414 let current_grid_key = (target_time_val, target_price_key);
415
416 for (run_price_level, run_data) in &runs_in_vicinity {
417 if *run_price_level == target_price_key
418 && run_data.start_time <= target_time_val
419 && run_data.until_time > target_time_val
420 {
421 grid_quantities.insert(current_grid_key, (run_data.qty, run_data.is_bid));
422 break;
423 }
424 }
425 }
426 }
427 grid_quantities
428 }
429
430 pub fn max_qty_in_range_raw(
431 &self,
432 earliest: u64,
433 latest: u64,
434 highest: Price,
435 lowest: Price,
436 ) -> Qty {
437 let mut max_qty = Qty::ZERO;
438
439 for (_price, runs) in self.price_levels.range(lowest..=highest) {
440 for run in runs.iter() {
441 if run.until_time < earliest || run.start_time > latest {
442 continue;
443 }
444 max_qty = max_qty.max(run.qty);
445 }
446 }
447
448 max_qty
449 }
450
451 pub fn max_depth_qty_in_range(
452 &self,
453 earliest: u64,
454 latest: u64,
455 highest: Price,
456 lowest: Price,
457 market_type: MarketKind,
458 order_size_filter: f32,
459 ) -> Qty {
460 let mut max_depth_qty = Qty::ZERO;
461 let size_in_quote_ccy = volume_size_unit() == SizeUnit::Quote;
462
463 for (price, runs) in self.price_levels.range(lowest..=highest) {
464 for run in runs.iter() {
465 if run.until_time < earliest || run.start_time > latest {
466 continue;
467 }
468
469 let order_size = market_type.qty_in_quote_value(run.qty, *price, size_in_quote_ccy);
470
471 if order_size > order_size_filter {
472 max_depth_qty = max_depth_qty.max(run.qty);
473 }
474 }
475 }
476
477 max_depth_qty
478 }
479}
480
481#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
482pub enum CoalesceKind {
483 First(f32),
484 Average(f32),
485 Max(f32),
486}
487
488impl CoalesceKind {
489 pub fn threshold(&self) -> f32 {
490 match self {
491 CoalesceKind::Average(t) | CoalesceKind::First(t) | CoalesceKind::Max(t) => *t,
492 }
493 }
494
495 pub fn with_threshold(&self, threshold: f32) -> Self {
496 match self {
497 CoalesceKind::First(_) => CoalesceKind::First(threshold),
498 CoalesceKind::Average(_) => CoalesceKind::Average(threshold),
499 CoalesceKind::Max(_) => CoalesceKind::Max(threshold),
500 }
501 }
502
503 fn is_within_lot_similarity(
504 self,
505 base_qty: Qty,
506 candidate_qty: Qty,
507 min_qty: MinQtySize,
508 ) -> bool {
509 let ratio = self.threshold().max(0.0);
510
511 if !ratio.is_finite() {
512 return false;
513 }
514
515 let base_lots = base_qty.to_lots(min_qty).max(0);
516 let candidate_lots = candidate_qty.to_lots(min_qty).max(0);
517
518 if base_lots == 0 {
519 return candidate_lots == 0;
520 }
521
522 let lots_diff = base_lots.abs_diff(candidate_lots) as f64;
523 let allowed_diff = (base_lots as f64) * (ratio as f64);
524
525 lots_diff <= allowed_diff
526 }
527}
528
529impl PartialEq for CoalesceKind {
530 fn eq(&self, other: &Self) -> bool {
531 std::mem::discriminant(self) == std::mem::discriminant(other)
532 }
533}
534
535impl Eq for CoalesceKind {}
536
537#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
538pub struct CoalescingRun {
539 pub start_time: u64,
540 pub until_time: u64,
541 pub is_bid: bool,
542 pub qty_sum: Qty,
543 pub run_count: u32,
544 first_qty: Qty,
545 max_qty: Qty,
546}
547
548impl CoalescingRun {
549 pub fn new(run: &OrderRun) -> Self {
550 let run_qty = run.qty;
551 CoalescingRun {
552 start_time: run.start_time,
553 until_time: run.until_time,
554 is_bid: run.is_bid,
555 qty_sum: run_qty,
556 run_count: 1,
557 first_qty: run_qty,
558 max_qty: run_qty,
559 }
560 }
561
562 pub fn merge_run(&mut self, run: &OrderRun) {
563 self.until_time = self.until_time.max(run.until_time);
564 let run_qty = run.qty;
565 self.qty_sum += run_qty;
566 self.run_count += 1;
567 self.max_qty = self.max_qty.max(run_qty);
568 }
569
570 pub fn comparison_qty(&self, kind: &CoalesceKind) -> Qty {
571 match kind {
572 CoalesceKind::Average(_) => self.current_average_qty(),
573 CoalesceKind::Max(_) | CoalesceKind::First(_) => self.first_qty,
574 }
575 }
576
577 pub fn current_average_qty(&self) -> Qty {
578 if self.run_count == 0 {
579 Qty::ZERO
580 } else {
581 let count = i64::from(self.run_count);
582 let rounded_units = (self.qty_sum.units + (count / 2)) / count;
583 Qty::from_units(rounded_units)
584 }
585 }
586
587 pub fn to_order_run(&self, kind: &CoalesceKind) -> OrderRun {
588 let final_qty = match kind {
589 CoalesceKind::Average(_) => self.current_average_qty(),
590 CoalesceKind::First(_) => self.first_qty,
591 CoalesceKind::Max(_) => self.max_qty,
592 };
593 OrderRun {
594 start_time: self.start_time,
595 until_time: self.until_time,
596 qty: final_qty,
597 is_bid: self.is_bid,
598 }
599 }
600}
601
602#[derive(Default)]
603pub struct QtyScale {
604 pub max_trade_qty: Qty,
605 pub max_aggr_volume: Qty,
606 pub max_depth_qty: Qty,
607}
608
609#[derive(Debug, Clone)]
610pub struct GroupedTrade {
611 pub is_sell: bool,
612 pub price: Price,
613 pub qty: Qty,
614}
615
616impl GroupedTrade {
617 pub fn compare_with(&self, price: Price, is_sell: bool) -> std::cmp::Ordering {
618 if self.is_sell == is_sell {
619 self.price.cmp(&price)
620 } else {
621 self.is_sell.cmp(&is_sell)
622 }
623 }
624}
625
626#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
627pub enum HeatmapStudy {
628 VolumeProfile(ProfileKind),
629}
630
631impl HeatmapStudy {
632 pub const ALL: [HeatmapStudy; 1] = [HeatmapStudy::VolumeProfile(ProfileKind::VisibleRange)];
633}
634
635impl std::fmt::Display for HeatmapStudy {
636 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637 match self {
638 HeatmapStudy::VolumeProfile(kind) => {
639 write!(f, "Volume Profile ({})", kind)
640 }
641 }
642 }
643}
644
645#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
646pub enum ProfileKind {
647 FixedWindow(usize),
648 VisibleRange,
649}
650
651impl std::fmt::Display for ProfileKind {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 match self {
654 ProfileKind::FixedWindow(_) => write!(f, "Fixed window"),
655 ProfileKind::VisibleRange => write!(f, "Visible range"),
656 }
657 }
658}