use futures_util::SinkExt;
use futures_util::StreamExt;
use serde::Deserialize;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
use crate::error::{Error, Result};
use wickra_core::Candle;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Interval {
OneSecond,
OneMinute,
ThreeMinutes,
FiveMinutes,
FifteenMinutes,
ThirtyMinutes,
OneHour,
TwoHours,
FourHours,
SixHours,
EightHours,
TwelveHours,
OneDay,
OneWeek,
}
impl Interval {
pub fn as_str(self) -> &'static str {
match self {
Self::OneSecond => "1s",
Self::OneMinute => "1m",
Self::ThreeMinutes => "3m",
Self::FiveMinutes => "5m",
Self::FifteenMinutes => "15m",
Self::ThirtyMinutes => "30m",
Self::OneHour => "1h",
Self::TwoHours => "2h",
Self::FourHours => "4h",
Self::SixHours => "6h",
Self::EightHours => "8h",
Self::TwelveHours => "12h",
Self::OneDay => "1d",
Self::OneWeek => "1w",
}
}
}
#[derive(Debug, Clone)]
pub struct KlineEvent {
pub symbol: String,
pub interval: Interval,
pub candle: Candle,
pub is_closed: bool,
}
#[derive(Debug)]
pub struct BinanceKlineStream {
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
interval: Interval,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RawWsEnvelope {
pub stream: String,
pub data: RawKlinePayload,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RawKlinePayload {
#[serde(rename = "e")]
pub event_type: String,
#[serde(rename = "E")]
pub event_time: i64,
#[serde(rename = "s")]
pub symbol: String,
#[serde(rename = "k")]
pub kline: RawKline,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RawKline {
#[serde(rename = "t")]
pub open_time: i64,
#[serde(rename = "T")]
pub close_time: i64,
#[serde(rename = "s")]
pub symbol: String,
#[serde(rename = "i")]
pub interval: String,
#[serde(rename = "o")]
pub open: String,
#[serde(rename = "c")]
pub close: String,
#[serde(rename = "h")]
pub high: String,
#[serde(rename = "l")]
pub low: String,
#[serde(rename = "v")]
pub volume: String,
#[serde(rename = "x")]
pub is_closed: bool,
}
impl BinanceKlineStream {
pub async fn connect(symbols: &[String], interval: Interval) -> Result<Self> {
if symbols.is_empty() {
return Err(Error::Malformed(
"BinanceKlineStream requires at least one symbol".into(),
));
}
let streams: Vec<String> = symbols
.iter()
.map(|s| format!("{}@kline_{}", s.to_lowercase(), interval.as_str()))
.collect();
let url = format!(
"wss://stream.binance.com:9443/stream?streams={}",
streams.join("/")
);
let url = url::Url::parse(&url).map_err(|e| Error::Malformed(e.to_string()))?;
let (socket, _) = tokio_tungstenite::connect_async(url.as_str()).await?;
Ok(Self { socket, interval })
}
pub async fn next_event(&mut self) -> Result<Option<KlineEvent>> {
loop {
let msg = match self.socket.next().await {
Some(Ok(m)) => m,
Some(Err(e)) => return Err(Error::from(e)),
None => return Ok(None),
};
match msg {
Message::Text(text) => {
let envelope: RawWsEnvelope = serde_json::from_str(&text)?;
return Ok(Some(envelope.into_event(self.interval)?));
}
Message::Binary(bytes) => {
let envelope: RawWsEnvelope = serde_json::from_slice(&bytes)?;
return Ok(Some(envelope.into_event(self.interval)?));
}
Message::Ping(payload) => {
self.socket.send(Message::Pong(payload)).await?;
}
Message::Pong(_) | Message::Frame(_) => {}
Message::Close(_) => return Ok(None),
}
}
}
pub async fn close(mut self) -> Result<()> {
self.socket.close(None).await?;
Ok(())
}
}
impl RawWsEnvelope {
fn into_event(self, interval: Interval) -> Result<KlineEvent> {
let k = self.data.kline;
let open: f64 = k
.open
.parse()
.map_err(|_| Error::Malformed(format!("bad open '{}'", k.open)))?;
let high: f64 = k
.high
.parse()
.map_err(|_| Error::Malformed(format!("bad high '{}'", k.high)))?;
let low: f64 = k
.low
.parse()
.map_err(|_| Error::Malformed(format!("bad low '{}'", k.low)))?;
let close: f64 = k
.close
.parse()
.map_err(|_| Error::Malformed(format!("bad close '{}'", k.close)))?;
let volume: f64 = k
.volume
.parse()
.map_err(|_| Error::Malformed(format!("bad volume '{}'", k.volume)))?;
let candle = Candle::new(open, high, low, close, volume, k.open_time)?;
Ok(KlineEvent {
symbol: self.data.symbol.to_lowercase(),
interval,
candle,
is_closed: k.is_closed,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_real_binance_payload() {
let json = r#"{
"stream": "btcusdt@kline_1m",
"data": {
"e": "kline",
"E": 1700000000000,
"s": "BTCUSDT",
"k": {
"t": 1700000000000,
"T": 1700000059999,
"s": "BTCUSDT",
"i": "1m",
"f": 1,
"L": 100,
"o": "30000.0",
"c": "30050.0",
"h": "30100.0",
"l": "29950.0",
"v": "12.5",
"n": 50,
"x": false,
"q": "375000.0",
"V": "6.25",
"Q": "187500.0",
"B": "0"
}
}
}"#;
let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
let evt = env.into_event(Interval::OneMinute).unwrap();
assert_eq!(evt.symbol, "btcusdt");
assert_eq!(evt.candle.open, 30_000.0);
assert_eq!(evt.candle.close, 30_050.0);
assert!(!evt.is_closed);
assert_eq!(evt.interval, Interval::OneMinute);
}
#[test]
fn rejects_non_parsable_numbers() {
let json = r#"{
"stream": "btcusdt@kline_1m",
"data": {
"e": "kline", "E": 0, "s": "BTCUSDT",
"k": {
"t": 0, "T": 0, "s": "BTCUSDT", "i": "1m",
"f": 0, "L": 0,
"o": "not-a-number", "c": "0", "h": "0", "l": "0",
"v": "0", "n": 0, "x": false, "q": "0", "V": "0", "Q": "0", "B": "0"
}
}
}"#;
let env: RawWsEnvelope = serde_json::from_str(json).unwrap();
let err = env.into_event(Interval::OneMinute).unwrap_err();
assert!(matches!(err, Error::Malformed(_)));
}
}