use crate::prelude::*;
use crate::ws::client::WsClient;
use crate::ws::PING_INTERVAL;
use futures::{SinkExt, StreamExt};
use log::{debug, error, trace, warn};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message as WsMessage;
pub const SOCKET_NAMESPACE: &str = "/markets";
const EIO_OPEN: u8 = b'0';
const EIO_CLOSE: u8 = b'1';
const EIO_PING: u8 = b'2';
const EIO_PONG: u8 = b'3';
const EIO_MESSAGE: u8 = b'4';
const SIO_EVENT: u8 = b'2';
pub fn frame_socketio_event(event: &str, data: &Value) -> String {
let payload = serde_json::to_string(&serde_json::json!([event, data]))
.unwrap_or_else(|_| format!(r#"["{}",null]"#, event));
format!("42{namespace},{payload}", namespace = SOCKET_NAMESPACE)
}
pub fn frame_socketio_connect() -> String {
format!("40{namespace},", namespace = SOCKET_NAMESPACE)
}
pub fn parse_socketio_message(text: &str) -> Option<(String, Value)> {
let bytes = text.as_bytes();
if bytes.is_empty() || bytes[0] != EIO_MESSAGE {
return None;
}
let after_eio = &text[1..];
let first_sio = after_eio.as_bytes().first()?;
if *first_sio != SIO_EVENT {
return None;
}
let after_sio_type = &after_eio[1..];
let event_payload = if let Some(rest) = after_sio_type.strip_prefix(SOCKET_NAMESPACE) {
rest.strip_prefix(',')?
} else {
after_sio_type
};
let values: Vec<Value> = serde_json::from_str(event_payload).ok()?;
if values.is_empty() {
return None;
}
let event_name = values[0].as_str()?.to_string();
let payload = values.get(1).cloned().unwrap_or(Value::Null);
Some((event_name, payload))
}
fn is_eio_ping(text: &str) -> bool {
text.as_bytes() == [EIO_PING]
}
fn is_eio_close(text: &str) -> bool {
text.as_bytes() == [EIO_CLOSE]
}
fn is_eio_open(text: &str) -> bool {
text.as_bytes().first() == Some(&EIO_OPEN)
}
fn is_namespace_connect_ack(text: &str) -> bool {
text.starts_with(&format!("40{namespace},", namespace = SOCKET_NAMESPACE))
}
fn is_namespace_disconnect(text: &str) -> bool {
text.starts_with(&format!("41{namespace}", namespace = SOCKET_NAMESPACE))
}
#[derive(Clone)]
pub struct Stream {
pub client: Client,
}
impl Stream {
pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
let stream = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(stream);
let open_text = Self::read_text_message(ws_client.stream()).await?;
if !is_eio_open(&open_text) {
return Err(LimitlessError::Base(format!(
"Expected Engine.IO open packet (0{{...}}), got: {}",
&open_text[..open_text.len().min(80)]
)));
}
trace!("Engine.IO open received: {}", open_text);
let connect_frame = frame_socketio_connect();
ws_client
.stream()
.send(WsMessage::Text(connect_frame.into()))
.await
.map_err(|e| {
LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
})?;
let ack_text = Self::read_text_message(ws_client.stream()).await?;
if !is_namespace_connect_ack(&ack_text) {
return Err(LimitlessError::Base(format!(
"Expected namespace connect ack (40/markets,), got: {}",
&ack_text[..ack_text.len().min(80)]
)));
}
trace!("Namespace connected: {}", ack_text);
let _ = ws_client
.stream()
.send(
WsMessage::Text(format!("41{namespace},", namespace = SOCKET_NAMESPACE).into())
.into(),
)
.await;
let _ = ws_client.disconnect().await;
Ok(())
}
pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
let stream = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(stream);
Self::event_loop(&mut ws_client, handler, None).await?;
Ok(())
}
pub async fn ws_subscribe_with_commands<F>(
&self,
cmd_receiver: mpsc::UnboundedReceiver<String>,
handler: F,
) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
let stream = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(stream);
Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
Ok(())
}
pub async fn ws_subscribe_market<F>(
&self,
market_slug: &str,
mut handler: F,
) -> Result<(), LimitlessError>
where
F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
{
let stream = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(stream);
Self::perform_handshake(&mut ws_client).await?;
let sub = frame_socketio_event(
"subscribe_market_prices",
&serde_json::json!({"marketSlugs": [market_slug]}),
);
ws_client
.stream()
.send(WsMessage::Text(sub.into()))
.await
.map_err(|e| LimitlessError::Base(format!("Failed to send subscription: {}", e)))?;
debug!("Subscribed to market prices for: {}", market_slug);
Self::typed_event_loop(&mut ws_client, &mut handler, None).await?;
Ok(())
}
pub async fn ws_subscribe_events<F>(&self, mut handler: F) -> Result<(), LimitlessError>
where
F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
{
let stream = self.client.wss_connect(None, false, None).await?;
let mut ws_client = WsClient::new(stream);
Self::perform_handshake(&mut ws_client).await?;
let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
match deserialize_event(event_name, payload) {
Some(kind) => handler(kind),
None => {
debug!("Failed to deserialize event '{}', skipping", event_name);
Ok(())
}
}
};
Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
Ok(())
}
async fn perform_handshake(ws_client: &mut WsClient) -> Result<(), LimitlessError> {
let open_text = Self::read_text_message(ws_client.stream()).await?;
if !is_eio_open(&open_text) {
return Err(LimitlessError::Base(format!(
"Expected Engine.IO open packet (0{{...}}), got: {}",
&open_text[..open_text.len().min(120)]
)));
}
debug!("Engine.IO open: {}", open_text);
let connect_frame = frame_socketio_connect();
ws_client
.stream()
.send(WsMessage::Text(connect_frame.into()))
.await
.map_err(|e| {
LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
})?;
let ack_text = Self::read_text_message(ws_client.stream()).await?;
if !is_namespace_connect_ack(&ack_text) {
return Err(LimitlessError::Base(format!(
"Expected namespace connect ack (40/markets,), got: {}",
&ack_text[..ack_text.len().min(120)]
)));
}
debug!("Namespace connected: {}", ack_text);
Ok(())
}
async fn read_text_message(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
) -> Result<String, LimitlessError> {
match stream.next().await {
Some(Ok(WsMessage::Text(text))) => Ok(text.to_string()),
Some(Ok(WsMessage::Binary(data))) => String::from_utf8(data.to_vec())
.map_err(|e| LimitlessError::Base(format!("Invalid UTF-8 in binary frame: {}", e))),
Some(Ok(other)) => Err(LimitlessError::Base(format!(
"Expected text frame, got: {:?}",
other
))),
Some(Err(e)) => Err(LimitlessError::Tungstenite(e)),
None => Err(LimitlessError::Base(
"WebSocket connection closed during handshake".to_string(),
)),
}
}
pub(crate) async fn event_loop<F>(
ws_client: &mut WsClient,
mut handler: F,
mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
Self::perform_handshake(ws_client).await?;
let mut last_ping = Instant::now();
loop {
tokio::select! {
msg = ws_client.stream().next() => {
match msg {
Some(Ok(WsMessage::Text(text))) => {
Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
}
Some(Ok(WsMessage::Binary(data))) => {
if let Ok(text) = String::from_utf8(data.to_vec()) {
Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
}
}
Some(Ok(WsMessage::Ping(data))) => {
let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
}
Some(Ok(WsMessage::Close(_))) => {
trace!("WebSocket closed by server");
return Ok(());
}
Some(Err(e)) => {
return Err(LimitlessError::Tungstenite(e));
}
None => {
trace!("WebSocket stream ended");
return Ok(());
}
_ => {}
}
}
cmd = async {
match cmd_receiver.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
if let Some(cmd) = cmd {
debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
if let Err(e) = ws_client
.stream()
.send(WsMessage::Text(cmd.into()))
.await
{
error!("Failed to send command: {}", e);
}
}
}
_ = tokio::time::sleep(PING_INTERVAL) => {
let now = Instant::now();
if now.duration_since(last_ping) >= PING_INTERVAL {
let _ = ws_client
.stream()
.send(WsMessage::Text(String::from("2").into()))
.await;
last_ping = now;
}
}
}
}
}
pub(crate) async fn typed_event_loop<F>(
ws_client: &mut WsClient,
handler: &mut F,
mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
) -> Result<(), LimitlessError>
where
F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
{
let mut last_ping = Instant::now();
loop {
tokio::select! {
msg = ws_client.stream().next() => {
match msg {
Some(Ok(WsMessage::Text(text))) => {
if let Some((event_name, payload)) = parse_socketio_message(&text) {
if let Err(e) = handler(&event_name, &payload) {
error!("WS handler error on '{event_name}': {}", e);
}
} else if is_eio_ping(&text) {
let _ = ws_client.stream()
.send(WsMessage::Text(String::from("3").into()))
.await;
} else if is_eio_close(&text) || is_namespace_disconnect(&text) {
trace!("Socket.IO close/disconnect received");
return Ok(());
}
}
Some(Ok(WsMessage::Binary(data))) => {
if let Ok(text) = String::from_utf8(data.to_vec()) {
if let Some((event_name, payload)) = parse_socketio_message(&text) {
if let Err(e) = handler(&event_name, &payload) {
error!("WS handler error on '{event_name}': {}", e);
}
}
}
}
Some(Ok(WsMessage::Ping(data))) => {
let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
}
Some(Ok(WsMessage::Close(_))) => {
trace!("WebSocket closed by server");
return Ok(());
}
Some(Err(e)) => {
return Err(LimitlessError::Tungstenite(e));
}
None => {
trace!("WebSocket stream ended");
return Ok(());
}
_ => {}
}
}
cmd = async {
match cmd_receiver.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
if let Some(cmd) = cmd {
debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
if let Err(e) = ws_client
.stream()
.send(WsMessage::Text(cmd.into()))
.await
{
error!("Failed to send command: {}", e);
}
}
}
_ = tokio::time::sleep(PING_INTERVAL) => {
let now = Instant::now();
if now.duration_since(last_ping) >= PING_INTERVAL {
let _ = ws_client
.stream()
.send(WsMessage::Text(String::from("2").into()))
.await;
last_ping = now;
}
}
}
}
}
async fn handle_incoming_text<F>(
text: &str,
handler: &mut F,
ws_client: &mut WsClient,
) -> Result<(), LimitlessError>
where
F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
{
if is_eio_ping(text) {
let _ = ws_client
.stream()
.send(WsMessage::Text(String::from("3").into()))
.await;
return Ok(());
}
if is_eio_close(text) || is_namespace_disconnect(text) {
trace!("Socket.IO close/disconnect");
return Err(LimitlessError::Base(
"Server closed the Socket.IO connection".to_string(),
));
}
if text.as_bytes() == [EIO_PONG] {
return Ok(());
}
if let Some((event_name, payload)) = parse_socketio_message(text) {
let event_array = serde_json::json!([event_name, payload]);
if let Err(e) = handler(event_array) {
error!("WS handler error on '{event_name}': {}", e);
}
return Ok(());
}
if is_namespace_connect_ack(text) || is_eio_open(text) {
return Ok(());
}
warn!("Unhandled WS message: {}", &text[..text.len().min(200)]);
if let Ok(value) = serde_json::from_str::<Value>(text) {
let _ = handler(value);
}
Ok(())
}
}
impl Limitless for Stream {
fn new(api_key: Option<String>, secret: Option<String>) -> Self {
Self::new_with_config(&Config::default(), api_key, secret)
}
fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
Self {
client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
}
}
}