limitless/ws/stream.rs
1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{debug, error, trace, warn};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_tungstenite::tungstenite::Message as WsMessage;
11
12// ── Socket.IO / Engine.IO protocol constants ─────────────────────────────
13
14/// The Socket.IO namespace used by the Limitless Exchange.
15pub const SOCKET_NAMESPACE: &str = "/markets";
16
17/// Engine.IO open packet prefix (server → client handshake).
18const EIO_OPEN: u8 = b'0';
19
20/// Engine.IO close packet.
21const EIO_CLOSE: u8 = b'1';
22
23/// Engine.IO ping packet (server → client).
24const EIO_PING: u8 = b'2';
25
26/// Engine.IO pong packet (client → server).
27const EIO_PONG: u8 = b'3';
28
29/// Engine.IO message packet (carries Socket.IO payload).
30const EIO_MESSAGE: u8 = b'4';
31
32/// Socket.IO connect packet type.
33/// Socket.IO disconnect packet type.
34/// Socket.IO event packet type.
35const SIO_EVENT: u8 = b'2';
36
37/// Socket.IO error packet type.
38// ── Helpers ──────────────────────────────────────────────────────────────
39
40/// Build a Socket.IO event frame for emission to the server.
41///
42/// Format: `42{namespace},["{event}",{data}]`
43///
44/// Returns a complete frame ready to send as a WebSocket text message.
45pub fn frame_socketio_event(event: &str, data: &Value) -> String {
46 let payload = serde_json::to_string(&serde_json::json!([event, data]))
47 .unwrap_or_else(|_| format!(r#"["{}",null]"#, event));
48 format!("42{namespace},{payload}", namespace = SOCKET_NAMESPACE)
49}
50
51/// Build a Socket.IO namespace connect frame.
52///
53/// Format: `40{namespace},`
54pub fn frame_socketio_connect() -> String {
55 format!("40{namespace},", namespace = SOCKET_NAMESPACE)
56}
57
58/// Parse an Engine.IO text message and extract the Socket.IO event name
59/// and payload if this is an event (`42{namespace},[...]`).
60///
61/// Returns `Some((event_name, payload_value))` for events, `None` for
62/// non-event messages (open, close, ping/pong, connect ack, etc.).
63pub fn parse_socketio_message(text: &str) -> Option<(String, Value)> {
64 let bytes = text.as_bytes();
65
66 // Must start with '4' (Engine.IO message)
67 if bytes.is_empty() || bytes[0] != EIO_MESSAGE {
68 return None;
69 }
70
71 let after_eio = &text[1..];
72
73 // Must start with a digit (Socket.IO packet type)
74 let first_sio = after_eio.as_bytes().first()?;
75 if *first_sio != SIO_EVENT {
76 // '40' = connect, '41' = disconnect, '43' = ack, '44' = error
77 // None of these carry user events
78 return None;
79 }
80
81 // Skip the Socket.IO type digit: now we have "2{namespace},[...]"
82 let after_sio_type = &after_eio[1..];
83
84 // Strip namespace prefix: "{namespace},"
85 let event_payload = if let Some(rest) = after_sio_type.strip_prefix(SOCKET_NAMESPACE) {
86 // Strip the comma after namespace
87 rest.strip_prefix(',')?
88 } else {
89 // No namespace prefix — payload starts right after the type digit
90 // (e.g., "2[...]")
91 after_sio_type
92 };
93
94 // Parse the JSON array: ["eventName", {...}]
95 let values: Vec<Value> = serde_json::from_str(event_payload).ok()?;
96 if values.is_empty() {
97 return None;
98 }
99 let event_name = values[0].as_str()?.to_string();
100 let payload = values.get(1).cloned().unwrap_or(Value::Null);
101
102 Some((event_name, payload))
103}
104
105/// Check if a text message is an Engine.IO ping (just the character '2').
106fn is_eio_ping(text: &str) -> bool {
107 text.as_bytes() == [EIO_PING]
108}
109
110/// Check if a text message is an Engine.IO close (just the character '1').
111fn is_eio_close(text: &str) -> bool {
112 text.as_bytes() == [EIO_CLOSE]
113}
114
115/// Check if a text message is the Engine.IO open packet (starts with '0').
116fn is_eio_open(text: &str) -> bool {
117 text.as_bytes().first() == Some(&EIO_OPEN)
118}
119
120/// Check if a text message is a Socket.IO namespace connect ack ('40{namespace},').
121fn is_namespace_connect_ack(text: &str) -> bool {
122 text.starts_with(&format!("40{namespace},", namespace = SOCKET_NAMESPACE))
123}
124
125/// Check if a text message is a Socket.IO namespace disconnect.
126fn is_namespace_disconnect(text: &str) -> bool {
127 text.starts_with(&format!("41{namespace}", namespace = SOCKET_NAMESPACE))
128}
129
130// ── Stream ───────────────────────────────────────────────────────────────
131
132/// Manages WebSocket streaming connections to the Limitless Exchange.
133///
134/// Connects to `wss://ws.limitless.exchange/socket.io/?EIO=4&transport=websocket`
135/// and handles the Socket.IO protocol (Engine.IO v4 + Socket.IO) over the
136/// raw WebSocket transport.
137///
138/// # Event Reference
139///
140/// | Client → Server (emit) | Auth | Description |
141/// |-------------------------------|------|------------------------------------|
142/// | `subscribe_market_prices` | No | AMM prices + CLOB orderbook |
143/// | `subscribe_positions` | Yes | Portfolio position updates |
144/// | `subscribe_order_events` | Yes | OME + settlement lifecycle |
145/// | `subscribe_market_lifecycle` | No | Market creation / resolution |
146///
147/// | Server → Client (on) | Auth | Description |
148/// |-----------------------|------|--------------------------------------|
149/// | `newPriceData` | No | AMM price update |
150/// | `orderbookUpdate` | No | CLOB orderbook snapshot |
151/// | `positions` | Yes | Position balance change |
152/// | `orderEvent` | Yes | OME state or settlement result |
153/// | `marketCreated` | No | New market funded and visible |
154/// | `marketResolved` | No | Market resolved with winning outcome |
155/// | `system` | — | System notifications |
156/// | `authenticated` | Yes | Auth confirmation |
157/// | `exception` | — | Error notifications |
158#[derive(Clone)]
159pub struct Stream {
160 pub client: Client,
161}
162
163impl Stream {
164 /// Tests connectivity by performing the full Socket.IO handshake.
165 ///
166 /// Connects, reads the Engine.IO open packet, sends namespace connect,
167 /// and verifies the server acknowledges it.
168 pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
169 let stream = self.client.wss_connect(None, false, None).await?;
170 let mut ws_client = WsClient::new(stream);
171
172 // ── Phase 1: Engine.IO open ──────────────────────────────
173 let open_text = Self::read_text_message(ws_client.stream()).await?;
174 if !is_eio_open(&open_text) {
175 return Err(LimitlessError::Base(format!(
176 "Expected Engine.IO open packet (0{{...}}), got: {}",
177 &open_text[..open_text.len().min(80)]
178 )));
179 }
180 trace!("Engine.IO open received: {}", open_text);
181
182 // ── Phase 2: Socket.IO namespace connect ─────────────────
183 let connect_frame = frame_socketio_connect();
184 ws_client
185 .stream()
186 .send(WsMessage::Text(connect_frame.into()))
187 .await
188 .map_err(|e| {
189 LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
190 })?;
191
192 let ack_text = Self::read_text_message(ws_client.stream()).await?;
193 if !is_namespace_connect_ack(&ack_text) {
194 return Err(LimitlessError::Base(format!(
195 "Expected namespace connect ack (40/markets,), got: {}",
196 &ack_text[..ack_text.len().min(80)]
197 )));
198 }
199 trace!("Namespace connected: {}", ack_text);
200
201 // ── Send proper close ────────────────────────────────────
202 let _ = ws_client
203 .stream()
204 .send(
205 WsMessage::Text(format!("41{namespace},", namespace = SOCKET_NAMESPACE).into())
206 .into(),
207 )
208 .await;
209 let _ = ws_client.disconnect().await;
210
211 Ok(())
212 }
213
214 /// Subscribe to a public data stream with an event handler callback.
215 ///
216 /// The `handler` receives a `Value` that is an array `[event_name, payload]`
217 /// for Socket.IO events, or the raw parsed JSON for other messages.
218 ///
219 /// # Example
220 ///
221 /// ```no_run
222 /// use limitless::prelude::*;
223 ///
224 /// #[tokio::main]
225 /// async fn main() {
226 /// let stream: Stream = Limitless::new(None, None);
227 /// stream
228 /// .ws_subscribe(|event| {
229 /// println!("Received: {:?}", event);
230 /// Ok(())
231 /// })
232 /// .await
233 /// .unwrap();
234 /// }
235 /// ```
236 pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
237 where
238 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
239 {
240 let stream = self.client.wss_connect(None, false, None).await?;
241 let mut ws_client = WsClient::new(stream);
242 Self::event_loop(&mut ws_client, handler, None).await?;
243 Ok(())
244 }
245
246 /// Subscribe to a stream with dynamic command support.
247 ///
248 /// Allows emitting subscription commands (subscribe/unsubscribe) after
249 /// the connection is established. Commands should be complete Socket.IO
250 /// frames (e.g., `42/markets,["subscribe_market_prices",{...}]`).
251 ///
252 /// Use [`frame_socketio_event`] to build properly framed commands.
253 pub async fn ws_subscribe_with_commands<F>(
254 &self,
255 cmd_receiver: mpsc::UnboundedReceiver<String>,
256 handler: F,
257 ) -> Result<(), LimitlessError>
258 where
259 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
260 {
261 let stream = self.client.wss_connect(None, false, None).await?;
262 let mut ws_client = WsClient::new(stream);
263 Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
264 Ok(())
265 }
266
267 /// Subscribe to a stream with dynamic command support **and authentication**.
268 ///
269 /// Like [`ws_subscribe_with_commands`](Self::ws_subscribe_with_commands) but
270 /// sends the `X-API-Key` header on the WebSocket upgrade request, enabling
271 /// private channels:
272 ///
273 /// - `subscribe_positions` — real-time position balance updates
274 /// - `subscribe_order_events` — OME state changes + settlement results
275 ///
276 /// # Requirements
277 ///
278 /// The [`Stream`] must have been constructed with an API key (via
279 /// [`Limitless::new`] or [`LimitlessClient::builder().set_credentials()`]).
280 /// Without a key the connection is still established but private
281 /// subscriptions will fail with an `exception` event.
282 ///
283 /// # Example
284 ///
285 /// ```no_run
286 /// use limitless::prelude::*;
287 /// use tokio::sync::mpsc;
288 ///
289 /// #[tokio::main]
290 /// async fn main() {
291 /// let ws: Stream = Limitless::new(
292 /// Some("lmts_sk_...".into()),
293 /// Some("base64_secret".into()),
294 /// );
295 /// let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
296 ///
297 /// // Send subscription commands after connecting
298 /// tokio::spawn(async move {
299 /// let sub = frame_socketio_event("subscribe_order_events", &serde_json::json!({}));
300 /// let _ = cmd_tx.send(sub);
301 /// });
302 ///
303 /// ws.ws_subscribe_authenticated_with_commands(cmd_rx, |event| {
304 /// println!("Private event: {event}");
305 /// Ok(())
306 /// }).await.unwrap();
307 /// }
308 /// ```
309 pub async fn ws_subscribe_authenticated_with_commands<F>(
310 &self,
311 cmd_receiver: mpsc::UnboundedReceiver<String>,
312 handler: F,
313 ) -> Result<(), LimitlessError>
314 where
315 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
316 {
317 let stream = self.client.wss_connect(None, true, None).await?;
318 let mut ws_client = WsClient::new(stream);
319 Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
320 Ok(())
321 }
322
323 /// Subscribe to market updates for a specific slug.
324 ///
325 /// Handles the full lifecycle: connect, handshake, subscribe, and
326 /// event dispatch. The handler receives `[event_name, payload]` arrays.
327 ///
328 /// # Example
329 ///
330 /// ```no_run
331 /// use limitless::prelude::*;
332 ///
333 /// #[tokio::main]
334 /// async fn main() {
335 /// let ws: Stream = Limitless::new(None, None);
336 /// ws.ws_subscribe_market("btc-above-100k", |event_name, payload| {
337 /// println!("{event_name}: {payload}");
338 /// Ok(())
339 /// }).await.unwrap();
340 /// }
341 /// ```
342 pub async fn ws_subscribe_market<F>(
343 &self,
344 market_slug: &str,
345 mut handler: F,
346 ) -> Result<(), LimitlessError>
347 where
348 F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
349 {
350 let stream = self.client.wss_connect(None, false, None).await?;
351 let mut ws_client = WsClient::new(stream);
352
353 // ── Handshake ─────────────────────────────────────────────
354 Self::perform_handshake(&mut ws_client).await?;
355
356 // ── Subscribe ─────────────────────────────────────────────
357 let sub = frame_socketio_event(
358 "subscribe_market_prices",
359 &serde_json::json!({"marketSlugs": [market_slug]}),
360 );
361 ws_client
362 .stream()
363 .send(WsMessage::Text(sub.into()))
364 .await
365 .map_err(|e| LimitlessError::Base(format!("Failed to send subscription: {}", e)))?;
366 debug!("Subscribed to market prices for: {}", market_slug);
367
368 // ── Event loop with typed dispatch ────────────────────────
369 Self::typed_event_loop(&mut ws_client, &mut handler, None).await?;
370 Ok(())
371 }
372
373 /// Subscribe to the WebSocket event stream and receive typed [`WsEventKind`] events.
374 ///
375 /// Connects, performs the Socket.IO handshake, then enters an event
376 /// loop that parses every incoming server event through
377 /// [`deserialize_event`] before passing the resulting [`WsEventKind`]
378 /// to `handler`.
379 ///
380 /// # Example
381 ///
382 /// ```no_run
383 /// use limitless::prelude::*;
384 ///
385 /// #[tokio::main]
386 /// async fn main() {
387 /// let ws: Stream = Limitless::new(None, None);
388 /// ws.ws_subscribe_events(|event| {
389 /// match event {
390 /// WsEventKind::NewPriceData(p) => println!("AMM prices for {}", p.market_address),
391 /// WsEventKind::TradeEvent(t) => println!("Trade: {} @ {}", t.size, t.price),
392 /// WsEventKind::Unknown(payload) => println!("Unknown: {payload:?}"),
393 /// other => println!("Event: {other:?}"),
394 /// }
395 /// Ok(())
396 /// }).await.unwrap();
397 /// }
398 /// ```
399 pub async fn ws_subscribe_events<F>(&self, mut handler: F) -> Result<(), LimitlessError>
400 where
401 F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
402 {
403 let stream = self.client.wss_connect(None, false, None).await?;
404 let mut ws_client = WsClient::new(stream);
405
406 // ── Handshake ─────────────────────────────────────────────
407 Self::perform_handshake(&mut ws_client).await?;
408
409 // ── Typed dispatch wrapper ────────────────────────────────
410 let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
411 match deserialize_event(event_name, payload) {
412 Some(kind) => handler(kind),
413 None => {
414 debug!("Failed to deserialize event '{}', skipping", event_name);
415 Ok(())
416 }
417 }
418 };
419
420 // ── Event loop with typed dispatch ────────────────────────
421 Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
422 Ok(())
423 }
424
425 /// Subscribe to typed WebSocket events **with authentication**.
426 ///
427 /// Like [`ws_subscribe_events`](Self::ws_subscribe_events) but sends the
428 /// `X-API-Key` header on the WebSocket upgrade request, enabling private
429 /// channels such as `positions` and `orderEvent`.
430 ///
431 /// # Example
432 ///
433 /// ```no_run
434 /// use limitless::prelude::*;
435 ///
436 /// #[tokio::main]
437 /// async fn main() {
438 /// let ws: Stream = Limitless::new(
439 /// Some("lmts_sk_...".into()),
440 /// Some("base64_secret".into()),
441 /// );
442 /// ws.ws_subscribe_authenticated_events(|event| {
443 /// match event {
444 /// WsEventKind::Positions(p) => println!("Position update: {p:?}"),
445 /// WsEventKind::OrderEvent(o) => println!("Order event: {o:?}"),
446 /// other => println!("Other: {other:?}"),
447 /// }
448 /// Ok(())
449 /// }).await.unwrap();
450 /// }
451 /// ```
452 pub async fn ws_subscribe_authenticated_events<F>(
453 &self,
454 mut handler: F,
455 ) -> Result<(), LimitlessError>
456 where
457 F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
458 {
459 let stream = self.client.wss_connect(None, true, None).await?;
460 let mut ws_client = WsClient::new(stream);
461
462 Self::perform_handshake(&mut ws_client).await?;
463
464 let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
465 match deserialize_event(event_name, payload) {
466 Some(kind) => handler(kind),
467 None => {
468 debug!("Failed to deserialize event '{}', skipping", event_name);
469 Ok(())
470 }
471 }
472 };
473
474 Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
475 Ok(())
476 }
477
478 /// Perform the Socket.IO handshake: read Engine.IO open, send namespace
479 /// connect, wait for ack.
480 async fn perform_handshake(ws_client: &mut WsClient) -> Result<(), LimitlessError> {
481 // Read Engine.IO open packet
482 let open_text = Self::read_text_message(ws_client.stream()).await?;
483 if !is_eio_open(&open_text) {
484 return Err(LimitlessError::Base(format!(
485 "Expected Engine.IO open packet (0{{...}}), got: {}",
486 &open_text[..open_text.len().min(120)]
487 )));
488 }
489 debug!("Engine.IO open: {}", open_text);
490
491 // Send namespace connect
492 let connect_frame = frame_socketio_connect();
493 ws_client
494 .stream()
495 .send(WsMessage::Text(connect_frame.into()))
496 .await
497 .map_err(|e| {
498 LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
499 })?;
500
501 // Read namespace connect ack
502 let ack_text = Self::read_text_message(ws_client.stream()).await?;
503 if !is_namespace_connect_ack(&ack_text) {
504 return Err(LimitlessError::Base(format!(
505 "Expected namespace connect ack (40/markets,), got: {}",
506 &ack_text[..ack_text.len().min(120)]
507 )));
508 }
509 debug!("Namespace connected: {}", ack_text);
510
511 Ok(())
512 }
513
514 /// Read the next text (or binary → text) message from the stream.
515 async fn read_text_message(
516 stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
517 ) -> Result<String, LimitlessError> {
518 match stream.next().await {
519 Some(Ok(WsMessage::Text(text))) => Ok(text.to_string()),
520 Some(Ok(WsMessage::Binary(data))) => String::from_utf8(data.to_vec())
521 .map_err(|e| LimitlessError::Base(format!("Invalid UTF-8 in binary frame: {}", e))),
522 Some(Ok(other)) => Err(LimitlessError::Base(format!(
523 "Expected text frame, got: {:?}",
524 other
525 ))),
526 Some(Err(e)) => Err(LimitlessError::Tungstenite(e)),
527 None => Err(LimitlessError::Base(
528 "WebSocket connection closed during handshake".to_string(),
529 )),
530 }
531 }
532
533 /// Core event loop: reads WebSocket messages, dispatches to handler,
534 /// sends periodic pings, and processes outgoing subscription commands.
535 ///
536 /// Performs the Socket.IO handshake before entering the main loop.
537 pub(crate) async fn event_loop<F>(
538 ws_client: &mut WsClient,
539 mut handler: F,
540 mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
541 ) -> Result<(), LimitlessError>
542 where
543 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
544 {
545 // ── Handshake phase ────────────────────────────────────────────
546 Self::perform_handshake(ws_client).await?;
547
548 // ── Main event loop ────────────────────────────────────────────
549 let mut last_ping = Instant::now();
550
551 loop {
552 tokio::select! {
553 // ── Incoming WebSocket message ─────────────────────────
554 msg = ws_client.stream().next() => {
555 match msg {
556 Some(Ok(WsMessage::Text(text))) => {
557 Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
558 }
559 Some(Ok(WsMessage::Binary(data))) => {
560 if let Ok(text) = String::from_utf8(data.to_vec()) {
561 Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
562 }
563 }
564 Some(Ok(WsMessage::Ping(data))) => {
565 let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
566 }
567 Some(Ok(WsMessage::Close(_))) => {
568 trace!("WebSocket closed by server");
569 return Ok(());
570 }
571 Some(Err(e)) => {
572 return Err(LimitlessError::Tungstenite(e));
573 }
574 None => {
575 trace!("WebSocket stream ended");
576 return Ok(());
577 }
578 _ => {}
579 }
580 }
581
582 // ── Outgoing command ───────────────────────────────────
583 cmd = async {
584 match cmd_receiver.as_mut() {
585 Some(rx) => rx.recv().await,
586 None => std::future::pending().await,
587 }
588 } => {
589 if let Some(cmd) = cmd {
590 debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
591 if let Err(e) = ws_client
592 .stream()
593 .send(WsMessage::Text(cmd.into()))
594 .await
595 {
596 error!("Failed to send command: {}", e);
597 }
598 }
599 }
600
601 // ── Periodic Engine.IO ping ────────────────────────────
602 _ = tokio::time::sleep(PING_INTERVAL) => {
603 let now = Instant::now();
604 if now.duration_since(last_ping) >= PING_INTERVAL {
605 // Send Engine.IO ping (the string "2")
606 let _ = ws_client
607 .stream()
608 .send(WsMessage::Text(String::from("2").into()))
609 .await;
610 last_ping = now;
611 }
612 }
613 }
614 }
615 }
616
617 /// Typed event loop: like `event_loop` but calls a `FnMut(&str, &Value)`
618 /// handler instead of `FnMut(Value)`.
619 pub(crate) async fn typed_event_loop<F>(
620 ws_client: &mut WsClient,
621 handler: &mut F,
622 mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
623 ) -> Result<(), LimitlessError>
624 where
625 F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
626 {
627 let mut last_ping = Instant::now();
628
629 loop {
630 tokio::select! {
631 // ── Incoming WebSocket message ─────────────────────────
632 msg = ws_client.stream().next() => {
633 match msg {
634 Some(Ok(WsMessage::Text(text))) => {
635 if let Some((event_name, payload)) = parse_socketio_message(&text) {
636 if let Err(e) = handler(&event_name, &payload) {
637 error!("WS handler error on '{event_name}': {}", e);
638 }
639 } else if is_eio_ping(&text) {
640 let _ = ws_client.stream()
641 .send(WsMessage::Text(String::from("3").into()))
642 .await;
643 } else if is_eio_close(&text) || is_namespace_disconnect(&text) {
644 trace!("Socket.IO close/disconnect received");
645 return Ok(());
646 }
647 // Ignore other messages (open, connect ack, pong, etc.)
648 }
649 Some(Ok(WsMessage::Binary(data))) => {
650 if let Ok(text) = String::from_utf8(data.to_vec()) {
651 if let Some((event_name, payload)) = parse_socketio_message(&text) {
652 if let Err(e) = handler(&event_name, &payload) {
653 error!("WS handler error on '{event_name}': {}", e);
654 }
655 }
656 }
657 }
658 Some(Ok(WsMessage::Ping(data))) => {
659 let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
660 }
661 Some(Ok(WsMessage::Close(_))) => {
662 trace!("WebSocket closed by server");
663 return Ok(());
664 }
665 Some(Err(e)) => {
666 return Err(LimitlessError::Tungstenite(e));
667 }
668 None => {
669 trace!("WebSocket stream ended");
670 return Ok(());
671 }
672 _ => {}
673 }
674 }
675
676 // ── Outgoing command ───────────────────────────────────
677 cmd = async {
678 match cmd_receiver.as_mut() {
679 Some(rx) => rx.recv().await,
680 None => std::future::pending().await,
681 }
682 } => {
683 if let Some(cmd) = cmd {
684 debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
685 if let Err(e) = ws_client
686 .stream()
687 .send(WsMessage::Text(cmd.into()))
688 .await
689 {
690 error!("Failed to send command: {}", e);
691 }
692 }
693 }
694
695 // ── Periodic Engine.IO ping ────────────────────────────
696 _ = tokio::time::sleep(PING_INTERVAL) => {
697 let now = Instant::now();
698 if now.duration_since(last_ping) >= PING_INTERVAL {
699 let _ = ws_client
700 .stream()
701 .send(WsMessage::Text(String::from("2").into()))
702 .await;
703 last_ping = now;
704 }
705 }
706 }
707 }
708 }
709
710 /// Handle an incoming text message from the WebSocket.
711 ///
712 /// Dispatches Engine.IO control frames and Socket.IO events.
713 async fn handle_incoming_text<F>(
714 text: &str,
715 handler: &mut F,
716 ws_client: &mut WsClient,
717 ) -> Result<(), LimitlessError>
718 where
719 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
720 {
721 // Engine.IO ping → reply pong
722 if is_eio_ping(text) {
723 let _ = ws_client
724 .stream()
725 .send(WsMessage::Text(String::from("3").into()))
726 .await;
727 return Ok(());
728 }
729
730 // Engine.IO close or Socket.IO namespace disconnect
731 if is_eio_close(text) || is_namespace_disconnect(text) {
732 trace!("Socket.IO close/disconnect");
733 return Err(LimitlessError::Base(
734 "Server closed the Socket.IO connection".to_string(),
735 ));
736 }
737
738 // Engine.IO pong — ignore
739 if text.as_bytes() == [EIO_PONG] {
740 return Ok(());
741 }
742
743 // Try to parse as a Socket.IO event (42/markets,[...])
744 if let Some((event_name, payload)) = parse_socketio_message(text) {
745 // Pass as [event_name, payload] array for backward compat
746 let event_array = serde_json::json!([event_name, payload]);
747 if let Err(e) = handler(event_array) {
748 error!("WS handler error on '{event_name}': {}", e);
749 }
750 return Ok(());
751 }
752
753 // Socket.IO connect ack, plain open, etc. — ignore
754 if is_namespace_connect_ack(text) || is_eio_open(text) {
755 return Ok(());
756 }
757
758 // Unknown message — log and try to parse as raw JSON
759 warn!("Unhandled WS message: {}", &text[..text.len().min(200)]);
760 if let Ok(value) = serde_json::from_str::<Value>(text) {
761 let _ = handler(value);
762 }
763
764 Ok(())
765 }
766}
767
768impl Limitless for Stream {
769 fn new(api_key: Option<String>, secret: Option<String>) -> Self {
770 Self::new_with_config(&Config::default(), api_key, secret)
771 }
772
773 fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
774 Self {
775 client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
776 }
777 }
778}