Skip to main content

termichart_data/
ws_stream.rs

1//! WebSocket streaming adapter for real-time market data.
2//!
3//! Connects to Binance WebSocket API and pushes candle updates through an mpsc channel.
4
5use std::sync::mpsc;
6use std::thread;
7use termichart_core::Candle;
8
9/// A WebSocket stream that connects to Binance and receives real-time kline updates.
10pub struct WsStream {
11    symbol: String,
12    interval: String,
13}
14
15impl WsStream {
16    /// Create a new WebSocket stream for the given symbol and interval.
17    pub fn new(symbol: impl Into<String>, interval: impl Into<String>) -> Self {
18        Self {
19            symbol: symbol.into().to_lowercase(),
20            interval: interval.into(),
21        }
22    }
23
24    /// Start streaming and return a receiver for candle updates.
25    ///
26    /// Spawns a background thread that maintains the WebSocket connection.
27    /// Returns a receiver that yields `Candle` values as they arrive.
28    pub fn start(&self) -> mpsc::Receiver<Candle> {
29        let (tx, rx) = mpsc::channel();
30        let url = format!(
31            "wss://stream.binance.com:9443/ws/{}@kline_{}",
32            self.symbol, self.interval
33        );
34
35        let url_clone = url.clone();
36        thread::spawn(move || {
37            if let Err(e) = run_ws_loop(&url_clone, &tx) {
38                eprintln!("WebSocket error: {}", e);
39            }
40        });
41
42        rx
43    }
44}
45
46fn run_ws_loop(
47    url: &str,
48    tx: &mpsc::Sender<Candle>,
49) -> std::result::Result<(), Box<dyn std::error::Error>> {
50    use tungstenite::connect;
51
52    let parsed_url = url::Url::parse(url)?;
53    let (mut socket, _response) = connect(parsed_url)?;
54
55    loop {
56        let msg = socket.read()?;
57
58        match msg {
59            tungstenite::Message::Text(text) => {
60                if let Some(candle) = parse_kline_message(&text) {
61                    if tx.send(candle).is_err() {
62                        // Receiver dropped, exit
63                        break;
64                    }
65                }
66            }
67            tungstenite::Message::Close(_) => break,
68            tungstenite::Message::Ping(data) => {
69                let _ = socket.write(tungstenite::Message::Pong(data));
70            }
71            _ => {}
72        }
73    }
74
75    Ok(())
76}
77
78/// Parse a Binance kline WebSocket message into a Candle.
79///
80/// Message format:
81/// ```json
82/// {
83///   "e": "kline",
84///   "k": {
85///     "t": 1234567890000,
86///     "o": "100.00",
87///     "h": "105.00",
88///     "l": "95.00",
89///     "c": "102.00",
90///     "v": "1000.00"
91///   }
92/// }
93/// ```
94fn parse_kline_message(text: &str) -> Option<Candle> {
95    let v: serde_json::Value = serde_json::from_str(text).ok()?;
96    let k = v.get("k")?;
97
98    let time = k.get("t")?.as_f64()? / 1000.0;
99    let open = k.get("o")?.as_str()?.parse::<f64>().ok()?;
100    let high = k.get("h")?.as_str()?.parse::<f64>().ok()?;
101    let low = k.get("l")?.as_str()?.parse::<f64>().ok()?;
102    let close = k.get("c")?.as_str()?.parse::<f64>().ok()?;
103    let volume = k.get("v")?.as_str()?.parse::<f64>().ok()?;
104
105    Some(Candle {
106        time,
107        open,
108        high,
109        low,
110        close,
111        volume,
112    })
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn parse_valid_kline() {
121        let msg = r#"{"e":"kline","E":1234567890123,"s":"BTCUSDT","k":{"t":1234567890000,"T":1234567949999,"s":"BTCUSDT","i":"1m","f":100,"L":200,"o":"50000.00","c":"50100.00","h":"50200.00","l":"49900.00","v":"10.50","n":300,"x":false,"q":"525000.00","V":"5.00","Q":"250000.00","B":"0"}}"#;
122        let candle = parse_kline_message(msg).unwrap();
123        assert!((candle.open - 50000.0).abs() < 0.01);
124        assert!((candle.close - 50100.0).abs() < 0.01);
125        assert!((candle.high - 50200.0).abs() < 0.01);
126        assert!((candle.low - 49900.0).abs() < 0.01);
127        assert!((candle.volume - 10.5).abs() < 0.01);
128        assert!((candle.time - 1234567890.0).abs() < 0.01);
129    }
130
131    #[test]
132    fn parse_invalid_json() {
133        assert!(parse_kline_message("not json").is_none());
134    }
135
136    #[test]
137    fn parse_missing_fields() {
138        let msg = r#"{"e":"kline","k":{"t":1000}}"#;
139        assert!(parse_kline_message(msg).is_none());
140    }
141
142    #[test]
143    fn ws_stream_creation() {
144        let ws = WsStream::new("btcusdt", "1m");
145        assert_eq!(ws.symbol, "btcusdt");
146        assert_eq!(ws.interval, "1m");
147    }
148}