use std::collections::HashMap;
use std::sync::Arc;
use futures_util::StreamExt;
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, info, warn};
use super::handle::{FeedEvent, FeedHandle};
use super::options::FeedOptions;
use crate::connector_manager::ExchangeHub;
use crate::core::types::{
AccountType, ExchangeId, StreamEvent, StreamType, SubscriptionRequest, Symbol,
};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct StreamKey {
pub exchange: ExchangeId,
pub account_type: AccountType,
pub symbol: String,
pub kind: StreamKind,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum StreamKind {
Ticker,
Trade,
Orderbook,
Kline(String),
}
impl StreamKind {
fn to_stream_type(&self) -> StreamType {
match self {
StreamKind::Ticker => StreamType::Ticker,
StreamKind::Trade => StreamType::Trade,
StreamKind::Orderbook => StreamType::Orderbook,
StreamKind::Kline(iv) => StreamType::Kline { interval: iv.clone() },
}
}
}
#[allow(dead_code)]
struct UpstreamEntry {
tx: broadcast::Sender<FeedEvent>,
refcount: Arc<()>,
pump: tokio::task::JoinHandle<()>,
}
pub struct MarketFeed {
hub: Arc<ExchangeHub>,
opts: FeedOptions,
upstreams: Arc<Mutex<HashMap<StreamKey, UpstreamEntry>>>,
}
impl MarketFeed {
pub(crate) fn new(hub: Arc<ExchangeHub>, opts: FeedOptions) -> Self {
Self {
hub,
opts,
upstreams: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn builder(hub: impl Into<Arc<ExchangeHub>>) -> super::builder::FeedBuilder {
super::builder::FeedBuilder::new(hub.into())
}
pub async fn subscribe_ticker(
&self,
exchange: ExchangeId,
symbol: impl Into<String>,
account_type: AccountType,
) -> Result<FeedHandle, FeedError> {
self.subscribe(StreamKey {
exchange,
account_type,
symbol: symbol.into(),
kind: StreamKind::Ticker,
})
.await
}
pub async fn subscribe_trades(
&self,
exchange: ExchangeId,
symbol: impl Into<String>,
account_type: AccountType,
) -> Result<FeedHandle, FeedError> {
self.subscribe(StreamKey {
exchange,
account_type,
symbol: symbol.into(),
kind: StreamKind::Trade,
})
.await
}
pub async fn subscribe_orderbook(
&self,
exchange: ExchangeId,
symbol: impl Into<String>,
account_type: AccountType,
) -> Result<FeedHandle, FeedError> {
self.subscribe(StreamKey {
exchange,
account_type,
symbol: symbol.into(),
kind: StreamKind::Orderbook,
})
.await
}
pub async fn subscribe_kline(
&self,
exchange: ExchangeId,
symbol: impl Into<String>,
account_type: AccountType,
interval: impl Into<String>,
) -> Result<FeedHandle, FeedError> {
self.subscribe(StreamKey {
exchange,
account_type,
symbol: symbol.into(),
kind: StreamKind::Kline(interval.into()),
})
.await
}
async fn subscribe(&self, key: StreamKey) -> Result<FeedHandle, FeedError> {
let mut upstreams = self.upstreams.lock().await;
if let Some(entry) = upstreams.get(&key) {
return Ok(FeedHandle {
rx: entry.tx.subscribe(),
_keep_alive: entry.refcount.clone(),
});
}
let ws = self
.hub
.ws(key.exchange, key.account_type)
.ok_or(FeedError::NoConnector {
exchange: key.exchange,
account_type: key.account_type,
})?;
let sym = Symbol::with_raw("", "", key.symbol.clone());
let mut sub_req = SubscriptionRequest::new(sym, key.kind.to_stream_type());
sub_req.account_type = key.account_type;
ws.subscribe(sub_req.clone())
.await
.map_err(|e| FeedError::Subscribe(format!("{e}")))?;
let (tx, _rx0) = broadcast::channel(self.opts.broadcast_capacity);
let refcount = Arc::new(());
let pump = spawn_pump(
tx.clone(),
ws.clone(),
key.clone(),
Arc::downgrade(&refcount),
);
let entry = UpstreamEntry {
tx: tx.clone(),
refcount: refcount.clone(),
pump,
};
upstreams.insert(key.clone(), entry);
info!(
exchange = ?key.exchange,
account = ?key.account_type,
symbol = %key.symbol,
kind = ?key.kind,
"feed upstream started"
);
Ok(FeedHandle {
rx: tx.subscribe(),
_keep_alive: refcount,
})
}
pub async fn active_upstreams(&self) -> usize {
self.upstreams.lock().await.len()
}
}
fn spawn_pump(
tx: broadcast::Sender<FeedEvent>,
ws: Arc<dyn crate::core::traits::WebSocketConnector>,
key: StreamKey,
_refcount_weak: std::sync::Weak<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut stream = ws.event_stream();
while let Some(item) = stream.next().await {
match item {
Ok(event) => {
if !matches_kind(&event, &key.kind) {
continue;
}
let symbol = event_symbol(&event).unwrap_or_else(|| key.symbol.clone());
if symbol != key.symbol && !key.symbol.is_empty() {
continue;
}
let fe = FeedEvent {
exchange: key.exchange,
account_type: key.account_type,
symbol: symbol.clone(),
event,
};
if tx.send(fe).is_err() {
debug!(
exchange = ?key.exchange,
symbol = %key.symbol,
"feed pump: no receivers, exiting"
);
return;
}
}
Err(e) => {
warn!(
exchange = ?key.exchange,
symbol = %key.symbol,
"ws error: {e}"
);
}
}
}
debug!(
exchange = ?key.exchange,
symbol = %key.symbol,
"feed pump: upstream stream closed"
);
})
}
fn matches_kind(event: &StreamEvent, kind: &StreamKind) -> bool {
matches!(
(event, kind),
(StreamEvent::Ticker { .. }, StreamKind::Ticker)
| (StreamEvent::Trade { .. }, StreamKind::Trade)
| (StreamEvent::OrderbookSnapshot { .. }, StreamKind::Orderbook)
| (StreamEvent::OrderbookDelta { .. }, StreamKind::Orderbook)
| (StreamEvent::Kline { .. }, StreamKind::Kline(_))
)
}
fn event_symbol(event: &StreamEvent) -> Option<String> {
match event {
StreamEvent::Ticker { symbol, .. } => Some(symbol.clone()),
StreamEvent::Trade { symbol, .. } => Some(symbol.clone()),
_ => None,
}
}
#[derive(Debug, thiserror::Error)]
pub enum FeedError {
#[error("no websocket connector registered for {exchange:?} {account_type:?} — call hub.connect_full first")]
NoConnector { exchange: ExchangeId, account_type: AccountType },
#[error("subscribe failed: {0}")]
Subscribe(String),
}