use serde_json::Value;
use uuid::Uuid;
use crate::actors::{
AdvancedOrderUpdate, BalanceUpdate, DataMessage, ExchangeConnector, InstrumentEvent,
OrderBookData, OrderUpdate, PositionChange, TickerData, TradeData, TradeSide, WebSocketConfig,
};
use crate::connectors::KucoinEnv;
use crate::error::Result;
use crate::ws::types::{WsMessage, WsToken};
#[derive(Debug)]
pub struct KucoinConnector {
pub negotiated_url: String,
pub ping_interval_secs: u64,
pub env: KucoinEnv,
}
impl KucoinConnector {
pub fn new(token_data: &WsToken, env: KucoinEnv) -> Result<Self> {
let server = token_data.instance_servers.first().ok_or_else(|| {
crate::error::ExchangeError::Config("KuCoin returned no instance servers".into())
})?;
let negotiated_url = format!(
"{}?token={}&connectId={}",
server.endpoint,
token_data.token,
Uuid::new_v4()
);
Ok(Self {
negotiated_url,
ping_interval_secs: server.ping_interval / 1000,
env,
})
}
pub fn trade_subscription(&self, symbol: &str) -> Option<String> {
let topic = match self.env {
KucoinEnv::LiveFutures => format!("/contractMarket/execution:{symbol}"),
_ => format!("/market/match:{symbol}"),
};
Self::build_sub(topic, false)
}
pub fn ticker_subscription(&self, symbol: &str) -> Option<String> {
let topic = match self.env {
KucoinEnv::LiveFutures => format!("/contractMarket/tickerV2:{symbol}"),
_ => format!("/market/ticker:{symbol}"),
};
Self::build_sub(topic, false)
}
pub fn orderbook_depth_subscription(&self, symbol: &str, depth: u8) -> Option<String> {
let d = if depth <= 5 { 5u8 } else { 50u8 };
let topic = match self.env {
KucoinEnv::LiveFutures => format!("/contractMarket/level2Depth{d}:{symbol}"),
_ => format!("/spotMarket/level2Depth{d}:{symbol}"),
};
Self::build_sub(topic, false)
}
pub fn orderbook_l2_subscription(&self, symbol: &str) -> Option<String> {
let topic = match self.env {
KucoinEnv::LiveFutures => format!("/contractMarket/level2:{symbol}"),
_ => format!("/market/level2:{symbol}"),
};
Self::build_sub(topic, false)
}
pub fn order_updates_subscription(&self) -> Option<String> {
Self::build_sub("/contractMarket/tradeOrders".to_string(), true)
}
pub fn position_subscription(&self, symbol: &str) -> Option<String> {
Self::build_sub(format!("/contract/position:{symbol}"), true)
}
pub fn balance_subscription(&self) -> Option<String> {
Self::build_sub("/contractAccount/wallet".to_string(), true)
}
pub fn instrument_subscription(&self, symbol: &str) -> Option<String> {
Self::build_sub(format!("/contract/instrument:{symbol}"), false)
}
pub fn stop_orders_subscription(&self) -> Option<String> {
Self::build_sub("/contractMarket/advancedOrders".to_string(), true)
}
fn build_sub(topic: String, private_channel: bool) -> Option<String> {
let msg = WsMessage {
id: Uuid::new_v4().to_string(),
msg_type: "subscribe".to_string(),
topic: Some(topic),
subject: None,
data: None,
private_channel: Some(private_channel),
response: Some(true),
};
serde_json::to_string(&msg).ok()
}
}
impl ExchangeConnector for KucoinConnector {
fn exchange_name(&self) -> &'static str {
"kucoin"
}
fn ws_url(&self) -> &str {
&self.negotiated_url
}
fn build_ws_config(&self, symbol: &str) -> WebSocketConfig {
WebSocketConfig {
url: self.negotiated_url.clone(),
exchange: self.exchange_name().to_string(),
symbol: symbol.to_string(),
subscription_msg: self.trade_subscription(symbol),
ping_interval_secs: self.ping_interval_secs,
reconnect_delay_secs: 5,
max_reconnect_attempts: 10,
}
}
fn subscription_message(&self, symbol: &str) -> Option<String> {
self.trade_subscription(symbol)
}
fn parse_message(&self, raw: &str) -> Result<Vec<DataMessage>> {
let msg: WsMessage = serde_json::from_str(raw)?;
match msg.msg_type.as_str() {
"message" => {
let topic = msg.topic.as_deref().unwrap_or("");
let Some(data) = msg.data else {
return Ok(vec![]);
};
let symbol = extract_symbol(topic);
let exchange = self.exchange_name();
if topic.contains("/contractMarket/execution") || topic.contains("/market/match") {
Ok(parse_trade(symbol, exchange, &data))
} else if topic.contains("/contractMarket/tickerV2")
|| topic.contains("/contractMarket/ticker")
|| topic.contains("/market/ticker")
{
Ok(parse_ticker(symbol, exchange, &data))
} else if topic.contains("level2Depth") {
Ok(parse_orderbook_depth(symbol, exchange, &data))
} else if topic.contains("level2") {
Ok(parse_level2_delta(symbol, exchange, &data))
} else if topic.contains("/contractMarket/tradeOrders") {
Ok(parse_order_update(exchange, &data))
} else if topic.contains("/contract/position") {
Ok(parse_position_change(symbol, exchange, &data))
} else if topic.contains("/contractAccount/wallet") {
Ok(parse_balance_update(exchange, &data))
} else if topic.contains("/contract/instrument") {
let subject = msg.subject.as_deref().unwrap_or("unknown");
Ok(parse_instrument_event(symbol, exchange, subject, &data))
} else if topic.contains("/contractMarket/advancedOrders") {
Ok(parse_advanced_order_update(exchange, &data))
} else {
Ok(vec![])
}
}
_ => Ok(vec![]),
}
}
}
fn extract_symbol(topic: &str) -> &str {
topic.split(':').next_back().unwrap_or("UNKNOWN")
}
fn str_f64(data: &Value, key: &str) -> f64 {
data.get(key)
.and_then(|v| {
if let Some(s) = v.as_str() {
s.parse().ok()
} else {
v.as_f64()
}
})
.unwrap_or(0.0)
}
fn str_u32(data: &Value, key: &str) -> u32 {
data.get(key)
.and_then(|v| {
if let Some(s) = v.as_str() {
s.parse().ok()
} else {
v.as_u64().map(|n| n as u32)
}
})
.unwrap_or(0)
}
fn first_f64(data: &Value, keys: &[&str]) -> f64 {
for key in keys {
let v = str_f64(data, key);
if v != 0.0 {
return v;
}
}
0.0
}
fn parse_trade(symbol: &str, exchange: &str, data: &Value) -> Vec<DataMessage> {
let side = match data["side"].as_str().unwrap_or("buy") {
s if s.eq_ignore_ascii_case("sell") => TradeSide::Sell,
_ => TradeSide::Buy,
};
let exchange_ts = data["ts"]
.as_i64()
.map(|ns| ns / 1_000_000)
.or_else(|| data["time"].as_str().and_then(|t| t.parse::<i64>().ok()))
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
let trade_id = data["tradeId"]
.as_str()
.or_else(|| data["makerOrderId"].as_str())
.unwrap_or("")
.to_string();
vec![DataMessage::Trade(TradeData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
side,
price: str_f64(data, "price"),
amount: str_f64(data, "size"),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
trade_id,
})]
}
fn parse_ticker(symbol: &str, exchange: &str, data: &Value) -> Vec<DataMessage> {
let best_bid = first_f64(data, &["bestBidPrice", "bestBid"]);
let best_ask = first_f64(data, &["bestAskPrice", "bestAsk"]);
let exchange_ts = data["ts"]
.as_i64()
.map(|ts| {
if ts > 1_700_000_000_000_i64 * 1_000_000 {
ts / 1_000_000
} else {
ts
}
})
.or_else(|| data["time"].as_i64())
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::Ticker(TickerData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
price: str_f64(data, "price"),
best_bid,
best_ask,
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}
fn parse_orderbook_depth(symbol: &str, exchange: &str, data: &Value) -> Vec<DataMessage> {
let parse_levels = |arr: &Value| -> Vec<[f64; 2]> {
arr.as_array()
.map(|rows| {
rows.iter()
.filter_map(|row| {
let price = row.get(0).and_then(|v| {
v.as_str()
.and_then(|s| s.parse().ok())
.or_else(|| v.as_f64())
})?;
let qty = row.get(1).and_then(|v| {
v.as_str()
.and_then(|s| s.parse().ok())
.or_else(|| v.as_f64())
})?;
Some([price, qty])
})
.collect()
})
.unwrap_or_default()
};
let exchange_ts = data["ts"]
.as_i64()
.or_else(|| data["timestamp"].as_i64())
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::OrderBook(OrderBookData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
asks: parse_levels(&data["asks"]),
bids: parse_levels(&data["bids"]),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
is_snapshot: true,
})]
}
fn parse_level2_delta(symbol: &str, exchange: &str, data: &Value) -> Vec<DataMessage> {
let change_str = data["change"].as_str().unwrap_or("");
let mut parts = change_str.splitn(3, ',');
let price: f64 = parts.next().and_then(|s| s.parse().ok()).unwrap_or(0.0);
let side = parts.next().unwrap_or("sell");
let qty: f64 = parts.next().and_then(|s| s.parse().ok()).unwrap_or(0.0);
if price == 0.0 {
return vec![];
}
let entry = [price, qty];
let (asks, bids) = if side.eq_ignore_ascii_case("sell") {
(vec![entry], vec![])
} else {
(vec![], vec![entry])
};
let exchange_ts = data["timestamp"]
.as_i64()
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::OrderBook(OrderBookData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
asks,
bids,
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
is_snapshot: false,
})]
}
fn parse_order_update(exchange: &str, data: &Value) -> Vec<DataMessage> {
let side = match data["side"].as_str().unwrap_or("buy") {
s if s.eq_ignore_ascii_case("sell") => TradeSide::Sell,
_ => TradeSide::Buy,
};
let exchange_ts = data["ts"]
.as_i64()
.map(|ns| ns / 1_000_000)
.or_else(|| data["updatedAt"].as_i64())
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::OrderUpdate(OrderUpdate {
symbol: data["symbol"].as_str().unwrap_or("").to_string(),
exchange: exchange.to_string(),
order_id: data["orderId"].as_str().unwrap_or("").to_string(),
client_oid: data["clientOid"].as_str().map(str::to_string),
side,
order_type: data["type"].as_str().unwrap_or("market").to_string(),
status: data["status"].as_str().unwrap_or("").to_string(),
price: str_f64(data, "price"),
size: str_u32(data, "size"),
filled_size: str_u32(data, "filledSize"),
remaining_size: str_u32(data, "remainSize"),
fee: str_f64(data, "fee"),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}
fn parse_position_change(symbol: &str, exchange: &str, data: &Value) -> Vec<DataMessage> {
let exchange_ts = data["currentTimestamp"]
.as_i64()
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::PositionChange(PositionChange {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
current_qty: data["currentQty"].as_i64().unwrap_or(0) as i32,
avg_entry_price: str_f64(data, "avgEntryPrice"),
unrealised_pnl: str_f64(data, "unrealisedPnl"),
realised_pnl: str_f64(data, "realisedPnl"),
change_reason: data["changeReason"]
.as_str()
.unwrap_or("unknown")
.to_string(),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}
fn parse_balance_update(exchange: &str, data: &Value) -> Vec<DataMessage> {
let exchange_ts = data["timestamp"]
.as_i64()
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
vec![DataMessage::BalanceUpdate(BalanceUpdate {
exchange: exchange.to_string(),
currency: data["currency"].as_str().unwrap_or("").to_string(),
available_balance: str_f64(data, "availableBalance"),
hold_balance: str_f64(data, "holdBalance"),
event: data["event"].as_str().unwrap_or("unknown").to_string(),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}
fn parse_instrument_event(
symbol: &str,
exchange: &str,
subject: &str,
data: &Value,
) -> Vec<DataMessage> {
let exchange_ts = data["timestamp"]
.as_i64()
.or_else(|| data["ts"].as_i64())
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
let mark_price = if str_f64(data, "markPrice") != 0.0 {
Some(str_f64(data, "markPrice"))
} else {
None
};
let index_price = if str_f64(data, "indexPrice") != 0.0 {
Some(str_f64(data, "indexPrice"))
} else {
None
};
let funding_rate = if str_f64(data, "fundingRate") != 0.0 {
Some(str_f64(data, "fundingRate"))
} else {
None
};
let predicted_funding_rate = if str_f64(data, "predictedValue") != 0.0 {
Some(str_f64(data, "predictedValue"))
} else {
None
};
let premium_index = if str_f64(data, "premiumIndex") != 0.0 {
Some(str_f64(data, "premiumIndex"))
} else {
None
};
vec![DataMessage::InstrumentEvent(InstrumentEvent {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
subject: subject.to_string(),
mark_price,
index_price,
funding_rate,
predicted_funding_rate,
premium_index,
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}
fn parse_advanced_order_update(exchange: &str, data: &Value) -> Vec<DataMessage> {
let side = match data["side"].as_str().unwrap_or("buy") {
s if s.eq_ignore_ascii_case("sell") => TradeSide::Sell,
_ => TradeSide::Buy,
};
let exchange_ts = data["ts"]
.as_i64()
.map(|ns| ns / 1_000_000)
.or_else(|| data["updatedAt"].as_i64())
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
let stop_price = {
let v = str_f64(data, "stopPrice");
if v != 0.0 { Some(v) } else { None }
};
let price = {
let v = str_f64(data, "price");
if v != 0.0 { Some(v) } else { None }
};
vec![DataMessage::AdvancedOrderUpdate(AdvancedOrderUpdate {
symbol: data["symbol"].as_str().unwrap_or("").to_string(),
exchange: exchange.to_string(),
order_id: data["orderId"].as_str().unwrap_or("").to_string(),
client_oid: data["clientOid"].as_str().map(str::to_string),
status: data["status"].as_str().unwrap_or("").to_string(),
side,
order_type: data["type"].as_str().unwrap_or("market").to_string(),
stop: data["stop"].as_str().map(str::to_string),
stop_price,
price,
size: str_u32(data, "size"),
exchange_ts,
receipt_ts: chrono::Utc::now().timestamp_millis(),
})]
}