use crate::prelude::*;
use crate::ws::client::WsClient;
use crate::ws::PING_INTERVAL;
use futures::{SinkExt, StreamExt};
use log::{error, trace};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message as WsMessage;
#[derive(Clone)]
pub struct Stream {
pub client: Client,
}
impl Stream {
pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
let response = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(response);
let _ = ws_client
.stream()
.send(WsMessage::Ping(vec![].into()))
.await;
let Some(data) = ws_client.stream().next().await else {
return Err(LimitlessError::Base(
"Failed to receive pong response".to_string(),
));
};
match data {
Ok(WsMessage::Pong(_)) => {
trace!("Pong received successfully");
}
Ok(other) => {
trace!("Unexpected WS message on ping: {:?}", other);
}
Err(e) => {
return Err(LimitlessError::Tungstenite(e));
}
}
Ok(())
}
pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
let response = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(response);
Self::event_loop(&mut ws_client, handler, None).await?;
Ok(())
}
pub async fn ws_subscribe_with_commands<F>(
&self,
cmd_receiver: mpsc::UnboundedReceiver<String>,
handler: F,
) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
let response = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(response);
Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
Ok(())
}
pub(crate) async fn event_loop<F>(
ws_client: &mut WsClient,
mut handler: F,
mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
let mut last_ping = Instant::now();
loop {
tokio::select! {
msg = ws_client.stream().next() => {
match msg {
Some(Ok(WsMessage::Text(text))) => {
if let Ok(event) = serde_json::from_str::<Value>(&text) {
if let Err(e) = handler(event) {
error!("WebSocket handler error: {}", e);
}
}
}
Some(Ok(WsMessage::Binary(data))) => {
if let Ok(text) = String::from_utf8(data.to_vec()) {
if let Ok(event) = serde_json::from_str::<Value>(&text) {
if let Err(e) = handler(event) {
error!("WebSocket handler error: {}", e);
}
}
}
}
Some(Ok(WsMessage::Ping(data))) => {
let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
}
Some(Ok(WsMessage::Close(_))) => {
trace!("WebSocket closed by server");
return Ok(());
}
Some(Err(e)) => {
return Err(LimitlessError::Tungstenite(e));
}
None => {
return Ok(());
}
_ => {}
}
}
cmd = async {
match cmd_receiver.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
if let Some(cmd) = cmd {
let _ = ws_client
.stream()
.send(WsMessage::Text(cmd.into()))
.await;
}
}
_ = tokio::time::sleep(PING_INTERVAL) => {
let now = Instant::now();
if now.duration_since(last_ping) >= PING_INTERVAL {
let _ = ws_client
.stream()
.send(WsMessage::Ping(vec![].into()))
.await;
last_ping = now;
}
}
}
}
}
}
impl Limitless for Stream {
fn new(api_key: Option<String>, secret: Option<String>) -> Self {
Self::new_with_config(&Config::default(), api_key, secret)
}
fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
Self {
client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
}
}
}