use std::collections::BTreeMap;
use crate::chart::Basis;
use crate::chart::heatmap::HeatmapDataPoint;
use crate::chart::kline::{ClusterKind, KlineDataPoint, KlineTrades, NPoc};
use exchange::unit::{Price, PriceStep, Qty};
use exchange::{Kline, Timeframe, Trade, Volume};
pub trait DataPoint {
fn add_trade(&mut self, trade: &Trade, step: PriceStep);
fn clear_trades(&mut self);
fn last_trade_time(&self) -> Option<u64>;
fn first_trade_time(&self) -> Option<u64>;
fn last_price(&self) -> Price;
fn kline(&self) -> Option<&Kline>;
fn value_high(&self) -> Price;
fn value_low(&self) -> Price;
}
pub struct TimeSeries<D: DataPoint> {
pub datapoints: BTreeMap<u64, D>,
pub interval: Timeframe,
pub tick_size: PriceStep,
}
impl<D: DataPoint> TimeSeries<D> {
pub fn base_price(&self) -> Price {
self.datapoints
.values()
.last()
.map_or(Price::from_f32(0.0), DataPoint::last_price)
}
pub fn latest_timestamp(&self) -> Option<u64> {
self.datapoints.keys().last().copied()
}
pub fn latest_kline(&self) -> Option<&Kline> {
self.datapoints.values().last().and_then(|dp| dp.kline())
}
pub fn price_scale(&self, lookback: usize) -> (Price, Price) {
let mut iter = self.datapoints.iter().rev().take(lookback);
if let Some((_, first)) = iter.next() {
let mut high = first.value_high();
let mut low = first.value_low();
for (_, dp) in iter {
let value_high = dp.value_high();
let value_low = dp.value_low();
if value_high > high {
high = value_high;
}
if value_low < low {
low = value_low;
}
}
(high, low)
} else {
(Price::from_f32(0.0), Price::from_f32(0.0))
}
}
pub fn volume_data<'a>(&'a self) -> BTreeMap<u64, exchange::Volume>
where
BTreeMap<u64, exchange::Volume>: From<&'a TimeSeries<D>>,
{
self.into()
}
pub fn timerange(&self) -> (u64, u64) {
let earliest = self.datapoints.keys().next().copied().unwrap_or(0);
let latest = self.datapoints.keys().last().copied().unwrap_or(0);
(earliest, latest)
}
pub fn min_max_price_in_range_prices(
&self,
earliest: u64,
latest: u64,
) -> Option<(Price, Price)> {
let mut it = self.datapoints.range(earliest..=latest);
let (_, first) = it.next()?;
let mut min_price = first.value_low();
let mut max_price = first.value_high();
for (_, dp) in it {
let low = dp.value_low();
let high = dp.value_high();
if low < min_price {
min_price = low;
}
if high > max_price {
max_price = high;
}
}
Some((min_price, max_price))
}
pub fn min_max_price_in_range(&self, earliest: u64, latest: u64) -> Option<(f32, f32)> {
self.min_max_price_in_range_prices(earliest, latest)
.map(|(min_p, max_p)| (min_p.to_f32(), max_p.to_f32()))
}
pub fn ingest_trades_bucket(&mut self, rounded_t: u64, trades: &[Trade], step: PriceStep)
where
D: Default,
{
let bucket = self.datapoints.entry(rounded_t).or_default();
for trade in trades {
bucket.add_trade(trade, step);
}
}
pub fn clear_trades(&mut self) {
for data_point in self.datapoints.values_mut() {
data_point.clear_trades();
}
}
fn align_down_to_phase(time: u64, phase: u64, interval: u64) -> u64 {
if time >= phase {
time.saturating_sub((time - phase) % interval)
} else {
phase
}
}
fn check_kline_integrity_range(
&self,
earliest: u64,
latest: u64,
interval: u64,
) -> Option<Vec<u64>> {
let mut time = earliest;
let mut missing_count = 0;
while time < latest {
if !self.datapoints.contains_key(&time) {
missing_count += 1;
break;
}
time += interval;
}
if missing_count > 0 {
let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
let mut time = earliest;
while time < latest {
if !self.datapoints.contains_key(&time) {
missing_keys.push(time);
}
time += interval;
}
log::debug!(
"Integrity check failed: missing {} klines",
missing_keys.len()
);
return Some(missing_keys);
}
None
}
pub fn check_kline_integrity(&self, earliest: u64, latest: u64) -> Option<Vec<u64>> {
if self.datapoints.is_empty() {
return None;
}
let interval = self.interval.to_milliseconds();
if interval == 0 {
return None;
}
let (series_earliest, series_latest) = self.timerange();
let phase = series_earliest % interval;
let check_earliest =
Self::align_down_to_phase(earliest.max(series_earliest), phase, interval)
.max(series_earliest);
let check_latest = Self::align_down_to_phase(latest.min(series_latest), phase, interval)
.min(series_latest);
if check_earliest < check_latest {
self.check_kline_integrity_range(check_earliest, check_latest, interval)
} else {
None
}
}
}
impl TimeSeries<KlineDataPoint> {
pub fn new(interval: Timeframe, tick_size: PriceStep, klines: &[Kline]) -> Self {
let mut timeseries = Self {
datapoints: BTreeMap::new(),
interval,
tick_size,
};
timeseries.insert_klines(klines);
timeseries
}
pub fn with_trades(&self, trades: &[Trade]) -> TimeSeries<KlineDataPoint> {
let mut new_series = Self {
datapoints: self.datapoints.clone(),
interval: self.interval,
tick_size: self.tick_size,
};
new_series.insert_trades_or_create_bucket(trades);
new_series
}
pub fn insert_klines(&mut self, klines: &[Kline]) {
for kline in klines {
let entry = self
.datapoints
.entry(kline.time)
.or_insert_with(|| KlineDataPoint {
kline: *kline,
footprint: KlineTrades::new(),
});
entry.kline = *kline;
}
self.update_poc_status();
}
pub fn insert_trades_or_create_bucket(&mut self, buffer: &[Trade]) {
if buffer.is_empty() {
return;
}
let aggr_time = self.interval.to_milliseconds();
let mut updated_times = Vec::new();
buffer.iter().for_each(|trade| {
let rounded_time = (trade.time / aggr_time) * aggr_time;
if !updated_times.contains(&rounded_time) {
updated_times.push(rounded_time);
}
let entry = self
.datapoints
.entry(rounded_time)
.or_insert_with(|| KlineDataPoint {
kline: Kline {
time: rounded_time,
open: trade.price,
high: trade.price,
low: trade.price,
close: trade.price,
volume: Volume::empty_buy_sell(),
},
footprint: KlineTrades::new(),
});
entry.add_trade(trade, self.tick_size);
});
for time in updated_times {
if let Some(data_point) = self.datapoints.get_mut(&time) {
data_point.calculate_poc();
}
}
}
pub fn insert_trades_existing_buckets(&mut self, buffer: &[Trade]) {
if buffer.is_empty() {
return;
}
let aggr_time = self.interval.to_milliseconds();
let mut updated_times: Vec<u64> = Vec::new();
for trade in buffer {
let rounded_time = (trade.time / aggr_time) * aggr_time;
if let Some(entry) = self.datapoints.get_mut(&rounded_time) {
if !updated_times.contains(&rounded_time) {
updated_times.push(rounded_time);
}
entry.add_trade(trade, self.tick_size);
}
}
for time in updated_times {
if let Some(data_point) = self.datapoints.get_mut(&time) {
data_point.calculate_poc();
}
}
}
pub fn change_tick_size(&mut self, tick_size: PriceStep, raw_trades: &[Trade]) {
self.tick_size = tick_size;
self.clear_trades();
if !raw_trades.is_empty() {
self.insert_trades_existing_buckets(raw_trades);
}
}
pub fn update_poc_status(&mut self) {
let updates = self
.datapoints
.iter()
.filter_map(|(&time, dp)| dp.poc_price().map(|price| (time, price)))
.collect::<Vec<_>>();
for (current_time, poc_price) in updates {
let mut npoc = NPoc::default();
for (&next_time, next_dp) in self.datapoints.range((current_time + 1)..) {
let next_dp_low = next_dp.kline.low.round_to_side_step(true, self.tick_size);
let next_dp_high = next_dp.kline.high.round_to_side_step(false, self.tick_size);
if next_dp_low <= poc_price && next_dp_high >= poc_price {
npoc.filled(next_time);
break;
} else {
npoc.unfilled();
}
}
if let Some(data_point) = self.datapoints.get_mut(¤t_time) {
data_point.set_poc_status(npoc);
}
}
}
pub fn suggest_trade_fetch_range(
&self,
visible_earliest: u64,
visible_latest: u64,
) -> Option<(u64, u64)> {
if self.datapoints.is_empty() {
return None;
}
self.find_trade_gap()
.and_then(|(last_t_before_gap, first_t_after_gap)| {
if last_t_before_gap.is_none() && first_t_after_gap.is_none() {
return None;
}
let (data_earliest, data_latest) = self.timerange();
let fetch_from = last_t_before_gap
.map_or(data_earliest, |t| t.saturating_add(1))
.max(visible_earliest);
let fetch_to = first_t_after_gap
.map_or(data_latest, |t| t.saturating_sub(1))
.min(visible_latest);
if fetch_from < fetch_to {
Some((fetch_from, fetch_to))
} else {
None
}
})
}
fn find_trade_gap(&self) -> Option<(Option<u64>, Option<u64>)> {
let empty_kline_time = self
.datapoints
.iter()
.rev()
.find(|(_, dp)| dp.footprint.trades.is_empty())
.map(|(&time, _)| time);
if let Some(target_time) = empty_kline_time {
let last_t_before_gap = self
.datapoints
.range(..target_time)
.rev()
.find_map(|(_, dp)| dp.last_trade_time());
let first_t_after_gap = self
.datapoints
.range(target_time + 1..)
.find_map(|(_, dp)| dp.first_trade_time());
Some((last_t_before_gap, first_t_after_gap))
} else {
None
}
}
pub fn max_qty_ts_range(
&self,
cluster_kind: ClusterKind,
earliest: u64,
latest: u64,
highest: Price,
lowest: Price,
) -> Qty {
let mut max_cluster_qty: Qty = Qty::default();
self.datapoints
.range(earliest..=latest)
.for_each(|(_, dp)| {
max_cluster_qty =
max_cluster_qty.max(dp.max_cluster_qty(cluster_kind, highest, lowest));
});
max_cluster_qty
}
}
impl TimeSeries<HeatmapDataPoint> {
pub fn new(basis: Basis, tick_size: PriceStep) -> Self {
let timeframe = match basis {
Basis::Time(interval) => interval,
Basis::Tick(_) => unimplemented!(),
};
Self {
datapoints: BTreeMap::new(),
interval: timeframe,
tick_size,
}
}
pub fn max_trade_qty_and_aggr_volume(&self, earliest: u64, latest: u64) -> (Qty, Qty) {
let mut max_trade_qty = Qty::ZERO;
let mut max_aggr_volume = Qty::ZERO;
self.datapoints
.range(earliest..=latest)
.for_each(|(_, dp)| {
let (mut buy_volume, mut sell_volume) = (Qty::ZERO, Qty::ZERO);
dp.grouped_trades.iter().for_each(|trade| {
let trade_qty = trade.qty;
max_trade_qty = max_trade_qty.max(trade_qty);
if trade.is_sell {
sell_volume += trade_qty;
} else {
buy_volume += trade_qty;
}
});
max_aggr_volume = max_aggr_volume.max(buy_volume + sell_volume);
});
(max_trade_qty, max_aggr_volume)
}
pub fn max_trade_qty_in_range(
&self,
earliest: u64,
latest: u64,
highest: Price,
lowest: Price,
) -> Qty {
let mut max_trade_qty = Qty::default();
self.datapoints
.range(earliest..=latest)
.for_each(|(_, dp)| {
dp.grouped_trades.iter().for_each(|trade| {
if trade.price >= lowest && trade.price <= highest {
max_trade_qty = max_trade_qty.max(trade.qty);
}
});
});
max_trade_qty
}
}
impl From<&TimeSeries<KlineDataPoint>> for BTreeMap<u64, exchange::Volume> {
fn from(timeseries: &TimeSeries<KlineDataPoint>) -> Self {
timeseries
.datapoints
.iter()
.map(|(time, dp)| (*time, dp.kline.volume))
.collect()
}
}