use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use futures_util::Stream;
use tokio::sync::Mutex as TokioMutex;
use crate::core::types::{
AccountType, ConnectionStatus, OrderbookCapabilities, StreamEvent, StreamType,
SubscriptionRequest, Symbol, WebSocketResult,
};
#[async_trait]
pub trait WebSocketConnector: Send + Sync {
async fn connect(&mut self, account_type: AccountType) -> WebSocketResult<()>;
async fn disconnect(&mut self) -> WebSocketResult<()>;
fn connection_status(&self) -> ConnectionStatus;
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()>;
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()>;
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>>;
fn active_subscriptions(&self) -> Vec<SubscriptionRequest>;
fn has_subscription(&self, request: &SubscriptionRequest) -> bool {
self.active_subscriptions().contains(request)
}
fn ping_rtt_handle(&self) -> Option<Arc<TokioMutex<u64>>> {
None
}
fn orderbook_capabilities(&self, account_type: AccountType) -> OrderbookCapabilities {
let _ = account_type;
OrderbookCapabilities::permissive()
}
}
#[async_trait]
pub trait WebSocketExt: WebSocketConnector {
async fn subscribe_ticker(&mut self, symbol: Symbol) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::ticker(symbol)).await
}
async fn subscribe_trades(&mut self, symbol: Symbol) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::trade(symbol)).await
}
async fn subscribe_orderbook(&mut self, symbol: Symbol) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::orderbook(symbol)).await
}
async fn subscribe_klines(&mut self, symbol: Symbol, interval: &str) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::kline(symbol, interval)).await
}
async fn subscribe_orders(&mut self) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::new(
Symbol::empty(),
StreamType::OrderUpdate,
))
.await
}
async fn subscribe_balance(&mut self) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::new(
Symbol::empty(),
StreamType::BalanceUpdate,
))
.await
}
async fn subscribe_positions(&mut self) -> WebSocketResult<()> {
self.subscribe(SubscriptionRequest::new(
Symbol::empty(),
StreamType::PositionUpdate,
))
.await
}
}
impl<T: WebSocketConnector> WebSocketExt for T {}