termichart_data/
ws_stream.rs1use std::sync::mpsc;
6use std::thread;
7use termichart_core::Candle;
8
9pub struct WsStream {
11 symbol: String,
12 interval: String,
13}
14
15impl WsStream {
16 pub fn new(symbol: impl Into<String>, interval: impl Into<String>) -> Self {
18 Self {
19 symbol: symbol.into().to_lowercase(),
20 interval: interval.into(),
21 }
22 }
23
24 pub fn start(&self) -> mpsc::Receiver<Candle> {
29 let (tx, rx) = mpsc::channel();
30 let url = format!(
31 "wss://stream.binance.com:9443/ws/{}@kline_{}",
32 self.symbol, self.interval
33 );
34
35 let url_clone = url.clone();
36 thread::spawn(move || {
37 if let Err(e) = run_ws_loop(&url_clone, &tx) {
38 eprintln!("WebSocket error: {}", e);
39 }
40 });
41
42 rx
43 }
44}
45
46fn run_ws_loop(
47 url: &str,
48 tx: &mpsc::Sender<Candle>,
49) -> std::result::Result<(), Box<dyn std::error::Error>> {
50 use tungstenite::connect;
51
52 let parsed_url = url::Url::parse(url)?;
53 let (mut socket, _response) = connect(parsed_url)?;
54
55 loop {
56 let msg = socket.read()?;
57
58 match msg {
59 tungstenite::Message::Text(text) => {
60 if let Some(candle) = parse_kline_message(&text) {
61 if tx.send(candle).is_err() {
62 break;
64 }
65 }
66 }
67 tungstenite::Message::Close(_) => break,
68 tungstenite::Message::Ping(data) => {
69 let _ = socket.write(tungstenite::Message::Pong(data));
70 }
71 _ => {}
72 }
73 }
74
75 Ok(())
76}
77
78fn parse_kline_message(text: &str) -> Option<Candle> {
95 let v: serde_json::Value = serde_json::from_str(text).ok()?;
96 let k = v.get("k")?;
97
98 let time = k.get("t")?.as_f64()? / 1000.0;
99 let open = k.get("o")?.as_str()?.parse::<f64>().ok()?;
100 let high = k.get("h")?.as_str()?.parse::<f64>().ok()?;
101 let low = k.get("l")?.as_str()?.parse::<f64>().ok()?;
102 let close = k.get("c")?.as_str()?.parse::<f64>().ok()?;
103 let volume = k.get("v")?.as_str()?.parse::<f64>().ok()?;
104
105 Some(Candle {
106 time,
107 open,
108 high,
109 low,
110 close,
111 volume,
112 })
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn parse_valid_kline() {
121 let msg = r#"{"e":"kline","E":1234567890123,"s":"BTCUSDT","k":{"t":1234567890000,"T":1234567949999,"s":"BTCUSDT","i":"1m","f":100,"L":200,"o":"50000.00","c":"50100.00","h":"50200.00","l":"49900.00","v":"10.50","n":300,"x":false,"q":"525000.00","V":"5.00","Q":"250000.00","B":"0"}}"#;
122 let candle = parse_kline_message(msg).unwrap();
123 assert!((candle.open - 50000.0).abs() < 0.01);
124 assert!((candle.close - 50100.0).abs() < 0.01);
125 assert!((candle.high - 50200.0).abs() < 0.01);
126 assert!((candle.low - 49900.0).abs() < 0.01);
127 assert!((candle.volume - 10.5).abs() < 0.01);
128 assert!((candle.time - 1234567890.0).abs() < 0.01);
129 }
130
131 #[test]
132 fn parse_invalid_json() {
133 assert!(parse_kline_message("not json").is_none());
134 }
135
136 #[test]
137 fn parse_missing_fields() {
138 let msg = r#"{"e":"kline","k":{"t":1000}}"#;
139 assert!(parse_kline_message(msg).is_none());
140 }
141
142 #[test]
143 fn ws_stream_creation() {
144 let ws = WsStream::new("btcusdt", "1m");
145 assert_eq!(ws.symbol, "btcusdt");
146 assert_eq!(ws.interval, "1m");
147 }
148}