use std::sync::Arc;
use async_trait::async_trait;
use digdigdig3::{
AccountType, ExchangeId, Symbol, SymbolInput,
connector_manager::ExchangeHub,
};
use crate::core::types::{Bar, Tick};
use crate::data_loader::{RestFetcher, StreamKind, TimedEvent};
pub struct ExchangeHubFetcher {
hub: Arc<ExchangeHub>,
}
impl ExchangeHubFetcher {
pub fn new(hub: Arc<ExchangeHub>) -> Self {
Self { hub }
}
}
#[async_trait]
impl RestFetcher for ExchangeHubFetcher {
async fn fetch(
&self,
exchange: ExchangeId,
account_type: AccountType,
symbol: &str,
kind: StreamKind,
from_ts: i64,
to_ts: i64,
) -> Result<Vec<TimedEvent>, String> {
let conn = self
.hub
.rest(exchange)
.ok_or_else(|| format!("ExchangeHubFetcher: no REST connector for {:?}", exchange))?;
let sym = Symbol::with_raw("", "", symbol.to_string());
match kind {
StreamKind::Bar => {
let elapsed_ms = to_ts.saturating_sub(from_ts).max(0);
let limit = u16::try_from((elapsed_ms / 60_000).min(1500).max(1))
.unwrap_or(1500);
let klines = conn
.get_klines(SymbolInput::Canonical(&sym), "1m", Some(limit), account_type, Some(to_ts))
.await
.map_err(|e| format!("get_klines: {e}"))?;
Ok(klines
.into_iter()
.filter(|k| k.open_time >= from_ts && k.open_time <= to_ts)
.map(|k| {
TimedEvent::Bar(Bar::new(
k.open_time,
k.open,
k.high,
k.low,
k.close,
k.volume,
))
})
.collect())
}
StreamKind::Tick => {
let caps = self.hub.capabilities(exchange).unwrap_or_default();
if !caps.has_recent_trades {
return Err(format!(
"ExchangeHubFetcher: {:?} does not support recent_trades (capability check)",
exchange,
));
}
let trades = conn
.get_recent_trades(SymbolInput::Canonical(&sym), Some(1000), account_type)
.await
.map_err(|e| format!("get_recent_trades: {e}"))?;
Ok(trades
.into_iter()
.filter(|t| t.timestamp >= from_ts && t.timestamp <= to_ts)
.map(|t| {
TimedEvent::Tick(Tick {
time: t.timestamp,
price: t.price,
size: t.quantity,
is_buy: t.side == digdigdig3::core::types::TradeSide::Buy,
bid: None,
ask: None,
})
})
.collect())
}
StreamKind::OrderBook => {
let book = conn
.get_orderbook(SymbolInput::Canonical(&sym), Some(50), account_type)
.await
.map_err(|e| format!("get_orderbook: {e}"))?;
if book.timestamp >= from_ts && book.timestamp <= to_ts {
Ok(vec![TimedEvent::OrderBook(book)])
} else {
Ok(vec![])
}
}
StreamKind::Funding => {
let rates = conn
.get_funding_rate_history(SymbolInput::Canonical(&sym), Some(from_ts), Some(to_ts), Some(1000), account_type)
.await
.map_err(|e| format!("get_funding_rate_history: {e}"))?;
Ok(rates
.into_iter()
.filter(|r| r.timestamp >= from_ts && r.timestamp <= to_ts)
.map(TimedEvent::Funding)
.collect())
}
StreamKind::Liquidation => {
let liqs = conn
.get_liquidation_history(Some(SymbolInput::Canonical(&sym)), Some(from_ts), Some(to_ts), Some(1000), account_type)
.await
.map_err(|e| format!("get_liquidation_history: {e}"))?;
Ok(liqs
.into_iter()
.filter(|l| l.timestamp >= from_ts && l.timestamp <= to_ts)
.map(TimedEvent::Liquidation)
.collect())
}
StreamKind::OpenInterest => {
let history = conn
.get_open_interest_history(SymbolInput::Canonical(&sym), "5m", Some(from_ts), Some(to_ts), Some(500), account_type)
.await
.map_err(|e| format!("get_open_interest_history: {e}"))?;
Ok(history
.into_iter()
.filter(|oi| oi.timestamp >= from_ts && oi.timestamp <= to_ts)
.map(TimedEvent::OpenInterest)
.collect())
}
StreamKind::LongShortRatio => {
let ratios = conn
.get_long_short_ratio_history(SymbolInput::Canonical(&sym), "5m", Some(from_ts), Some(to_ts), Some(500), account_type)
.await
.map_err(|e| format!("get_long_short_ratio_history: {e}"))?;
Ok(ratios
.into_iter()
.filter(|r| r.timestamp >= from_ts && r.timestamp <= to_ts)
.map(TimedEvent::LongShortRatio)
.collect())
}
StreamKind::OrderbookDelta
| StreamKind::AggTrade
| StreamKind::Ticker
| StreamKind::MarkPrice
| StreamKind::OptionGreeks
| StreamKind::VolatilityIndex
| StreamKind::Basis
| StreamKind::IndexPrice
| StreamKind::CompositeIndex
| StreamKind::InsuranceFund
| StreamKind::Settlement
| StreamKind::BlockTrade
| StreamKind::OrderbookL3
| StreamKind::RiskLimit
| StreamKind::PredictedFunding
| StreamKind::FundingSettlement
| StreamKind::Auction
| StreamKind::MarketWarning
| StreamKind::HistoricalVolatility => Err(format!(
"ExchangeHubFetcher: StreamKind::{:?} is WS-only — no REST history endpoint",
kind,
)),
}
}
}
#[cfg(test)]
mod tests {
use crate::data_loader::StreamKind;
#[test]
fn ws_only_kinds_error_message_contains_ws_only() {
let ws_only = [
StreamKind::OrderbookDelta,
StreamKind::AggTrade,
StreamKind::Ticker,
StreamKind::MarkPrice,
StreamKind::OptionGreeks,
StreamKind::VolatilityIndex,
StreamKind::Basis,
StreamKind::IndexPrice,
StreamKind::CompositeIndex,
StreamKind::InsuranceFund,
StreamKind::Settlement,
StreamKind::BlockTrade,
StreamKind::OrderbookL3,
StreamKind::RiskLimit,
StreamKind::PredictedFunding,
StreamKind::FundingSettlement,
StreamKind::Auction,
StreamKind::MarketWarning,
StreamKind::HistoricalVolatility,
];
for kind in ws_only {
let msg = format!(
"ExchangeHubFetcher: StreamKind::{:?} is WS-only — no REST history endpoint",
kind,
);
assert!(
msg.contains("WS-only"),
"Error message for {:?} should mention WS-only",
kind,
);
}
}
#[test]
fn rest_kinds_are_not_ws_only() {
let rest_kinds = [
StreamKind::Bar,
StreamKind::Tick,
StreamKind::OrderBook,
StreamKind::Funding,
StreamKind::Liquidation,
StreamKind::OpenInterest,
StreamKind::LongShortRatio,
];
for kind in rest_kinds {
assert_ne!(kind.as_str(), "", "all REST kinds must have a storage name");
}
}
}