1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::BarType,
29 identifiers::{AccountId, ClientOrderId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33 RECONNECTED,
34 retry::{RetryManager, create_websocket_retry_manager},
35 websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 client::AssetContextDataType,
42 enums::HyperliquidWsChannel,
43 error::HyperliquidWsError,
44 messages::{
45 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47 },
48 parse::{
49 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50 parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
51 },
52};
53
54#[derive(Debug)]
56#[allow(
57 clippy::large_enum_variant,
58 reason = "Commands are ephemeral and immediately consumed"
59)]
60#[allow(private_interfaces)]
61pub enum HandlerCommand {
62 SetClient(WebSocketClient),
64 Disconnect,
66 Subscribe {
68 subscriptions: Vec<SubscriptionRequest>,
69 },
70 Unsubscribe {
72 subscriptions: Vec<SubscriptionRequest>,
73 },
74 InitializeInstruments(Vec<InstrumentAny>),
76 UpdateInstrument(InstrumentAny),
78 AddBarType { key: String, bar_type: BarType },
80 RemoveBarType { key: String },
82 UpdateAssetContextSubs {
84 coin: Ustr,
85 data_types: AHashSet<AssetContextDataType>,
86 },
87 CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
89}
90
91pub(super) struct FeedHandler {
92 clock: &'static AtomicTime,
93 signal: Arc<AtomicBool>,
94 client: Option<WebSocketClient>,
95 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
98 account_id: Option<AccountId>,
99 subscriptions: SubscriptionState,
100 retry_manager: RetryManager<HyperliquidWsError>,
101 message_buffer: Vec<NautilusWsMessage>,
102 instruments: AHashMap<Ustr, InstrumentAny>,
103 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
104 bar_types_cache: AHashMap<String, BarType>,
105 bar_cache: AHashMap<String, CandleData>,
106 asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
107 processed_trade_ids: FifoCache<u64, 10_000>,
108 mark_price_cache: AHashMap<Ustr, String>,
109 index_price_cache: AHashMap<Ustr, String>,
110 funding_rate_cache: AHashMap<Ustr, String>,
111}
112
113impl FeedHandler {
114 pub(super) fn new(
116 signal: Arc<AtomicBool>,
117 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
118 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
120 account_id: Option<AccountId>,
121 subscriptions: SubscriptionState,
122 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
123 ) -> Self {
124 Self {
125 clock: get_atomic_clock_realtime(),
126 signal,
127 client: None,
128 cmd_rx,
129 raw_rx,
130 out_tx,
131 account_id,
132 subscriptions,
133 retry_manager: create_websocket_retry_manager(),
134 message_buffer: Vec::new(),
135 instruments: AHashMap::new(),
136 cloid_cache,
137 bar_types_cache: AHashMap::new(),
138 bar_cache: AHashMap::new(),
139 asset_context_subs: AHashMap::new(),
140 processed_trade_ids: FifoCache::new(),
141 mark_price_cache: AHashMap::new(),
142 index_price_cache: AHashMap::new(),
143 funding_rate_cache: AHashMap::new(),
144 }
145 }
146
147 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
149 self.out_tx
150 .send(msg)
151 .map_err(|e| format!("Failed to send message: {e}"))
152 }
153
154 pub(super) fn is_stopped(&self) -> bool {
156 self.signal.load(Ordering::Relaxed)
157 }
158
159 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
160 if let Some(client) = &self.client {
161 self.retry_manager
162 .execute_with_retry(
163 "websocket_send",
164 || {
165 let payload = payload.clone();
166 async move {
167 client.send_text(payload, None).await.map_err(|e| {
168 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
169 })
170 }
171 },
172 should_retry_hyperliquid_error,
173 create_hyperliquid_timeout_error,
174 )
175 .await
176 .map_err(|e| anyhow::anyhow!("{e}"))
177 } else {
178 Err(anyhow::anyhow!("No WebSocket client available"))
179 }
180 }
181
182 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
183 if !self.message_buffer.is_empty() {
184 return Some(self.message_buffer.remove(0));
185 }
186
187 loop {
188 tokio::select! {
189 Some(cmd) = self.cmd_rx.recv() => {
190 match cmd {
191 HandlerCommand::SetClient(client) => {
192 log::debug!("Setting WebSocket client in handler");
193 self.client = Some(client);
194 }
195 HandlerCommand::Disconnect => {
196 log::debug!("Handler received disconnect command");
197
198 if let Some(ref client) = self.client {
199 client.disconnect().await;
200 }
201 self.signal.store(true, Ordering::SeqCst);
202 return None;
203 }
204 HandlerCommand::Subscribe { subscriptions } => {
205 for subscription in subscriptions {
206 let key = subscription_to_key(&subscription);
207 self.subscriptions.mark_subscribe(&key);
208
209 let request = HyperliquidWsRequest::Subscribe { subscription };
210 match serde_json::to_string(&request) {
211 Ok(payload) => {
212 log::debug!("Sending subscribe payload: {payload}");
213 if let Err(e) = self.send_with_retry(payload).await {
214 log::error!("Error subscribing to {key}: {e}");
215 self.subscriptions.mark_failure(&key);
216 }
217 }
218 Err(e) => {
219 log::error!("Error serializing subscription for {key}: {e}");
220 self.subscriptions.mark_failure(&key);
221 }
222 }
223 }
224 }
225 HandlerCommand::Unsubscribe { subscriptions } => {
226 for subscription in subscriptions {
227 let key = subscription_to_key(&subscription);
228 self.subscriptions.mark_unsubscribe(&key);
229
230 let request = HyperliquidWsRequest::Unsubscribe { subscription };
231 match serde_json::to_string(&request) {
232 Ok(payload) => {
233 log::debug!("Sending unsubscribe payload: {payload}");
234 if let Err(e) = self.send_with_retry(payload).await {
235 log::error!("Error unsubscribing from {key}: {e}");
236 }
237 }
238 Err(e) => {
239 log::error!("Error serializing unsubscription for {key}: {e}");
240 }
241 }
242 }
243 }
244 HandlerCommand::InitializeInstruments(instruments) => {
245 for inst in instruments {
246 let coin = inst.raw_symbol().inner();
247 self.instruments.insert(coin, inst);
248 }
249 }
250 HandlerCommand::UpdateInstrument(inst) => {
251 let coin = inst.raw_symbol().inner();
252 self.instruments.insert(coin, inst);
253 }
254 HandlerCommand::AddBarType { key, bar_type } => {
255 self.bar_types_cache.insert(key, bar_type);
256 }
257 HandlerCommand::RemoveBarType { key } => {
258 self.bar_types_cache.remove(&key);
259 self.bar_cache.remove(&key);
260 }
261 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
262 if data_types.is_empty() {
263 self.asset_context_subs.remove(&coin);
264 } else {
265 self.asset_context_subs.insert(coin, data_types);
266 }
267 }
268 HandlerCommand::CacheSpotFillCoins(_) => {
269 }
271 }
272 }
273
274 Some(raw_msg) = self.raw_rx.recv() => {
275 match raw_msg {
276 Message::Text(text) => {
277 if text == RECONNECTED {
278 log::info!("Received RECONNECTED sentinel");
279 return Some(NautilusWsMessage::Reconnected);
280 }
281
282 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
283 Ok(msg) => {
284 let ts_init = self.clock.get_time_ns();
285
286 let nautilus_msgs = Self::parse_to_nautilus_messages(
287 msg,
288 &self.instruments,
289 &self.cloid_cache,
290 &self.bar_types_cache,
291 self.account_id,
292 ts_init,
293 &self.asset_context_subs,
294 &mut self.processed_trade_ids,
295 &mut self.mark_price_cache,
296 &mut self.index_price_cache,
297 &mut self.funding_rate_cache,
298 &mut self.bar_cache,
299 );
300
301 if !nautilus_msgs.is_empty() {
302 let mut iter = nautilus_msgs.into_iter();
303 let first = iter.next().unwrap();
304 self.message_buffer.extend(iter);
305 return Some(first);
306 }
307 }
308 Err(e) => {
309 log::error!("Error parsing WebSocket message: {e}, text: {text}");
310 }
311 }
312 }
313 Message::Ping(data) => {
314 if let Some(ref client) = self.client
315 && let Err(e) = client.send_pong(data.to_vec()).await {
316 log::error!("Error sending pong: {e}");
317 }
318 }
319 Message::Close(_) => {
320 log::info!("Received WebSocket close frame");
321 return None;
322 }
323 _ => {}
324 }
325 }
326
327 else => {
328 log::debug!("Handler shutting down: stream ended or command channel closed");
329 return None;
330 }
331 }
332 }
333 }
334
335 #[allow(clippy::too_many_arguments)]
336 fn parse_to_nautilus_messages(
337 msg: HyperliquidWsMessage,
338 instruments: &AHashMap<Ustr, InstrumentAny>,
339 cloid_cache: &DashMap<Ustr, ClientOrderId>,
340 bar_types: &AHashMap<String, BarType>,
341 account_id: Option<AccountId>,
342 ts_init: UnixNanos,
343 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
344 processed_trade_ids: &mut FifoCache<u64, 10_000>,
345 mark_price_cache: &mut AHashMap<Ustr, String>,
346 index_price_cache: &mut AHashMap<Ustr, String>,
347 funding_rate_cache: &mut AHashMap<Ustr, String>,
348 bar_cache: &mut AHashMap<String, CandleData>,
349 ) -> Vec<NautilusWsMessage> {
350 let mut result = Vec::new();
351
352 match msg {
353 HyperliquidWsMessage::OrderUpdates { data } => {
354 if let Some(account_id) = account_id
355 && let Some(msg) = Self::handle_order_updates(
356 &data,
357 instruments,
358 cloid_cache,
359 account_id,
360 ts_init,
361 )
362 {
363 result.push(msg);
364 }
365 }
366 HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
367 match data {
369 WsUserEventData::Fills { fills } => {
370 log::debug!("Received {} fill(s) from userEvents channel", fills.len());
371 for fill in &fills {
372 log::debug!(
373 "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
374 fill.oid,
375 fill.coin,
376 fill.side,
377 fill.sz,
378 fill.px
379 );
380 }
381
382 if let Some(account_id) = account_id {
383 log::debug!("Processing fills with account_id={account_id}");
384
385 if let Some(msg) = Self::handle_user_fills(
386 &fills,
387 instruments,
388 cloid_cache,
389 account_id,
390 ts_init,
391 processed_trade_ids,
392 ) {
393 log::debug!("Successfully created fill message");
394 result.push(msg);
395 } else {
396 log::debug!("handle_user_fills returned None (no new fills)");
397 }
398 } else {
399 log::warn!("Cannot process fills: account_id is None");
400 }
401 }
402 _ => {
403 log::debug!("Received non-fill user event: {data:?}");
404 }
405 }
406 }
407 HyperliquidWsMessage::UserFills { data } => {
408 if let Some(account_id) = account_id
411 && let Some(msg) = Self::handle_user_fills(
412 &data.fills,
413 instruments,
414 cloid_cache,
415 account_id,
416 ts_init,
417 processed_trade_ids,
418 )
419 {
420 result.push(msg);
421 }
422 }
423 HyperliquidWsMessage::Trades { data } => {
424 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
425 result.push(msg);
426 }
427 }
428 HyperliquidWsMessage::Bbo { data } => {
429 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
430 result.push(msg);
431 }
432 }
433 HyperliquidWsMessage::L2Book { data } => {
434 if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
435 result.push(msg);
436 }
437 }
438 HyperliquidWsMessage::Candle { data } => {
439 if let Some(msg) =
440 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
441 {
442 result.push(msg);
443 }
444 }
445 HyperliquidWsMessage::ActiveAssetCtx { data }
446 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
447 result.extend(Self::handle_asset_context(
448 &data,
449 instruments,
450 asset_context_subs,
451 mark_price_cache,
452 index_price_cache,
453 funding_rate_cache,
454 ts_init,
455 ));
456 }
457 HyperliquidWsMessage::Error { data } => {
458 log::warn!("Received error from Hyperliquid WebSocket: {data}");
459 }
460 _ => {}
462 }
463
464 result
465 }
466
467 fn handle_order_updates(
468 data: &[super::messages::WsOrderData],
469 instruments: &AHashMap<Ustr, InstrumentAny>,
470 cloid_cache: &DashMap<Ustr, ClientOrderId>,
471 account_id: AccountId,
472 ts_init: UnixNanos,
473 ) -> Option<NautilusWsMessage> {
474 let mut exec_reports = Vec::new();
475
476 for order_update in data {
477 let instrument = instruments.get(&order_update.order.coin);
478
479 if let Some(instrument) = instrument {
480 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
481 Ok(mut report) => {
482 if let Some(cloid) = &order_update.order.cloid {
484 let cloid_ustr = Ustr::from(cloid.as_str());
485 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
486 let real_client_order_id = *entry.value();
487 log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
488 report.client_order_id = Some(real_client_order_id);
489 }
490 }
491 exec_reports.push(ExecutionReport::Order(report));
492 }
493 Err(e) => {
494 log::error!("Error parsing order update: {e}");
495 }
496 }
497 } else {
498 log::debug!("No instrument found for coin: {}", order_update.order.coin);
499 }
500 }
501
502 if exec_reports.is_empty() {
503 None
504 } else {
505 Some(NautilusWsMessage::ExecutionReports(exec_reports))
506 }
507 }
508
509 fn handle_user_fills(
510 fills: &[super::messages::WsFillData],
511 instruments: &AHashMap<Ustr, InstrumentAny>,
512 cloid_cache: &DashMap<Ustr, ClientOrderId>,
513 account_id: AccountId,
514 ts_init: UnixNanos,
515 processed_trade_ids: &mut FifoCache<u64, 10_000>,
516 ) -> Option<NautilusWsMessage> {
517 let mut exec_reports = Vec::new();
518
519 for fill in fills {
520 if processed_trade_ids.contains(&fill.tid) {
521 log::debug!("Skipping duplicate fill: tid={}", fill.tid);
522 continue;
523 }
524
525 let instrument = instruments.get(&fill.coin);
526
527 if let Some(instrument) = instrument {
528 log::debug!("Found instrument for fill coin={}", fill.coin);
529 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
530 Ok(mut report) => {
531 processed_trade_ids.add(fill.tid);
533
534 if let Some(cloid) = &fill.cloid {
535 let cloid_ustr = Ustr::from(cloid.as_str());
536 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
537 let real_client_order_id = *entry.value();
538 log::debug!(
539 "Resolved fill cloid {cloid} -> {real_client_order_id}"
540 );
541 report.client_order_id = Some(real_client_order_id);
542 }
543 }
544 log::debug!(
545 "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
546 report.venue_order_id,
547 report.trade_id
548 );
549 exec_reports.push(ExecutionReport::Fill(report));
550 }
551 Err(e) => {
552 log::error!("Error parsing fill: {e}");
553 }
554 }
555 } else {
556 log::warn!(
558 "No instrument found for fill coin={}. Keys: {:?}",
559 fill.coin,
560 instruments.keys().collect::<Vec<_>>()
561 );
562 }
563 }
564
565 if exec_reports.is_empty() {
566 None
567 } else {
568 Some(NautilusWsMessage::ExecutionReports(exec_reports))
569 }
570 }
571
572 fn handle_trades(
573 data: &[super::messages::WsTradeData],
574 instruments: &AHashMap<Ustr, InstrumentAny>,
575 ts_init: UnixNanos,
576 ) -> Option<NautilusWsMessage> {
577 let mut trade_ticks = Vec::new();
578
579 for trade in data {
580 if let Some(instrument) = instruments.get(&trade.coin) {
581 match parse_ws_trade_tick(trade, instrument, ts_init) {
582 Ok(tick) => trade_ticks.push(tick),
583 Err(e) => {
584 log::error!("Error parsing trade tick: {e}");
585 }
586 }
587 } else {
588 log::debug!("No instrument found for coin: {}", trade.coin);
589 }
590 }
591
592 if trade_ticks.is_empty() {
593 None
594 } else {
595 Some(NautilusWsMessage::Trades(trade_ticks))
596 }
597 }
598
599 fn handle_bbo(
600 data: &super::messages::WsBboData,
601 instruments: &AHashMap<Ustr, InstrumentAny>,
602 ts_init: UnixNanos,
603 ) -> Option<NautilusWsMessage> {
604 if let Some(instrument) = instruments.get(&data.coin) {
605 match parse_ws_quote_tick(data, instrument, ts_init) {
606 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
607 Err(e) => {
608 log::error!("Error parsing quote tick: {e}");
609 None
610 }
611 }
612 } else {
613 log::debug!("No instrument found for coin: {}", data.coin);
614 None
615 }
616 }
617
618 fn handle_l2_book(
619 data: &super::messages::WsBookData,
620 instruments: &AHashMap<Ustr, InstrumentAny>,
621 ts_init: UnixNanos,
622 ) -> Option<NautilusWsMessage> {
623 if let Some(instrument) = instruments.get(&data.coin) {
624 match parse_ws_order_book_deltas(data, instrument, ts_init) {
625 Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
626 Err(e) => {
627 log::error!("Error parsing order book deltas: {e}");
628 None
629 }
630 }
631 } else {
632 log::debug!("No instrument found for coin: {}", data.coin);
633 None
634 }
635 }
636
637 fn handle_candle(
638 data: &CandleData,
639 instruments: &AHashMap<Ustr, InstrumentAny>,
640 bar_types: &AHashMap<String, BarType>,
641 bar_cache: &mut AHashMap<String, CandleData>,
642 ts_init: UnixNanos,
643 ) -> Option<NautilusWsMessage> {
644 let key = format!("candle:{}:{}", data.s, data.i);
645
646 let mut closed_bar = None;
647
648 if let Some(cached) = bar_cache.get(&key) {
649 if cached.close_time != data.close_time {
651 log::debug!(
652 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
653 data.s,
654 cached.close_time,
655 data.close_time
656 );
657 closed_bar = Some(cached.clone());
658 }
659 }
660
661 bar_cache.insert(key.clone(), data.clone());
662
663 if let Some(closed_data) = closed_bar {
664 if let Some(bar_type) = bar_types.get(&key) {
665 if let Some(instrument) = instruments.get(&data.s) {
666 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
667 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
668 Err(e) => {
669 log::error!("Error parsing closed candle: {e}");
670 }
671 }
672 } else {
673 log::debug!("No instrument found for coin: {}", data.s);
674 }
675 } else {
676 log::debug!("No bar type found for key: {key}");
677 }
678 }
679
680 None
681 }
682
683 fn handle_asset_context(
684 data: &WsActiveAssetCtxData,
685 instruments: &AHashMap<Ustr, InstrumentAny>,
686 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
687 mark_price_cache: &mut AHashMap<Ustr, String>,
688 index_price_cache: &mut AHashMap<Ustr, String>,
689 funding_rate_cache: &mut AHashMap<Ustr, String>,
690 ts_init: UnixNanos,
691 ) -> Vec<NautilusWsMessage> {
692 let mut result = Vec::new();
693
694 let coin = match data {
695 WsActiveAssetCtxData::Perp { coin, .. } => coin,
696 WsActiveAssetCtxData::Spot { coin, .. } => coin,
697 };
698
699 if let Some(instrument) = instruments.get(coin) {
700 let (mark_px, oracle_px, funding) = match data {
701 WsActiveAssetCtxData::Perp { ctx, .. } => (
702 &ctx.shared.mark_px,
703 Some(&ctx.oracle_px),
704 Some(&ctx.funding),
705 ),
706 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
707 };
708
709 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
710 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
711 let funding_changed =
712 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
713
714 let subscribed_types = asset_context_subs.get(coin);
715
716 if mark_changed || index_changed || funding_changed {
717 match parse_ws_asset_context(data, instrument, ts_init) {
718 Ok((mark_price, index_price, funding_rate)) => {
719 if mark_changed
720 && subscribed_types
721 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
722 {
723 mark_price_cache.insert(*coin, mark_px.clone());
724 result.push(NautilusWsMessage::MarkPrice(mark_price));
725 }
726
727 if index_changed
728 && subscribed_types
729 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
730 {
731 if let Some(px) = oracle_px {
732 index_price_cache.insert(*coin, px.clone());
733 }
734
735 if let Some(index) = index_price {
736 result.push(NautilusWsMessage::IndexPrice(index));
737 }
738 }
739
740 if funding_changed
741 && subscribed_types
742 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
743 {
744 if let Some(rate) = funding {
745 funding_rate_cache.insert(*coin, rate.clone());
746 }
747
748 if let Some(funding) = funding_rate {
749 result.push(NautilusWsMessage::FundingRate(funding));
750 }
751 }
752 }
753 Err(e) => {
754 log::error!("Error parsing asset context: {e}");
755 }
756 }
757 }
758 } else {
759 log::debug!("No instrument found for coin: {coin}");
760 }
761
762 result
763 }
764}
765
766pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
767 match sub {
768 SubscriptionRequest::AllMids { dex } => {
769 if let Some(dex_name) = dex {
770 format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
771 } else {
772 HyperliquidWsChannel::AllMids.as_str().to_string()
773 }
774 }
775 SubscriptionRequest::Notification { user } => {
776 format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
777 }
778 SubscriptionRequest::WebData2 { user } => {
779 format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
780 }
781 SubscriptionRequest::Candle { coin, interval } => {
782 format!(
783 "{}:{coin}:{}",
784 HyperliquidWsChannel::Candle.as_str(),
785 interval.as_str()
786 )
787 }
788 SubscriptionRequest::L2Book { coin, .. } => {
789 format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
790 }
791 SubscriptionRequest::Trades { coin } => {
792 format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
793 }
794 SubscriptionRequest::OrderUpdates { user } => {
795 format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
796 }
797 SubscriptionRequest::UserEvents { user } => {
798 format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
799 }
800 SubscriptionRequest::UserFills { user, .. } => {
801 format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
802 }
803 SubscriptionRequest::UserFundings { user } => {
804 format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
805 }
806 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
807 format!(
808 "{}:{user}",
809 HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
810 )
811 }
812 SubscriptionRequest::ActiveAssetCtx { coin } => {
813 format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
814 }
815 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
816 format!(
817 "{}:{coin}",
818 HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
819 )
820 }
821 SubscriptionRequest::ActiveAssetData { user, coin } => {
822 format!(
823 "{}:{user}:{coin}",
824 HyperliquidWsChannel::ActiveAssetData.as_str()
825 )
826 }
827 SubscriptionRequest::UserTwapSliceFills { user } => {
828 format!(
829 "{}:{user}",
830 HyperliquidWsChannel::UserTwapSliceFills.as_str()
831 )
832 }
833 SubscriptionRequest::UserTwapHistory { user } => {
834 format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
835 }
836 SubscriptionRequest::Bbo { coin } => {
837 format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
838 }
839 }
840}
841
842pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
844 match error {
845 HyperliquidWsError::TungsteniteError(_) => true,
846 HyperliquidWsError::ClientError(msg) => {
847 let msg_lower = msg.to_lowercase();
848 msg_lower.contains("timeout")
849 || msg_lower.contains("timed out")
850 || msg_lower.contains("connection")
851 || msg_lower.contains("network")
852 }
853 _ => false,
854 }
855}
856
857pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
859 HyperliquidWsError::ClientError(msg)
860}