Skip to main content

limitless/ws/
stream.rs

1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{error, trace};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10
11use tokio_tungstenite::tungstenite::Message as WsMessage;
12
13/// Manages WebSocket streaming connections to the Limitless Exchange.
14///
15/// Connects to `wss://ws.limitless.exchange/markets` and provides
16/// methods for subscribing to public and private data streams.
17///
18/// **Protocol note:** The Limitless WS uses Socket.IO protocol over raw
19/// WebSocket. This implementation handles the raw WebSocket transport;
20/// callers should frame Socket.IO packets (namespace connect, event
21/// emit/receive) on top of this stream.
22///
23/// # Event Reference
24///
25/// | Client → Server (emit)         | Auth | Description                        |
26/// |-------------------------------|------|------------------------------------|
27/// | `subscribe_market_prices`     | No   | AMM prices + CLOB orderbook       |
28/// | `subscribe_positions`         | Yes  | Portfolio position updates         |
29/// | `subscribe_order_events`      | Yes  | OME + settlement lifecycle        |
30/// | `subscribe_market_lifecycle`  | No   | Market creation / resolution       |
31///
32/// | Server → Client (on)  | Auth | Description                          |
33/// |-----------------------|------|--------------------------------------|
34/// | `newPriceData`        | No   | AMM price update                     |
35/// | `orderbookUpdate`     | No   | CLOB orderbook snapshot              |
36/// | `positions`           | Yes  | Position balance change              |
37/// | `orderEvent`          | Yes  | OME state or settlement result       |
38/// | `marketCreated`       | No   | New market funded and visible        |
39/// | `marketResolved`      | No   | Market resolved with winning outcome |
40/// | `system`              | —    | System notifications                 |
41/// | `authenticated`       | Yes  | Auth confirmation                    |
42/// | `exception`           | —    | Error notifications                  |
43#[derive(Clone)]
44pub struct Stream {
45    pub client: Client,
46}
47
48impl Stream {
49    /// Tests connectivity by sending a WebSocket ping.
50    pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
51        let response = self.client.wss_connect(None, false, None).await?;
52
53        let mut ws_client = WsClient::new(response);
54        let _ = ws_client
55            .stream()
56            .send(WsMessage::Ping(vec![].into()))
57            .await;
58
59        let Some(data) = ws_client.stream().next().await else {
60            return Err(LimitlessError::Base(
61                "Failed to receive pong response".to_string(),
62            ));
63        };
64        match data {
65            Ok(WsMessage::Pong(_)) => {
66                trace!("Pong received successfully");
67            }
68            Ok(other) => {
69                trace!("Unexpected WS message on ping: {:?}", other);
70            }
71            Err(e) => {
72                return Err(LimitlessError::Tungstenite(e));
73            }
74        }
75        Ok(())
76    }
77
78    /// Subscribe to a public data stream with an event handler callback.
79    ///
80    /// The `handler` receives raw JSON `Value` for each incoming message
81    /// that is not a control frame (Ping/Pong/Close).
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// use limitless::prelude::*;
87    ///
88    /// #[tokio::main]
89    /// async fn main() {
90    ///     let stream: Stream = Limitless::new(None, None);
91    ///     stream
92    ///         .ws_subscribe(|event| {
93    ///             println!("Received: {:?}", event);
94    ///             Ok(())
95    ///         })
96    ///         .await
97    ///         .unwrap();
98    /// }
99    /// ```
100    pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
101    where
102        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
103    {
104        let response = self.client.wss_connect(None, false, None).await?;
105        let mut ws_client = WsClient::new(response);
106        Self::event_loop(&mut ws_client, handler, None).await?;
107        Ok(())
108    }
109
110    /// Subscribe to a stream with dynamic command support.
111    ///
112    /// Allows emitting subscription commands (subscribe/unsubscribe) after
113    /// the connection is established. Send JSON command strings through
114    /// the `cmd_sender` channel.
115    pub async fn ws_subscribe_with_commands<F>(
116        &self,
117        cmd_receiver: mpsc::UnboundedReceiver<String>,
118        handler: F,
119    ) -> Result<(), LimitlessError>
120    where
121        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
122    {
123        let response = self.client.wss_connect(None, false, None).await?;
124        let mut ws_client = WsClient::new(response);
125        Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
126        Ok(())
127    }
128
129    /// Core event loop: reads WebSocket messages, dispatches to handler,
130    /// sends periodic pings, and processes outgoing subscription commands.
131    pub(crate) async fn event_loop<F>(
132        ws_client: &mut WsClient,
133        mut handler: F,
134        mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
135    ) -> Result<(), LimitlessError>
136    where
137        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
138    {
139        let mut last_ping = Instant::now();
140
141        loop {
142            tokio::select! {
143                // ── Incoming WebSocket message ─────────────────────────
144                msg = ws_client.stream().next() => {
145                    match msg {
146                        Some(Ok(WsMessage::Text(text))) => {
147                            if let Ok(event) = serde_json::from_str::<Value>(&text) {
148                                if let Err(e) = handler(event) {
149                                    error!("WebSocket handler error: {}", e);
150                                }
151                            }
152                        }
153                        Some(Ok(WsMessage::Binary(data))) => {
154                            // Socket.IO may send binary frames
155                            if let Ok(text) = String::from_utf8(data.to_vec()) {
156                                if let Ok(event) = serde_json::from_str::<Value>(&text) {
157                                    if let Err(e) = handler(event) {
158                                        error!("WebSocket handler error: {}", e);
159                                    }
160                                }
161                            }
162                        }
163                        Some(Ok(WsMessage::Ping(data))) => {
164                            let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
165                        }
166                        Some(Ok(WsMessage::Close(_))) => {
167                            trace!("WebSocket closed by server");
168                            return Ok(());
169                        }
170                        Some(Err(e)) => {
171                            return Err(LimitlessError::Tungstenite(e));
172                        }
173                        None => {
174                            return Ok(());
175                        }
176                        _ => {}
177                    }
178                }
179
180                // ── Outgoing subscription command ──────────────────────
181                cmd = async {
182                    match cmd_receiver.as_mut() {
183                        Some(rx) => rx.recv().await,
184                        None => std::future::pending().await,
185                    }
186                } => {
187                    if let Some(cmd) = cmd {
188                        let _ = ws_client
189                            .stream()
190                            .send(WsMessage::Text(cmd.into()))
191                            .await;
192                    }
193                }
194
195                // ── Periodic ping ──────────────────────────────────────
196                _ = tokio::time::sleep(PING_INTERVAL) => {
197                    let now = Instant::now();
198                    if now.duration_since(last_ping) >= PING_INTERVAL {
199                        let _ = ws_client
200                            .stream()
201                            .send(WsMessage::Ping(vec![].into()))
202                            .await;
203                        last_ping = now;
204                    }
205                }
206            }
207        }
208    }
209}
210
211impl Limitless for Stream {
212    fn new(api_key: Option<String>, secret: Option<String>) -> Self {
213        Self::new_with_config(&Config::default(), api_key, secret)
214    }
215
216    fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
217        Self {
218            client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
219        }
220    }
221}