use std::collections::{HashMap, VecDeque};
use crate::bar_indicators::indicator_value::IndicatorValue;
use crate::bar_indicators::liquidation_consumer::LiquidationConsumer;
use crate::core::types::Liquidation;
#[derive(Clone)]
pub struct LiquidationClusterDetector {
price_bucket: f64,
window_ms: i64,
min_cluster_count: usize,
events: VecDeque<(i64, i64, f64)>,
last_cluster_price: f64,
last_cluster_count: f64,
last_cluster_volume: f64,
}
impl LiquidationClusterDetector {
pub fn new(price_bucket: f64, window_ms: i64, min_cluster_count: usize) -> Self {
Self {
price_bucket: price_bucket.max(f64::MIN_POSITIVE),
window_ms: window_ms.max(1),
min_cluster_count: min_cluster_count.max(1),
events: VecDeque::new(),
last_cluster_price: 0.0,
last_cluster_count: 0.0,
last_cluster_volume: 0.0,
}
}
fn evict(&mut self, now: i64) {
while let Some(&(ts, _, _)) = self.events.front() {
if now - ts > self.window_ms {
self.events.pop_front();
} else {
break;
}
}
}
fn price_to_bucket(&self, price: f64) -> i64 {
(price / self.price_bucket).floor() as i64
}
fn recompute(&mut self) {
let mut bucket_count: HashMap<i64, usize> = HashMap::new();
let mut bucket_volume: HashMap<i64, f64> = HashMap::new();
for &(_, bucket, vol) in &self.events {
*bucket_count.entry(bucket).or_insert(0) += 1;
*bucket_volume.entry(bucket).or_insert(0.0) += vol;
}
let best = bucket_count
.iter()
.max_by_key(|&(_, &cnt)| cnt);
if let Some((&bucket_key, &count)) = best {
if count >= self.min_cluster_count {
let vol = bucket_volume.get(&bucket_key).copied().unwrap_or(0.0);
let cluster_price = bucket_key as f64 * self.price_bucket + self.price_bucket * 0.5;
self.last_cluster_price = cluster_price;
self.last_cluster_count = count as f64;
self.last_cluster_volume = vol;
return;
}
}
self.last_cluster_price = 0.0;
self.last_cluster_count = 0.0;
self.last_cluster_volume = 0.0;
}
}
impl LiquidationConsumer for LiquidationClusterDetector {
fn update_liquidation(&mut self, liq: &Liquidation) -> IndicatorValue {
let bucket = self.price_to_bucket(liq.price);
self.events.push_back((liq.timestamp, bucket, liq.quote_value()));
self.evict(liq.timestamp);
self.recompute();
IndicatorValue::Triple(
self.last_cluster_price,
self.last_cluster_count,
self.last_cluster_volume,
)
}
fn value(&self) -> IndicatorValue {
IndicatorValue::Triple(
self.last_cluster_price,
self.last_cluster_count,
self.last_cluster_volume,
)
}
fn reset(&mut self) {
self.events.clear();
self.last_cluster_price = 0.0;
self.last_cluster_count = 0.0;
self.last_cluster_volume = 0.0;
}
fn is_ready(&self) -> bool {
!self.events.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::TradeSide;
fn liq(ts: i64, price: f64, qty: f64) -> Liquidation {
Liquidation { symbol: String::new(), side: TradeSide::Buy, price, quantity: qty, timestamp: ts, value: None }
}
#[test]
fn no_cluster_initially() {
let lcd = LiquidationClusterDetector::new(100.0, 60_000, 3);
assert_eq!(lcd.value(), IndicatorValue::Triple(0.0, 0.0, 0.0));
assert!(!lcd.is_ready());
}
#[test]
fn cluster_detected_at_threshold() {
let mut lcd = LiquidationClusterDetector::new(100.0, 60_000, 3);
lcd.update_liquidation(&liq(0, 30_050.0, 1.0));
lcd.update_liquidation(&liq(1_000, 30_070.0, 1.0));
lcd.update_liquidation(&liq(2_000, 30_020.0, 1.0));
if let IndicatorValue::Triple(cp, cc, _cv) = lcd.value() {
assert!(cc >= 3.0, "count={cc}");
assert!((cp - 30_050.0).abs() < 1.0, "cluster_price={cp}");
} else {
panic!("expected Triple");
}
}
#[test]
fn below_threshold_returns_zeros() {
let mut lcd = LiquidationClusterDetector::new(100.0, 60_000, 3);
lcd.update_liquidation(&liq(0, 30_050.0, 1.0));
lcd.update_liquidation(&liq(1_000, 30_070.0, 1.0));
if let IndicatorValue::Triple(cp, cc, cv) = lcd.value() {
assert_eq!(cp, 0.0);
assert_eq!(cc, 0.0);
assert_eq!(cv, 0.0);
}
}
#[test]
fn dominant_bucket_wins() {
let mut lcd = LiquidationClusterDetector::new(100.0, 60_000, 2);
lcd.update_liquidation(&liq(0, 30_050.0, 1.0));
lcd.update_liquidation(&liq(1_000, 30_070.0, 1.0));
lcd.update_liquidation(&liq(2_000, 31_010.0, 1.0));
lcd.update_liquidation(&liq(3_000, 31_020.0, 1.0));
lcd.update_liquidation(&liq(4_000, 31_030.0, 1.0));
if let IndicatorValue::Triple(_cp, cc, _cv) = lcd.value() {
assert_eq!(cc, 3.0, "bucket B should win with 3 events");
}
}
#[test]
fn old_events_evicted() {
let mut lcd = LiquidationClusterDetector::new(100.0, 5_000, 2);
lcd.update_liquidation(&liq(0, 30_050.0, 1.0));
lcd.update_liquidation(&liq(1_000, 30_070.0, 1.0));
lcd.update_liquidation(&liq(20_000, 31_010.0, 1.0));
if let IndicatorValue::Triple(cp, cc, cv) = lcd.value() {
assert_eq!(cp, 0.0, "old cluster evicted");
assert_eq!(cc, 0.0);
assert_eq!(cv, 0.0);
}
}
#[test]
fn reset_clears_state() {
let mut lcd = LiquidationClusterDetector::new(100.0, 60_000, 3);
lcd.update_liquidation(&liq(0, 30_050.0, 1.0));
lcd.update_liquidation(&liq(1_000, 30_070.0, 1.0));
lcd.update_liquidation(&liq(2_000, 30_020.0, 1.0));
lcd.reset();
assert!(!lcd.is_ready());
assert_eq!(lcd.value(), IndicatorValue::Triple(0.0, 0.0, 0.0));
}
}