limitless/ws/stream.rs
1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{error, trace};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10
11use tokio_tungstenite::tungstenite::Message as WsMessage;
12
13/// Manages WebSocket streaming connections to the Limitless Exchange.
14///
15/// Connects to `wss://ws.limitless.exchange/markets` and provides
16/// methods for subscribing to public and private data streams.
17///
18/// **Protocol note:** The Limitless WS uses Socket.IO protocol over raw
19/// WebSocket. This implementation handles the raw WebSocket transport;
20/// callers should frame Socket.IO packets (namespace connect, event
21/// emit/receive) on top of this stream.
22///
23/// # Event Reference
24///
25/// | Client → Server (emit) | Auth | Description |
26/// |-------------------------------|------|------------------------------------|
27/// | `subscribe_market_prices` | No | AMM prices + CLOB orderbook |
28/// | `subscribe_positions` | Yes | Portfolio position updates |
29/// | `subscribe_order_events` | Yes | OME + settlement lifecycle |
30/// | `subscribe_market_lifecycle` | No | Market creation / resolution |
31///
32/// | Server → Client (on) | Auth | Description |
33/// |-----------------------|------|--------------------------------------|
34/// | `newPriceData` | No | AMM price update |
35/// | `orderbookUpdate` | No | CLOB orderbook snapshot |
36/// | `positions` | Yes | Position balance change |
37/// | `orderEvent` | Yes | OME state or settlement result |
38/// | `marketCreated` | No | New market funded and visible |
39/// | `marketResolved` | No | Market resolved with winning outcome |
40/// | `system` | — | System notifications |
41/// | `authenticated` | Yes | Auth confirmation |
42/// | `exception` | — | Error notifications |
43#[derive(Clone)]
44pub struct Stream {
45 pub client: Client,
46}
47
48impl Stream {
49 /// Tests connectivity by sending a WebSocket ping.
50 pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
51 let response = self.client.wss_connect(None, false, None).await?;
52
53 let mut ws_client = WsClient::new(response);
54 let _ = ws_client
55 .stream()
56 .send(WsMessage::Ping(vec![].into()))
57 .await;
58
59 let Some(data) = ws_client.stream().next().await else {
60 return Err(LimitlessError::Base(
61 "Failed to receive pong response".to_string(),
62 ));
63 };
64 match data {
65 Ok(WsMessage::Pong(_)) => {
66 trace!("Pong received successfully");
67 }
68 Ok(other) => {
69 trace!("Unexpected WS message on ping: {:?}", other);
70 }
71 Err(e) => {
72 return Err(LimitlessError::Tungstenite(e));
73 }
74 }
75 Ok(())
76 }
77
78 /// Subscribe to a public data stream with an event handler callback.
79 ///
80 /// The `handler` receives raw JSON `Value` for each incoming message
81 /// that is not a control frame (Ping/Pong/Close).
82 ///
83 /// # Example
84 ///
85 /// ```no_run
86 /// use limitless::prelude::*;
87 ///
88 /// #[tokio::main]
89 /// async fn main() {
90 /// let stream: Stream = Limitless::new(None, None);
91 /// stream
92 /// .ws_subscribe(|event| {
93 /// println!("Received: {:?}", event);
94 /// Ok(())
95 /// })
96 /// .await
97 /// .unwrap();
98 /// }
99 /// ```
100 pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
101 where
102 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
103 {
104 let response = self.client.wss_connect(None, false, None).await?;
105 let mut ws_client = WsClient::new(response);
106 Self::event_loop(&mut ws_client, handler, None).await?;
107 Ok(())
108 }
109
110 /// Subscribe to a stream with dynamic command support.
111 ///
112 /// Allows emitting subscription commands (subscribe/unsubscribe) after
113 /// the connection is established. Send JSON command strings through
114 /// the `cmd_sender` channel.
115 pub async fn ws_subscribe_with_commands<F>(
116 &self,
117 cmd_receiver: mpsc::UnboundedReceiver<String>,
118 handler: F,
119 ) -> Result<(), LimitlessError>
120 where
121 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
122 {
123 let response = self.client.wss_connect(None, false, None).await?;
124 let mut ws_client = WsClient::new(response);
125 Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
126 Ok(())
127 }
128
129 /// Core event loop: reads WebSocket messages, dispatches to handler,
130 /// sends periodic pings, and processes outgoing subscription commands.
131 pub(crate) async fn event_loop<F>(
132 ws_client: &mut WsClient,
133 mut handler: F,
134 mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
135 ) -> Result<(), LimitlessError>
136 where
137 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
138 {
139 let mut last_ping = Instant::now();
140
141 loop {
142 tokio::select! {
143 // ── Incoming WebSocket message ─────────────────────────
144 msg = ws_client.stream().next() => {
145 match msg {
146 Some(Ok(WsMessage::Text(text))) => {
147 if let Ok(event) = serde_json::from_str::<Value>(&text) {
148 if let Err(e) = handler(event) {
149 error!("WebSocket handler error: {}", e);
150 }
151 }
152 }
153 Some(Ok(WsMessage::Binary(data))) => {
154 // Socket.IO may send binary frames
155 if let Ok(text) = String::from_utf8(data.to_vec()) {
156 if let Ok(event) = serde_json::from_str::<Value>(&text) {
157 if let Err(e) = handler(event) {
158 error!("WebSocket handler error: {}", e);
159 }
160 }
161 }
162 }
163 Some(Ok(WsMessage::Ping(data))) => {
164 let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
165 }
166 Some(Ok(WsMessage::Close(_))) => {
167 trace!("WebSocket closed by server");
168 return Ok(());
169 }
170 Some(Err(e)) => {
171 return Err(LimitlessError::Tungstenite(e));
172 }
173 None => {
174 return Ok(());
175 }
176 _ => {}
177 }
178 }
179
180 // ── Outgoing subscription command ──────────────────────
181 cmd = async {
182 match cmd_receiver.as_mut() {
183 Some(rx) => rx.recv().await,
184 None => std::future::pending().await,
185 }
186 } => {
187 if let Some(cmd) = cmd {
188 let _ = ws_client
189 .stream()
190 .send(WsMessage::Text(cmd.into()))
191 .await;
192 }
193 }
194
195 // ── Periodic ping ──────────────────────────────────────
196 _ = tokio::time::sleep(PING_INTERVAL) => {
197 let now = Instant::now();
198 if now.duration_since(last_ping) >= PING_INTERVAL {
199 let _ = ws_client
200 .stream()
201 .send(WsMessage::Ping(vec![].into()))
202 .await;
203 last_ping = now;
204 }
205 }
206 }
207 }
208 }
209}
210
211impl Limitless for Stream {
212 fn new(api_key: Option<String>, secret: Option<String>) -> Self {
213 Self::new_with_config(&Config::default(), api_key, secret)
214 }
215
216 fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
217 Self {
218 client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
219 }
220 }
221}