#![allow(unused_imports)]
use serde_json::Value;
use std::sync::{Arc, atomic::Ordering};
use tokio::spawn;
use crate::common::config::ConfigurationWebsocketStreams;
use crate::common::websocket::{
Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
create_stream_handler,
};
use crate::models::{StreamId, WebsocketEvent, WebsocketMode};
mod apis;
mod handle;
mod models;
pub use apis::*;
pub use handle::*;
pub use models::*;
pub struct WebsocketStreams {
websocket_streams_base: Arc<WebsocketStreamsBase>,
}
impl WebsocketStreams {
pub(crate) async fn connect(
config: ConfigurationWebsocketStreams,
streams: Vec<String>,
mode: Option<WebsocketMode>,
) -> anyhow::Result<Self> {
let mut cfg = config;
if let Some(m) = mode {
cfg.mode = m;
}
let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);
websocket_streams_base.clone().connect(streams).await?;
Ok(Self {
websocket_streams_base: websocket_streams_base.clone(),
})
}
pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
where
F: FnMut(WebsocketEvent) + Send + 'static,
{
let base = Arc::clone(&self.websocket_streams_base);
base.common.events.subscribe(callback)
}
pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
subscription.unsubscribe();
}
pub async fn disconnect(&self) -> anyhow::Result<()> {
self.websocket_streams_base
.disconnect()
.await
.map_err(anyhow::Error::msg)
}
pub async fn is_connected(&self) -> bool {
self.websocket_streams_base.is_connected().await
}
pub async fn ping_server(&self) {
self.websocket_streams_base.ping_server().await;
}
pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
let base = Arc::clone(&self.websocket_streams_base);
spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
}
pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
let base = Arc::clone(&self.websocket_streams_base);
spawn(async move {
base.unsubscribe(streams, id.map(StreamId::from), None)
.await;
});
}
pub async fn is_subscribed(&self, stream: &str) -> bool {
self.websocket_streams_base.is_subscribed(stream).await
}
pub async fn risk_data(
&self,
listen_key: String,
id: Option<String>,
) -> anyhow::Result<Arc<WebsocketStream<RiskDataStreamEventsResponse>>> {
Ok(create_stream_handler::<RiskDataStreamEventsResponse>(
WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
listen_key,
id.map(StreamId::from),
None,
)
.await)
}
pub async fn trade_data(
&self,
listen_key: String,
id: Option<String>,
) -> anyhow::Result<Arc<WebsocketStream<TradeDataStreamEventsResponse>>> {
Ok(create_stream_handler::<TradeDataStreamEventsResponse>(
WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
listen_key,
id.map(StreamId::from),
None,
)
.await)
}
}