use std::cell::Cell;
use std::sync::OnceLock;
use serde_json::{json, Value};
use url::Url;
use crate::core::rt::WsFrame;
use crate::core::traits::Credentials;
use crate::core::types::{
AccountType, StreamEvent, WebSocketError, WebSocketResult,
};
use crate::core::websocket::{KlineInterval, StreamKind, StreamSpec, TopicKey, TopicRegistry, WsProtocol};
use super::endpoints::{LighterUrls, map_kline_interval, symbol_to_market_id};
use super::websocket::{
parse_candle, parse_orderbook, parse_ticker_channel, parse_trade, parse_market_stats,
};
thread_local! {
static CURRENT_RESOLUTION: Cell<Option<String>> = const { Cell::new(None) };
}
fn set_current_resolution(res: impl Into<String>) {
CURRENT_RESOLUTION.with(|c| c.set(Some(res.into())));
}
pub(super) fn take_current_resolution() -> String {
CURRENT_RESOLUTION.with(|c| c.take()).unwrap_or_default()
}
static REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct LighterProtocol {
testnet: bool,
}
impl LighterProtocol {
pub fn new(testnet: bool) -> Self {
Self { testnet }
}
fn channel_for_spec(spec: &StreamSpec) -> Result<(String, &'static str), WebSocketError> {
let base = extract_base(spec);
let market_id = symbol_to_market_id(&base)
.ok_or_else(|| WebSocketError::NotSupported(format!(
"Lighter: unknown market for '{}'. \
Known perp markets: ETH(0), BTC(1), SOL(2), ARB(3), OP(4), DOGE(5), ...",
base
)))?;
let (chan_name, type_prefix) = match &spec.kind {
StreamKind::Orderbook | StreamKind::OrderbookDelta => {
("order_book", "update/order_book")
}
StreamKind::Trade => {
("trade", "update/trade")
}
StreamKind::AggTrade => {
return Err(WebSocketError::NotSupported(
"Lighter has no AggTrade channel — subscribe to Trade instead. \
The exchange publishes individual trades only."
.into(),
));
}
StreamKind::Ticker => {
("ticker", "update/ticker")
}
StreamKind::FundingRate | StreamKind::MarkPrice => {
("market_stats", "update/market_stats")
}
StreamKind::Kline { interval } => {
let res = map_kline_interval(interval.as_str());
let subscribe_channel = format!("candle/{}/{}", market_id, res);
return Ok((subscribe_channel, "update/candle"));
}
other => {
return Err(WebSocketError::NotSupported(format!(
"Lighter WS has no public channel for {:?} \
(auth-gated channels are native-only by design)",
other
)));
}
};
let subscribe_channel = format!("{}/{}", chan_name, market_id);
Ok((subscribe_channel, type_prefix))
}
}
fn extract_base(spec: &StreamSpec) -> String {
let raw = spec.symbol.to_string();
if let Some(slash) = raw.find('/') {
raw[..slash].to_uppercase()
} else {
raw.to_uppercase()
}
}
pub(super) fn extract_market_id_from_channel(channel: &str) -> &str {
let sep = if channel.contains(':') {
':'
} else {
'/'
};
channel.rsplit(sep).next().unwrap_or(channel)
}
impl WsProtocol for LighterProtocol {
fn name(&self) -> &'static str {
"lighter"
}
fn endpoint(&self, _account_type: AccountType, _testnet: bool) -> Url {
let url = if self.testnet {
LighterUrls::TESTNET.ws
} else {
LighterUrls::MAINNET.ws
};
Url::parse(url).expect("lighter ws endpoint url is valid")
}
fn ping_frame(&self) -> Option<WsFrame> {
None
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let (channel, _) = Self::channel_for_spec(spec)?;
let frame = json!({
"type": "subscribe",
"channel": channel,
});
Ok(WsFrame::Text(frame.to_string()))
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let (channel, _) = Self::channel_for_spec(spec)?;
let frame = json!({
"type": "unsubscribe",
"channel": channel,
});
Ok(WsFrame::Text(frame.to_string()))
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
matches!(
raw.get("type").and_then(|v| v.as_str()),
Some("connected") | Some("pong")
)
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
let msg_type = raw.get("type").and_then(|v| v.as_str())?;
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
if msg_type == "update/candle" || msg_type == "subscribed/candle" {
let sep = if channel.contains(':') { ':' } else { '/' };
let mut parts = channel.splitn(3, sep);
let _name = parts.next(); let market_id = parts.next().unwrap_or("0");
let resolution = parts.next().unwrap_or("");
set_current_resolution(resolution);
return Some(TopicKey::new(&format!("update/candle:{}", market_id)));
}
if !msg_type.starts_with("update/") {
return None;
}
let market_id = extract_market_id_from_channel(channel);
Some(TopicKey::new(&format!("{}:{}", msg_type, market_id)))
}
fn topic_registry(&self, _account_type: AccountType) -> &TopicRegistry {
REGISTRY.get_or_init(build_registry)
}
}
fn build_registry() -> TopicRegistry {
let at = AccountType::FuturesCross;
TopicRegistry::builder()
.register(StreamKind::Orderbook, at, "update/order_book:*", wrap_orderbook)
.register(StreamKind::OrderbookDelta, at, "update/order_book:*", wrap_orderbook)
.register(StreamKind::Trade, at, "update/trade:*", wrap_trade)
.register(StreamKind::Ticker, at, "update/ticker:*", wrap_ticker)
.register(StreamKind::FundingRate, at, "update/market_stats:*", wrap_market_stats)
.register(StreamKind::MarkPrice, at, "update/market_stats:*", wrap_market_stats)
.register(
StreamKind::Kline { interval: KlineInterval::new("") },
at,
"update/candle:*",
wrap_kline,
)
.build()
}
fn wrap_orderbook(raw: &Value) -> WebSocketResult<StreamEvent> {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
parse_orderbook(raw, channel)
.ok_or_else(|| WebSocketError::Parse("lighter: orderbook parse returned None".into()))
}
fn wrap_trade(raw: &Value) -> WebSocketResult<StreamEvent> {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
let events = parse_trade(raw, channel);
events
.into_iter()
.next()
.ok_or_else(|| WebSocketError::FieldAbsent("lighter: trades[0]".into()))
}
fn wrap_ticker(raw: &Value) -> WebSocketResult<StreamEvent> {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
parse_ticker_channel(raw, channel)
.ok_or_else(|| WebSocketError::Parse("lighter: ticker parse returned None".into()))
}
fn wrap_market_stats(raw: &Value) -> WebSocketResult<StreamEvent> {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
parse_market_stats(raw, channel)
.ok_or_else(|| WebSocketError::Parse("lighter: market_stats parse returned None".into()))
}
fn wrap_kline(raw: &Value) -> WebSocketResult<StreamEvent> {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
let resolution = take_current_resolution();
parse_candle(raw, channel, &resolution)
.ok_or_else(|| WebSocketError::Parse("lighter: candle parse returned None".into()))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::OwnedSymbolInput;
fn proto() -> LighterProtocol {
LighterProtocol::new(false)
}
fn make_spec(kind: StreamKind, sym: &str) -> StreamSpec {
StreamSpec {
kind,
symbol: OwnedSymbolInput::Raw(sym.to_string()),
account_type: AccountType::FuturesCross,
depth: None,
speed_ms: None,
}
}
#[test]
fn mainnet_endpoint() {
let url = proto().endpoint(AccountType::FuturesCross, false);
assert_eq!(url.as_str(), "wss://mainnet.zklighter.elliot.ai/stream");
}
#[test]
fn testnet_endpoint() {
let url = LighterProtocol::new(true).endpoint(AccountType::FuturesCross, true);
assert_eq!(url.as_str(), "wss://testnet.zklighter.elliot.ai/stream");
}
#[test]
fn ping_frame_returns_none() {
assert!(proto().ping_frame().is_none());
}
#[test]
fn subscribe_orderbook_eth() {
let spec = make_spec(StreamKind::Orderbook, "ETH");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["type"], "subscribe");
assert_eq!(v["channel"], "order_book/0"); } else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_orderbook_btc() {
let spec = make_spec(StreamKind::Orderbook, "BTC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["channel"], "order_book/1"); } else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_trade_btc() {
let spec = make_spec(StreamKind::Trade, "BTC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["channel"], "trade/1");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_ticker_eth() {
let spec = make_spec(StreamKind::Ticker, "ETH");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["channel"], "ticker/0");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_unknown_market_err() {
let spec = make_spec(StreamKind::Trade, "NONEXISTENT");
assert!(proto().subscribe_frame(&spec).is_err());
}
#[test]
fn subscribe_agg_trade_returns_not_supported() {
let spec = make_spec(StreamKind::AggTrade, "BTC");
let result = proto().subscribe_frame(&spec);
assert!(
matches!(result, Err(WebSocketError::NotSupported(_))),
"AggTrade must return NotSupported for Lighter, got {:?}",
result
);
}
#[test]
fn subscribe_slash_symbol_eth() {
let spec = make_spec(StreamKind::Orderbook, "ETH/USDC");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["channel"], "order_book/0");
} else {
panic!("expected Text frame");
}
}
#[test]
fn unsubscribe_orderbook_eth() {
let spec = make_spec(StreamKind::Orderbook, "ETH");
let frame = proto().unsubscribe_frame(&spec).expect("unsub frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["type"], "unsubscribe");
assert_eq!(v["channel"], "order_book/0");
} else {
panic!("expected Text frame");
}
}
#[test]
fn ack_connected() {
let raw = serde_json::json!({"type":"connected","session_id":"abc"});
assert!(proto().is_subscribe_ack(&raw));
}
#[test]
fn ack_pong() {
let raw = serde_json::json!({"type":"pong"});
assert!(proto().is_subscribe_ack(&raw));
}
#[test]
fn ack_update_is_false() {
let raw = serde_json::json!({
"type": "update/order_book",
"channel": "order_book:0",
});
assert!(!proto().is_subscribe_ack(&raw));
}
#[test]
fn topic_from_update_order_book() {
let raw = serde_json::json!({
"type": "update/order_book",
"channel": "order_book:0",
"asks": [],
"bids": []
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/order_book:0"))
);
}
#[test]
fn topic_from_update_trade() {
let raw = serde_json::json!({
"type": "update/trade",
"channel": "trade:1",
"trades": []
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/trade:1"))
);
}
#[test]
fn topic_from_update_market_stats() {
let raw = serde_json::json!({
"type": "update/market_stats",
"channel": "market_stats:0",
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/market_stats:0"))
);
}
#[test]
fn topic_none_for_connected() {
let raw = serde_json::json!({"type":"connected","session_id":"x"});
assert_eq!(proto().extract_topic(&raw), None);
}
#[test]
fn topic_none_for_ping() {
let raw = serde_json::json!({"type":"ping","timestamp":12345});
assert_eq!(proto().extract_topic(&raw), None);
}
#[test]
fn topic_slash_channel_format() {
let raw = serde_json::json!({
"type": "update/order_book",
"channel": "order_book/0",
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/order_book:0"))
);
}
#[test]
fn extract_market_id_colon() {
assert_eq!(extract_market_id_from_channel("order_book:0"), "0");
assert_eq!(extract_market_id_from_channel("market_stats:1"), "1");
}
#[test]
fn extract_market_id_slash() {
assert_eq!(extract_market_id_from_channel("order_book/0"), "0");
assert_eq!(extract_market_id_from_channel("trade/1"), "1");
}
#[test]
fn subscribe_kline_btc_1m() {
let spec = make_spec(
StreamKind::Kline { interval: KlineInterval::new("1m") },
"BTC",
);
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["type"], "subscribe");
assert_eq!(v["channel"], "candle/1/1m");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_kline_eth_4h() {
let spec = make_spec(
StreamKind::Kline { interval: KlineInterval::new("4h") },
"ETH",
);
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).unwrap();
assert_eq!(v["channel"], "candle/0/4h");
} else {
panic!("expected Text frame");
}
}
#[test]
fn extract_topic_update_candle() {
let raw = serde_json::json!({
"type": "update/candle",
"channel": "candle:1:1m",
"timestamp": 1780012392587i64,
"candles": [{"t": 1780012380000i64, "o": 73517.4, "h": 73522.1, "l": 73517.4, "c": 73520.5, "v": 0.14261}]
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/candle:1"))
);
}
#[test]
fn extract_topic_subscribed_candle_routes_same_key() {
let raw = serde_json::json!({
"type": "subscribed/candle",
"channel": "candle:1:1m",
"candles": [{"t": 1780012380000i64, "o": 73517.4, "h": 73522.1, "l": 73517.4, "c": 73520.5, "v": 0.14261}]
});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("update/candle:1"))
);
}
#[test]
fn extract_topic_candle_stores_resolution() {
let raw = serde_json::json!({
"type": "update/candle",
"channel": "candle:1:4h",
"candles": []
});
proto().extract_topic(&raw);
let res = take_current_resolution();
assert_eq!(res, "4h");
}
#[test]
fn registry_supports_public_channels() {
let p = proto();
let reg = p.topic_registry(AccountType::FuturesCross);
let at = AccountType::FuturesCross;
assert!(reg.supports(&StreamKind::Orderbook, at), "Orderbook");
assert!(reg.supports(&StreamKind::Trade, at), "Trade");
assert!(reg.supports(&StreamKind::Ticker, at), "Ticker");
assert!(reg.supports(&StreamKind::FundingRate, at), "FundingRate");
assert!(reg.supports(&StreamKind::MarkPrice, at), "MarkPrice");
assert!(
reg.supports(&StreamKind::Kline { interval: KlineInterval::new("") }, at),
"Kline"
);
assert!(!reg.supports(&StreamKind::AggTrade, at), "AggTrade must NOT be registered");
}
#[test]
fn registry_wildcard_dispatches() {
let p = proto();
let reg = p.topic_registry(AccountType::FuturesCross);
assert!(reg.dispatch(&TopicKey::new("update/order_book:0")).is_some());
assert!(reg.dispatch(&TopicKey::new("update/trade:1")).is_some());
assert!(reg.dispatch(&TopicKey::new("update/ticker:0")).is_some());
assert!(reg.dispatch(&TopicKey::new("update/market_stats:2")).is_some());
assert!(reg.dispatch(&TopicKey::new("update/candle:1")).is_some());
}
}