Skip to main content

wickra_data/live/
binance.rs

1//! Binance spot WebSocket kline feed.
2//!
3//! Subscribes to Binance's `<symbol>@kline_<interval>` stream and emits a
4//! [`KlineEvent`] every time the server pushes a new tick. The event tells you
5//! whether the current candle is still open or has just closed.
6//!
7//! Example (requires the `live-binance` feature):
8//!
9//! ```no_run
10//! use wickra_data::live::binance::{BinanceKlineStream, Interval};
11//! # async fn run() -> wickra_data::Result<()> {
12//! let mut stream = BinanceKlineStream::connect(&["BTCUSDT".to_string()], Interval::OneMinute).await?;
13//! while let Some(event) = stream.next_event().await? {
14//!     if event.is_closed {
15//!         println!("closed {} @ {}", event.symbol, event.candle.close);
16//!     }
17//! }
18//! # Ok(()) }
19//! ```
20
21use std::time::Duration;
22
23use futures_util::SinkExt;
24use futures_util::StreamExt;
25use serde::Deserialize;
26use tokio::net::TcpStream;
27use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
28use tokio_tungstenite::tungstenite::Message;
29use tokio_tungstenite::MaybeTlsStream;
30use tokio_tungstenite::WebSocketStream;
31
32use crate::error::{Error, Result};
33use wickra_core::Candle;
34
35/// Tunable knobs for a [`BinanceKlineStream`]. The defaults match Binance's
36/// public production endpoint and are right for almost every caller; the
37/// fields exist so an integration test or a Binance Testnet user can point
38/// the stream at a different base URL and shrink the reconnect timing.
39#[derive(Debug, Clone)]
40pub struct BinanceConfig {
41    /// WebSocket endpoint **without** path (e.g. `"wss://stream.binance.com:9443"`).
42    /// The combined-stream path `/stream?streams=…` is appended internally.
43    pub base_url: String,
44    /// Maximum time to wait for the next inbound frame before treating the
45    /// connection as stalled. Binance pings roughly every 3 minutes, so a
46    /// healthy but quiet stream stays comfortably inside the 300 s default.
47    pub read_timeout: Duration,
48    /// Delay before the first reconnect attempt; doubles on each failure up
49    /// to [`Self::max_reconnect_backoff`].
50    pub initial_reconnect_delay: Duration,
51    /// Upper bound on the exponential reconnect backoff.
52    pub max_reconnect_backoff: Duration,
53    /// How many times [`BinanceKlineStream::next_event`] retries a dropped
54    /// connection before surfacing the last error. Must be `>= 1`.
55    pub max_reconnect_attempts: u32,
56    /// Upper bound on an inbound WebSocket message. Kline frames are tiny;
57    /// this only caps a pathological or hostile server from forcing an
58    /// unbounded allocation.
59    pub max_message_size: usize,
60    /// Upper bound on a single inbound WebSocket frame.
61    pub max_frame_size: usize,
62}
63
64impl Default for BinanceConfig {
65    fn default() -> Self {
66        Self {
67            base_url: "wss://stream.binance.com:9443".to_string(),
68            read_timeout: Duration::from_secs(300),
69            initial_reconnect_delay: Duration::from_secs(1),
70            max_reconnect_backoff: Duration::from_secs(30),
71            max_reconnect_attempts: 6,
72            max_message_size: 8 << 20,
73            max_frame_size: 2 << 20,
74        }
75    }
76}
77
78/// Supported Binance kline intervals. The `as_str` value matches Binance's
79/// wire-format strings (`"1m"`, `"5m"`, `"1h"`, etc.).
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Interval {
82    OneSecond,
83    OneMinute,
84    ThreeMinutes,
85    FiveMinutes,
86    FifteenMinutes,
87    ThirtyMinutes,
88    OneHour,
89    TwoHours,
90    FourHours,
91    SixHours,
92    EightHours,
93    TwelveHours,
94    OneDay,
95    OneWeek,
96}
97
98impl Interval {
99    /// Wire-format string used in the stream name.
100    pub fn as_str(self) -> &'static str {
101        match self {
102            Self::OneSecond => "1s",
103            Self::OneMinute => "1m",
104            Self::ThreeMinutes => "3m",
105            Self::FiveMinutes => "5m",
106            Self::FifteenMinutes => "15m",
107            Self::ThirtyMinutes => "30m",
108            Self::OneHour => "1h",
109            Self::TwoHours => "2h",
110            Self::FourHours => "4h",
111            Self::SixHours => "6h",
112            Self::EightHours => "8h",
113            Self::TwelveHours => "12h",
114            Self::OneDay => "1d",
115            Self::OneWeek => "1w",
116        }
117    }
118}
119
120/// One push from the Binance kline stream.
121#[derive(Debug, Clone)]
122pub struct KlineEvent {
123    /// Symbol in lowercase form as sent by Binance (e.g. `"btcusdt"`).
124    pub symbol: String,
125    /// Interval the candle belongs to.
126    pub interval: Interval,
127    /// Candle in its current state (may still be open).
128    pub candle: Candle,
129    /// Whether the candle has been closed by the server. Closed events are the
130    /// only ones safe to use for bar-completion logic.
131    pub is_closed: bool,
132}
133
134/// A live Binance kline stream.
135#[derive(Debug)]
136pub struct BinanceKlineStream {
137    socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
138    /// Lowercased symbols the stream is subscribed to. Retained so the
139    /// connection can be rebuilt on a reconnect.
140    symbols: Vec<String>,
141    /// Interval requested at connect time. Used to tag every event.
142    interval: Interval,
143    /// `true` once the caller invoked [`close`](Self::close). A closed stream
144    /// is never polled or reconnected again.
145    closed: bool,
146    /// Timing / sizing knobs. Retained so reconnects honour the same config.
147    config: BinanceConfig,
148}
149
150/// Wire-format representation of an incoming Binance kline tick. Public so callers
151/// can deserialize it themselves if they prefer.
152#[derive(Debug, Clone, Deserialize)]
153pub struct RawWsEnvelope {
154    /// Stream name, e.g. `"btcusdt@kline_1m"`.
155    pub stream: String,
156    pub data: RawKlinePayload,
157}
158
159#[derive(Debug, Clone, Deserialize)]
160pub struct RawKlinePayload {
161    #[serde(rename = "e")]
162    pub event_type: String,
163    #[serde(rename = "E")]
164    pub event_time: i64,
165    #[serde(rename = "s")]
166    pub symbol: String,
167    #[serde(rename = "k")]
168    pub kline: RawKline,
169}
170
171#[derive(Debug, Clone, Deserialize)]
172pub struct RawKline {
173    #[serde(rename = "t")]
174    pub open_time: i64,
175    #[serde(rename = "T")]
176    pub close_time: i64,
177    #[serde(rename = "s")]
178    pub symbol: String,
179    #[serde(rename = "i")]
180    pub interval: String,
181    #[serde(rename = "o")]
182    pub open: String,
183    #[serde(rename = "c")]
184    pub close: String,
185    #[serde(rename = "h")]
186    pub high: String,
187    #[serde(rename = "l")]
188    pub low: String,
189    #[serde(rename = "v")]
190    pub volume: String,
191    #[serde(rename = "x")]
192    pub is_closed: bool,
193}
194
195impl BinanceKlineStream {
196    /// Open a raw combined-stream WebSocket for the given (already-lowercased)
197    /// symbols.
198    async fn open(
199        symbols: &[String],
200        interval: Interval,
201        config: &BinanceConfig,
202    ) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
203        let streams: Vec<String> = symbols
204            .iter()
205            .map(|s| format!("{}@kline_{}", s, interval.as_str()))
206            .collect();
207        let url = format!("{}/stream?streams={}", config.base_url, streams.join("/"));
208        let url = url::Url::parse(&url).map_err(|e| Error::Malformed(e.to_string()))?;
209        // tokio-tungstenite 0.29's WebSocketConfig is #[non_exhaustive],
210        // so the only way to set fields is via the builder-style methods on
211        // a fresh `default()` value.
212        let ws_config = WebSocketConfig::default()
213            .max_message_size(Some(config.max_message_size))
214            .max_frame_size(Some(config.max_frame_size));
215        let (socket, _) =
216            tokio_tungstenite::connect_async_with_config(url.as_str(), Some(ws_config), false)
217                .await?;
218        Ok(socket)
219    }
220
221    /// Connect to Binance's combined-stream endpoint for one or more symbols.
222    ///
223    /// Symbols may be passed in either case; they are lowercased to match
224    /// Binance's stream-name conventions. A dropped or stalled connection is
225    /// re-established transparently by [`next_event`](Self::next_event).
226    pub async fn connect(symbols: &[String], interval: Interval) -> Result<Self> {
227        Self::connect_with_config(symbols, interval, BinanceConfig::default()).await
228    }
229
230    /// Connect with a custom [`BinanceConfig`]. Useful for Binance Testnet
231    /// (`"wss://testnet.binance.vision"`) or for shrinking the reconnect
232    /// timing in integration tests.
233    pub async fn connect_with_config(
234        symbols: &[String],
235        interval: Interval,
236        config: BinanceConfig,
237    ) -> Result<Self> {
238        if symbols.is_empty() {
239            return Err(Error::Malformed(
240                "BinanceKlineStream requires at least one symbol".into(),
241            ));
242        }
243        let symbols: Vec<String> = symbols.iter().map(|s| s.to_lowercase()).collect();
244        let socket = Self::open(&symbols, interval, &config).await?;
245        Ok(Self {
246            socket,
247            symbols,
248            interval,
249            closed: false,
250            config,
251        })
252    }
253
254    /// Whether the caller has closed the stream. Once closed, every further
255    /// [`next_event`](Self::next_event) call yields `Ok(None)` immediately.
256    pub fn is_closed(&self) -> bool {
257        self.closed
258    }
259
260    /// Re-establish a dropped connection with exponential backoff. Returns the
261    /// last error if every attempt fails.
262    async fn reconnect(&mut self) -> Result<()> {
263        let mut delay = self.config.initial_reconnect_delay;
264        let mut last_err = None;
265        for _ in 0..self.config.max_reconnect_attempts {
266            tokio::time::sleep(delay).await;
267            match Self::open(&self.symbols, self.interval, &self.config).await {
268                Ok(socket) => {
269                    self.socket = socket;
270                    return Ok(());
271                }
272                Err(e) => {
273                    last_err = Some(e);
274                    delay = delay
275                        .saturating_mul(2)
276                        .min(self.config.max_reconnect_backoff);
277                }
278            }
279        }
280        Err(last_err.expect("max_reconnect_attempts is non-zero"))
281    }
282
283    /// Receive the next kline event. A dropped, errored or stalled connection
284    /// is re-established transparently (exponential backoff, up to
285    /// [`BinanceConfig::max_reconnect_attempts`]); an exhausted reconnect
286    /// surfaces as `Err`. `Ok(None)` is returned only after the caller has
287    /// [`close`](Self::close)d the stream.
288    pub async fn next_event(&mut self) -> Result<Option<KlineEvent>> {
289        if self.closed {
290            return Ok(None);
291        }
292        loop {
293            // A protocol error, a clean server close, or a read stall are all
294            // transient: reconnect with backoff and resume reading.
295            let Ok(Some(Ok(msg))) =
296                tokio::time::timeout(self.config.read_timeout, self.socket.next()).await
297            else {
298                self.reconnect().await?;
299                continue;
300            };
301            match msg {
302                Message::Text(text) => {
303                    if let Some(event) = Self::parse_frame(&text, self.interval)? {
304                        return Ok(Some(event));
305                    }
306                    // Non-kline frame (subscription ack / heartbeat / error):
307                    // skip it and keep reading.
308                }
309                Message::Binary(bytes) => {
310                    let text = String::from_utf8_lossy(&bytes);
311                    if let Some(event) = Self::parse_frame(&text, self.interval)? {
312                        return Ok(Some(event));
313                    }
314                }
315                Message::Ping(payload) => {
316                    // Best-effort Pong reply. If the write fails the
317                    // connection is already dead — the next read will
318                    // surface the error and trigger reconnect through
319                    // the timeout/err arm above.
320                    let _ = self.socket.send(Message::Pong(payload)).await;
321                }
322                Message::Pong(_) | Message::Frame(_) => {}
323                Message::Close(_) => {
324                    self.reconnect().await?;
325                }
326            }
327        }
328    }
329
330    /// Parse one raw WebSocket text frame.
331    ///
332    /// Returns `Ok(Some(event))` for a kline frame, `Ok(None)` for any other
333    /// frame (subscription acknowledgements, error objects, heartbeats), and
334    /// `Err` only when a frame that *is* a kline fails to decode.
335    fn parse_frame(text: &str, interval: Interval) -> Result<Option<KlineEvent>> {
336        let value: serde_json::Value = serde_json::from_str(text)?;
337        // Combined-stream kline frames carry `data.e == "kline"`. Everything
338        // else on the socket is control traffic that must not abort the feed.
339        let is_kline = value
340            .get("data")
341            .and_then(|d| d.get("e"))
342            .and_then(serde_json::Value::as_str)
343            == Some("kline");
344        if !is_kline {
345            return Ok(None);
346        }
347        let envelope: RawWsEnvelope = serde_json::from_value(value)?;
348        Ok(Some(envelope.into_event(interval)?))
349    }
350
351    /// Close the underlying socket cleanly and mark the stream closed. After
352    /// this, [`next_event`](Self::next_event) yields `Ok(None)` and never
353    /// reconnects.
354    pub async fn close(&mut self) -> Result<()> {
355        self.closed = true;
356        self.socket.close(None).await?;
357        Ok(())
358    }
359}
360
361impl RawWsEnvelope {
362    fn into_event(self, interval: Interval) -> Result<KlineEvent> {
363        let k = self.data.kline;
364        let open: f64 = k
365            .open
366            .parse()
367            .map_err(|_| Error::Malformed(format!("bad open '{}'", k.open)))?;
368        let high: f64 = k
369            .high
370            .parse()
371            .map_err(|_| Error::Malformed(format!("bad high '{}'", k.high)))?;
372        let low: f64 = k
373            .low
374            .parse()
375            .map_err(|_| Error::Malformed(format!("bad low '{}'", k.low)))?;
376        let close: f64 = k
377            .close
378            .parse()
379            .map_err(|_| Error::Malformed(format!("bad close '{}'", k.close)))?;
380        let volume: f64 = k
381            .volume
382            .parse()
383            .map_err(|_| Error::Malformed(format!("bad volume '{}'", k.volume)))?;
384        let candle = Candle::new(open, high, low, close, volume, k.open_time)?;
385        Ok(KlineEvent {
386            symbol: self.data.symbol.to_lowercase(),
387            interval,
388            candle,
389            is_closed: k.is_closed,
390        })
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn interval_as_str_covers_every_variant() {
400        // Wire-format strings are part of Binance's public protocol — pin
401        // every mapping so a typo here is caught immediately.
402        let pairs: &[(Interval, &str)] = &[
403            (Interval::OneSecond, "1s"),
404            (Interval::OneMinute, "1m"),
405            (Interval::ThreeMinutes, "3m"),
406            (Interval::FiveMinutes, "5m"),
407            (Interval::FifteenMinutes, "15m"),
408            (Interval::ThirtyMinutes, "30m"),
409            (Interval::OneHour, "1h"),
410            (Interval::TwoHours, "2h"),
411            (Interval::FourHours, "4h"),
412            (Interval::SixHours, "6h"),
413            (Interval::EightHours, "8h"),
414            (Interval::TwelveHours, "12h"),
415            (Interval::OneDay, "1d"),
416            (Interval::OneWeek, "1w"),
417        ];
418        for (iv, expected) in pairs {
419            assert_eq!(iv.as_str(), *expected);
420        }
421    }
422
423    #[test]
424    fn binance_config_default_matches_production_endpoint() {
425        let cfg = BinanceConfig::default();
426        assert_eq!(cfg.base_url, "wss://stream.binance.com:9443");
427        assert_eq!(cfg.read_timeout, Duration::from_secs(300));
428        assert_eq!(cfg.initial_reconnect_delay, Duration::from_secs(1));
429        assert_eq!(cfg.max_reconnect_backoff, Duration::from_secs(30));
430        assert_eq!(cfg.max_reconnect_attempts, 6);
431        assert_eq!(cfg.max_message_size, 8 << 20);
432        assert_eq!(cfg.max_frame_size, 2 << 20);
433    }
434
435    #[tokio::test]
436    async fn connect_rejects_an_empty_symbol_list() {
437        let err = BinanceKlineStream::connect(&[], Interval::OneMinute)
438            .await
439            .unwrap_err();
440        assert!(matches!(err, Error::Malformed(_)));
441    }
442
443    #[test]
444    fn parses_real_binance_payload() {
445        // Sample event format from Binance's public docs (truncated).
446        let json = r#"{
447            "stream": "btcusdt@kline_1m",
448            "data": {
449              "e": "kline",
450              "E": 1700000000000,
451              "s": "BTCUSDT",
452              "k": {
453                "t": 1700000000000,
454                "T": 1700000059999,
455                "s": "BTCUSDT",
456                "i": "1m",
457                "f": 1,
458                "L": 100,
459                "o": "30000.0",
460                "c": "30050.0",
461                "h": "30100.0",
462                "l": "29950.0",
463                "v": "12.5",
464                "n": 50,
465                "x": false,
466                "q": "375000.0",
467                "V": "6.25",
468                "Q": "187500.0",
469                "B": "0"
470              }
471            }
472        }"#;
473        let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
474        let evt = env.into_event(Interval::OneMinute).unwrap();
475        assert_eq!(evt.symbol, "btcusdt");
476        assert_eq!(evt.candle.open, 30_000.0);
477        assert_eq!(evt.candle.close, 30_050.0);
478        assert!(!evt.is_closed);
479        assert_eq!(evt.interval, Interval::OneMinute);
480    }
481
482    #[test]
483    fn rejects_non_parsable_numbers() {
484        let json = r#"{
485            "stream": "btcusdt@kline_1m",
486            "data": {
487              "e": "kline", "E": 0, "s": "BTCUSDT",
488              "k": {
489                "t": 0, "T": 0, "s": "BTCUSDT", "i": "1m",
490                "f": 0, "L": 0,
491                "o": "not-a-number", "c": "0", "h": "0", "l": "0",
492                "v": "0", "n": 0, "x": false, "q": "0", "V": "0", "Q": "0", "B": "0"
493              }
494            }
495        }"#;
496        let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
497        let err = env.into_event(Interval::OneMinute).unwrap_err();
498        assert!(matches!(err, Error::Malformed(_)));
499    }
500
501    #[test]
502    fn skips_non_kline_frames() {
503        // Subscription acknowledgement: skipped, never an error.
504        let ack = r#"{"result":null,"id":1}"#;
505        assert!(BinanceKlineStream::parse_frame(ack, Interval::OneMinute)
506            .unwrap()
507            .is_none());
508        // Error object: also skipped.
509        let err = r#"{"error":{"code":2,"msg":"Invalid request"}}"#;
510        assert!(BinanceKlineStream::parse_frame(err, Interval::OneMinute)
511            .unwrap()
512            .is_none());
513    }
514
515    #[test]
516    fn parse_frame_decodes_a_kline() {
517        let json = r#"{
518            "stream": "btcusdt@kline_1m",
519            "data": {
520              "e": "kline", "E": 1700000000000, "s": "BTCUSDT",
521              "k": {
522                "t": 1700000000000, "T": 1700000059999, "s": "BTCUSDT", "i": "1m",
523                "f": 1, "L": 100, "o": "30000.0", "c": "30050.0", "h": "30100.0",
524                "l": "29950.0", "v": "12.5", "n": 50, "x": true,
525                "q": "375000.0", "V": "6.25", "Q": "187500.0", "B": "0"
526              }
527            }
528        }"#;
529        let event = BinanceKlineStream::parse_frame(json, Interval::OneMinute)
530            .unwrap()
531            .expect("a kline frame yields an event");
532        assert_eq!(event.symbol, "btcusdt");
533        assert!(event.is_closed);
534    }
535
536    // ====================================================================
537    // Mock WebSocket server: drives the async / reconnect / control-frame
538    // paths against a `127.0.0.1` listener instead of the real Binance
539    // endpoint. Each test gets its own port (`bind("127.0.0.1:0")`).
540    // ====================================================================
541
542    use std::sync::atomic::{AtomicU32, Ordering};
543    use std::sync::Arc;
544    use tokio::net::TcpListener;
545
546    /// A kline JSON frame matching Binance's combined-stream envelope. Always
547    /// reports `is_closed = true` so the test can assert on the flag.
548    fn sample_kline_text() -> String {
549        r#"{"stream":"btcusdt@kline_1m","data":{"e":"kline","E":1700000000000,"s":"BTCUSDT","k":{"t":1700000000000,"T":1700000059999,"s":"BTCUSDT","i":"1m","f":1,"L":100,"o":"30000.0","c":"30050.0","h":"30100.0","l":"29950.0","v":"12.5","n":50,"x":true,"q":"375000.0","V":"6.25","Q":"187500.0","B":"0"}}}"#.to_string()
550    }
551
552    /// Test-tuned [`BinanceConfig`]: aim at the given mock and shrink every
553    /// timer so a failing reconnect loop completes in milliseconds.
554    fn test_config(base_url: String) -> BinanceConfig {
555        BinanceConfig {
556            base_url,
557            read_timeout: Duration::from_millis(200),
558            initial_reconnect_delay: Duration::from_millis(5),
559            max_reconnect_backoff: Duration::from_millis(10),
560            max_reconnect_attempts: 3,
561            ..BinanceConfig::default()
562        }
563    }
564
565    /// Spawn a mock WS server that accepts one connection, drops the
566    /// listener (so any reconnect lands on a refused port), and then hands
567    /// the upgraded socket to `handler`. Every step `.unwrap()`s — a failure
568    /// here is a bug in the test scaffolding, not in the production code.
569    async fn one_shot_server<F, Fut>(handler: F) -> String
570    where
571        F: FnOnce(WebSocketStream<TcpStream>) -> Fut + Send + 'static,
572        Fut: std::future::Future<Output = ()> + Send + 'static,
573    {
574        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
575        let base_url = format!("ws://{}", listener.local_addr().unwrap());
576        tokio::spawn(async move {
577            let (stream, _) = listener.accept().await.unwrap();
578            drop(listener);
579            let ws = tokio_tungstenite::accept_async(stream).await.unwrap();
580            handler(ws).await;
581        });
582        base_url
583    }
584
585    /// Spawn a mock WS server that accepts exactly `n_accepts` connections
586    /// and invokes `handler` for each (with a zero-based index). Returns a
587    /// [`JoinHandle`](tokio::task::JoinHandle) the test can await so every
588    /// spawned handler is guaranteed to reach its closing brace before the
589    /// runtime is torn down.
590    async fn multi_shot_server<F, Fut>(
591        n_accepts: u32,
592        handler: F,
593    ) -> (String, tokio::task::JoinHandle<()>)
594    where
595        F: Fn(u32, WebSocketStream<TcpStream>) -> Fut + Send + Sync + 'static,
596        Fut: std::future::Future<Output = ()> + Send + 'static,
597    {
598        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
599        let base_url = format!("ws://{}", listener.local_addr().unwrap());
600        let handler = Arc::new(handler);
601        let h = tokio::spawn(async move {
602            let mut joins = Vec::with_capacity(n_accepts as usize);
603            for index in 0..n_accepts {
604                let (stream, _) = listener.accept().await.unwrap();
605                let handler = handler.clone();
606                joins.push(tokio::spawn(async move {
607                    let ws = tokio_tungstenite::accept_async(stream).await.unwrap();
608                    handler(index, ws).await;
609                }));
610            }
611            for j in joins {
612                j.await.unwrap();
613            }
614        });
615        (base_url, h)
616    }
617
618    #[tokio::test]
619    async fn next_event_decodes_a_text_kline_frame() {
620        let kline = sample_kline_text();
621        let base = one_shot_server(move |mut ws| async move {
622            let _ = ws.send(Message::Text(kline.into())).await;
623            while let Some(Ok(_)) = ws.next().await {}
624        })
625        .await;
626        let mut stream = BinanceKlineStream::connect_with_config(
627            &["BTCUSDT".to_string()],
628            Interval::OneMinute,
629            test_config(base),
630        )
631        .await
632        .unwrap();
633        assert!(!stream.is_closed());
634        let event = stream
635            .next_event()
636            .await
637            .unwrap()
638            .expect("server pushes a kline");
639        assert_eq!(event.symbol, "btcusdt");
640        assert!(event.is_closed);
641    }
642
643    #[tokio::test]
644    async fn next_event_decodes_a_binary_kline_frame() {
645        let kline = sample_kline_text();
646        let base = one_shot_server(move |mut ws| async move {
647            let bytes: Vec<u8> = kline.into_bytes();
648            let _ = ws.send(Message::Binary(bytes.into())).await;
649            while let Some(Ok(_)) = ws.next().await {}
650        })
651        .await;
652        let mut stream = BinanceKlineStream::connect_with_config(
653            &["BTCUSDT".to_string()],
654            Interval::OneMinute,
655            test_config(base),
656        )
657        .await
658        .unwrap();
659        let event = stream
660            .next_event()
661            .await
662            .unwrap()
663            .expect("server pushes a kline as Binary");
664        assert_eq!(event.symbol, "btcusdt");
665    }
666
667    #[tokio::test]
668    async fn next_event_replies_to_a_ping_with_a_pong() {
669        let kline = sample_kline_text();
670        let base = one_shot_server(move |mut ws| async move {
671            let _ = ws
672                .send(Message::Ping(b"binance-ping".as_slice().into()))
673                .await;
674            let _ = ws.send(Message::Text(kline.into())).await;
675            while let Some(Ok(_)) = ws.next().await {}
676        })
677        .await;
678        let mut stream = BinanceKlineStream::connect_with_config(
679            &["BTCUSDT".to_string()],
680            Interval::OneMinute,
681            test_config(base),
682        )
683        .await
684        .unwrap();
685        // If the client never replied to the Ping the server's drain would
686        // observe nothing — but for line coverage it's enough that the
687        // client received the Ping, sent a Pong, then read the next frame.
688        let event = stream
689            .next_event()
690            .await
691            .unwrap()
692            .expect("kline arrives right after the ping");
693        assert_eq!(event.symbol, "btcusdt");
694    }
695
696    #[tokio::test]
697    async fn next_event_skips_inbound_pong_frames() {
698        let kline = sample_kline_text();
699        let base = one_shot_server(move |mut ws| async move {
700            let _ = ws
701                .send(Message::Pong(b"unsolicited".as_slice().into()))
702                .await;
703            let _ = ws.send(Message::Text(kline.into())).await;
704            while let Some(Ok(_)) = ws.next().await {}
705        })
706        .await;
707        let mut stream = BinanceKlineStream::connect_with_config(
708            &["BTCUSDT".to_string()],
709            Interval::OneMinute,
710            test_config(base),
711        )
712        .await
713        .unwrap();
714        let event = stream
715            .next_event()
716            .await
717            .unwrap()
718            .expect("kline follows the ignored Pong");
719        assert_eq!(event.symbol, "btcusdt");
720    }
721
722    #[tokio::test]
723    async fn next_event_reconnects_after_a_server_close_frame() {
724        let kline = sample_kline_text();
725        let (base, server_done) = multi_shot_server(2, move |index, mut ws| {
726            let kline = kline.clone();
727            async move {
728                let msg = if index == 0 {
729                    // First connection: send a clean Close so the client
730                    // exercises the Message::Close reconnect path.
731                    Message::Close(None)
732                } else {
733                    Message::Text(kline.into())
734                };
735                let _ = ws.send(msg).await;
736            }
737        })
738        .await;
739        let mut stream = BinanceKlineStream::connect_with_config(
740            &["BTCUSDT".to_string()],
741            Interval::OneMinute,
742            test_config(base),
743        )
744        .await
745        .unwrap();
746        let event = stream
747            .next_event()
748            .await
749            .unwrap()
750            .expect("reconnect succeeds and the second connection serves a kline");
751        assert_eq!(event.symbol, "btcusdt");
752        // Wait for every spawned handler to reach its final state.
753        tokio::time::timeout(Duration::from_secs(1), server_done)
754            .await
755            .unwrap()
756            .unwrap();
757    }
758
759    #[tokio::test]
760    async fn next_event_reconnects_after_a_read_timeout() {
761        let kline = sample_kline_text();
762        let stall_token = Arc::new(AtomicU32::new(0));
763        let stall_token_h = stall_token.clone();
764        let (base, server_done) = multi_shot_server(2, move |index, mut ws| {
765            let kline = kline.clone();
766            let stall_token = stall_token_h.clone();
767            async move {
768                if index == 0 {
769                    // First connection: never write anything. Outlast the
770                    // client's 80 ms read_timeout but bounded so the
771                    // handler still completes for the coverage check.
772                    stall_token.fetch_add(1, Ordering::SeqCst);
773                    tokio::time::sleep(Duration::from_millis(250)).await;
774                } else {
775                    let _ = ws.send(Message::Text(kline.into())).await;
776                }
777            }
778        })
779        .await;
780        let cfg = BinanceConfig {
781            read_timeout: Duration::from_millis(80),
782            ..test_config(base)
783        };
784        let mut stream = BinanceKlineStream::connect_with_config(
785            &["BTCUSDT".to_string()],
786            Interval::OneMinute,
787            cfg,
788        )
789        .await
790        .unwrap();
791        let event = stream
792            .next_event()
793            .await
794            .unwrap()
795            .expect("client times out, reconnects, and reads the kline");
796        assert_eq!(event.symbol, "btcusdt");
797        assert!(stall_token.load(Ordering::SeqCst) >= 1);
798        tokio::time::timeout(Duration::from_secs(1), server_done)
799            .await
800            .unwrap()
801            .unwrap();
802    }
803
804    #[tokio::test]
805    async fn next_event_yields_none_after_close() {
806        let base = one_shot_server(|mut ws| async move {
807            // Stay open until the client closes; this lets close() complete
808            // its handshake cleanly.
809            while let Some(Ok(_)) = ws.next().await {}
810        })
811        .await;
812        let mut stream = BinanceKlineStream::connect_with_config(
813            &["BTCUSDT".to_string()],
814            Interval::OneMinute,
815            test_config(base),
816        )
817        .await
818        .unwrap();
819        stream.close().await.unwrap();
820        assert!(stream.is_closed());
821        assert!(stream.next_event().await.unwrap().is_none());
822    }
823
824    #[tokio::test]
825    async fn next_event_surfaces_an_error_when_reconnect_attempts_are_exhausted() {
826        // After the first accept the listener is dropped (one_shot_server
827        // does this), so every reconnect attempt lands on a closed port.
828        let base = one_shot_server(|mut ws| async move {
829            let _ = ws.send(Message::Close(None)).await;
830            // Returning here also drops the socket, but the listener has
831            // already been released — the client's subsequent connects
832            // will be refused.
833        })
834        .await;
835        let cfg = BinanceConfig {
836            max_reconnect_attempts: 2,
837            initial_reconnect_delay: Duration::from_millis(1),
838            max_reconnect_backoff: Duration::from_millis(2),
839            ..test_config(base)
840        };
841        let mut stream = BinanceKlineStream::connect_with_config(
842            &["BTCUSDT".to_string()],
843            Interval::OneMinute,
844            cfg,
845        )
846        .await
847        .unwrap();
848        let err = stream
849            .next_event()
850            .await
851            .expect_err("reconnect attempts are exhausted");
852        // Either a WS-layer error or a Malformed error from URL parsing —
853        // we only care that the call surfaced as Err rather than panicked.
854        let _ = err;
855    }
856
857    #[tokio::test]
858    async fn next_event_skips_non_kline_frames_and_returns_the_next_kline() {
859        // Drives the loop through the Text- *and* Binary-arm "frame was
860        // not a kline, keep reading" fall-throughs before serving the
861        // actual kline.
862        let kline = sample_kline_text();
863        let base = one_shot_server(move |mut ws| async move {
864            let _ = ws
865                .send(Message::Text(r#"{"result":null,"id":1}"#.into()))
866                .await;
867            let _ = ws
868                .send(Message::Binary(b"{\"id\":2}".to_vec().into()))
869                .await;
870            let _ = ws.send(Message::Text(kline.into())).await;
871        })
872        .await;
873        let mut stream = BinanceKlineStream::connect_with_config(
874            &["BTCUSDT".to_string()],
875            Interval::OneMinute,
876            test_config(base),
877        )
878        .await
879        .unwrap();
880        let event = stream
881            .next_event()
882            .await
883            .unwrap()
884            .expect("kline arrives after the two skipped control frames");
885        assert_eq!(event.symbol, "btcusdt");
886    }
887
888    #[tokio::test]
889    async fn next_event_propagates_a_parse_error_from_a_malformed_kline() {
890        // A "kline" envelope whose open field is not a number — parse_frame
891        // identifies it as a kline, into_event then fails, and next_event
892        // bubbles the error rather than skipping the frame.
893        let bad = r#"{"stream":"btcusdt@kline_1m","data":{"e":"kline","E":0,"s":"BTCUSDT","k":{"t":0,"T":0,"s":"BTCUSDT","i":"1m","o":"not-a-number","c":"0","h":"0","l":"0","v":"0","x":false}}}"#.to_string();
894        let base = one_shot_server(move |mut ws| async move {
895            let _ = ws.send(Message::Text(bad.into())).await;
896            while let Some(Ok(_)) = ws.next().await {}
897        })
898        .await;
899        let mut stream = BinanceKlineStream::connect_with_config(
900            &["BTCUSDT".to_string()],
901            Interval::OneMinute,
902            test_config(base),
903        )
904        .await
905        .unwrap();
906        let err = stream.next_event().await.unwrap_err();
907        assert!(matches!(err, Error::Malformed(_)));
908    }
909}