use std::sync::OnceLock;
use std::time::Duration;
use serde_json::{json, Value};
use url::Url;
use crate::core::rt::WsFrame;
use crate::core::traits::Credentials;
use crate::core::types::{AccountType, WebSocketError};
use crate::core::websocket::{
KlineInterval, StreamKind, StreamSpec,
TopicKey, TopicRegistry,
WsProtocol,
};
use super::parser as kraken_parser;
static REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct KrakenProtocol;
impl KrakenProtocol {
pub(crate) fn channel_name(kind: &StreamKind) -> Option<&'static str> {
match kind {
StreamKind::Ticker => Some("ticker"),
StreamKind::Trade => Some("trade"),
StreamKind::Orderbook | StreamKind::OrderbookDelta => Some("book"),
StreamKind::Kline { .. } => Some("ohlc"),
StreamKind::MarketWarning => Some("instrument"),
_ => None,
}
}
fn format_symbol(spec: &StreamSpec) -> Result<String, WebSocketError> {
let resolved = spec
.symbol
.resolve(crate::core::types::ExchangeId::Kraken, spec.account_type)
.map_err(|e| {
WebSocketError::NotSupported(format!(
"kraken: symbol normalization failed: {}",
e
))
})?;
Ok(to_kraken_ws_symbol(&resolved.to_string()))
}
fn build_frame(op: &str, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let channel = Self::channel_name(&spec.kind).ok_or_else(|| {
WebSocketError::NotSupported(format!(
"Kraken v2 has no public WS channel for {:?} — use REST for this data kind",
spec.kind
))
})?;
let symbol = Self::format_symbol(spec)?;
let mut params = json!({
"channel": channel,
"symbol": [symbol],
});
match &spec.kind {
StreamKind::Orderbook | StreamKind::OrderbookDelta => {
let depth = spec.depth.unwrap_or(10) as u64;
params["depth"] = json!(depth);
}
StreamKind::Kline { interval } => {
let minutes = kline_interval_to_minutes(interval);
params["interval"] = json!(minutes);
}
_ => {}
}
let frame = json!({
"method": op,
"params": params,
});
Ok(WsFrame::Text(frame.to_string()))
}
}
impl WsProtocol for KrakenProtocol {
fn name(&self) -> &'static str {
"kraken"
}
fn endpoint(&self, _account_type: AccountType, _testnet: bool) -> Url {
Url::parse("wss://ws.kraken.com/v2").expect("kraken ws endpoint is valid")
}
fn ping_frame(&self) -> Option<WsFrame> {
Some(WsFrame::Text(
r#"{"method":"ping"}"#.to_string(),
))
}
fn ping_interval(&self) -> Duration {
Duration::from_secs(30)
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
Self::build_frame("subscribe", spec)
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
Self::build_frame("unsubscribe", spec)
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn is_pong(&self, raw: &Value) -> bool {
raw.get("method").and_then(|v| v.as_str()) == Some("pong")
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
let method = raw.get("method").and_then(|v| v.as_str());
matches!(method, Some("subscribe") | Some("unsubscribe"))
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
if raw.get("method").is_some() {
return None;
}
let channel = raw.get("channel").and_then(|v| v.as_str())?;
match channel {
"heartbeat" | "status" => None,
_ => Some(TopicKey::new(channel)),
}
}
fn topic_registry(&self, _account_type: AccountType) -> &TopicRegistry {
REGISTRY.get_or_init(build_registry)
}
}
fn build_registry() -> TopicRegistry {
let at = AccountType::Spot;
TopicRegistry::builder()
.register(StreamKind::Ticker, at, "ticker", kraken_parser::parse_ws_ticker)
.register(StreamKind::Trade, at, "trade", kraken_parser::parse_ws_trade)
.register(StreamKind::Orderbook, at, "book", kraken_parser::parse_ws_book)
.register(StreamKind::OrderbookDelta, at, "book", kraken_parser::parse_ws_book)
.register(
StreamKind::Kline { interval: KlineInterval::new("") },
at,
"ohlc",
kraken_parser::parse_ws_ohlc,
)
.register(StreamKind::MarketWarning, at, "instrument", kraken_parser::parse_ws_instrument)
.build()
}
pub(crate) fn to_kraken_ws_symbol(sym: &str) -> String {
if sym.contains('/') {
return sym.to_string();
}
const KNOWN_QUOTES: &[&str] = &["USDT", "USDC", "EUR", "USD", "GBP", "AUD", "CHF", "JPY"];
for q in KNOWN_QUOTES {
if let Some(base) = sym.to_uppercase().strip_suffix(*q) {
if !base.is_empty() {
let ws_base = if base == "XBT" { "BTC" } else { base };
return format!("{}/{}", ws_base, q);
}
}
}
sym.to_string()
}
fn kline_interval_to_minutes(interval: &KlineInterval) -> u32 {
match interval.as_str() {
"1m" => 1,
"5m" => 5,
"15m" => 15,
"30m" => 30,
"1h" => 60,
"4h" => 240,
"1d" => 1440,
"1w" => 10080,
_ => 1,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::OwnedSymbolInput;
use crate::core::websocket::StreamSpec;
fn make_spec(kind: StreamKind, sym: &str) -> StreamSpec {
StreamSpec {
kind,
symbol: OwnedSymbolInput::Raw(sym.to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
#[test]
fn subscribe_frame_ticker() {
let proto = KrakenProtocol;
let spec = make_spec(StreamKind::Ticker, "BTC/USD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["method"], "subscribe");
assert_eq!(v["params"]["channel"], "ticker");
assert_eq!(v["params"]["symbol"][0], "BTC/USD");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_trade() {
let proto = KrakenProtocol;
let spec = make_spec(StreamKind::Trade, "BTC/USD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["params"]["channel"], "trade");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_book_includes_depth() {
let proto = KrakenProtocol;
let mut spec = make_spec(StreamKind::Orderbook, "BTC/USD");
spec.depth = Some(25);
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["params"]["channel"], "book");
assert_eq!(v["params"]["depth"], 25);
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_ohlc_includes_interval() {
let proto = KrakenProtocol;
let spec = make_spec(StreamKind::Kline { interval: KlineInterval::new("1h") }, "BTC/USD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["params"]["channel"], "ohlc");
assert_eq!(v["params"]["interval"], 60);
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_unsupported_returns_not_supported() {
let proto = KrakenProtocol;
let spec = make_spec(StreamKind::Liquidation, "BTC/USD");
let result = proto.subscribe_frame(&spec);
assert!(
matches!(result, Err(WebSocketError::NotSupported(_))),
"Liquidation must return NotSupported, got {:?}",
result
);
}
#[test]
fn extract_topic_ticker_data_frame() {
let proto = KrakenProtocol;
let raw = serde_json::json!({
"channel": "ticker",
"type": "snapshot",
"data": []
});
assert_eq!(proto.extract_topic(&raw), Some(TopicKey::new("ticker")));
}
#[test]
fn extract_topic_pong_returns_none() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"method": "pong", "time_in": "...", "time_out": "..."});
assert_eq!(proto.extract_topic(&raw), None);
}
#[test]
fn extract_topic_subscribe_ack_returns_none() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"method": "subscribe", "success": true});
assert_eq!(proto.extract_topic(&raw), None);
}
#[test]
fn extract_topic_heartbeat_returns_none() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"channel": "heartbeat"});
assert_eq!(proto.extract_topic(&raw), None);
}
#[test]
fn extract_topic_status_returns_none() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"channel": "status", "data": [{"api_version": "v2"}]});
assert_eq!(proto.extract_topic(&raw), None);
}
#[test]
fn is_pong_true_for_method_pong() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"method": "pong"});
assert!(proto.is_pong(&raw));
}
#[test]
fn is_pong_false_for_data_frame() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"channel": "ticker", "data": []});
assert!(!proto.is_pong(&raw));
}
#[test]
fn is_subscribe_ack_true_for_subscribe() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"method": "subscribe", "success": true});
assert!(proto.is_subscribe_ack(&raw));
}
#[test]
fn is_subscribe_ack_true_for_unsubscribe() {
let proto = KrakenProtocol;
let raw = serde_json::json!({"method": "unsubscribe", "success": true});
assert!(proto.is_subscribe_ack(&raw));
}
#[test]
fn ping_frame_is_json_ping() {
let proto = KrakenProtocol;
let frame = proto.ping_frame();
assert!(frame.is_some());
if let Some(WsFrame::Text(s)) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["method"], "ping");
} else {
panic!("expected Some(Text)");
}
}
#[test]
fn topic_registry_covers_public_channels() {
let proto = KrakenProtocol;
let reg = proto.topic_registry(AccountType::Spot);
let at = AccountType::Spot;
assert!(reg.supports(&StreamKind::Ticker, at), "Ticker");
assert!(reg.supports(&StreamKind::Trade, at), "Trade");
assert!(reg.supports(&StreamKind::Orderbook, at), "Orderbook");
assert!(reg.supports(&StreamKind::OrderbookDelta, at), "OrderbookDelta");
assert!(
reg.supports(&StreamKind::Kline { interval: KlineInterval::new("") }, at),
"Kline"
);
assert!(reg.supports(&StreamKind::MarketWarning, at), "MarketWarning");
}
#[test]
fn book_channel_has_two_parsers() {
let proto = KrakenProtocol;
let reg = proto.topic_registry(AccountType::Spot);
let key = TopicKey::new("book");
let parsers = reg.dispatch_all(&key);
assert_eq!(parsers.len(), 1, "book channel must have 1 unique parser (de-duped)");
}
#[test]
fn ws_symbol_xbtusd_to_btc_usd() {
assert_eq!(to_kraken_ws_symbol("XBTUSD"), "BTC/USD");
}
#[test]
fn ws_symbol_btcusd_to_btc_usd() {
assert_eq!(to_kraken_ws_symbol("BTCUSD"), "BTC/USD");
}
#[test]
fn ws_symbol_btc_usd_idempotent() {
assert_eq!(to_kraken_ws_symbol("BTC/USD"), "BTC/USD");
}
#[test]
fn ws_symbol_xbtusdt_to_btc_usdt() {
assert_eq!(to_kraken_ws_symbol("XBTUSDT"), "BTC/USDT");
}
#[test]
fn ws_symbol_ethusdt_to_eth_usdt() {
assert_eq!(to_kraken_ws_symbol("ETHUSDT"), "ETH/USDT");
}
#[test]
fn ws_symbol_ethusd_to_eth_usd() {
assert_eq!(to_kraken_ws_symbol("ETHUSD"), "ETH/USD");
}
#[test]
fn ws_symbol_unknown_passthrough() {
assert_eq!(to_kraken_ws_symbol("BTCXXX"), "BTCXXX");
}
#[test]
fn subscribe_frame_ticker_xbtusd_normalised() {
let proto = KrakenProtocol;
let spec = make_spec(StreamKind::Ticker, "XBTUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(
v["params"]["symbol"][0], "BTC/USD",
"XBTUSD must be normalised to BTC/USD for Kraken WS v2"
);
} else {
panic!("expected Text frame");
}
}
#[test]
fn kline_interval_to_minutes_coverage() {
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("1m")), 1);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("5m")), 5);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("15m")), 15);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("30m")), 30);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("1h")), 60);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("4h")), 240);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("1d")), 1440);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("1w")), 10080);
assert_eq!(kline_interval_to_minutes(&KlineInterval::new("2m")), 1);
}
}