ccxt_exchanges/hyperliquid/
ws_exchange_impl.rs

1//! WsExchange trait implementation for HyperLiquid
2//!
3//! This module implements the unified `WsExchange` trait from `ccxt-core` for HyperLiquid.
4
5use async_trait::async_trait;
6use ccxt_core::{
7    Error, Result,
8    types::financial::{Amount, Price},
9    types::{Balance, Market, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10    ws_client::WsConnectionState,
11    ws_exchange::{MessageStream, WsExchange},
12};
13use rust_decimal::Decimal;
14use rust_decimal::prelude::FromStr;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tokio::time::{Duration, timeout};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19
20use super::{HyperLiquid, parser};
21
22async fn get_ws(exchange: &HyperLiquid) -> Result<super::ws::HyperLiquidWs> {
23    let mut guard = exchange.ws_connection.write().await;
24    if let Some(ws) = guard.as_ref() {
25        return Ok(ws.clone());
26    }
27    let ws = exchange.create_ws();
28    ws.connect().await?;
29    let ws_clone = ws.clone();
30    *guard = Some(ws);
31    Ok(ws_clone)
32}
33
34async fn resolve_market(
35    exchange: &HyperLiquid,
36    symbol: &str,
37) -> Result<(String, String, Option<Arc<Market>>)> {
38    if !symbol.contains('/') && !symbol.contains(':') {
39        let normalized = format!("{}/USDC:USDC", symbol);
40        return Ok((symbol.to_string(), normalized, None));
41    }
42    exchange.load_markets(false).await?;
43    if let Ok(market) = exchange.base().market(symbol).await {
44        return Ok((market.base.clone(), market.symbol.clone(), Some(market)));
45    }
46    let normalized = format!("{}/USDC:USDC", symbol);
47    if let Ok(market) = exchange.base().market(&normalized).await {
48        return Ok((market.base.clone(), market.symbol.clone(), Some(market)));
49    }
50    Ok((symbol.to_string(), normalized, None))
51}
52
53#[async_trait]
54impl WsExchange for HyperLiquid {
55    // ==================== Connection Management ====================
56
57    async fn ws_connect(&self) -> Result<()> {
58        let _ = get_ws(self).await?;
59        Ok(())
60    }
61
62    async fn ws_disconnect(&self) -> Result<()> {
63        let mut guard = self.ws_connection.write().await;
64        if let Some(ws) = guard.take() {
65            ws.disconnect().await?;
66        }
67        Ok(())
68    }
69
70    fn ws_is_connected(&self) -> bool {
71        if let Ok(guard) = self.ws_connection.try_read() {
72            if let Some(ws) = guard.as_ref() {
73                return ws.is_connected();
74            }
75        }
76        false
77    }
78
79    fn ws_state(&self) -> WsConnectionState {
80        if let Ok(guard) = self.ws_connection.try_read() {
81            if let Some(ws) = guard.as_ref() {
82                return ws.state();
83            }
84        }
85        WsConnectionState::Disconnected
86    }
87
88    fn subscriptions(&self) -> Vec<String> {
89        if let Ok(guard) = self.ws_connection.try_read() {
90            if let Some(ws) = guard.as_ref() {
91                if let Ok(subs) = ws.subscriptions().try_read() {
92                    return subs.iter().map(|s| format!("{:?}", s.sub_type)).collect();
93                }
94            }
95        }
96        Vec::new()
97    }
98
99    // ==================== Data Streams ====================
100    async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
101        let (base, symbol_name, market) = resolve_market(self, symbol).await?;
102
103        let ws = get_ws(self).await?;
104        ws.subscribe_all_mids().await?;
105        ws.subscribe_l2_book(&base).await?;
106
107        let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
108        let ws = ws.clone();
109        tokio::spawn(async move {
110            loop {
111                if !ws.is_connected() {
112                    let _ = ws.connect().await;
113                }
114
115                #[allow(clippy::manual_unwrap_or_default)]
116                let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
117                    Ok(msg) => msg,
118                    Err(_) => None,
119                };
120
121                if let Some(msg) = msg {
122                    let channel = msg.get("channel").and_then(|v| v.as_str());
123                    if channel == Some("allMids") {
124                        let data = msg.get("data").unwrap_or(&msg);
125                        if let Some(obj) = data.as_object() {
126                            if let Some(mid) = obj.get(&base).and_then(|v| v.as_str()) {
127                                if let Ok(price) = Decimal::from_str(mid) {
128                                    match parser::parse_ticker(
129                                        &symbol_name,
130                                        price,
131                                        market.as_deref(),
132                                    ) {
133                                        Ok(ticker) => {
134                                            if tx.send(Ok(ticker)).is_err() {
135                                                break;
136                                            }
137                                        }
138                                        Err(e) => {
139                                            let _ = tx.send(Err(e));
140                                        }
141                                    }
142                                }
143                            }
144                        }
145                        continue;
146                    }
147
148                    if channel == Some("l2Book") {
149                        let data = msg.get("data").unwrap_or(&msg);
150                        if let Some(coin) = data.get("coin").and_then(|v| v.as_str()) {
151                            if coin != base {
152                                continue;
153                            }
154                        }
155                        if let Ok(book) = parser::parse_orderbook(data, symbol_name.clone()) {
156                            if let (Some(best_bid), Some(best_ask)) =
157                                (book.bids.first(), book.asks.first())
158                            {
159                                let mid = (best_bid.price.as_decimal()
160                                    + best_ask.price.as_decimal())
161                                    / Decimal::from(2);
162                                let timestamp = chrono::Utc::now().timestamp_millis();
163                                let ticker = Ticker {
164                                    symbol: symbol_name.clone(),
165                                    timestamp,
166                                    datetime: parser::timestamp_to_datetime(timestamp),
167                                    high: None,
168                                    low: None,
169                                    bid: Some(Price::new(best_bid.price.as_decimal())),
170                                    bid_volume: Some(Amount::new(best_bid.amount.as_decimal())),
171                                    ask: Some(Price::new(best_ask.price.as_decimal())),
172                                    ask_volume: Some(Amount::new(best_ask.amount.as_decimal())),
173                                    vwap: None,
174                                    open: None,
175                                    close: Some(Price::new(mid)),
176                                    last: Some(Price::new(mid)),
177                                    previous_close: None,
178                                    change: None,
179                                    percentage: None,
180                                    average: None,
181                                    base_volume: None,
182                                    quote_volume: None,
183                                    funding_rate: None,
184                                    open_interest: None,
185                                    index_price: None,
186                                    mark_price: None,
187                                    info: std::collections::HashMap::new(),
188                                };
189                                if tx.send(Ok(ticker)).is_err() {
190                                    break;
191                                }
192                            }
193                        }
194                    }
195                }
196            }
197        });
198
199        let stream = UnboundedReceiverStream::new(rx);
200        Ok(Box::pin(stream))
201    }
202
203    async fn watch_tickers(&self, _symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
204        Err(Error::not_implemented("watch_tickers"))
205    }
206
207    async fn watch_order_book(
208        &self,
209        symbol: &str,
210        _limit: Option<u32>,
211    ) -> Result<MessageStream<OrderBook>> {
212        let (base, symbol_name, _market) = resolve_market(self, symbol).await?;
213
214        let ws = get_ws(self).await?;
215        ws.subscribe_l2_book(&base).await?;
216
217        let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
218        let ws = ws.clone();
219        tokio::spawn(async move {
220            loop {
221                if !ws.is_connected() {
222                    let _ = ws.connect().await;
223                }
224
225                #[allow(clippy::manual_unwrap_or_default)]
226                let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
227                    Ok(msg) => msg,
228                    Err(_) => None,
229                };
230
231                if let Some(msg) = msg {
232                    let channel = msg.get("channel").and_then(|v| v.as_str());
233                    if channel != Some("l2Book") {
234                        continue;
235                    }
236                    let data = msg.get("data").unwrap_or(&msg);
237                    if let Some(coin) = data.get("coin").and_then(|v| v.as_str()) {
238                        if coin != base {
239                            continue;
240                        }
241                    }
242                    match parser::parse_orderbook(data, symbol_name.clone()) {
243                        Ok(book) => {
244                            if tx.send(Ok(book)).is_err() {
245                                break;
246                            }
247                        }
248                        Err(e) => {
249                            let _ = tx.send(Err(e));
250                        }
251                    }
252                }
253            }
254        });
255
256        let stream = UnboundedReceiverStream::new(rx);
257        Ok(Box::pin(stream))
258    }
259
260    async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
261        let (base, _symbol_name, market) = resolve_market(self, symbol).await?;
262
263        let ws = get_ws(self).await?;
264        ws.subscribe_trades(&base).await?;
265
266        let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
267        let ws = ws.clone();
268        tokio::spawn(async move {
269            loop {
270                if !ws.is_connected() {
271                    let _ = ws.connect().await;
272                }
273
274                #[allow(clippy::manual_unwrap_or_default)]
275                let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
276                    Ok(msg) => msg,
277                    Err(_) => None,
278                };
279
280                if let Some(msg) = msg {
281                    let channel = msg.get("channel").and_then(|v| v.as_str());
282                    if channel != Some("trades") {
283                        continue;
284                    }
285                    let data = msg.get("data").unwrap_or(&msg);
286                    let mut trades = Vec::new();
287                    if let Some(items) = data.as_array() {
288                        for item in items {
289                            if let Ok(trade) = parser::parse_trade(item, market.as_deref()) {
290                                trades.push(trade);
291                            }
292                        }
293                    } else if let Ok(trade) = parser::parse_trade(data, market.as_deref()) {
294                        trades.push(trade);
295                    }
296                    if !trades.is_empty() {
297                        if tx.send(Ok(trades)).is_err() {
298                            break;
299                        }
300                    }
301                }
302            }
303        });
304
305        let stream = UnboundedReceiverStream::new(rx);
306        Ok(Box::pin(stream))
307    }
308
309    async fn watch_ohlcv(
310        &self,
311        _symbol: &str,
312        _timeframe: Timeframe,
313    ) -> Result<MessageStream<Ohlcv>> {
314        Err(Error::not_implemented("watch_ohlcv"))
315    }
316
317    async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
318        Err(Error::not_implemented("watch_balance"))
319    }
320
321    async fn watch_orders(&self, _symbol: Option<&str>) -> Result<MessageStream<Order>> {
322        let address = self.wallet_address().ok_or_else(|| {
323            Error::authentication("watch_orders requires authentication (wallet address)")
324        })?;
325
326        self.load_markets(false).await?;
327        let ws = get_ws(self).await?;
328
329        ws.subscribe_order_updates(address).await?;
330
331        let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
332        let ws = ws.clone();
333        let symbol_filter = _symbol.map(ToString::to_string);
334        let market_cache = self.base().market_cache.clone();
335        tokio::spawn(async move {
336            loop {
337                if !ws.is_connected() {
338                    let _ = ws.connect().await;
339                }
340
341                #[allow(clippy::manual_unwrap_or_default)]
342                let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
343                    Ok(msg) => msg,
344                    Err(_) => None,
345                };
346
347                if let Some(msg) = msg {
348                    let channel = msg.get("channel").and_then(|v| v.as_str());
349                    if channel != Some("orderUpdates") {
350                        continue;
351                    }
352                    let data = msg.get("data").unwrap_or(&msg);
353                    let mut items = Vec::new();
354                    if let Some(array) = data.as_array() {
355                        items.extend(array.iter().cloned());
356                    } else if let Some(inner) = data.get("data") {
357                        if let Some(array) = inner.as_array() {
358                            items.extend(array.iter().cloned());
359                        } else {
360                            items.push(inner.clone());
361                        }
362                    } else {
363                        items.push(data.clone());
364                    }
365
366                    for item in items {
367                        let market = if let Some(coin) = item.get("coin").and_then(|v| v.as_str()) {
368                            let symbol = format!("{}/USDC:USDC", coin);
369                            market_cache.read().await.get_market(&symbol)
370                        } else {
371                            None
372                        };
373
374                        if let Ok(order) = parser::parse_order(&item, market.as_deref()) {
375                            if let Some(ref filter) = symbol_filter {
376                                if order.symbol != *filter {
377                                    continue;
378                                }
379                            }
380                            if tx.send(Ok(order)).is_err() {
381                                return;
382                            }
383                        }
384                    }
385                }
386            }
387        });
388
389        let stream = UnboundedReceiverStream::new(rx);
390        Ok(Box::pin(stream))
391    }
392
393    async fn watch_my_trades(&self, _symbol: Option<&str>) -> Result<MessageStream<Trade>> {
394        let address = self.wallet_address().ok_or_else(|| {
395            Error::authentication("watch_my_trades requires authentication (wallet address)")
396        })?;
397
398        self.load_markets(false).await?;
399        let ws = get_ws(self).await?;
400
401        ws.subscribe_user_fills(address).await?;
402
403        let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
404        let ws = ws.clone();
405        let symbol_filter = _symbol.map(ToString::to_string);
406        let market_cache = self.base().market_cache.clone();
407        tokio::spawn(async move {
408            loop {
409                if !ws.is_connected() {
410                    let _ = ws.connect().await;
411                }
412
413                #[allow(clippy::manual_unwrap_or_default)]
414                let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
415                    Ok(msg) => msg,
416                    Err(_) => None,
417                };
418
419                if let Some(msg) = msg {
420                    let channel = msg.get("channel").and_then(|v| v.as_str());
421                    if channel != Some("userFills") {
422                        continue;
423                    }
424                    let data = msg.get("data").unwrap_or(&msg);
425                    let mut items = Vec::new();
426                    if let Some(array) = data.as_array() {
427                        items.extend(array.iter().cloned());
428                    } else if let Some(inner) = data.get("data") {
429                        if let Some(array) = inner.as_array() {
430                            items.extend(array.iter().cloned());
431                        } else {
432                            items.push(inner.clone());
433                        }
434                    } else {
435                        items.push(data.clone());
436                    }
437
438                    for item in items {
439                        let market = if let Some(coin) = item.get("coin").and_then(|v| v.as_str()) {
440                            let symbol = format!("{}/USDC:USDC", coin);
441                            market_cache.read().await.get_market(&symbol)
442                        } else {
443                            None
444                        };
445
446                        if let Ok(trade) = parser::parse_trade(&item, market.as_deref()) {
447                            if let Some(ref filter) = symbol_filter {
448                                if trade.symbol != *filter {
449                                    continue;
450                                }
451                            }
452                            if tx.send(Ok(trade)).is_err() {
453                                return;
454                            }
455                        }
456                    }
457                }
458            }
459        });
460
461        let stream = UnboundedReceiverStream::new(rx);
462        Ok(Box::pin(stream))
463    }
464
465    async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
466        Err(Error::not_implemented("subscribe"))
467    }
468
469    async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
470        Err(Error::not_implemented("unsubscribe"))
471    }
472}