use std::sync::OnceLock;
use serde_json::{json, Value};
use url::Url;
use uuid::Uuid;
use crate::core::rt::WsFrame;
use crate::core::traits::Credentials;
use crate::core::types::{
AccountType, OrderBook, OrderBookLevel, StreamEvent, WebSocketError, WebSocketResult,
};
use crate::core::websocket::{StreamKind, StreamSpec, TopicKey, TopicRegistry, WsProtocol};
use super::parser::UpbitParser;
static REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct UpbitProtocol;
impl UpbitProtocol {
pub fn new(_testnet: bool) -> Self {
Self
}
fn build_subscribe(upbit_type: &str, code: &str) -> WsFrame {
let frame = json!([
{"ticket": Uuid::new_v4().to_string()},
{"type": upbit_type, "codes": [code], "is_only_realtime": true},
{"format": "DEFAULT"}
]);
WsFrame::Text(frame.to_string())
}
fn resolve_code(spec: &StreamSpec) -> Result<String, WebSocketError> {
spec.symbol
.resolve(crate::core::types::ExchangeId::Upbit, spec.account_type)
.map(|s| s.to_ascii_uppercase())
.map_err(|e| {
WebSocketError::NotSupported(format!(
"upbit: symbol normalization failed: {}",
e
))
})
}
}
impl WsProtocol for UpbitProtocol {
fn name(&self) -> &'static str {
"upbit"
}
fn endpoint(&self, _account_type: AccountType, _testnet: bool) -> Url {
Url::parse("wss://api.upbit.com/websocket/v1")
.expect("upbit ws endpoint is valid")
}
fn ping_frame(&self) -> Option<WsFrame> {
None
}
fn uses_native_ping(&self) -> bool {
false
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let code = Self::resolve_code(spec)?;
match &spec.kind {
StreamKind::Trade => Ok(Self::build_subscribe("trade", &code)),
StreamKind::Orderbook => Ok(Self::build_subscribe("orderbook", &code)),
StreamKind::Ticker => Ok(Self::build_subscribe("ticker", &code)),
other => Err(WebSocketError::NotSupported(format!(
"Upbit WS has no public channel for {:?}",
other
))),
}
}
fn unsubscribe_frame(&self, _spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
Err(WebSocketError::NotSupported(
"Upbit does not support per-channel unsubscribe — reconnect required".into(),
))
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn is_pong(&self, raw: &Value) -> bool {
raw.get("status")
.and_then(|v| v.as_str())
.map(|s| s == "UP")
.unwrap_or(false)
}
fn is_subscribe_ack(&self, _raw: &Value) -> bool {
false
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
let t = raw
.get("type")
.or_else(|| raw.get("ty"))
.and_then(|v| v.as_str())?;
match t {
"trade" | "orderbook" | "ticker" => Some(TopicKey::new(t)),
_ => None,
}
}
fn topic_registry(&self, _account_type: AccountType) -> &TopicRegistry {
REGISTRY.get_or_init(build_registry)
}
}
fn build_registry() -> TopicRegistry {
let at = AccountType::Spot;
TopicRegistry::builder()
.register(StreamKind::Trade, at, "trade", parse_trade)
.register(StreamKind::Orderbook, at, "orderbook", parse_orderbook)
.register(StreamKind::Ticker, at, "ticker", parse_ticker)
.build()
}
pub(crate) fn parse_trade(raw: &Value) -> WebSocketResult<StreamEvent> {
let symbol = raw
.get("code")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let trade = UpbitParser::parse_ws_trade(raw)
.map_err(|e| WebSocketError::Parse(format!("upbit trade: {}", e)))?;
Ok(StreamEvent::Trade { symbol, trade })
}
pub(crate) fn parse_orderbook(raw: &Value) -> WebSocketResult<StreamEvent> {
let symbol = raw
.get("code")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let units = raw
.get("orderbook_units")
.and_then(|v| v.as_array())
.ok_or_else(|| WebSocketError::Parse("upbit orderbook: missing orderbook_units".into()))?;
let mut bids = Vec::new();
let mut asks = Vec::new();
for unit in units {
let bid_price = unit.get("bid_price").and_then(|v| v.as_f64());
let bid_size = unit.get("bid_size").and_then(|v| v.as_f64());
let ask_price = unit.get("ask_price").and_then(|v| v.as_f64());
let ask_size = unit.get("ask_size").and_then(|v| v.as_f64());
if let (Some(p), Some(q)) = (bid_price, bid_size) {
bids.push(OrderBookLevel::new(p, q));
}
if let (Some(p), Some(q)) = (ask_price, ask_size) {
asks.push(OrderBookLevel::new(p, q));
}
}
let timestamp = raw
.get("timestamp")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let book = OrderBook {
timestamp,
bids,
asks,
sequence: None,
last_update_id: None,
first_update_id: None,
prev_update_id: None,
event_time: None,
transaction_time: None,
checksum: None,
};
Ok(StreamEvent::OrderbookSnapshot { symbol, book })
}
pub(crate) fn parse_ticker(raw: &Value) -> WebSocketResult<StreamEvent> {
let symbol = raw
.get("code")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let ticker = UpbitParser::parse_ws_ticker(raw)
.map_err(|e| WebSocketError::Parse(format!("upbit ticker: {}", e)))?;
Ok(StreamEvent::Ticker { symbol, ticker })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::{AccountType, OwnedSymbolInput, TradeSide};
use crate::core::websocket::{StreamSpec, WsProtocol};
fn make_spec(kind: StreamKind, sym: &str) -> StreamSpec {
StreamSpec {
kind,
symbol: OwnedSymbolInput::Raw(sym.to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
fn proto() -> UpbitProtocol {
UpbitProtocol::new(false)
}
#[test]
fn endpoint_is_korea() {
let url = proto().endpoint(AccountType::Spot, false);
assert_eq!(url.as_str(), "wss://api.upbit.com/websocket/v1");
}
#[test]
fn ping_frame_returns_none() {
assert!(proto().ping_frame().is_none());
}
#[test]
fn subscribe_frame_trade_is_3_element_array() {
let spec = make_spec(StreamKind::Trade, "KRW-BTC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
let arr = v.as_array().expect("outer array");
assert_eq!(arr.len(), 3, "subscribe frame must be 3-element array");
assert!(arr[0].get("ticket").is_some(), "element 0 must have ticket");
assert_eq!(arr[1].get("type").and_then(|v| v.as_str()), Some("trade"));
assert_eq!(arr[1]["codes"][0], "KRW-BTC");
assert_eq!(arr[2].get("format").and_then(|v| v.as_str()), Some("DEFAULT"));
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_orderbook_is_3_element_array() {
let spec = make_spec(StreamKind::Orderbook, "KRW-BTC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
let arr = v.as_array().expect("outer array");
assert_eq!(arr[1].get("type").and_then(|v| v.as_str()), Some("orderbook"));
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_ticker_is_3_element_array() {
let spec = make_spec(StreamKind::Ticker, "KRW-BTC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
let arr = v.as_array().expect("outer array");
assert_eq!(arr.len(), 3, "subscribe frame must be 3-element array");
assert!(arr[0].get("ticket").is_some(), "element 0 must have ticket");
assert_eq!(arr[1].get("type").and_then(|v| v.as_str()), Some("ticker"));
assert_eq!(arr[1]["codes"][0], "KRW-BTC");
assert_eq!(arr[2].get("format").and_then(|v| v.as_str()), Some("DEFAULT"));
} else {
panic!("expected Text frame");
}
}
#[test]
fn unsubscribe_frame_returns_not_supported() {
let spec = make_spec(StreamKind::Trade, "KRW-BTC");
let result = proto().unsubscribe_frame(&spec);
assert!(
matches!(result, Err(WebSocketError::NotSupported(_))),
"unsubscribe must return NotSupported, got {:?}",
result
);
}
#[test]
fn is_pong_matches_status_up() {
let raw = serde_json::json!({"status": "UP"});
assert!(proto().is_pong(&raw));
}
#[test]
fn is_pong_false_for_data_frame() {
let raw = serde_json::json!({"type": "trade", "code": "KRW-BTC"});
assert!(!proto().is_pong(&raw));
}
#[test]
fn is_subscribe_ack_always_false() {
let raw = serde_json::json!({"status": "UP"});
assert!(!proto().is_subscribe_ack(&raw));
}
#[test]
fn extract_topic_trade() {
let raw = serde_json::json!({"type": "trade", "code": "KRW-BTC"});
assert_eq!(proto().extract_topic(&raw), Some(TopicKey::new("trade")));
}
#[test]
fn extract_topic_orderbook() {
let raw = serde_json::json!({"type": "orderbook", "code": "KRW-BTC"});
assert_eq!(proto().extract_topic(&raw), Some(TopicKey::new("orderbook")));
}
#[test]
fn extract_topic_ticker() {
let raw = serde_json::json!({"type": "ticker", "code": "KRW-BTC"});
assert_eq!(proto().extract_topic(&raw), Some(TopicKey::new("ticker")));
}
#[test]
fn extract_topic_status_up_returns_none() {
let raw = serde_json::json!({"status": "UP"});
assert_eq!(proto().extract_topic(&raw), None);
}
#[test]
fn topic_registry_covers_trade_and_orderbook_and_ticker() {
let p = proto();
let reg = p.topic_registry(AccountType::Spot);
let at = AccountType::Spot;
assert!(reg.supports(&StreamKind::Trade, at), "Trade");
assert!(reg.supports(&StreamKind::Orderbook, at), "Orderbook");
assert!(reg.supports(&StreamKind::Ticker, at), "Ticker");
}
#[test]
fn topic_registry_dispatches_by_topic_key() {
let p = proto();
let reg = p.topic_registry(AccountType::Spot);
assert!(reg.dispatch(&TopicKey::new("trade")).is_some(), "trade");
assert!(reg.dispatch(&TopicKey::new("orderbook")).is_some(), "orderbook");
assert!(reg.dispatch(&TopicKey::new("ticker")).is_some(), "ticker registered");
}
#[test]
fn parse_trade_basic() {
let raw = serde_json::json!({
"type": "trade",
"code": "KRW-BTC",
"trade_price": 90000000.0,
"trade_volume": 0.001,
"trade_timestamp": 1700000000000i64,
"sequential_id": 12345,
"ask_bid": "ASK"
});
let ev = parse_trade(&raw).expect("parse");
match ev {
StreamEvent::Trade { symbol, trade } => {
assert_eq!(symbol, "KRW-BTC");
assert!((trade.price - 90_000_000.0).abs() < f64::EPSILON);
assert_eq!(trade.side, TradeSide::Sell);
}
other => panic!("expected Trade, got {:?}", other),
}
}
#[test]
fn parse_ticker_basic() {
let raw = serde_json::json!({
"type": "ticker",
"code": "KRW-BTC",
"trade_price": 90500000.0,
"high_price": 91000000.0,
"low_price": 87500000.0,
"acc_trade_volume_24h": 3200.0,
"acc_trade_price_24h": 290000000000.0,
"change_price": 2500000.0,
"change_rate": 0.0284,
"timestamp": 1718782303500i64
});
let ev = parse_ticker(&raw).expect("parse");
match ev {
StreamEvent::Ticker { symbol, ticker } => {
assert_eq!(symbol, "KRW-BTC");
assert!(ticker.last_price > 0.0, "last_price > 0");
assert_eq!(ticker.bid_price, None);
assert_eq!(ticker.ask_price, None);
assert!(ticker.high_24h.is_some());
}
other => panic!("expected Ticker, got {:?}", other),
}
}
#[test]
fn parse_orderbook_basic() {
let raw = serde_json::json!({
"type": "orderbook",
"code": "KRW-BTC",
"timestamp": 1700000000000i64,
"orderbook_units": [
{"bid_price": 89990000.0, "bid_size": 0.5, "ask_price": 90010000.0, "ask_size": 0.3}
]
});
let ev = parse_orderbook(&raw).expect("parse");
match ev {
StreamEvent::OrderbookSnapshot { symbol, book } => {
assert_eq!(symbol, "KRW-BTC");
assert_eq!(book.bids.len(), 1);
assert_eq!(book.asks.len(), 1);
assert!((book.bids[0].price - 89_990_000.0).abs() < f64::EPSILON);
}
other => panic!("expected OrderbookSnapshot, got {:?}", other),
}
}
#[test]
fn parse_orderbook_missing_units_returns_err() {
let raw = serde_json::json!({"type": "orderbook", "code": "KRW-BTC"});
assert!(parse_orderbook(&raw).is_err());
}
}