Skip to main content

limitless/ws/
stream.rs

1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{debug, error, trace, warn};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_tungstenite::tungstenite::Message as WsMessage;
11
12// ── Socket.IO / Engine.IO protocol constants ─────────────────────────────
13
14/// The Socket.IO namespace used by the Limitless Exchange.
15pub const SOCKET_NAMESPACE: &str = "/markets";
16
17/// Engine.IO open packet prefix (server → client handshake).
18const EIO_OPEN: u8 = b'0';
19
20/// Engine.IO close packet.
21const EIO_CLOSE: u8 = b'1';
22
23/// Engine.IO ping packet (server → client).
24const EIO_PING: u8 = b'2';
25
26/// Engine.IO pong packet (client → server).
27const EIO_PONG: u8 = b'3';
28
29/// Engine.IO message packet (carries Socket.IO payload).
30const EIO_MESSAGE: u8 = b'4';
31
32/// Socket.IO connect packet type.
33/// Socket.IO disconnect packet type.
34/// Socket.IO event packet type.
35const SIO_EVENT: u8 = b'2';
36
37/// Socket.IO error packet type.
38// ── Helpers ──────────────────────────────────────────────────────────────
39
40/// Build a Socket.IO event frame for emission to the server.
41///
42/// Format: `42{namespace},["{event}",{data}]`
43///
44/// Returns a complete frame ready to send as a WebSocket text message.
45pub fn frame_socketio_event(event: &str, data: &Value) -> String {
46    let payload = serde_json::to_string(&serde_json::json!([event, data]))
47        .unwrap_or_else(|_| format!(r#"["{}",null]"#, event));
48    format!("42{namespace},{payload}", namespace = SOCKET_NAMESPACE)
49}
50
51/// Build a Socket.IO namespace connect frame.
52///
53/// Format: `40{namespace},`
54pub fn frame_socketio_connect() -> String {
55    format!("40{namespace},", namespace = SOCKET_NAMESPACE)
56}
57
58/// Parse an Engine.IO text message and extract the Socket.IO event name
59/// and payload if this is an event (`42{namespace},[...]`).
60///
61/// Returns `Some((event_name, payload_value))` for events, `None` for
62/// non-event messages (open, close, ping/pong, connect ack, etc.).
63pub fn parse_socketio_message(text: &str) -> Option<(String, Value)> {
64    let bytes = text.as_bytes();
65
66    // Must start with '4' (Engine.IO message)
67    if bytes.is_empty() || bytes[0] != EIO_MESSAGE {
68        return None;
69    }
70
71    let after_eio = &text[1..];
72
73    // Must start with a digit (Socket.IO packet type)
74    let first_sio = after_eio.as_bytes().first()?;
75    if *first_sio != SIO_EVENT {
76        // '40' = connect, '41' = disconnect, '43' = ack, '44' = error
77        // None of these carry user events
78        return None;
79    }
80
81    // Skip the Socket.IO type digit: now we have "2{namespace},[...]"
82    let after_sio_type = &after_eio[1..];
83
84    // Strip namespace prefix: "{namespace},"
85    let event_payload = if let Some(rest) = after_sio_type.strip_prefix(SOCKET_NAMESPACE) {
86        // Strip the comma after namespace
87        rest.strip_prefix(',')?
88    } else {
89        // No namespace prefix — payload starts right after the type digit
90        // (e.g., "2[...]")
91        after_sio_type
92    };
93
94    // Parse the JSON array: ["eventName", {...}]
95    let values: Vec<Value> = serde_json::from_str(event_payload).ok()?;
96    if values.is_empty() {
97        return None;
98    }
99    let event_name = values[0].as_str()?.to_string();
100    let payload = values.get(1).cloned().unwrap_or(Value::Null);
101
102    Some((event_name, payload))
103}
104
105/// Check if a text message is an Engine.IO ping (just the character '2').
106fn is_eio_ping(text: &str) -> bool {
107    text.as_bytes() == [EIO_PING]
108}
109
110/// Check if a text message is an Engine.IO close (just the character '1').
111fn is_eio_close(text: &str) -> bool {
112    text.as_bytes() == [EIO_CLOSE]
113}
114
115/// Check if a text message is the Engine.IO open packet (starts with '0').
116fn is_eio_open(text: &str) -> bool {
117    text.as_bytes().first() == Some(&EIO_OPEN)
118}
119
120/// Check if a text message is a Socket.IO namespace connect ack ('40{namespace},').
121fn is_namespace_connect_ack(text: &str) -> bool {
122    text.starts_with(&format!("40{namespace},", namespace = SOCKET_NAMESPACE))
123}
124
125/// Check if a text message is a Socket.IO namespace disconnect.
126fn is_namespace_disconnect(text: &str) -> bool {
127    text.starts_with(&format!("41{namespace}", namespace = SOCKET_NAMESPACE))
128}
129
130// ── Stream ───────────────────────────────────────────────────────────────
131
132/// Manages WebSocket streaming connections to the Limitless Exchange.
133///
134/// Connects to `wss://ws.limitless.exchange/socket.io/?EIO=4&transport=websocket`
135/// and handles the Socket.IO protocol (Engine.IO v4 + Socket.IO) over the
136/// raw WebSocket transport.
137///
138/// # Event Reference
139///
140/// | Client → Server (emit)         | Auth | Description                        |
141/// |-------------------------------|------|------------------------------------|
142/// | `subscribe_market_prices`     | No   | AMM prices + CLOB orderbook       |
143/// | `subscribe_positions`         | Yes  | Portfolio position updates         |
144/// | `subscribe_order_events`      | Yes  | OME + settlement lifecycle        |
145/// | `subscribe_market_lifecycle`  | No   | Market creation / resolution       |
146///
147/// | Server → Client (on)  | Auth | Description                          |
148/// |-----------------------|------|--------------------------------------|
149/// | `newPriceData`        | No   | AMM price update                     |
150/// | `orderbookUpdate`     | No   | CLOB orderbook snapshot              |
151/// | `positions`           | Yes  | Position balance change              |
152/// | `orderEvent`          | Yes  | OME state or settlement result       |
153/// | `marketCreated`       | No   | New market funded and visible        |
154/// | `marketResolved`      | No   | Market resolved with winning outcome |
155/// | `system`              | —    | System notifications                 |
156/// | `authenticated`       | Yes  | Auth confirmation                    |
157/// | `exception`           | —    | Error notifications                  |
158#[derive(Clone)]
159pub struct Stream {
160    pub client: Client,
161}
162
163impl Stream {
164    /// Tests connectivity by performing the full Socket.IO handshake.
165    ///
166    /// Connects, reads the Engine.IO open packet, sends namespace connect,
167    /// and verifies the server acknowledges it.
168    pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
169        let stream = self.client.wss_connect(None, false, None).await?;
170        let mut ws_client = WsClient::new(stream);
171
172        // ── Phase 1: Engine.IO open ──────────────────────────────
173        let open_text = Self::read_text_message(ws_client.stream()).await?;
174        if !is_eio_open(&open_text) {
175            return Err(LimitlessError::Base(format!(
176                "Expected Engine.IO open packet (0{{...}}), got: {}",
177                &open_text[..open_text.len().min(80)]
178            )));
179        }
180        trace!("Engine.IO open received: {}", open_text);
181
182        // ── Phase 2: Socket.IO namespace connect ─────────────────
183        let connect_frame = frame_socketio_connect();
184        ws_client
185            .stream()
186            .send(WsMessage::Text(connect_frame.into()))
187            .await
188            .map_err(|e| {
189                LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
190            })?;
191
192        let ack_text = Self::read_text_message(ws_client.stream()).await?;
193        if !is_namespace_connect_ack(&ack_text) {
194            return Err(LimitlessError::Base(format!(
195                "Expected namespace connect ack (40/markets,), got: {}",
196                &ack_text[..ack_text.len().min(80)]
197            )));
198        }
199        trace!("Namespace connected: {}", ack_text);
200
201        // ── Send proper close ────────────────────────────────────
202        let _ = ws_client
203            .stream()
204            .send(
205                WsMessage::Text(format!("41{namespace},", namespace = SOCKET_NAMESPACE).into())
206                    .into(),
207            )
208            .await;
209        let _ = ws_client.disconnect().await;
210
211        Ok(())
212    }
213
214    /// Subscribe to a public data stream with an event handler callback.
215    ///
216    /// The `handler` receives a `Value` that is an array `[event_name, payload]`
217    /// for Socket.IO events, or the raw parsed JSON for other messages.
218    ///
219    /// # Example
220    ///
221    /// ```no_run
222    /// use limitless::prelude::*;
223    ///
224    /// #[tokio::main]
225    /// async fn main() {
226    ///     let stream: Stream = Limitless::new(None, None);
227    ///     stream
228    ///         .ws_subscribe(|event| {
229    ///             println!("Received: {:?}", event);
230    ///             Ok(())
231    ///         })
232    ///         .await
233    ///         .unwrap();
234    /// }
235    /// ```
236    pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
237    where
238        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
239    {
240        let stream = self.client.wss_connect(None, false, None).await?;
241        let mut ws_client = WsClient::new(stream);
242        Self::event_loop(&mut ws_client, handler, None).await?;
243        Ok(())
244    }
245
246    /// Subscribe to a stream with dynamic command support.
247    ///
248    /// Allows emitting subscription commands (subscribe/unsubscribe) after
249    /// the connection is established. Commands should be complete Socket.IO
250    /// frames (e.g., `42/markets,["subscribe_market_prices",{...}]`).
251    ///
252    /// Use [`frame_socketio_event`] to build properly framed commands.
253    pub async fn ws_subscribe_with_commands<F>(
254        &self,
255        cmd_receiver: mpsc::UnboundedReceiver<String>,
256        handler: F,
257    ) -> Result<(), LimitlessError>
258    where
259        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
260    {
261        let stream = self.client.wss_connect(None, false, None).await?;
262        let mut ws_client = WsClient::new(stream);
263        Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
264        Ok(())
265    }
266
267    /// Subscribe to market updates for a specific slug.
268    ///
269    /// Handles the full lifecycle: connect, handshake, subscribe, and
270    /// event dispatch. The handler receives `[event_name, payload]` arrays.
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// use limitless::prelude::*;
276    ///
277    /// #[tokio::main]
278    /// async fn main() {
279    ///     let ws: Stream = Limitless::new(None, None);
280    ///     ws.ws_subscribe_market("btc-above-100k", |event_name, payload| {
281    ///         println!("{event_name}: {payload}");
282    ///         Ok(())
283    ///     }).await.unwrap();
284    /// }
285    /// ```
286    pub async fn ws_subscribe_market<F>(
287        &self,
288        market_slug: &str,
289        mut handler: F,
290    ) -> Result<(), LimitlessError>
291    where
292        F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
293    {
294        let stream = self.client.wss_connect(None, false, None).await?;
295        let mut ws_client = WsClient::new(stream);
296
297        // ── Handshake ─────────────────────────────────────────────
298        Self::perform_handshake(&mut ws_client).await?;
299
300        // ── Subscribe ─────────────────────────────────────────────
301        let sub = frame_socketio_event(
302            "subscribe_market_prices",
303            &serde_json::json!({"marketSlugs": [market_slug]}),
304        );
305        ws_client
306            .stream()
307            .send(WsMessage::Text(sub.into()))
308            .await
309            .map_err(|e| LimitlessError::Base(format!("Failed to send subscription: {}", e)))?;
310        debug!("Subscribed to market prices for: {}", market_slug);
311
312        // ── Event loop with typed dispatch ────────────────────────
313        Self::typed_event_loop(&mut ws_client, &mut handler, None).await?;
314        Ok(())
315    }
316
317    /// Subscribe to the WebSocket event stream and receive typed [`WsEventKind`] events.
318    ///
319    /// Connects, performs the Socket.IO handshake, then enters an event
320    /// loop that parses every incoming server event through
321    /// [`deserialize_event`] before passing the resulting [`WsEventKind`]
322    /// to `handler`.
323    ///
324    /// # Example
325    ///
326    /// ```no_run
327    /// use limitless::prelude::*;
328    ///
329    /// #[tokio::main]
330    /// async fn main() {
331    ///     let ws: Stream = Limitless::new(None, None);
332    ///     ws.ws_subscribe_events(|event| {
333    ///         match event {
334    ///             WsEventKind::NewPriceData(p) => println!("AMM prices for {}", p.market_address),
335    ///             WsEventKind::TradeEvent(t) => println!("Trade: {} @ {}", t.size, t.price),
336    ///             WsEventKind::Unknown(payload) => println!("Unknown: {payload:?}"),
337    ///             other => println!("Event: {other:?}"),
338    ///         }
339    ///         Ok(())
340    ///     }).await.unwrap();
341    /// }
342    /// ```
343    pub async fn ws_subscribe_events<F>(&self, mut handler: F) -> Result<(), LimitlessError>
344    where
345        F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
346    {
347        let stream = self.client.wss_connect(None, false, None).await?;
348        let mut ws_client = WsClient::new(stream);
349
350        // ── Handshake ─────────────────────────────────────────────
351        Self::perform_handshake(&mut ws_client).await?;
352
353        // ── Typed dispatch wrapper ────────────────────────────────
354        let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
355            match deserialize_event(event_name, payload) {
356                Some(kind) => handler(kind),
357                None => {
358                    debug!("Failed to deserialize event '{}', skipping", event_name);
359                    Ok(())
360                }
361            }
362        };
363
364        // ── Event loop with typed dispatch ────────────────────────
365        Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
366        Ok(())
367    }
368
369    /// Perform the Socket.IO handshake: read Engine.IO open, send namespace
370    /// connect, wait for ack.
371    async fn perform_handshake(ws_client: &mut WsClient) -> Result<(), LimitlessError> {
372        // Read Engine.IO open packet
373        let open_text = Self::read_text_message(ws_client.stream()).await?;
374        if !is_eio_open(&open_text) {
375            return Err(LimitlessError::Base(format!(
376                "Expected Engine.IO open packet (0{{...}}), got: {}",
377                &open_text[..open_text.len().min(120)]
378            )));
379        }
380        debug!("Engine.IO open: {}", open_text);
381
382        // Send namespace connect
383        let connect_frame = frame_socketio_connect();
384        ws_client
385            .stream()
386            .send(WsMessage::Text(connect_frame.into()))
387            .await
388            .map_err(|e| {
389                LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
390            })?;
391
392        // Read namespace connect ack
393        let ack_text = Self::read_text_message(ws_client.stream()).await?;
394        if !is_namespace_connect_ack(&ack_text) {
395            return Err(LimitlessError::Base(format!(
396                "Expected namespace connect ack (40/markets,), got: {}",
397                &ack_text[..ack_text.len().min(120)]
398            )));
399        }
400        debug!("Namespace connected: {}", ack_text);
401
402        Ok(())
403    }
404
405    /// Read the next text (or binary → text) message from the stream.
406    async fn read_text_message(
407        stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
408    ) -> Result<String, LimitlessError> {
409        match stream.next().await {
410            Some(Ok(WsMessage::Text(text))) => Ok(text.to_string()),
411            Some(Ok(WsMessage::Binary(data))) => String::from_utf8(data.to_vec())
412                .map_err(|e| LimitlessError::Base(format!("Invalid UTF-8 in binary frame: {}", e))),
413            Some(Ok(other)) => Err(LimitlessError::Base(format!(
414                "Expected text frame, got: {:?}",
415                other
416            ))),
417            Some(Err(e)) => Err(LimitlessError::Tungstenite(e)),
418            None => Err(LimitlessError::Base(
419                "WebSocket connection closed during handshake".to_string(),
420            )),
421        }
422    }
423
424    /// Core event loop: reads WebSocket messages, dispatches to handler,
425    /// sends periodic pings, and processes outgoing subscription commands.
426    ///
427    /// Performs the Socket.IO handshake before entering the main loop.
428    pub(crate) async fn event_loop<F>(
429        ws_client: &mut WsClient,
430        mut handler: F,
431        mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
432    ) -> Result<(), LimitlessError>
433    where
434        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
435    {
436        // ── Handshake phase ────────────────────────────────────────────
437        Self::perform_handshake(ws_client).await?;
438
439        // ── Main event loop ────────────────────────────────────────────
440        let mut last_ping = Instant::now();
441
442        loop {
443            tokio::select! {
444                // ── Incoming WebSocket message ─────────────────────────
445                msg = ws_client.stream().next() => {
446                    match msg {
447                        Some(Ok(WsMessage::Text(text))) => {
448                            Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
449                        }
450                        Some(Ok(WsMessage::Binary(data))) => {
451                            if let Ok(text) = String::from_utf8(data.to_vec()) {
452                                Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
453                            }
454                        }
455                        Some(Ok(WsMessage::Ping(data))) => {
456                            let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
457                        }
458                        Some(Ok(WsMessage::Close(_))) => {
459                            trace!("WebSocket closed by server");
460                            return Ok(());
461                        }
462                        Some(Err(e)) => {
463                            return Err(LimitlessError::Tungstenite(e));
464                        }
465                        None => {
466                            trace!("WebSocket stream ended");
467                            return Ok(());
468                        }
469                        _ => {}
470                    }
471                }
472
473                // ── Outgoing command ───────────────────────────────────
474                cmd = async {
475                    match cmd_receiver.as_mut() {
476                        Some(rx) => rx.recv().await,
477                        None => std::future::pending().await,
478                    }
479                } => {
480                    if let Some(cmd) = cmd {
481                        debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
482                        if let Err(e) = ws_client
483                            .stream()
484                            .send(WsMessage::Text(cmd.into()))
485                            .await
486                        {
487                            error!("Failed to send command: {}", e);
488                        }
489                    }
490                }
491
492                // ── Periodic Engine.IO ping ────────────────────────────
493                _ = tokio::time::sleep(PING_INTERVAL) => {
494                    let now = Instant::now();
495                    if now.duration_since(last_ping) >= PING_INTERVAL {
496                        // Send Engine.IO ping (the string "2")
497                        let _ = ws_client
498                            .stream()
499                            .send(WsMessage::Text(String::from("2").into()))
500                            .await;
501                        last_ping = now;
502                    }
503                }
504            }
505        }
506    }
507
508    /// Typed event loop: like `event_loop` but calls a `FnMut(&str, &Value)`
509    /// handler instead of `FnMut(Value)`.
510    pub(crate) async fn typed_event_loop<F>(
511        ws_client: &mut WsClient,
512        handler: &mut F,
513        mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
514    ) -> Result<(), LimitlessError>
515    where
516        F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
517    {
518        let mut last_ping = Instant::now();
519
520        loop {
521            tokio::select! {
522                // ── Incoming WebSocket message ─────────────────────────
523                msg = ws_client.stream().next() => {
524                    match msg {
525                        Some(Ok(WsMessage::Text(text))) => {
526                            if let Some((event_name, payload)) = parse_socketio_message(&text) {
527                                if let Err(e) = handler(&event_name, &payload) {
528                                    error!("WS handler error on '{event_name}': {}", e);
529                                }
530                            } else if is_eio_ping(&text) {
531                                let _ = ws_client.stream()
532                                    .send(WsMessage::Text(String::from("3").into()))
533                                    .await;
534                            } else if is_eio_close(&text) || is_namespace_disconnect(&text) {
535                                trace!("Socket.IO close/disconnect received");
536                                return Ok(());
537                            }
538                            // Ignore other messages (open, connect ack, pong, etc.)
539                        }
540                        Some(Ok(WsMessage::Binary(data))) => {
541                            if let Ok(text) = String::from_utf8(data.to_vec()) {
542                                if let Some((event_name, payload)) = parse_socketio_message(&text) {
543                                    if let Err(e) = handler(&event_name, &payload) {
544                                        error!("WS handler error on '{event_name}': {}", e);
545                                    }
546                                }
547                            }
548                        }
549                        Some(Ok(WsMessage::Ping(data))) => {
550                            let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
551                        }
552                        Some(Ok(WsMessage::Close(_))) => {
553                            trace!("WebSocket closed by server");
554                            return Ok(());
555                        }
556                        Some(Err(e)) => {
557                            return Err(LimitlessError::Tungstenite(e));
558                        }
559                        None => {
560                            trace!("WebSocket stream ended");
561                            return Ok(());
562                        }
563                        _ => {}
564                    }
565                }
566
567                // ── Outgoing command ───────────────────────────────────
568                cmd = async {
569                    match cmd_receiver.as_mut() {
570                        Some(rx) => rx.recv().await,
571                        None => std::future::pending().await,
572                    }
573                } => {
574                    if let Some(cmd) = cmd {
575                        debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
576                        if let Err(e) = ws_client
577                            .stream()
578                            .send(WsMessage::Text(cmd.into()))
579                            .await
580                        {
581                            error!("Failed to send command: {}", e);
582                        }
583                    }
584                }
585
586                // ── Periodic Engine.IO ping ────────────────────────────
587                _ = tokio::time::sleep(PING_INTERVAL) => {
588                    let now = Instant::now();
589                    if now.duration_since(last_ping) >= PING_INTERVAL {
590                        let _ = ws_client
591                            .stream()
592                            .send(WsMessage::Text(String::from("2").into()))
593                            .await;
594                        last_ping = now;
595                    }
596                }
597            }
598        }
599    }
600
601    /// Handle an incoming text message from the WebSocket.
602    ///
603    /// Dispatches Engine.IO control frames and Socket.IO events.
604    async fn handle_incoming_text<F>(
605        text: &str,
606        handler: &mut F,
607        ws_client: &mut WsClient,
608    ) -> Result<(), LimitlessError>
609    where
610        F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
611    {
612        // Engine.IO ping → reply pong
613        if is_eio_ping(text) {
614            let _ = ws_client
615                .stream()
616                .send(WsMessage::Text(String::from("3").into()))
617                .await;
618            return Ok(());
619        }
620
621        // Engine.IO close or Socket.IO namespace disconnect
622        if is_eio_close(text) || is_namespace_disconnect(text) {
623            trace!("Socket.IO close/disconnect");
624            return Err(LimitlessError::Base(
625                "Server closed the Socket.IO connection".to_string(),
626            ));
627        }
628
629        // Engine.IO pong — ignore
630        if text.as_bytes() == [EIO_PONG] {
631            return Ok(());
632        }
633
634        // Try to parse as a Socket.IO event (42/markets,[...])
635        if let Some((event_name, payload)) = parse_socketio_message(text) {
636            // Pass as [event_name, payload] array for backward compat
637            let event_array = serde_json::json!([event_name, payload]);
638            if let Err(e) = handler(event_array) {
639                error!("WS handler error on '{event_name}': {}", e);
640            }
641            return Ok(());
642        }
643
644        // Socket.IO connect ack, plain open, etc. — ignore
645        if is_namespace_connect_ack(text) || is_eio_open(text) {
646            return Ok(());
647        }
648
649        // Unknown message — log and try to parse as raw JSON
650        warn!("Unhandled WS message: {}", &text[..text.len().min(200)]);
651        if let Ok(value) = serde_json::from_str::<Value>(text) {
652            let _ = handler(value);
653        }
654
655        Ok(())
656    }
657}
658
659impl Limitless for Stream {
660    fn new(api_key: Option<String>, secret: Option<String>) -> Self {
661        Self::new_with_config(&Config::default(), api_key, secret)
662    }
663
664    fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
665        Self {
666            client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
667        }
668    }
669}