1use std::time::Duration;
22
23use futures_util::SinkExt;
24use futures_util::StreamExt;
25use serde::Deserialize;
26use tokio::net::TcpStream;
27use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
28use tokio_tungstenite::tungstenite::Message;
29use tokio_tungstenite::MaybeTlsStream;
30use tokio_tungstenite::WebSocketStream;
31
32use crate::error::{Error, Result};
33use wickra_core::Candle;
34
35#[derive(Debug, Clone)]
40pub struct BinanceConfig {
41 pub base_url: String,
44 pub read_timeout: Duration,
48 pub initial_reconnect_delay: Duration,
51 pub max_reconnect_backoff: Duration,
53 pub max_reconnect_attempts: u32,
56 pub max_message_size: usize,
60 pub max_frame_size: usize,
62}
63
64impl Default for BinanceConfig {
65 fn default() -> Self {
66 Self {
67 base_url: "wss://stream.binance.com:9443".to_string(),
68 read_timeout: Duration::from_secs(300),
69 initial_reconnect_delay: Duration::from_secs(1),
70 max_reconnect_backoff: Duration::from_secs(30),
71 max_reconnect_attempts: 6,
72 max_message_size: 8 << 20,
73 max_frame_size: 2 << 20,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Interval {
82 OneSecond,
83 OneMinute,
84 ThreeMinutes,
85 FiveMinutes,
86 FifteenMinutes,
87 ThirtyMinutes,
88 OneHour,
89 TwoHours,
90 FourHours,
91 SixHours,
92 EightHours,
93 TwelveHours,
94 OneDay,
95 OneWeek,
96}
97
98impl Interval {
99 pub fn as_str(self) -> &'static str {
101 match self {
102 Self::OneSecond => "1s",
103 Self::OneMinute => "1m",
104 Self::ThreeMinutes => "3m",
105 Self::FiveMinutes => "5m",
106 Self::FifteenMinutes => "15m",
107 Self::ThirtyMinutes => "30m",
108 Self::OneHour => "1h",
109 Self::TwoHours => "2h",
110 Self::FourHours => "4h",
111 Self::SixHours => "6h",
112 Self::EightHours => "8h",
113 Self::TwelveHours => "12h",
114 Self::OneDay => "1d",
115 Self::OneWeek => "1w",
116 }
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct KlineEvent {
123 pub symbol: String,
125 pub interval: Interval,
127 pub candle: Candle,
129 pub is_closed: bool,
132}
133
134#[derive(Debug)]
136pub struct BinanceKlineStream {
137 socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
138 symbols: Vec<String>,
141 interval: Interval,
143 closed: bool,
146 config: BinanceConfig,
148}
149
150#[derive(Debug, Clone, Deserialize)]
153pub struct RawWsEnvelope {
154 pub stream: String,
156 pub data: RawKlinePayload,
157}
158
159#[derive(Debug, Clone, Deserialize)]
160pub struct RawKlinePayload {
161 #[serde(rename = "e")]
162 pub event_type: String,
163 #[serde(rename = "E")]
164 pub event_time: i64,
165 #[serde(rename = "s")]
166 pub symbol: String,
167 #[serde(rename = "k")]
168 pub kline: RawKline,
169}
170
171#[derive(Debug, Clone, Deserialize)]
172pub struct RawKline {
173 #[serde(rename = "t")]
174 pub open_time: i64,
175 #[serde(rename = "T")]
176 pub close_time: i64,
177 #[serde(rename = "s")]
178 pub symbol: String,
179 #[serde(rename = "i")]
180 pub interval: String,
181 #[serde(rename = "o")]
182 pub open: String,
183 #[serde(rename = "c")]
184 pub close: String,
185 #[serde(rename = "h")]
186 pub high: String,
187 #[serde(rename = "l")]
188 pub low: String,
189 #[serde(rename = "v")]
190 pub volume: String,
191 #[serde(rename = "x")]
192 pub is_closed: bool,
193}
194
195impl BinanceKlineStream {
196 async fn open(
199 symbols: &[String],
200 interval: Interval,
201 config: &BinanceConfig,
202 ) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
203 let streams: Vec<String> = symbols
204 .iter()
205 .map(|s| format!("{}@kline_{}", s, interval.as_str()))
206 .collect();
207 let url = format!("{}/stream?streams={}", config.base_url, streams.join("/"));
208 let url = url::Url::parse(&url).map_err(|e| Error::Malformed(e.to_string()))?;
209 let ws_config = WebSocketConfig::default()
213 .max_message_size(Some(config.max_message_size))
214 .max_frame_size(Some(config.max_frame_size));
215 let (socket, _) =
216 tokio_tungstenite::connect_async_with_config(url.as_str(), Some(ws_config), false)
217 .await?;
218 Ok(socket)
219 }
220
221 pub async fn connect(symbols: &[String], interval: Interval) -> Result<Self> {
227 Self::connect_with_config(symbols, interval, BinanceConfig::default()).await
228 }
229
230 pub async fn connect_with_config(
234 symbols: &[String],
235 interval: Interval,
236 config: BinanceConfig,
237 ) -> Result<Self> {
238 if symbols.is_empty() {
239 return Err(Error::Malformed(
240 "BinanceKlineStream requires at least one symbol".into(),
241 ));
242 }
243 let symbols: Vec<String> = symbols.iter().map(|s| s.to_lowercase()).collect();
244 let socket = Self::open(&symbols, interval, &config).await?;
245 Ok(Self {
246 socket,
247 symbols,
248 interval,
249 closed: false,
250 config,
251 })
252 }
253
254 pub fn is_closed(&self) -> bool {
257 self.closed
258 }
259
260 async fn reconnect(&mut self) -> Result<()> {
263 let mut delay = self.config.initial_reconnect_delay;
264 let mut last_err = None;
265 for _ in 0..self.config.max_reconnect_attempts {
266 tokio::time::sleep(delay).await;
267 match Self::open(&self.symbols, self.interval, &self.config).await {
268 Ok(socket) => {
269 self.socket = socket;
270 return Ok(());
271 }
272 Err(e) => {
273 last_err = Some(e);
274 delay = delay
275 .saturating_mul(2)
276 .min(self.config.max_reconnect_backoff);
277 }
278 }
279 }
280 Err(last_err.expect("max_reconnect_attempts is non-zero"))
281 }
282
283 pub async fn next_event(&mut self) -> Result<Option<KlineEvent>> {
289 if self.closed {
290 return Ok(None);
291 }
292 loop {
293 let Ok(Some(Ok(msg))) =
296 tokio::time::timeout(self.config.read_timeout, self.socket.next()).await
297 else {
298 self.reconnect().await?;
299 continue;
300 };
301 match msg {
302 Message::Text(text) => {
303 if let Some(event) = Self::parse_frame(&text, self.interval)? {
304 return Ok(Some(event));
305 }
306 }
309 Message::Binary(bytes) => {
310 let text = String::from_utf8_lossy(&bytes);
311 if let Some(event) = Self::parse_frame(&text, self.interval)? {
312 return Ok(Some(event));
313 }
314 }
315 Message::Ping(payload) => {
316 let _ = self.socket.send(Message::Pong(payload)).await;
321 }
322 Message::Pong(_) | Message::Frame(_) => {}
323 Message::Close(_) => {
324 self.reconnect().await?;
325 }
326 }
327 }
328 }
329
330 fn parse_frame(text: &str, interval: Interval) -> Result<Option<KlineEvent>> {
336 let value: serde_json::Value = serde_json::from_str(text)?;
337 let is_kline = value
340 .get("data")
341 .and_then(|d| d.get("e"))
342 .and_then(serde_json::Value::as_str)
343 == Some("kline");
344 if !is_kline {
345 return Ok(None);
346 }
347 let envelope: RawWsEnvelope = serde_json::from_value(value)?;
348 Ok(Some(envelope.into_event(interval)?))
349 }
350
351 pub async fn close(&mut self) -> Result<()> {
355 self.closed = true;
356 self.socket.close(None).await?;
357 Ok(())
358 }
359}
360
361impl RawWsEnvelope {
362 fn into_event(self, interval: Interval) -> Result<KlineEvent> {
363 let k = self.data.kline;
364 let open: f64 = k
365 .open
366 .parse()
367 .map_err(|_| Error::Malformed(format!("bad open '{}'", k.open)))?;
368 let high: f64 = k
369 .high
370 .parse()
371 .map_err(|_| Error::Malformed(format!("bad high '{}'", k.high)))?;
372 let low: f64 = k
373 .low
374 .parse()
375 .map_err(|_| Error::Malformed(format!("bad low '{}'", k.low)))?;
376 let close: f64 = k
377 .close
378 .parse()
379 .map_err(|_| Error::Malformed(format!("bad close '{}'", k.close)))?;
380 let volume: f64 = k
381 .volume
382 .parse()
383 .map_err(|_| Error::Malformed(format!("bad volume '{}'", k.volume)))?;
384 let candle = Candle::new(open, high, low, close, volume, k.open_time)?;
385 Ok(KlineEvent {
386 symbol: self.data.symbol.to_lowercase(),
387 interval,
388 candle,
389 is_closed: k.is_closed,
390 })
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn interval_as_str_covers_every_variant() {
400 let pairs: &[(Interval, &str)] = &[
403 (Interval::OneSecond, "1s"),
404 (Interval::OneMinute, "1m"),
405 (Interval::ThreeMinutes, "3m"),
406 (Interval::FiveMinutes, "5m"),
407 (Interval::FifteenMinutes, "15m"),
408 (Interval::ThirtyMinutes, "30m"),
409 (Interval::OneHour, "1h"),
410 (Interval::TwoHours, "2h"),
411 (Interval::FourHours, "4h"),
412 (Interval::SixHours, "6h"),
413 (Interval::EightHours, "8h"),
414 (Interval::TwelveHours, "12h"),
415 (Interval::OneDay, "1d"),
416 (Interval::OneWeek, "1w"),
417 ];
418 for (iv, expected) in pairs {
419 assert_eq!(iv.as_str(), *expected);
420 }
421 }
422
423 #[test]
424 fn binance_config_default_matches_production_endpoint() {
425 let cfg = BinanceConfig::default();
426 assert_eq!(cfg.base_url, "wss://stream.binance.com:9443");
427 assert_eq!(cfg.read_timeout, Duration::from_secs(300));
428 assert_eq!(cfg.initial_reconnect_delay, Duration::from_secs(1));
429 assert_eq!(cfg.max_reconnect_backoff, Duration::from_secs(30));
430 assert_eq!(cfg.max_reconnect_attempts, 6);
431 assert_eq!(cfg.max_message_size, 8 << 20);
432 assert_eq!(cfg.max_frame_size, 2 << 20);
433 }
434
435 #[tokio::test]
436 async fn connect_rejects_an_empty_symbol_list() {
437 let err = BinanceKlineStream::connect(&[], Interval::OneMinute)
438 .await
439 .unwrap_err();
440 assert!(matches!(err, Error::Malformed(_)));
441 }
442
443 #[test]
444 fn parses_real_binance_payload() {
445 let json = r#"{
447 "stream": "btcusdt@kline_1m",
448 "data": {
449 "e": "kline",
450 "E": 1700000000000,
451 "s": "BTCUSDT",
452 "k": {
453 "t": 1700000000000,
454 "T": 1700000059999,
455 "s": "BTCUSDT",
456 "i": "1m",
457 "f": 1,
458 "L": 100,
459 "o": "30000.0",
460 "c": "30050.0",
461 "h": "30100.0",
462 "l": "29950.0",
463 "v": "12.5",
464 "n": 50,
465 "x": false,
466 "q": "375000.0",
467 "V": "6.25",
468 "Q": "187500.0",
469 "B": "0"
470 }
471 }
472 }"#;
473 let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
474 let evt = env.into_event(Interval::OneMinute).unwrap();
475 assert_eq!(evt.symbol, "btcusdt");
476 assert_eq!(evt.candle.open, 30_000.0);
477 assert_eq!(evt.candle.close, 30_050.0);
478 assert!(!evt.is_closed);
479 assert_eq!(evt.interval, Interval::OneMinute);
480 }
481
482 #[test]
483 fn rejects_non_parsable_numbers() {
484 let json = r#"{
485 "stream": "btcusdt@kline_1m",
486 "data": {
487 "e": "kline", "E": 0, "s": "BTCUSDT",
488 "k": {
489 "t": 0, "T": 0, "s": "BTCUSDT", "i": "1m",
490 "f": 0, "L": 0,
491 "o": "not-a-number", "c": "0", "h": "0", "l": "0",
492 "v": "0", "n": 0, "x": false, "q": "0", "V": "0", "Q": "0", "B": "0"
493 }
494 }
495 }"#;
496 let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
497 let err = env.into_event(Interval::OneMinute).unwrap_err();
498 assert!(matches!(err, Error::Malformed(_)));
499 }
500
501 #[test]
502 fn skips_non_kline_frames() {
503 let ack = r#"{"result":null,"id":1}"#;
505 assert!(BinanceKlineStream::parse_frame(ack, Interval::OneMinute)
506 .unwrap()
507 .is_none());
508 let err = r#"{"error":{"code":2,"msg":"Invalid request"}}"#;
510 assert!(BinanceKlineStream::parse_frame(err, Interval::OneMinute)
511 .unwrap()
512 .is_none());
513 }
514
515 #[test]
516 fn parse_frame_decodes_a_kline() {
517 let json = r#"{
518 "stream": "btcusdt@kline_1m",
519 "data": {
520 "e": "kline", "E": 1700000000000, "s": "BTCUSDT",
521 "k": {
522 "t": 1700000000000, "T": 1700000059999, "s": "BTCUSDT", "i": "1m",
523 "f": 1, "L": 100, "o": "30000.0", "c": "30050.0", "h": "30100.0",
524 "l": "29950.0", "v": "12.5", "n": 50, "x": true,
525 "q": "375000.0", "V": "6.25", "Q": "187500.0", "B": "0"
526 }
527 }
528 }"#;
529 let event = BinanceKlineStream::parse_frame(json, Interval::OneMinute)
530 .unwrap()
531 .expect("a kline frame yields an event");
532 assert_eq!(event.symbol, "btcusdt");
533 assert!(event.is_closed);
534 }
535
536 use std::sync::atomic::{AtomicU32, Ordering};
543 use std::sync::Arc;
544 use tokio::net::TcpListener;
545
546 fn sample_kline_text() -> String {
549 r#"{"stream":"btcusdt@kline_1m","data":{"e":"kline","E":1700000000000,"s":"BTCUSDT","k":{"t":1700000000000,"T":1700000059999,"s":"BTCUSDT","i":"1m","f":1,"L":100,"o":"30000.0","c":"30050.0","h":"30100.0","l":"29950.0","v":"12.5","n":50,"x":true,"q":"375000.0","V":"6.25","Q":"187500.0","B":"0"}}}"#.to_string()
550 }
551
552 fn test_config(base_url: String) -> BinanceConfig {
555 BinanceConfig {
556 base_url,
557 read_timeout: Duration::from_millis(200),
558 initial_reconnect_delay: Duration::from_millis(5),
559 max_reconnect_backoff: Duration::from_millis(10),
560 max_reconnect_attempts: 3,
561 ..BinanceConfig::default()
562 }
563 }
564
565 async fn one_shot_server<F, Fut>(handler: F) -> String
570 where
571 F: FnOnce(WebSocketStream<TcpStream>) -> Fut + Send + 'static,
572 Fut: std::future::Future<Output = ()> + Send + 'static,
573 {
574 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
575 let base_url = format!("ws://{}", listener.local_addr().unwrap());
576 tokio::spawn(async move {
577 let (stream, _) = listener.accept().await.unwrap();
578 drop(listener);
579 let ws = tokio_tungstenite::accept_async(stream).await.unwrap();
580 handler(ws).await;
581 });
582 base_url
583 }
584
585 async fn multi_shot_server<F, Fut>(
591 n_accepts: u32,
592 handler: F,
593 ) -> (String, tokio::task::JoinHandle<()>)
594 where
595 F: Fn(u32, WebSocketStream<TcpStream>) -> Fut + Send + Sync + 'static,
596 Fut: std::future::Future<Output = ()> + Send + 'static,
597 {
598 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
599 let base_url = format!("ws://{}", listener.local_addr().unwrap());
600 let handler = Arc::new(handler);
601 let h = tokio::spawn(async move {
602 let mut joins = Vec::with_capacity(n_accepts as usize);
603 for index in 0..n_accepts {
604 let (stream, _) = listener.accept().await.unwrap();
605 let handler = handler.clone();
606 joins.push(tokio::spawn(async move {
607 let ws = tokio_tungstenite::accept_async(stream).await.unwrap();
608 handler(index, ws).await;
609 }));
610 }
611 for j in joins {
612 j.await.unwrap();
613 }
614 });
615 (base_url, h)
616 }
617
618 #[tokio::test]
619 async fn next_event_decodes_a_text_kline_frame() {
620 let kline = sample_kline_text();
621 let base = one_shot_server(move |mut ws| async move {
622 let _ = ws.send(Message::Text(kline.into())).await;
623 while let Some(Ok(_)) = ws.next().await {}
624 })
625 .await;
626 let mut stream = BinanceKlineStream::connect_with_config(
627 &["BTCUSDT".to_string()],
628 Interval::OneMinute,
629 test_config(base),
630 )
631 .await
632 .unwrap();
633 assert!(!stream.is_closed());
634 let event = stream
635 .next_event()
636 .await
637 .unwrap()
638 .expect("server pushes a kline");
639 assert_eq!(event.symbol, "btcusdt");
640 assert!(event.is_closed);
641 }
642
643 #[tokio::test]
644 async fn next_event_decodes_a_binary_kline_frame() {
645 let kline = sample_kline_text();
646 let base = one_shot_server(move |mut ws| async move {
647 let bytes: Vec<u8> = kline.into_bytes();
648 let _ = ws.send(Message::Binary(bytes.into())).await;
649 while let Some(Ok(_)) = ws.next().await {}
650 })
651 .await;
652 let mut stream = BinanceKlineStream::connect_with_config(
653 &["BTCUSDT".to_string()],
654 Interval::OneMinute,
655 test_config(base),
656 )
657 .await
658 .unwrap();
659 let event = stream
660 .next_event()
661 .await
662 .unwrap()
663 .expect("server pushes a kline as Binary");
664 assert_eq!(event.symbol, "btcusdt");
665 }
666
667 #[tokio::test]
668 async fn next_event_replies_to_a_ping_with_a_pong() {
669 let kline = sample_kline_text();
670 let base = one_shot_server(move |mut ws| async move {
671 let _ = ws
672 .send(Message::Ping(b"binance-ping".as_slice().into()))
673 .await;
674 let _ = ws.send(Message::Text(kline.into())).await;
675 while let Some(Ok(_)) = ws.next().await {}
676 })
677 .await;
678 let mut stream = BinanceKlineStream::connect_with_config(
679 &["BTCUSDT".to_string()],
680 Interval::OneMinute,
681 test_config(base),
682 )
683 .await
684 .unwrap();
685 let event = stream
689 .next_event()
690 .await
691 .unwrap()
692 .expect("kline arrives right after the ping");
693 assert_eq!(event.symbol, "btcusdt");
694 }
695
696 #[tokio::test]
697 async fn next_event_skips_inbound_pong_frames() {
698 let kline = sample_kline_text();
699 let base = one_shot_server(move |mut ws| async move {
700 let _ = ws
701 .send(Message::Pong(b"unsolicited".as_slice().into()))
702 .await;
703 let _ = ws.send(Message::Text(kline.into())).await;
704 while let Some(Ok(_)) = ws.next().await {}
705 })
706 .await;
707 let mut stream = BinanceKlineStream::connect_with_config(
708 &["BTCUSDT".to_string()],
709 Interval::OneMinute,
710 test_config(base),
711 )
712 .await
713 .unwrap();
714 let event = stream
715 .next_event()
716 .await
717 .unwrap()
718 .expect("kline follows the ignored Pong");
719 assert_eq!(event.symbol, "btcusdt");
720 }
721
722 #[tokio::test]
723 async fn next_event_reconnects_after_a_server_close_frame() {
724 let kline = sample_kline_text();
725 let (base, server_done) = multi_shot_server(2, move |index, mut ws| {
726 let kline = kline.clone();
727 async move {
728 let msg = if index == 0 {
729 Message::Close(None)
732 } else {
733 Message::Text(kline.into())
734 };
735 let _ = ws.send(msg).await;
736 }
737 })
738 .await;
739 let mut stream = BinanceKlineStream::connect_with_config(
740 &["BTCUSDT".to_string()],
741 Interval::OneMinute,
742 test_config(base),
743 )
744 .await
745 .unwrap();
746 let event = stream
747 .next_event()
748 .await
749 .unwrap()
750 .expect("reconnect succeeds and the second connection serves a kline");
751 assert_eq!(event.symbol, "btcusdt");
752 tokio::time::timeout(Duration::from_secs(1), server_done)
754 .await
755 .unwrap()
756 .unwrap();
757 }
758
759 #[tokio::test]
760 async fn next_event_reconnects_after_a_read_timeout() {
761 let kline = sample_kline_text();
762 let stall_token = Arc::new(AtomicU32::new(0));
763 let stall_token_h = stall_token.clone();
764 let (base, server_done) = multi_shot_server(2, move |index, mut ws| {
765 let kline = kline.clone();
766 let stall_token = stall_token_h.clone();
767 async move {
768 if index == 0 {
769 stall_token.fetch_add(1, Ordering::SeqCst);
773 tokio::time::sleep(Duration::from_millis(250)).await;
774 } else {
775 let _ = ws.send(Message::Text(kline.into())).await;
776 }
777 }
778 })
779 .await;
780 let cfg = BinanceConfig {
781 read_timeout: Duration::from_millis(80),
782 ..test_config(base)
783 };
784 let mut stream = BinanceKlineStream::connect_with_config(
785 &["BTCUSDT".to_string()],
786 Interval::OneMinute,
787 cfg,
788 )
789 .await
790 .unwrap();
791 let event = stream
792 .next_event()
793 .await
794 .unwrap()
795 .expect("client times out, reconnects, and reads the kline");
796 assert_eq!(event.symbol, "btcusdt");
797 assert!(stall_token.load(Ordering::SeqCst) >= 1);
798 tokio::time::timeout(Duration::from_secs(1), server_done)
799 .await
800 .unwrap()
801 .unwrap();
802 }
803
804 #[tokio::test]
805 async fn next_event_yields_none_after_close() {
806 let base = one_shot_server(|mut ws| async move {
807 while let Some(Ok(_)) = ws.next().await {}
810 })
811 .await;
812 let mut stream = BinanceKlineStream::connect_with_config(
813 &["BTCUSDT".to_string()],
814 Interval::OneMinute,
815 test_config(base),
816 )
817 .await
818 .unwrap();
819 stream.close().await.unwrap();
820 assert!(stream.is_closed());
821 assert!(stream.next_event().await.unwrap().is_none());
822 }
823
824 #[tokio::test]
825 async fn next_event_surfaces_an_error_when_reconnect_attempts_are_exhausted() {
826 let base = one_shot_server(|mut ws| async move {
829 let _ = ws.send(Message::Close(None)).await;
830 })
834 .await;
835 let cfg = BinanceConfig {
836 max_reconnect_attempts: 2,
837 initial_reconnect_delay: Duration::from_millis(1),
838 max_reconnect_backoff: Duration::from_millis(2),
839 ..test_config(base)
840 };
841 let mut stream = BinanceKlineStream::connect_with_config(
842 &["BTCUSDT".to_string()],
843 Interval::OneMinute,
844 cfg,
845 )
846 .await
847 .unwrap();
848 let err = stream
849 .next_event()
850 .await
851 .expect_err("reconnect attempts are exhausted");
852 let _ = err;
855 }
856
857 #[tokio::test]
858 async fn next_event_skips_non_kline_frames_and_returns_the_next_kline() {
859 let kline = sample_kline_text();
863 let base = one_shot_server(move |mut ws| async move {
864 let _ = ws
865 .send(Message::Text(r#"{"result":null,"id":1}"#.into()))
866 .await;
867 let _ = ws
868 .send(Message::Binary(b"{\"id\":2}".to_vec().into()))
869 .await;
870 let _ = ws.send(Message::Text(kline.into())).await;
871 })
872 .await;
873 let mut stream = BinanceKlineStream::connect_with_config(
874 &["BTCUSDT".to_string()],
875 Interval::OneMinute,
876 test_config(base),
877 )
878 .await
879 .unwrap();
880 let event = stream
881 .next_event()
882 .await
883 .unwrap()
884 .expect("kline arrives after the two skipped control frames");
885 assert_eq!(event.symbol, "btcusdt");
886 }
887
888 #[tokio::test]
889 async fn next_event_propagates_a_parse_error_from_a_malformed_kline() {
890 let bad = r#"{"stream":"btcusdt@kline_1m","data":{"e":"kline","E":0,"s":"BTCUSDT","k":{"t":0,"T":0,"s":"BTCUSDT","i":"1m","o":"not-a-number","c":"0","h":"0","l":"0","v":"0","x":false}}}"#.to_string();
894 let base = one_shot_server(move |mut ws| async move {
895 let _ = ws.send(Message::Text(bad.into())).await;
896 while let Some(Ok(_)) = ws.next().await {}
897 })
898 .await;
899 let mut stream = BinanceKlineStream::connect_with_config(
900 &["BTCUSDT".to_string()],
901 Interval::OneMinute,
902 test_config(base),
903 )
904 .await
905 .unwrap();
906 let err = stream.next_event().await.unwrap_err();
907 assert!(matches!(err, Error::Malformed(_)));
908 }
909}