ccxt_exchanges/hyperliquid/
ws_exchange_impl.rs1use async_trait::async_trait;
6use ccxt_core::{
7 Error, Result,
8 types::financial::{Amount, Price},
9 types::{Balance, Market, Ohlcv, Order, OrderBook, Ticker, Timeframe, Trade},
10 ws_client::WsConnectionState,
11 ws_exchange::{MessageStream, WsExchange},
12};
13use rust_decimal::Decimal;
14use rust_decimal::prelude::FromStr;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tokio::time::{Duration, timeout};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19
20use super::{HyperLiquid, parser};
21
22async fn get_ws(exchange: &HyperLiquid) -> Result<super::ws::HyperLiquidWs> {
23 let mut guard = exchange.ws_connection.write().await;
24 if let Some(ws) = guard.as_ref() {
25 return Ok(ws.clone());
26 }
27 let ws = exchange.create_ws();
28 ws.connect().await?;
29 let ws_clone = ws.clone();
30 *guard = Some(ws);
31 Ok(ws_clone)
32}
33
34async fn resolve_market(
35 exchange: &HyperLiquid,
36 symbol: &str,
37) -> Result<(String, String, Option<Arc<Market>>)> {
38 if !symbol.contains('/') && !symbol.contains(':') {
39 let normalized = format!("{}/USDC:USDC", symbol);
40 return Ok((symbol.to_string(), normalized, None));
41 }
42 exchange.load_markets(false).await?;
43 if let Ok(market) = exchange.base().market(symbol).await {
44 return Ok((market.base.clone(), market.symbol.clone(), Some(market)));
45 }
46 let normalized = format!("{}/USDC:USDC", symbol);
47 if let Ok(market) = exchange.base().market(&normalized).await {
48 return Ok((market.base.clone(), market.symbol.clone(), Some(market)));
49 }
50 Ok((symbol.to_string(), normalized, None))
51}
52
53#[async_trait]
54impl WsExchange for HyperLiquid {
55 async fn ws_connect(&self) -> Result<()> {
58 let _ = get_ws(self).await?;
59 Ok(())
60 }
61
62 async fn ws_disconnect(&self) -> Result<()> {
63 let mut guard = self.ws_connection.write().await;
64 if let Some(ws) = guard.take() {
65 ws.disconnect().await?;
66 }
67 Ok(())
68 }
69
70 fn ws_is_connected(&self) -> bool {
71 if let Ok(guard) = self.ws_connection.try_read() {
72 if let Some(ws) = guard.as_ref() {
73 return ws.is_connected();
74 }
75 }
76 false
77 }
78
79 fn ws_state(&self) -> WsConnectionState {
80 if let Ok(guard) = self.ws_connection.try_read() {
81 if let Some(ws) = guard.as_ref() {
82 return ws.state();
83 }
84 }
85 WsConnectionState::Disconnected
86 }
87
88 fn subscriptions(&self) -> Vec<String> {
89 if let Ok(guard) = self.ws_connection.try_read() {
90 if let Some(ws) = guard.as_ref() {
91 if let Ok(subs) = ws.subscriptions().try_read() {
92 return subs.iter().map(|s| format!("{:?}", s.sub_type)).collect();
93 }
94 }
95 }
96 Vec::new()
97 }
98
99 async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>> {
101 let (base, symbol_name, market) = resolve_market(self, symbol).await?;
102
103 let ws = get_ws(self).await?;
104 ws.subscribe_all_mids().await?;
105 ws.subscribe_l2_book(&base).await?;
106
107 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
108 let ws = ws.clone();
109 tokio::spawn(async move {
110 loop {
111 if !ws.is_connected() {
112 let _ = ws.connect().await;
113 }
114
115 #[allow(clippy::manual_unwrap_or_default)]
116 let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
117 Ok(msg) => msg,
118 Err(_) => None,
119 };
120
121 if let Some(msg) = msg {
122 let channel = msg.get("channel").and_then(|v| v.as_str());
123 if channel == Some("allMids") {
124 let data = msg.get("data").unwrap_or(&msg);
125 if let Some(obj) = data.as_object() {
126 if let Some(mid) = obj.get(&base).and_then(|v| v.as_str()) {
127 if let Ok(price) = Decimal::from_str(mid) {
128 match parser::parse_ticker(
129 &symbol_name,
130 price,
131 market.as_deref(),
132 ) {
133 Ok(ticker) => {
134 if tx.send(Ok(ticker)).is_err() {
135 break;
136 }
137 }
138 Err(e) => {
139 let _ = tx.send(Err(e));
140 }
141 }
142 }
143 }
144 }
145 continue;
146 }
147
148 if channel == Some("l2Book") {
149 let data = msg.get("data").unwrap_or(&msg);
150 if let Some(coin) = data.get("coin").and_then(|v| v.as_str()) {
151 if coin != base {
152 continue;
153 }
154 }
155 if let Ok(book) = parser::parse_orderbook(data, symbol_name.clone()) {
156 if let (Some(best_bid), Some(best_ask)) =
157 (book.bids.first(), book.asks.first())
158 {
159 let mid = (best_bid.price.as_decimal()
160 + best_ask.price.as_decimal())
161 / Decimal::from(2);
162 let timestamp = chrono::Utc::now().timestamp_millis();
163 let ticker = Ticker {
164 symbol: symbol_name.clone(),
165 timestamp,
166 datetime: parser::timestamp_to_datetime(timestamp),
167 high: None,
168 low: None,
169 bid: Some(Price::new(best_bid.price.as_decimal())),
170 bid_volume: Some(Amount::new(best_bid.amount.as_decimal())),
171 ask: Some(Price::new(best_ask.price.as_decimal())),
172 ask_volume: Some(Amount::new(best_ask.amount.as_decimal())),
173 vwap: None,
174 open: None,
175 close: Some(Price::new(mid)),
176 last: Some(Price::new(mid)),
177 previous_close: None,
178 change: None,
179 percentage: None,
180 average: None,
181 base_volume: None,
182 quote_volume: None,
183 funding_rate: None,
184 open_interest: None,
185 index_price: None,
186 mark_price: None,
187 info: std::collections::HashMap::new(),
188 };
189 if tx.send(Ok(ticker)).is_err() {
190 break;
191 }
192 }
193 }
194 }
195 }
196 }
197 });
198
199 let stream = UnboundedReceiverStream::new(rx);
200 Ok(Box::pin(stream))
201 }
202
203 async fn watch_tickers(&self, _symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
204 Err(Error::not_implemented("watch_tickers"))
205 }
206
207 async fn watch_order_book(
208 &self,
209 symbol: &str,
210 _limit: Option<u32>,
211 ) -> Result<MessageStream<OrderBook>> {
212 let (base, symbol_name, _market) = resolve_market(self, symbol).await?;
213
214 let ws = get_ws(self).await?;
215 ws.subscribe_l2_book(&base).await?;
216
217 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
218 let ws = ws.clone();
219 tokio::spawn(async move {
220 loop {
221 if !ws.is_connected() {
222 let _ = ws.connect().await;
223 }
224
225 #[allow(clippy::manual_unwrap_or_default)]
226 let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
227 Ok(msg) => msg,
228 Err(_) => None,
229 };
230
231 if let Some(msg) = msg {
232 let channel = msg.get("channel").and_then(|v| v.as_str());
233 if channel != Some("l2Book") {
234 continue;
235 }
236 let data = msg.get("data").unwrap_or(&msg);
237 if let Some(coin) = data.get("coin").and_then(|v| v.as_str()) {
238 if coin != base {
239 continue;
240 }
241 }
242 match parser::parse_orderbook(data, symbol_name.clone()) {
243 Ok(book) => {
244 if tx.send(Ok(book)).is_err() {
245 break;
246 }
247 }
248 Err(e) => {
249 let _ = tx.send(Err(e));
250 }
251 }
252 }
253 }
254 });
255
256 let stream = UnboundedReceiverStream::new(rx);
257 Ok(Box::pin(stream))
258 }
259
260 async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>> {
261 let (base, _symbol_name, market) = resolve_market(self, symbol).await?;
262
263 let ws = get_ws(self).await?;
264 ws.subscribe_trades(&base).await?;
265
266 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
267 let ws = ws.clone();
268 tokio::spawn(async move {
269 loop {
270 if !ws.is_connected() {
271 let _ = ws.connect().await;
272 }
273
274 #[allow(clippy::manual_unwrap_or_default)]
275 let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
276 Ok(msg) => msg,
277 Err(_) => None,
278 };
279
280 if let Some(msg) = msg {
281 let channel = msg.get("channel").and_then(|v| v.as_str());
282 if channel != Some("trades") {
283 continue;
284 }
285 let data = msg.get("data").unwrap_or(&msg);
286 let mut trades = Vec::new();
287 if let Some(items) = data.as_array() {
288 for item in items {
289 if let Ok(trade) = parser::parse_trade(item, market.as_deref()) {
290 trades.push(trade);
291 }
292 }
293 } else if let Ok(trade) = parser::parse_trade(data, market.as_deref()) {
294 trades.push(trade);
295 }
296 if !trades.is_empty() {
297 if tx.send(Ok(trades)).is_err() {
298 break;
299 }
300 }
301 }
302 }
303 });
304
305 let stream = UnboundedReceiverStream::new(rx);
306 Ok(Box::pin(stream))
307 }
308
309 async fn watch_ohlcv(
310 &self,
311 _symbol: &str,
312 _timeframe: Timeframe,
313 ) -> Result<MessageStream<Ohlcv>> {
314 Err(Error::not_implemented("watch_ohlcv"))
315 }
316
317 async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
318 Err(Error::not_implemented("watch_balance"))
319 }
320
321 async fn watch_orders(&self, _symbol: Option<&str>) -> Result<MessageStream<Order>> {
322 let address = self.wallet_address().ok_or_else(|| {
323 Error::authentication("watch_orders requires authentication (wallet address)")
324 })?;
325
326 self.load_markets(false).await?;
327 let ws = get_ws(self).await?;
328
329 ws.subscribe_order_updates(address).await?;
330
331 let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
332 let ws = ws.clone();
333 let symbol_filter = _symbol.map(ToString::to_string);
334 let market_cache = self.base().market_cache.clone();
335 tokio::spawn(async move {
336 loop {
337 if !ws.is_connected() {
338 let _ = ws.connect().await;
339 }
340
341 #[allow(clippy::manual_unwrap_or_default)]
342 let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
343 Ok(msg) => msg,
344 Err(_) => None,
345 };
346
347 if let Some(msg) = msg {
348 let channel = msg.get("channel").and_then(|v| v.as_str());
349 if channel != Some("orderUpdates") {
350 continue;
351 }
352 let data = msg.get("data").unwrap_or(&msg);
353 let mut items = Vec::new();
354 if let Some(array) = data.as_array() {
355 items.extend(array.iter().cloned());
356 } else if let Some(inner) = data.get("data") {
357 if let Some(array) = inner.as_array() {
358 items.extend(array.iter().cloned());
359 } else {
360 items.push(inner.clone());
361 }
362 } else {
363 items.push(data.clone());
364 }
365
366 for item in items {
367 let market = if let Some(coin) = item.get("coin").and_then(|v| v.as_str()) {
368 let symbol = format!("{}/USDC:USDC", coin);
369 market_cache.read().await.get_market(&symbol)
370 } else {
371 None
372 };
373
374 if let Ok(order) = parser::parse_order(&item, market.as_deref()) {
375 if let Some(ref filter) = symbol_filter {
376 if order.symbol != *filter {
377 continue;
378 }
379 }
380 if tx.send(Ok(order)).is_err() {
381 return;
382 }
383 }
384 }
385 }
386 }
387 });
388
389 let stream = UnboundedReceiverStream::new(rx);
390 Ok(Box::pin(stream))
391 }
392
393 async fn watch_my_trades(&self, _symbol: Option<&str>) -> Result<MessageStream<Trade>> {
394 let address = self.wallet_address().ok_or_else(|| {
395 Error::authentication("watch_my_trades requires authentication (wallet address)")
396 })?;
397
398 self.load_markets(false).await?;
399 let ws = get_ws(self).await?;
400
401 ws.subscribe_user_fills(address).await?;
402
403 let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
404 let ws = ws.clone();
405 let symbol_filter = _symbol.map(ToString::to_string);
406 let market_cache = self.base().market_cache.clone();
407 tokio::spawn(async move {
408 loop {
409 if !ws.is_connected() {
410 let _ = ws.connect().await;
411 }
412
413 #[allow(clippy::manual_unwrap_or_default)]
414 let msg = match timeout(Duration::from_secs(2), ws.receive()).await {
415 Ok(msg) => msg,
416 Err(_) => None,
417 };
418
419 if let Some(msg) = msg {
420 let channel = msg.get("channel").and_then(|v| v.as_str());
421 if channel != Some("userFills") {
422 continue;
423 }
424 let data = msg.get("data").unwrap_or(&msg);
425 let mut items = Vec::new();
426 if let Some(array) = data.as_array() {
427 items.extend(array.iter().cloned());
428 } else if let Some(inner) = data.get("data") {
429 if let Some(array) = inner.as_array() {
430 items.extend(array.iter().cloned());
431 } else {
432 items.push(inner.clone());
433 }
434 } else {
435 items.push(data.clone());
436 }
437
438 for item in items {
439 let market = if let Some(coin) = item.get("coin").and_then(|v| v.as_str()) {
440 let symbol = format!("{}/USDC:USDC", coin);
441 market_cache.read().await.get_market(&symbol)
442 } else {
443 None
444 };
445
446 if let Ok(trade) = parser::parse_trade(&item, market.as_deref()) {
447 if let Some(ref filter) = symbol_filter {
448 if trade.symbol != *filter {
449 continue;
450 }
451 }
452 if tx.send(Ok(trade)).is_err() {
453 return;
454 }
455 }
456 }
457 }
458 }
459 });
460
461 let stream = UnboundedReceiverStream::new(rx);
462 Ok(Box::pin(stream))
463 }
464
465 async fn subscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
466 Err(Error::not_implemented("subscribe"))
467 }
468
469 async fn unsubscribe(&self, _channel: &str, _symbol: Option<&str>) -> Result<()> {
470 Err(Error::not_implemented("unsubscribe"))
471 }
472}