use crate::{
clients::{disconnect::WssExitReason, wss::WssDecoder},
sources::kraken::decoder::KrakenDecoder,
};
use futures_util::{SinkExt, StreamExt};
use tokio::{
sync::mpsc,
time::{Duration, interval},
};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tracing::{error, info, warn};
use url::Url;
use crate::sources::kraken::events::KrakenWssEvent;
const KRAKEN_WSS_URL: &str = "wss://ws.kraken.com/v2";
const PING_INTERVAL_SECS: u64 = 30;
pub struct KrakenWssClient {
base_url: String,
channels: Vec<String>,
symbols: Vec<String>,
book_depth: usize,
}
impl KrakenWssClient {
pub fn new(channels: Vec<String>, symbols: Vec<String>, book_depth: usize) -> Self {
Self {
base_url: KRAKEN_WSS_URL.to_string(),
channels,
symbols,
book_depth,
}
}
pub fn with_url(
base_url: impl Into<String>,
channels: Vec<String>,
symbols: Vec<String>,
book_depth: usize,
) -> Self {
Self {
base_url: base_url.into(),
channels,
symbols,
book_depth,
}
}
pub async fn receive_data(
&self,
tx: mpsc::Sender<KrakenWssEvent>,
) -> WssExitReason {
let url = match Url::parse(&self.base_url) {
Ok(u) => u,
Err(e) => return WssExitReason::ConnectionFailed(format!("URL parse: {e}")),
};
let (ws_stream, _) = match connect_async(url).await {
Ok(s) => s,
Err(e) => return WssExitReason::Transport(e),
};
info!("WebSocket connected to Kraken");
let (mut writer, mut reader) = ws_stream.split();
for channel in &self.channels {
let mut params = serde_json::json!({
"channel": channel,
"symbol": self.symbols,
});
if channel == "book" {
params["depth"] = serde_json::json!(self.book_depth);
}
let sub_msg = serde_json::json!({
"method": "subscribe",
"params": params,
})
.to_string();
if let Err(e) = writer.send(Message::Text(sub_msg)).await {
return WssExitReason::Transport(e);
}
}
let (ping_tx, mut ping_rx) = mpsc::channel::<()>(1);
let ping_handle = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(PING_INTERVAL_SECS));
loop {
ticker.tick().await;
if ping_tx.send(()).await.is_err() {
break;
}
}
});
let mut exit_reason = WssExitReason::StreamEnded;
loop {
tokio::select! {
msg = reader.next() => {
match msg {
Some(Ok(Message::Text(txt))) => {
match KrakenDecoder::decode(&txt) {
Ok(Some(event)) => {
if tx.send(event).await.is_err() {
exit_reason = WssExitReason::ReceiverDropped;
break;
}
}
Ok(None) => {}
Err(e) => warn!("Kraken decode error: {}", e),
}
}
Some(Ok(Message::Ping(p))) => {
let _ = writer.send(Message::Pong(p)).await;
}
Some(Ok(Message::Close(f))) => {
info!("Kraken server closed connection: {:?}", f);
exit_reason = WssExitReason::ServerClose(f);
break;
}
Some(Err(e)) => {
error!("Kraken ws error: {}", e);
exit_reason = WssExitReason::Transport(e);
break;
}
None => break, _ => {}
}
}
_ = ping_rx.recv() => {
let ping_msg = serde_json::json!({
"method": "ping",
}).to_string();
match writer.send(Message::Text(ping_msg)).await {
Ok(()) => {}
Err(e) => {
exit_reason = WssExitReason::HeartbeatWriteFailed(e);
break;
}
}
}
}
}
ping_handle.abort();
exit_reason
}
}