1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::BarType,
29 identifiers::{AccountId, ClientOrderId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33 RECONNECTED,
34 retry::{RetryManager, create_websocket_retry_manager},
35 websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 client::AssetContextDataType,
42 enums::HyperliquidWsChannel,
43 error::HyperliquidWsError,
44 messages::{
45 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47 },
48 parse::{
49 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50 parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
51 },
52};
53
54#[derive(Debug)]
56#[allow(
57 clippy::large_enum_variant,
58 reason = "Commands are ephemeral and immediately consumed"
59)]
60#[allow(private_interfaces)]
61pub enum HandlerCommand {
62 SetClient(WebSocketClient),
64 Disconnect,
66 Subscribe {
68 subscriptions: Vec<SubscriptionRequest>,
69 },
70 Unsubscribe {
72 subscriptions: Vec<SubscriptionRequest>,
73 },
74 InitializeInstruments(Vec<InstrumentAny>),
76 UpdateInstrument(InstrumentAny),
78 AddBarType { key: String, bar_type: BarType },
80 RemoveBarType { key: String },
82 UpdateAssetContextSubs {
84 coin: Ustr,
85 data_types: AHashSet<AssetContextDataType>,
86 },
87 CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
89}
90
91pub(super) struct FeedHandler {
92 clock: &'static AtomicTime,
93 signal: Arc<AtomicBool>,
94 client: Option<WebSocketClient>,
95 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
98 account_id: Option<AccountId>,
99 subscriptions: SubscriptionState,
100 retry_manager: RetryManager<HyperliquidWsError>,
101 message_buffer: Vec<NautilusWsMessage>,
102 instruments: AHashMap<Ustr, InstrumentAny>,
103 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
104 bar_types_cache: AHashMap<String, BarType>,
105 bar_cache: AHashMap<String, CandleData>,
106 asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
107 processed_trade_ids: FifoCache<u64, 10_000>,
108 mark_price_cache: AHashMap<Ustr, String>,
109 index_price_cache: AHashMap<Ustr, String>,
110 funding_rate_cache: AHashMap<Ustr, String>,
111}
112
113impl FeedHandler {
114 pub(super) fn new(
116 signal: Arc<AtomicBool>,
117 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
118 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
120 account_id: Option<AccountId>,
121 subscriptions: SubscriptionState,
122 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
123 ) -> Self {
124 Self {
125 clock: get_atomic_clock_realtime(),
126 signal,
127 client: None,
128 cmd_rx,
129 raw_rx,
130 out_tx,
131 account_id,
132 subscriptions,
133 retry_manager: create_websocket_retry_manager(),
134 message_buffer: Vec::new(),
135 instruments: AHashMap::new(),
136 cloid_cache,
137 bar_types_cache: AHashMap::new(),
138 bar_cache: AHashMap::new(),
139 asset_context_subs: AHashMap::new(),
140 processed_trade_ids: FifoCache::new(),
141 mark_price_cache: AHashMap::new(),
142 index_price_cache: AHashMap::new(),
143 funding_rate_cache: AHashMap::new(),
144 }
145 }
146
147 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
149 self.out_tx
150 .send(msg)
151 .map_err(|e| format!("Failed to send message: {e}"))
152 }
153
154 pub(super) fn is_stopped(&self) -> bool {
156 self.signal.load(Ordering::Relaxed)
157 }
158
159 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
160 if let Some(client) = &self.client {
161 self.retry_manager
162 .execute_with_retry(
163 "websocket_send",
164 || {
165 let payload = payload.clone();
166 async move {
167 client.send_text(payload, None).await.map_err(|e| {
168 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
169 })
170 }
171 },
172 should_retry_hyperliquid_error,
173 create_hyperliquid_timeout_error,
174 )
175 .await
176 .map_err(|e| anyhow::anyhow!("{e}"))
177 } else {
178 Err(anyhow::anyhow!("No WebSocket client available"))
179 }
180 }
181
182 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
183 if !self.message_buffer.is_empty() {
184 return Some(self.message_buffer.remove(0));
185 }
186
187 loop {
188 tokio::select! {
189 Some(cmd) = self.cmd_rx.recv() => {
190 match cmd {
191 HandlerCommand::SetClient(client) => {
192 log::debug!("Setting WebSocket client in handler");
193 self.client = Some(client);
194 }
195 HandlerCommand::Disconnect => {
196 log::debug!("Handler received disconnect command");
197 if let Some(ref client) = self.client {
198 client.disconnect().await;
199 }
200 self.signal.store(true, Ordering::SeqCst);
201 return None;
202 }
203 HandlerCommand::Subscribe { subscriptions } => {
204 for subscription in subscriptions {
205 let key = subscription_to_key(&subscription);
206 self.subscriptions.mark_subscribe(&key);
207
208 let request = HyperliquidWsRequest::Subscribe { subscription };
209 match serde_json::to_string(&request) {
210 Ok(payload) => {
211 log::debug!("Sending subscribe payload: {payload}");
212 if let Err(e) = self.send_with_retry(payload).await {
213 log::error!("Error subscribing to {key}: {e}");
214 self.subscriptions.mark_failure(&key);
215 }
216 }
217 Err(e) => {
218 log::error!("Error serializing subscription for {key}: {e}");
219 self.subscriptions.mark_failure(&key);
220 }
221 }
222 }
223 }
224 HandlerCommand::Unsubscribe { subscriptions } => {
225 for subscription in subscriptions {
226 let key = subscription_to_key(&subscription);
227 self.subscriptions.mark_unsubscribe(&key);
228
229 let request = HyperliquidWsRequest::Unsubscribe { subscription };
230 match serde_json::to_string(&request) {
231 Ok(payload) => {
232 log::debug!("Sending unsubscribe payload: {payload}");
233 if let Err(e) = self.send_with_retry(payload).await {
234 log::error!("Error unsubscribing from {key}: {e}");
235 }
236 }
237 Err(e) => {
238 log::error!("Error serializing unsubscription for {key}: {e}");
239 }
240 }
241 }
242 }
243 HandlerCommand::InitializeInstruments(instruments) => {
244 for inst in instruments {
245 let coin = inst.raw_symbol().inner();
246 self.instruments.insert(coin, inst);
247 }
248 }
249 HandlerCommand::UpdateInstrument(inst) => {
250 let coin = inst.raw_symbol().inner();
251 self.instruments.insert(coin, inst);
252 }
253 HandlerCommand::AddBarType { key, bar_type } => {
254 self.bar_types_cache.insert(key, bar_type);
255 }
256 HandlerCommand::RemoveBarType { key } => {
257 self.bar_types_cache.remove(&key);
258 self.bar_cache.remove(&key);
259 }
260 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
261 if data_types.is_empty() {
262 self.asset_context_subs.remove(&coin);
263 } else {
264 self.asset_context_subs.insert(coin, data_types);
265 }
266 }
267 HandlerCommand::CacheSpotFillCoins(_) => {
268 }
270 }
271 continue;
272 }
273
274 Some(raw_msg) = self.raw_rx.recv() => {
275 match raw_msg {
276 Message::Text(text) => {
277 if text == RECONNECTED {
278 log::info!("Received RECONNECTED sentinel");
279 return Some(NautilusWsMessage::Reconnected);
280 }
281
282 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
283 Ok(msg) => {
284 let ts_init = self.clock.get_time_ns();
285
286 let nautilus_msgs = Self::parse_to_nautilus_messages(
287 msg,
288 &self.instruments,
289 &self.cloid_cache,
290 &self.bar_types_cache,
291 self.account_id,
292 ts_init,
293 &self.asset_context_subs,
294 &mut self.processed_trade_ids,
295 &mut self.mark_price_cache,
296 &mut self.index_price_cache,
297 &mut self.funding_rate_cache,
298 &mut self.bar_cache,
299 );
300
301 if !nautilus_msgs.is_empty() {
302 let mut iter = nautilus_msgs.into_iter();
303 let first = iter.next().unwrap();
304 self.message_buffer.extend(iter);
305 return Some(first);
306 }
307 }
308 Err(e) => {
309 log::error!("Error parsing WebSocket message: {e}, text: {text}");
310 }
311 }
312 }
313 Message::Ping(data) => {
314 if let Some(ref client) = self.client
315 && let Err(e) = client.send_pong(data.to_vec()).await {
316 log::error!("Error sending pong: {e}");
317 }
318 }
319 Message::Close(_) => {
320 log::info!("Received WebSocket close frame");
321 return None;
322 }
323 _ => {}
324 }
325 }
326
327 else => {
328 log::debug!("Handler shutting down: stream ended or command channel closed");
329 return None;
330 }
331 }
332 }
333 }
334
335 #[allow(clippy::too_many_arguments)]
336 fn parse_to_nautilus_messages(
337 msg: HyperliquidWsMessage,
338 instruments: &AHashMap<Ustr, InstrumentAny>,
339 cloid_cache: &DashMap<Ustr, ClientOrderId>,
340 bar_types: &AHashMap<String, BarType>,
341 account_id: Option<AccountId>,
342 ts_init: UnixNanos,
343 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
344 processed_trade_ids: &mut FifoCache<u64, 10_000>,
345 mark_price_cache: &mut AHashMap<Ustr, String>,
346 index_price_cache: &mut AHashMap<Ustr, String>,
347 funding_rate_cache: &mut AHashMap<Ustr, String>,
348 bar_cache: &mut AHashMap<String, CandleData>,
349 ) -> Vec<NautilusWsMessage> {
350 let mut result = Vec::new();
351
352 match msg {
353 HyperliquidWsMessage::OrderUpdates { data } => {
354 if let Some(account_id) = account_id
355 && let Some(msg) = Self::handle_order_updates(
356 &data,
357 instruments,
358 cloid_cache,
359 account_id,
360 ts_init,
361 )
362 {
363 result.push(msg);
364 }
365 }
366 HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
367 match data {
369 WsUserEventData::Fills { fills } => {
370 log::debug!("Received {} fill(s) from userEvents channel", fills.len());
371 for fill in &fills {
372 log::debug!(
373 "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
374 fill.oid,
375 fill.coin,
376 fill.side,
377 fill.sz,
378 fill.px
379 );
380 }
381 if let Some(account_id) = account_id {
382 log::debug!("Processing fills with account_id={account_id}");
383 if let Some(msg) = Self::handle_user_fills(
384 &fills,
385 instruments,
386 cloid_cache,
387 account_id,
388 ts_init,
389 processed_trade_ids,
390 ) {
391 log::debug!("Successfully created fill message");
392 result.push(msg);
393 } else {
394 log::debug!("handle_user_fills returned None (no new fills)");
395 }
396 } else {
397 log::warn!("Cannot process fills: account_id is None");
398 }
399 }
400 _ => {
401 log::debug!("Received non-fill user event: {data:?}");
402 }
403 }
404 }
405 HyperliquidWsMessage::UserFills { data } => {
406 if let Some(account_id) = account_id
409 && let Some(msg) = Self::handle_user_fills(
410 &data.fills,
411 instruments,
412 cloid_cache,
413 account_id,
414 ts_init,
415 processed_trade_ids,
416 )
417 {
418 result.push(msg);
419 }
420 }
421 HyperliquidWsMessage::Trades { data } => {
422 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
423 result.push(msg);
424 }
425 }
426 HyperliquidWsMessage::Bbo { data } => {
427 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
428 result.push(msg);
429 }
430 }
431 HyperliquidWsMessage::L2Book { data } => {
432 if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
433 result.push(msg);
434 }
435 }
436 HyperliquidWsMessage::Candle { data } => {
437 if let Some(msg) =
438 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
439 {
440 result.push(msg);
441 }
442 }
443 HyperliquidWsMessage::ActiveAssetCtx { data }
444 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
445 result.extend(Self::handle_asset_context(
446 &data,
447 instruments,
448 asset_context_subs,
449 mark_price_cache,
450 index_price_cache,
451 funding_rate_cache,
452 ts_init,
453 ));
454 }
455 HyperliquidWsMessage::Error { data } => {
456 log::warn!("Received error from Hyperliquid WebSocket: {data}");
457 }
458 _ => {}
460 }
461
462 result
463 }
464
465 fn handle_order_updates(
466 data: &[super::messages::WsOrderData],
467 instruments: &AHashMap<Ustr, InstrumentAny>,
468 cloid_cache: &DashMap<Ustr, ClientOrderId>,
469 account_id: AccountId,
470 ts_init: UnixNanos,
471 ) -> Option<NautilusWsMessage> {
472 let mut exec_reports = Vec::new();
473
474 for order_update in data {
475 let instrument = instruments.get(&order_update.order.coin);
476
477 if let Some(instrument) = instrument {
478 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
479 Ok(mut report) => {
480 if let Some(cloid) = &order_update.order.cloid {
482 let cloid_ustr = Ustr::from(cloid.as_str());
483 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
484 let real_client_order_id = *entry.value();
485 log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
486 report.client_order_id = Some(real_client_order_id);
487 }
488 }
489 exec_reports.push(ExecutionReport::Order(report));
490 }
491 Err(e) => {
492 log::error!("Error parsing order update: {e}");
493 }
494 }
495 } else {
496 log::debug!("No instrument found for coin: {}", order_update.order.coin);
497 }
498 }
499
500 if exec_reports.is_empty() {
501 None
502 } else {
503 Some(NautilusWsMessage::ExecutionReports(exec_reports))
504 }
505 }
506
507 fn handle_user_fills(
508 fills: &[super::messages::WsFillData],
509 instruments: &AHashMap<Ustr, InstrumentAny>,
510 cloid_cache: &DashMap<Ustr, ClientOrderId>,
511 account_id: AccountId,
512 ts_init: UnixNanos,
513 processed_trade_ids: &mut FifoCache<u64, 10_000>,
514 ) -> Option<NautilusWsMessage> {
515 let mut exec_reports = Vec::new();
516
517 for fill in fills {
518 if processed_trade_ids.contains(&fill.tid) {
519 log::debug!("Skipping duplicate fill: tid={}", fill.tid);
520 continue;
521 }
522
523 let instrument = instruments.get(&fill.coin);
524
525 if let Some(instrument) = instrument {
526 log::debug!("Found instrument for fill coin={}", fill.coin);
527 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
528 Ok(mut report) => {
529 processed_trade_ids.add(fill.tid);
531
532 if let Some(cloid) = &fill.cloid {
533 let cloid_ustr = Ustr::from(cloid.as_str());
534 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
535 let real_client_order_id = *entry.value();
536 log::debug!(
537 "Resolved fill cloid {cloid} -> {real_client_order_id}"
538 );
539 report.client_order_id = Some(real_client_order_id);
540 }
541 }
542 log::debug!(
543 "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
544 report.venue_order_id,
545 report.trade_id
546 );
547 exec_reports.push(ExecutionReport::Fill(report));
548 }
549 Err(e) => {
550 log::error!("Error parsing fill: {e}");
551 }
552 }
553 } else {
554 log::warn!(
556 "No instrument found for fill coin={}. Keys: {:?}",
557 fill.coin,
558 instruments.keys().collect::<Vec<_>>()
559 );
560 }
561 }
562
563 if exec_reports.is_empty() {
564 None
565 } else {
566 Some(NautilusWsMessage::ExecutionReports(exec_reports))
567 }
568 }
569
570 fn handle_trades(
571 data: &[super::messages::WsTradeData],
572 instruments: &AHashMap<Ustr, InstrumentAny>,
573 ts_init: UnixNanos,
574 ) -> Option<NautilusWsMessage> {
575 let mut trade_ticks = Vec::new();
576
577 for trade in data {
578 if let Some(instrument) = instruments.get(&trade.coin) {
579 match parse_ws_trade_tick(trade, instrument, ts_init) {
580 Ok(tick) => trade_ticks.push(tick),
581 Err(e) => {
582 log::error!("Error parsing trade tick: {e}");
583 }
584 }
585 } else {
586 log::debug!("No instrument found for coin: {}", trade.coin);
587 }
588 }
589
590 if trade_ticks.is_empty() {
591 None
592 } else {
593 Some(NautilusWsMessage::Trades(trade_ticks))
594 }
595 }
596
597 fn handle_bbo(
598 data: &super::messages::WsBboData,
599 instruments: &AHashMap<Ustr, InstrumentAny>,
600 ts_init: UnixNanos,
601 ) -> Option<NautilusWsMessage> {
602 if let Some(instrument) = instruments.get(&data.coin) {
603 match parse_ws_quote_tick(data, instrument, ts_init) {
604 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
605 Err(e) => {
606 log::error!("Error parsing quote tick: {e}");
607 None
608 }
609 }
610 } else {
611 log::debug!("No instrument found for coin: {}", data.coin);
612 None
613 }
614 }
615
616 fn handle_l2_book(
617 data: &super::messages::WsBookData,
618 instruments: &AHashMap<Ustr, InstrumentAny>,
619 ts_init: UnixNanos,
620 ) -> Option<NautilusWsMessage> {
621 if let Some(instrument) = instruments.get(&data.coin) {
622 match parse_ws_order_book_deltas(data, instrument, ts_init) {
623 Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
624 Err(e) => {
625 log::error!("Error parsing order book deltas: {e}");
626 None
627 }
628 }
629 } else {
630 log::debug!("No instrument found for coin: {}", data.coin);
631 None
632 }
633 }
634
635 fn handle_candle(
636 data: &CandleData,
637 instruments: &AHashMap<Ustr, InstrumentAny>,
638 bar_types: &AHashMap<String, BarType>,
639 bar_cache: &mut AHashMap<String, CandleData>,
640 ts_init: UnixNanos,
641 ) -> Option<NautilusWsMessage> {
642 let key = format!("candle:{}:{}", data.s, data.i);
643
644 let mut closed_bar = None;
645 if let Some(cached) = bar_cache.get(&key) {
646 if cached.close_time != data.close_time {
648 log::debug!(
649 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
650 data.s,
651 cached.close_time,
652 data.close_time
653 );
654 closed_bar = Some(cached.clone());
655 }
656 }
657
658 bar_cache.insert(key.clone(), data.clone());
659
660 if let Some(closed_data) = closed_bar {
661 if let Some(bar_type) = bar_types.get(&key) {
662 if let Some(instrument) = instruments.get(&data.s) {
663 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
664 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
665 Err(e) => {
666 log::error!("Error parsing closed candle: {e}");
667 }
668 }
669 } else {
670 log::debug!("No instrument found for coin: {}", data.s);
671 }
672 } else {
673 log::debug!("No bar type found for key: {key}");
674 }
675 }
676
677 None
678 }
679
680 fn handle_asset_context(
681 data: &WsActiveAssetCtxData,
682 instruments: &AHashMap<Ustr, InstrumentAny>,
683 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
684 mark_price_cache: &mut AHashMap<Ustr, String>,
685 index_price_cache: &mut AHashMap<Ustr, String>,
686 funding_rate_cache: &mut AHashMap<Ustr, String>,
687 ts_init: UnixNanos,
688 ) -> Vec<NautilusWsMessage> {
689 let mut result = Vec::new();
690
691 let coin = match data {
692 WsActiveAssetCtxData::Perp { coin, .. } => coin,
693 WsActiveAssetCtxData::Spot { coin, .. } => coin,
694 };
695
696 if let Some(instrument) = instruments.get(coin) {
697 let (mark_px, oracle_px, funding) = match data {
698 WsActiveAssetCtxData::Perp { ctx, .. } => (
699 &ctx.shared.mark_px,
700 Some(&ctx.oracle_px),
701 Some(&ctx.funding),
702 ),
703 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
704 };
705
706 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
707 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
708 let funding_changed =
709 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
710
711 let subscribed_types = asset_context_subs.get(coin);
712
713 if mark_changed || index_changed || funding_changed {
714 match parse_ws_asset_context(data, instrument, ts_init) {
715 Ok((mark_price, index_price, funding_rate)) => {
716 if mark_changed
717 && subscribed_types
718 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
719 {
720 mark_price_cache.insert(*coin, mark_px.clone());
721 result.push(NautilusWsMessage::MarkPrice(mark_price));
722 }
723 if index_changed
724 && subscribed_types
725 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
726 {
727 if let Some(px) = oracle_px {
728 index_price_cache.insert(*coin, px.clone());
729 }
730 if let Some(index) = index_price {
731 result.push(NautilusWsMessage::IndexPrice(index));
732 }
733 }
734 if funding_changed
735 && subscribed_types
736 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
737 {
738 if let Some(rate) = funding {
739 funding_rate_cache.insert(*coin, rate.clone());
740 }
741 if let Some(funding) = funding_rate {
742 result.push(NautilusWsMessage::FundingRate(funding));
743 }
744 }
745 }
746 Err(e) => {
747 log::error!("Error parsing asset context: {e}");
748 }
749 }
750 }
751 } else {
752 log::debug!("No instrument found for coin: {coin}");
753 }
754
755 result
756 }
757}
758
759pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
760 match sub {
761 SubscriptionRequest::AllMids { dex } => {
762 if let Some(dex_name) = dex {
763 format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
764 } else {
765 HyperliquidWsChannel::AllMids.as_str().to_string()
766 }
767 }
768 SubscriptionRequest::Notification { user } => {
769 format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
770 }
771 SubscriptionRequest::WebData2 { user } => {
772 format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
773 }
774 SubscriptionRequest::Candle { coin, interval } => {
775 format!(
776 "{}:{coin}:{}",
777 HyperliquidWsChannel::Candle.as_str(),
778 interval.as_str()
779 )
780 }
781 SubscriptionRequest::L2Book { coin, .. } => {
782 format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
783 }
784 SubscriptionRequest::Trades { coin } => {
785 format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
786 }
787 SubscriptionRequest::OrderUpdates { user } => {
788 format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
789 }
790 SubscriptionRequest::UserEvents { user } => {
791 format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
792 }
793 SubscriptionRequest::UserFills { user, .. } => {
794 format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
795 }
796 SubscriptionRequest::UserFundings { user } => {
797 format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
798 }
799 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
800 format!(
801 "{}:{user}",
802 HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
803 )
804 }
805 SubscriptionRequest::ActiveAssetCtx { coin } => {
806 format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
807 }
808 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
809 format!(
810 "{}:{coin}",
811 HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
812 )
813 }
814 SubscriptionRequest::ActiveAssetData { user, coin } => {
815 format!(
816 "{}:{user}:{coin}",
817 HyperliquidWsChannel::ActiveAssetData.as_str()
818 )
819 }
820 SubscriptionRequest::UserTwapSliceFills { user } => {
821 format!(
822 "{}:{user}",
823 HyperliquidWsChannel::UserTwapSliceFills.as_str()
824 )
825 }
826 SubscriptionRequest::UserTwapHistory { user } => {
827 format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
828 }
829 SubscriptionRequest::Bbo { coin } => {
830 format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
831 }
832 }
833}
834
835pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
837 match error {
838 HyperliquidWsError::TungsteniteError(_) => true,
839 HyperliquidWsError::ClientError(msg) => {
840 let msg_lower = msg.to_lowercase();
841 msg_lower.contains("timeout")
842 || msg_lower.contains("timed out")
843 || msg_lower.contains("connection")
844 || msg_lower.contains("network")
845 }
846 _ => false,
847 }
848}
849
850pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
852 HyperliquidWsError::ClientError(msg)
853}