Skip to main content

limitless/ws/
stream.rs

1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{debug, error, trace, warn};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_tungstenite::tungstenite::Message as WsMessage;
11
12// ── Socket.IO / Engine.IO protocol constants ─────────────────────────────
13
14/// The Socket.IO namespace used by the Limitless Exchange.
15pub const SOCKET_NAMESPACE: &str = "/markets";
16
17/// Engine.IO open packet prefix (server → client handshake).
18const EIO_OPEN: u8 = b'0';
19
20/// Engine.IO close packet.
21const EIO_CLOSE: u8 = b'1';
22
23/// Engine.IO ping packet (server → client).
24const EIO_PING: u8 = b'2';
25
26/// Engine.IO pong packet (client → server).
27const EIO_PONG: u8 = b'3';
28
29/// Engine.IO message packet (carries Socket.IO payload).
30const EIO_MESSAGE: u8 = b'4';
31
32/// Socket.IO connect packet type.
33/// Socket.IO disconnect packet type.
34/// Socket.IO event packet type.
35const SIO_EVENT: u8 = b'2';
36
37/// Socket.IO error packet type.
38// ── Helpers ──────────────────────────────────────────────────────────────
39
40/// Build a Socket.IO event frame for emission to the server.
41///
42/// Format: `42{namespace},["{event}",{data}]`
43///
44/// Returns a complete frame ready to send as a WebSocket text message.
45pub fn frame_socketio_event(event: &str, data: &Value) -> String {
46    let payload = serde_json::to_string(&serde_json::json!([event, data]))
47        .unwrap_or_else(|_| format!(r#"["{}",null]"#, event));
48    format!("42{namespace},{payload}", namespace = SOCKET_NAMESPACE)
49}
50
51/// Build a Socket.IO namespace connect frame.
52///
53/// Format: `40{namespace},`
54pub fn frame_socketio_connect() -> String {
55    format!("40{namespace},", namespace = SOCKET_NAMESPACE)
56}
57
58/// Parse an Engine.IO text message and extract the Socket.IO event name
59/// and payload if this is an event (`42{namespace},[...]`).
60///
61/// Returns `Some((event_name, payload_value))` for events, `None` for
62/// non-event messages (open, close, ping/pong, connect ack, etc.).
63pub fn parse_socketio_message(text: &str) -> Option<(String, Value)> {
64    let bytes = text.as_bytes();
65
66    // Must start with '4' (Engine.IO message)
67    if bytes.is_empty() || bytes[0] != EIO_MESSAGE {
68        return None;
69    }
70
71    let after_eio = &text[1..];
72
73    // Must start with a digit (Socket.IO packet type)
74    let first_sio = after_eio.as_bytes().first()?;
75    if *first_sio != SIO_EVENT {
76        // '40' = connect, '41' = disconnect, '43' = ack, '44' = error
77        // None of these carry user events
78        return None;
79    }
80
81    // Skip the Socket.IO type digit: now we have "2{namespace},[...]"
82    let after_sio_type = &after_eio[1..];
83
84    // Strip namespace prefix: "{namespace},"
85    let event_payload = if let Some(rest) = after_sio_type.strip_prefix(SOCKET_NAMESPACE) {
86        // Strip the comma after namespace
87        rest.strip_prefix(',')?
88    } else {
89        // No namespace prefix — payload starts right after the type digit
90        // (e.g., "2[...]")
91        after_sio_type
92    };
93
94    // Parse the JSON array: ["eventName", {...}]
95    let values: Vec<Value> = serde_json::from_str(event_payload).ok()?;
96    if values.is_empty() {
97        return None;
98    }
99    let event_name = values[0].as_str()?.to_string();
100    let payload = values.get(1).cloned().unwrap_or(Value::Null);
101
102    Some((event_name, payload))
103}
104
105/// Check if a text message is an Engine.IO ping (just the character '2').
106fn is_eio_ping(text: &str) -> bool {
107    text.as_bytes() == [EIO_PING]
108}
109
110/// Check if a text message is an Engine.IO close (just the character '1').
111fn is_eio_close(text: &str) -> bool {
112    text.as_bytes() == [EIO_CLOSE]
113}
114
115/// Check if a text message is the Engine.IO open packet (starts with '0').
116fn is_eio_open(text: &str) -> bool {
117    text.as_bytes().first() == Some(&EIO_OPEN)
118}
119
120/// Check if a text message is a Socket.IO namespace connect ack ('40{namespace},').
121fn is_namespace_connect_ack(text: &str) -> bool {
122    text.starts_with(&format!("40{namespace},", namespace = SOCKET_NAMESPACE))
123}
124
125/// Check if a text message is a Socket.IO namespace disconnect.
126fn is_namespace_disconnect(text: &str) -> bool {
127    text.starts_with(&format!("41{namespace}", namespace = SOCKET_NAMESPACE))
128}
129
130// ── Stream ───────────────────────────────────────────────────────────────
131
132/// Manages WebSocket streaming connections to the Limitless Exchange.
133///
134/// Connects to `wss://ws.limitless.exchange/socket.io/?EIO=4&transport=websocket`
135/// and handles the Socket.IO protocol (Engine.IO v4 + Socket.IO) over the
136/// raw WebSocket transport.
137///
138/// # Event Reference
139///
140/// | Client → Server (emit)         | Auth | Description                        |
141/// |-------------------------------|------|------------------------------------|
142/// | `subscribe_market_prices`     | No   | AMM prices + CLOB orderbook       |
143/// | `subscribe_positions`         | Yes  | Portfolio position updates         |
144/// | `subscribe_order_events`      | Yes  | OME + settlement lifecycle        |
145/// | `subscribe_market_lifecycle`  | No   | Market creation / resolution       |
146///
147/// | Server → Client (on)  | Auth | Description                          |
148/// |-----------------------|------|--------------------------------------|
149/// | `newPriceData`        | No   | AMM price update                     |
150/// | `orderbookUpdate`     | No   | CLOB orderbook snapshot              |
151/// | `positions`           | Yes  | Position balance change              |
152/// | `orderEvent`          | Yes  | OME state or settlement result       |
153/// | `marketCreated`       | No   | New market funded and visible        |
154/// | `marketResolved`      | No   | Market resolved with winning outcome |
155/// | `system`              | —    | System notifications                 |
156/// | `authenticated`       | Yes  | Auth confirmation                    |
157/// | `exception`           | —    | Error notifications                  |
158#[derive(Clone)]
159pub struct Stream {
160    pub client: Client,
161}
162
163impl Stream {
164    /// Tests connectivity by performing the full Socket.IO handshake.
165    ///
166    /// Connects, reads the Engine.IO open packet, sends namespace connect,
167    /// and verifies the server acknowledges it.
168    pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
169        let stream = self.client.wss_connect(None, false, None).await?;
170        let mut ws_client = WsClient::new(stream);
171
172        // ── Phase 1: Engine.IO open ──────────────────────────────
173        let open_text = Self::read_text_message(ws_client.stream()).await?;
174        if !is_eio_open(&open_text) {
175            return Err(LimitlessError::Base(format!(
176                "Expected Engine.IO open packet (0{{...}}), got: {}",
177                &open_text[..open_text.len().min(80)]
178            )));
179        }
180        trace!("Engine.IO open received: {}", open_text);
181
182        // ── Phase 2: Socket.IO namespace connect ─────────────────
183        let connect_frame = frame_socketio_connect();
184        ws_client
185            .stream()
186            .send(WsMessage::Text(connect_frame.into()))
187            .await
188            .map_err(|e| {
189                LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
190            })?;
191
192        let ack_text = Self::read_text_message(ws_client.stream()).await?;
193        if !is_namespace_connect_ack(&ack_text) {
194            return Err(LimitlessError::Base(format!(
195                "Expected namespace connect ack (40/markets,), got: {}",
196                &ack_text[..ack_text.len().min(80)]
197            )));
198        }
199        trace!("Namespace connected: {}", ack_text);
200
201        // ── Send proper close ────────────────────────────────────
202        let _ = ws_client
203            .stream()
204            .send(
205                WsMessage::Text(format!("41{namespace},", namespace = SOCKET_NAMESPACE).into())
206                    .into(),
207            )
208            .await;
209        let _ = ws_client.disconnect().await;
210
211        Ok(())
212    }
213
214    /// Subscribe to a public data stream with an event handler callback.
215    ///
216    /// The `handler` receives a `Value` that is an array `[event_name, payload]`
217    /// for Socket.IO events, or the raw parsed JSON for other messages.
218    ///
219    /// # Example
220    ///
221    /// ```no_run
222    /// use limitless::prelude::*;
223    ///
224    /// #[tokio::main]
225    /// async fn main() {
226    ///     let stream: Stream = Limitless::new(None, None);
227    ///     stream
228    ///         .ws_subscribe(|event| {
229    ///             println!("Received: {:?}", event);
230    ///             Ok(())
231    ///         })
232    ///         .await
233    ///         .unwrap();
234    /// }
235    /// ```
236    pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
237    where
238        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
239    {
240        let stream = self.client.wss_connect(None, false, None).await?;
241        let mut ws_client = WsClient::new(stream);
242        Self::event_loop(&mut ws_client, handler, None).await?;
243        Ok(())
244    }
245
246    /// Subscribe to a stream with dynamic command support.
247    ///
248    /// Allows emitting subscription commands (subscribe/unsubscribe) after
249    /// the connection is established. Commands should be complete Socket.IO
250    /// frames (e.g., `42/markets,["subscribe_market_prices",{...}]`).
251    ///
252    /// Use [`frame_socketio_event`] to build properly framed commands.
253    pub async fn ws_subscribe_with_commands<F>(
254        &self,
255        cmd_receiver: mpsc::UnboundedReceiver<String>,
256        handler: F,
257    ) -> Result<(), LimitlessError>
258    where
259        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
260    {
261        let stream = self.client.wss_connect(None, false, None).await?;
262        let mut ws_client = WsClient::new(stream);
263        Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
264        Ok(())
265    }
266
267    /// Subscribe to a stream with dynamic command support **and authentication**.
268    ///
269    /// Like [`ws_subscribe_with_commands`](Self::ws_subscribe_with_commands) but
270    /// sends the `X-API-Key` header on the WebSocket upgrade request, enabling
271    /// private channels:
272    ///
273    /// - `subscribe_positions` — real-time position balance updates
274    /// - `subscribe_order_events` — OME state changes + settlement results
275    ///
276    /// # Requirements
277    ///
278    /// The [`Stream`] must have been constructed with an API key (via
279    /// [`Limitless::new`] or [`LimitlessClient::builder().set_credentials()`]).
280    /// Without a key the connection is still established but private
281    /// subscriptions will fail with an `exception` event.
282    ///
283    /// # Example
284    ///
285    /// ```no_run
286    /// use limitless::prelude::*;
287    /// use tokio::sync::mpsc;
288    ///
289    /// #[tokio::main]
290    /// async fn main() {
291    ///     let ws: Stream = Limitless::new(
292    ///         Some("lmts_sk_...".into()),
293    ///         Some("base64_secret".into()),
294    ///     );
295    ///     let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
296    ///
297    ///     // Send subscription commands after connecting
298    ///     tokio::spawn(async move {
299    ///         let sub = frame_socketio_event("subscribe_order_events", &serde_json::json!({}));
300    ///         let _ = cmd_tx.send(sub);
301    ///     });
302    ///
303    ///     ws.ws_subscribe_authenticated_with_commands(cmd_rx, |event| {
304    ///         println!("Private event: {event}");
305    ///         Ok(())
306    ///     }).await.unwrap();
307    /// }
308    /// ```
309    pub async fn ws_subscribe_authenticated_with_commands<F>(
310        &self,
311        cmd_receiver: mpsc::UnboundedReceiver<String>,
312        handler: F,
313    ) -> Result<(), LimitlessError>
314    where
315        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
316    {
317        let stream = self.client.wss_connect(None, true, None).await?;
318        let mut ws_client = WsClient::new(stream);
319        Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
320        Ok(())
321    }
322
323    /// Subscribe to market updates for a specific slug.
324    ///
325    /// Handles the full lifecycle: connect, handshake, subscribe, and
326    /// event dispatch. The handler receives `[event_name, payload]` arrays.
327    ///
328    /// # Example
329    ///
330    /// ```no_run
331    /// use limitless::prelude::*;
332    ///
333    /// #[tokio::main]
334    /// async fn main() {
335    ///     let ws: Stream = Limitless::new(None, None);
336    ///     ws.ws_subscribe_market("btc-above-100k", |event_name, payload| {
337    ///         println!("{event_name}: {payload}");
338    ///         Ok(())
339    ///     }).await.unwrap();
340    /// }
341    /// ```
342    pub async fn ws_subscribe_market<F>(
343        &self,
344        market_slug: &str,
345        mut handler: F,
346    ) -> Result<(), LimitlessError>
347    where
348        F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
349    {
350        let stream = self.client.wss_connect(None, false, None).await?;
351        let mut ws_client = WsClient::new(stream);
352
353        // ── Handshake ─────────────────────────────────────────────
354        Self::perform_handshake(&mut ws_client).await?;
355
356        // ── Subscribe ─────────────────────────────────────────────
357        let sub = frame_socketio_event(
358            "subscribe_market_prices",
359            &serde_json::json!({"marketSlugs": [market_slug]}),
360        );
361        ws_client
362            .stream()
363            .send(WsMessage::Text(sub.into()))
364            .await
365            .map_err(|e| LimitlessError::Base(format!("Failed to send subscription: {}", e)))?;
366        debug!("Subscribed to market prices for: {}", market_slug);
367
368        // ── Event loop with typed dispatch ────────────────────────
369        Self::typed_event_loop(&mut ws_client, &mut handler, None).await?;
370        Ok(())
371    }
372
373    /// Subscribe to the WebSocket event stream and receive typed [`WsEventKind`] events.
374    ///
375    /// Connects, performs the Socket.IO handshake, then enters an event
376    /// loop that parses every incoming server event through
377    /// [`deserialize_event`] before passing the resulting [`WsEventKind`]
378    /// to `handler`.
379    ///
380    /// # Example
381    ///
382    /// ```no_run
383    /// use limitless::prelude::*;
384    ///
385    /// #[tokio::main]
386    /// async fn main() {
387    ///     let ws: Stream = Limitless::new(None, None);
388    ///     ws.ws_subscribe_events(|event| {
389    ///         match event {
390    ///             WsEventKind::NewPriceData(p) => println!("AMM prices for {}", p.market_address),
391    ///             WsEventKind::TradeEvent(t) => println!("Trade: {} @ {}", t.size, t.price),
392    ///             WsEventKind::Unknown(payload) => println!("Unknown: {payload:?}"),
393    ///             other => println!("Event: {other:?}"),
394    ///         }
395    ///         Ok(())
396    ///     }).await.unwrap();
397    /// }
398    /// ```
399    pub async fn ws_subscribe_events<F>(&self, mut handler: F) -> Result<(), LimitlessError>
400    where
401        F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
402    {
403        let stream = self.client.wss_connect(None, false, None).await?;
404        let mut ws_client = WsClient::new(stream);
405
406        // ── Handshake ─────────────────────────────────────────────
407        Self::perform_handshake(&mut ws_client).await?;
408
409        // ── Typed dispatch wrapper ────────────────────────────────
410        let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
411            match deserialize_event(event_name, payload) {
412                Some(kind) => handler(kind),
413                None => {
414                    debug!("Failed to deserialize event '{}', skipping", event_name);
415                    Ok(())
416                }
417            }
418        };
419
420        // ── Event loop with typed dispatch ────────────────────────
421        Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
422        Ok(())
423    }
424
425    /// Subscribe to typed WebSocket events **with authentication**.
426    ///
427    /// Like [`ws_subscribe_events`](Self::ws_subscribe_events) but sends the
428    /// `X-API-Key` header on the WebSocket upgrade request, enabling private
429    /// channels such as `positions` and `orderEvent`.
430    ///
431    /// # Example
432    ///
433    /// ```no_run
434    /// use limitless::prelude::*;
435    ///
436    /// #[tokio::main]
437    /// async fn main() {
438    ///     let ws: Stream = Limitless::new(
439    ///         Some("lmts_sk_...".into()),
440    ///         Some("base64_secret".into()),
441    ///     );
442    ///     ws.ws_subscribe_authenticated_events(|event| {
443    ///         match event {
444    ///             WsEventKind::Positions(p) => println!("Position update: {p:?}"),
445    ///             WsEventKind::OrderEvent(o) => println!("Order event: {o:?}"),
446    ///             other => println!("Other: {other:?}"),
447    ///         }
448    ///         Ok(())
449    ///     }).await.unwrap();
450    /// }
451    /// ```
452    pub async fn ws_subscribe_authenticated_events<F>(
453        &self,
454        mut handler: F,
455    ) -> Result<(), LimitlessError>
456    where
457        F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
458    {
459        let stream = self.client.wss_connect(None, true, None).await?;
460        let mut ws_client = WsClient::new(stream);
461
462        Self::perform_handshake(&mut ws_client).await?;
463
464        let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
465            match deserialize_event(event_name, payload) {
466                Some(kind) => handler(kind),
467                None => {
468                    debug!("Failed to deserialize event '{}', skipping", event_name);
469                    Ok(())
470                }
471            }
472        };
473
474        Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
475        Ok(())
476    }
477
478    /// Perform the Socket.IO handshake: read Engine.IO open, send namespace
479    /// connect, wait for ack.
480    async fn perform_handshake(ws_client: &mut WsClient) -> Result<(), LimitlessError> {
481        // Read Engine.IO open packet
482        let open_text = Self::read_text_message(ws_client.stream()).await?;
483        if !is_eio_open(&open_text) {
484            return Err(LimitlessError::Base(format!(
485                "Expected Engine.IO open packet (0{{...}}), got: {}",
486                &open_text[..open_text.len().min(120)]
487            )));
488        }
489        debug!("Engine.IO open: {}", open_text);
490
491        // Send namespace connect
492        let connect_frame = frame_socketio_connect();
493        ws_client
494            .stream()
495            .send(WsMessage::Text(connect_frame.into()))
496            .await
497            .map_err(|e| {
498                LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
499            })?;
500
501        // Read namespace connect ack
502        let ack_text = Self::read_text_message(ws_client.stream()).await?;
503        if !is_namespace_connect_ack(&ack_text) {
504            return Err(LimitlessError::Base(format!(
505                "Expected namespace connect ack (40/markets,), got: {}",
506                &ack_text[..ack_text.len().min(120)]
507            )));
508        }
509        debug!("Namespace connected: {}", ack_text);
510
511        Ok(())
512    }
513
514    /// Read the next text (or binary → text) message from the stream.
515    async fn read_text_message(
516        stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
517    ) -> Result<String, LimitlessError> {
518        match stream.next().await {
519            Some(Ok(WsMessage::Text(text))) => Ok(text.to_string()),
520            Some(Ok(WsMessage::Binary(data))) => String::from_utf8(data.to_vec())
521                .map_err(|e| LimitlessError::Base(format!("Invalid UTF-8 in binary frame: {}", e))),
522            Some(Ok(other)) => Err(LimitlessError::Base(format!(
523                "Expected text frame, got: {:?}",
524                other
525            ))),
526            Some(Err(e)) => Err(LimitlessError::Tungstenite(e)),
527            None => Err(LimitlessError::Base(
528                "WebSocket connection closed during handshake".to_string(),
529            )),
530        }
531    }
532
533    /// Core event loop: reads WebSocket messages, dispatches to handler,
534    /// sends periodic pings, and processes outgoing subscription commands.
535    ///
536    /// Performs the Socket.IO handshake before entering the main loop.
537    pub(crate) async fn event_loop<F>(
538        ws_client: &mut WsClient,
539        mut handler: F,
540        mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
541    ) -> Result<(), LimitlessError>
542    where
543        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
544    {
545        // ── Handshake phase ────────────────────────────────────────────
546        Self::perform_handshake(ws_client).await?;
547
548        // ── Main event loop ────────────────────────────────────────────
549        let mut last_ping = Instant::now();
550
551        loop {
552            tokio::select! {
553                // ── Incoming WebSocket message ─────────────────────────
554                msg = ws_client.stream().next() => {
555                    match msg {
556                        Some(Ok(WsMessage::Text(text))) => {
557                            Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
558                        }
559                        Some(Ok(WsMessage::Binary(data))) => {
560                            if let Ok(text) = String::from_utf8(data.to_vec()) {
561                                Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
562                            }
563                        }
564                        Some(Ok(WsMessage::Ping(data))) => {
565                            let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
566                        }
567                        Some(Ok(WsMessage::Close(_))) => {
568                            trace!("WebSocket closed by server");
569                            return Ok(());
570                        }
571                        Some(Err(e)) => {
572                            return Err(LimitlessError::Tungstenite(e));
573                        }
574                        None => {
575                            trace!("WebSocket stream ended");
576                            return Ok(());
577                        }
578                        _ => {}
579                    }
580                }
581
582                // ── Outgoing command ───────────────────────────────────
583                cmd = async {
584                    match cmd_receiver.as_mut() {
585                        Some(rx) => rx.recv().await,
586                        None => std::future::pending().await,
587                    }
588                } => {
589                    if let Some(cmd) = cmd {
590                        debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
591                        if let Err(e) = ws_client
592                            .stream()
593                            .send(WsMessage::Text(cmd.into()))
594                            .await
595                        {
596                            error!("Failed to send command: {}", e);
597                        }
598                    }
599                }
600
601                // ── Periodic Engine.IO ping ────────────────────────────
602                _ = tokio::time::sleep(PING_INTERVAL) => {
603                    let now = Instant::now();
604                    if now.duration_since(last_ping) >= PING_INTERVAL {
605                        // Send Engine.IO ping (the string "2")
606                        let _ = ws_client
607                            .stream()
608                            .send(WsMessage::Text(String::from("2").into()))
609                            .await;
610                        last_ping = now;
611                    }
612                }
613            }
614        }
615    }
616
617    /// Typed event loop: like `event_loop` but calls a `FnMut(&str, &Value)`
618    /// handler instead of `FnMut(Value)`.
619    pub(crate) async fn typed_event_loop<F>(
620        ws_client: &mut WsClient,
621        handler: &mut F,
622        mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
623    ) -> Result<(), LimitlessError>
624    where
625        F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
626    {
627        let mut last_ping = Instant::now();
628
629        loop {
630            tokio::select! {
631                // ── Incoming WebSocket message ─────────────────────────
632                msg = ws_client.stream().next() => {
633                    match msg {
634                        Some(Ok(WsMessage::Text(text))) => {
635                            if let Some((event_name, payload)) = parse_socketio_message(&text) {
636                                if let Err(e) = handler(&event_name, &payload) {
637                                    error!("WS handler error on '{event_name}': {}", e);
638                                }
639                            } else if is_eio_ping(&text) {
640                                let _ = ws_client.stream()
641                                    .send(WsMessage::Text(String::from("3").into()))
642                                    .await;
643                            } else if is_eio_close(&text) || is_namespace_disconnect(&text) {
644                                trace!("Socket.IO close/disconnect received");
645                                return Ok(());
646                            }
647                            // Ignore other messages (open, connect ack, pong, etc.)
648                        }
649                        Some(Ok(WsMessage::Binary(data))) => {
650                            if let Ok(text) = String::from_utf8(data.to_vec()) {
651                                if let Some((event_name, payload)) = parse_socketio_message(&text) {
652                                    if let Err(e) = handler(&event_name, &payload) {
653                                        error!("WS handler error on '{event_name}': {}", e);
654                                    }
655                                }
656                            }
657                        }
658                        Some(Ok(WsMessage::Ping(data))) => {
659                            let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
660                        }
661                        Some(Ok(WsMessage::Close(_))) => {
662                            trace!("WebSocket closed by server");
663                            return Ok(());
664                        }
665                        Some(Err(e)) => {
666                            return Err(LimitlessError::Tungstenite(e));
667                        }
668                        None => {
669                            trace!("WebSocket stream ended");
670                            return Ok(());
671                        }
672                        _ => {}
673                    }
674                }
675
676                // ── Outgoing command ───────────────────────────────────
677                cmd = async {
678                    match cmd_receiver.as_mut() {
679                        Some(rx) => rx.recv().await,
680                        None => std::future::pending().await,
681                    }
682                } => {
683                    if let Some(cmd) = cmd {
684                        debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
685                        if let Err(e) = ws_client
686                            .stream()
687                            .send(WsMessage::Text(cmd.into()))
688                            .await
689                        {
690                            error!("Failed to send command: {}", e);
691                        }
692                    }
693                }
694
695                // ── Periodic Engine.IO ping ────────────────────────────
696                _ = tokio::time::sleep(PING_INTERVAL) => {
697                    let now = Instant::now();
698                    if now.duration_since(last_ping) >= PING_INTERVAL {
699                        let _ = ws_client
700                            .stream()
701                            .send(WsMessage::Text(String::from("2").into()))
702                            .await;
703                        last_ping = now;
704                    }
705                }
706            }
707        }
708    }
709
710    /// Handle an incoming text message from the WebSocket.
711    ///
712    /// Dispatches Engine.IO control frames and Socket.IO events.
713    async fn handle_incoming_text<F>(
714        text: &str,
715        handler: &mut F,
716        ws_client: &mut WsClient,
717    ) -> Result<(), LimitlessError>
718    where
719        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
720    {
721        // Engine.IO ping → reply pong
722        if is_eio_ping(text) {
723            let _ = ws_client
724                .stream()
725                .send(WsMessage::Text(String::from("3").into()))
726                .await;
727            return Ok(());
728        }
729
730        // Engine.IO close or Socket.IO namespace disconnect
731        if is_eio_close(text) || is_namespace_disconnect(text) {
732            trace!("Socket.IO close/disconnect");
733            return Err(LimitlessError::Base(
734                "Server closed the Socket.IO connection".to_string(),
735            ));
736        }
737
738        // Engine.IO pong — ignore
739        if text.as_bytes() == [EIO_PONG] {
740            return Ok(());
741        }
742
743        // Try to parse as a Socket.IO event (42/markets,[...])
744        if let Some((event_name, payload)) = parse_socketio_message(text) {
745            // Pass as [event_name, payload] array for backward compat
746            let event_array = serde_json::json!([event_name, payload]);
747            if let Err(e) = handler(event_array) {
748                error!("WS handler error on '{event_name}': {}", e);
749            }
750            return Ok(());
751        }
752
753        // Socket.IO connect ack, plain open, etc. — ignore
754        if is_namespace_connect_ack(text) || is_eio_open(text) {
755            return Ok(());
756        }
757
758        // Unknown message — log and try to parse as raw JSON
759        warn!("Unhandled WS message: {}", &text[..text.len().min(200)]);
760        if let Ok(value) = serde_json::from_str::<Value>(text) {
761            let _ = handler(value);
762        }
763
764        Ok(())
765    }
766}
767
768impl Limitless for Stream {
769    fn new(api_key: Option<String>, secret: Option<String>) -> Self {
770        Self::new_with_config(&Config::default(), api_key, secret)
771    }
772
773    fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
774        Self {
775            client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
776        }
777    }
778}