1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::cache::fifo::FifoCache;
25use nautilus_core::{
26 AtomicTime, MUTEX_POISONED, Params, nanos::UnixNanos, time::get_atomic_clock_realtime,
27};
28use nautilus_model::{
29 data::{BarType, CustomData, Data, DataType},
30 identifiers::AccountId,
31 instruments::{Instrument, InstrumentAny},
32 types::Price,
33};
34use nautilus_network::{
35 RECONNECTED,
36 retry::{RetryManager, create_websocket_retry_manager},
37 websocket::{SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43 client::{AssetContextDataType, CloidCache},
44 enums::HyperliquidWsChannel,
45 error::HyperliquidWsError,
46 messages::{
47 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
48 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
49 },
50 parse::{
51 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
52 parse_ws_order_book_depth10, parse_ws_order_status_report, parse_ws_quote_tick,
53 parse_ws_trade_tick,
54 },
55};
56use crate::data_types::HyperliquidAllMids;
57
58#[derive(Debug)]
60#[expect(
61 clippy::large_enum_variant,
62 reason = "Commands are ephemeral and immediately consumed"
63)]
64#[allow(private_interfaces)]
65pub enum HandlerCommand {
66 SetClient(WebSocketClient),
68 Disconnect,
70 Subscribe {
72 subscriptions: Vec<SubscriptionRequest>,
73 },
74 Unsubscribe {
76 subscriptions: Vec<SubscriptionRequest>,
77 },
78 InitializeInstruments(Vec<InstrumentAny>),
80 UpdateInstrument(InstrumentAny),
82 AddBarType { key: String, bar_type: BarType },
84 RemoveBarType { key: String },
86 UpdateAssetContextSubs {
88 coin: Ustr,
89 data_types: AHashSet<AssetContextDataType>,
90 },
91 CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
93 SetDepth10Sub { coin: Ustr, subscribed: bool },
96}
97
98pub(super) struct FeedHandler {
99 clock: &'static AtomicTime,
100 signal: Arc<AtomicBool>,
101 client: Option<WebSocketClient>,
102 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
103 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
105 account_id: Option<AccountId>,
106 subscriptions: SubscriptionState,
107 retry_manager: RetryManager<HyperliquidWsError>,
108 message_buffer: Vec<NautilusWsMessage>,
109 instruments: AHashMap<Ustr, InstrumentAny>,
110 cloid_cache: CloidCache,
111 bar_types_cache: AHashMap<String, BarType>,
112 bar_cache: AHashMap<String, CandleData>,
113 asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
114 depth10_subs: AHashSet<Ustr>,
115 processed_trade_ids: FifoCache<u64, 10_000>,
116 mark_price_cache: AHashMap<Ustr, String>,
117 index_price_cache: AHashMap<Ustr, String>,
118 funding_rate_cache: AHashMap<Ustr, String>,
119}
120
121impl FeedHandler {
122 pub(super) fn new(
124 signal: Arc<AtomicBool>,
125 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
126 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
127 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
128 account_id: Option<AccountId>,
129 subscriptions: SubscriptionState,
130 cloid_cache: CloidCache,
131 ) -> Self {
132 Self {
133 clock: get_atomic_clock_realtime(),
134 signal,
135 client: None,
136 cmd_rx,
137 raw_rx,
138 out_tx,
139 account_id,
140 subscriptions,
141 retry_manager: create_websocket_retry_manager(),
142 message_buffer: Vec::new(),
143 instruments: AHashMap::new(),
144 cloid_cache,
145 bar_types_cache: AHashMap::new(),
146 bar_cache: AHashMap::new(),
147 asset_context_subs: AHashMap::new(),
148 depth10_subs: AHashSet::new(),
149 processed_trade_ids: FifoCache::new(),
150 mark_price_cache: AHashMap::new(),
151 index_price_cache: AHashMap::new(),
152 funding_rate_cache: AHashMap::new(),
153 }
154 }
155
156 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
158 self.out_tx
159 .send(msg)
160 .map_err(|e| format!("Failed to send message: {e}"))
161 }
162
163 pub(super) fn is_stopped(&self) -> bool {
165 self.signal.load(Ordering::Relaxed)
166 }
167
168 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
169 if let Some(client) = &self.client {
170 self.retry_manager
171 .execute_with_retry(
172 "websocket_send",
173 || {
174 let payload = payload.clone();
175 async move {
176 client.send_text(payload, None).await.map_err(|e| {
177 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
178 })
179 }
180 },
181 should_retry_hyperliquid_error,
182 create_hyperliquid_timeout_error,
183 )
184 .await
185 .map_err(|e| anyhow::anyhow!("{e}"))
186 } else {
187 Err(anyhow::anyhow!("No WebSocket client available"))
188 }
189 }
190
191 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
192 if !self.message_buffer.is_empty() {
193 return Some(self.message_buffer.remove(0));
194 }
195
196 loop {
197 tokio::select! {
198 Some(cmd) = self.cmd_rx.recv() => {
199 match cmd {
200 HandlerCommand::SetClient(client) => {
201 log::debug!("Setting WebSocket client in handler");
202 self.client = Some(client);
203 }
204 HandlerCommand::Disconnect => {
205 log::debug!("Handler received disconnect command");
206
207 if let Some(ref client) = self.client {
208 client.disconnect().await;
209 }
210 self.signal.store(true, Ordering::SeqCst);
211 return None;
212 }
213 HandlerCommand::Subscribe { subscriptions } => {
214 for subscription in subscriptions {
215 let key = subscription_to_key(&subscription);
216 self.subscriptions.mark_subscribe(&key);
217
218 let request = HyperliquidWsRequest::Subscribe { subscription };
219 match serde_json::to_string(&request) {
220 Ok(payload) => {
221 log::debug!("Sending subscribe payload: {payload}");
222 if let Err(e) = self.send_with_retry(payload).await {
223 log::error!("Error subscribing to {key}: {e}");
224 self.subscriptions.mark_failure(&key);
225 }
226 }
227 Err(e) => {
228 log::error!("Error serializing subscription for {key}: {e}");
229 self.subscriptions.mark_failure(&key);
230 }
231 }
232 }
233 }
234 HandlerCommand::Unsubscribe { subscriptions } => {
235 for subscription in subscriptions {
236 let key = subscription_to_key(&subscription);
237 self.subscriptions.mark_unsubscribe(&key);
238
239 let request = HyperliquidWsRequest::Unsubscribe { subscription };
240 match serde_json::to_string(&request) {
241 Ok(payload) => {
242 log::debug!("Sending unsubscribe payload: {payload}");
243 if let Err(e) = self.send_with_retry(payload).await {
244 log::error!("Error unsubscribing from {key}: {e}");
245 }
246 }
247 Err(e) => {
248 log::error!("Error serializing unsubscription for {key}: {e}");
249 }
250 }
251 }
252 }
253 HandlerCommand::InitializeInstruments(instruments) => {
254 for inst in instruments {
255 let coin = inst.raw_symbol().inner();
256 self.instruments.insert(coin, inst);
257 }
258 }
259 HandlerCommand::UpdateInstrument(inst) => {
260 let coin = inst.raw_symbol().inner();
261 self.instruments.insert(coin, inst);
262 }
263 HandlerCommand::AddBarType { key, bar_type } => {
264 self.bar_types_cache.insert(key, bar_type);
265 }
266 HandlerCommand::RemoveBarType { key } => {
267 self.bar_types_cache.remove(&key);
268 self.bar_cache.remove(&key);
269 }
270 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
271 if data_types.is_empty() {
272 self.asset_context_subs.remove(&coin);
273 } else {
274 self.asset_context_subs.insert(coin, data_types);
275 }
276 }
277 HandlerCommand::CacheSpotFillCoins(_) => {
278 }
280 HandlerCommand::SetDepth10Sub { coin, subscribed } => {
281 if subscribed {
282 self.depth10_subs.insert(coin);
283 } else {
284 self.depth10_subs.remove(&coin);
285 }
286 }
287 }
288 }
289
290 Some(raw_msg) = self.raw_rx.recv() => {
291 match raw_msg {
292 Message::Text(text) => {
293 if text == RECONNECTED {
294 log::info!("Received RECONNECTED sentinel");
295 return Some(NautilusWsMessage::Reconnected);
296 }
297
298 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
299 Ok(msg) => {
300 let ts_init = self.clock.get_time_ns();
301 let all_mids_data_types =
302 Self::all_mids_data_types(&self.subscriptions);
303
304 let nautilus_msgs = Self::parse_to_nautilus_messages(
305 msg,
306 &self.instruments,
307 &self.cloid_cache,
308 &self.bar_types_cache,
309 self.account_id,
310 ts_init,
311 &self.asset_context_subs,
312 &self.depth10_subs,
313 &mut self.processed_trade_ids,
314 &mut self.mark_price_cache,
315 &mut self.index_price_cache,
316 &mut self.funding_rate_cache,
317 &mut self.bar_cache,
318 &all_mids_data_types,
319 );
320
321 if !nautilus_msgs.is_empty() {
322 let mut iter = nautilus_msgs.into_iter();
323 let first = iter.next().unwrap();
324 self.message_buffer.extend(iter);
325 return Some(first);
326 }
327 }
328 Err(e) => {
329 log::error!("Error parsing WebSocket message: {e}, text: {text}");
330 }
331 }
332 }
333 Message::Ping(data) => {
334 if let Some(ref client) = self.client
335 && let Err(e) = client.send_pong(data.to_vec()).await {
336 log::error!("Error sending pong: {e}");
337 }
338 }
339 Message::Close(_) => {
340 log::info!("Received WebSocket close frame");
341 return None;
342 }
343 _ => {}
344 }
345 }
346
347 else => {
348 log::debug!("Handler shutting down: stream ended or command channel closed");
349 return None;
350 }
351 }
352 }
353 }
354
355 #[expect(clippy::too_many_arguments)]
356 fn parse_to_nautilus_messages(
357 msg: HyperliquidWsMessage,
358 instruments: &AHashMap<Ustr, InstrumentAny>,
359 cloid_cache: &CloidCache,
360 bar_types: &AHashMap<String, BarType>,
361 account_id: Option<AccountId>,
362 ts_init: UnixNanos,
363 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
364 depth10_subs: &AHashSet<Ustr>,
365 processed_trade_ids: &mut FifoCache<u64, 10_000>,
366 mark_price_cache: &mut AHashMap<Ustr, String>,
367 index_price_cache: &mut AHashMap<Ustr, String>,
368 funding_rate_cache: &mut AHashMap<Ustr, String>,
369 bar_cache: &mut AHashMap<String, CandleData>,
370 all_mids_data_types: &[DataType],
371 ) -> Vec<NautilusWsMessage> {
372 let mut result = Vec::new();
373
374 match msg {
375 HyperliquidWsMessage::OrderUpdates { data } => {
376 if let Some(account_id) = account_id
377 && let Some(msg) = Self::handle_order_updates(
378 &data,
379 instruments,
380 cloid_cache,
381 account_id,
382 ts_init,
383 )
384 {
385 result.push(msg);
386 }
387 }
388 HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
389 match data {
391 WsUserEventData::Fills { fills } => {
392 log::debug!("Received {} fill(s) from userEvents channel", fills.len());
393 for fill in &fills {
394 log::debug!(
395 "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
396 fill.oid,
397 fill.coin,
398 fill.side,
399 fill.sz,
400 fill.px
401 );
402 }
403
404 if let Some(account_id) = account_id {
405 log::debug!("Processing fills with account_id={account_id}");
406
407 if let Some(msg) = Self::handle_user_fills(
408 &fills,
409 instruments,
410 cloid_cache,
411 account_id,
412 ts_init,
413 processed_trade_ids,
414 ) {
415 log::debug!("Successfully created fill message");
416 result.push(msg);
417 } else {
418 log::debug!("handle_user_fills returned None (no new fills)");
419 }
420 } else {
421 log::warn!("Cannot process fills: account_id is None");
422 }
423 }
424 WsUserEventData::Liquidation { liquidation } => {
425 log::warn!(
426 "Liquidation event: lid={}, liquidator={}, liquidated_user={}, ntl_pos={}, account_value={}",
427 liquidation.lid,
428 liquidation.liquidator,
429 liquidation.liquidated_user,
430 liquidation.liquidated_ntl_pos,
431 liquidation.liquidated_account_value,
432 );
433 }
434 _ => {
435 log::debug!("Received non-fill user event: {data:?}");
436 }
437 }
438 }
439 HyperliquidWsMessage::UserFills { data } => {
440 if let Some(account_id) = account_id
443 && let Some(msg) = Self::handle_user_fills(
444 &data.fills,
445 instruments,
446 cloid_cache,
447 account_id,
448 ts_init,
449 processed_trade_ids,
450 )
451 {
452 result.push(msg);
453 }
454 }
455 HyperliquidWsMessage::Trades { data } => {
456 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
457 result.push(msg);
458 }
459 }
460 HyperliquidWsMessage::AllMids { data } => {
461 let mut mids = std::collections::HashMap::new();
462 for (coin, mid_str) in &data.mids {
463 let coin_ustr = Ustr::from(coin.as_str());
464 if let Some(instrument) = instruments.get(&coin_ustr) {
465 match mid_str.parse::<Price>() {
466 Ok(price) => {
467 mids.insert(instrument.id(), price);
468 }
469 Err(e) => {
470 log::warn!("Failed to parse mid price for {coin}: {e}");
471 }
472 }
473 } else {
474 log::debug!("No instrument found for coin: {coin}");
475 }
476 }
477
478 if !mids.is_empty() {
479 for data_type in all_mids_data_types {
480 let all_mids = HyperliquidAllMids::new(mids.clone(), ts_init, ts_init);
481 result.push(NautilusWsMessage::CustomData(Data::Custom(
482 CustomData::new(Arc::new(all_mids), data_type.clone()),
483 )));
484 }
485 }
486 }
487 HyperliquidWsMessage::Bbo { data } => {
488 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
489 result.push(msg);
490 }
491 }
492 HyperliquidWsMessage::L2Book { data } => {
493 result.extend(Self::handle_l2_book(
494 &data,
495 instruments,
496 depth10_subs,
497 ts_init,
498 ));
499 }
500 HyperliquidWsMessage::Candle { data } => {
501 if let Some(msg) =
502 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
503 {
504 result.push(msg);
505 }
506 }
507 HyperliquidWsMessage::ActiveAssetCtx { data }
508 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
509 result.extend(Self::handle_asset_context(
510 &data,
511 instruments,
512 asset_context_subs,
513 mark_price_cache,
514 index_price_cache,
515 funding_rate_cache,
516 ts_init,
517 ));
518 }
519 HyperliquidWsMessage::Error { data } => {
520 log::warn!("Received error from Hyperliquid WebSocket: {data}");
521 }
522 _ => {}
524 }
525
526 result
527 }
528
529 fn handle_order_updates(
530 data: &[super::messages::WsOrderData],
531 instruments: &AHashMap<Ustr, InstrumentAny>,
532 cloid_cache: &CloidCache,
533 account_id: AccountId,
534 ts_init: UnixNanos,
535 ) -> Option<NautilusWsMessage> {
536 let mut exec_reports = Vec::new();
537
538 for order_update in data {
539 let instrument = instruments.get(&order_update.order.coin);
540
541 if let Some(instrument) = instrument {
542 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
543 Ok(mut report) => {
544 if let Some(cloid) = &order_update.order.cloid {
546 let cloid_ustr = Ustr::from(cloid.as_str());
547 let resolved = cloid_cache
548 .lock()
549 .expect(MUTEX_POISONED)
550 .get(&cloid_ustr)
551 .copied();
552
553 if let Some(real_client_order_id) = resolved {
554 log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
555 report.client_order_id = Some(real_client_order_id);
556 }
557 }
558 exec_reports.push(ExecutionReport::Order(report));
559 }
560 Err(e) => {
561 log::error!("Error parsing order update: {e}");
562 }
563 }
564 } else {
565 log::debug!("No instrument found for coin: {}", order_update.order.coin);
566 }
567 }
568
569 if exec_reports.is_empty() {
570 None
571 } else {
572 Some(NautilusWsMessage::ExecutionReports(exec_reports))
573 }
574 }
575
576 fn handle_user_fills(
577 fills: &[super::messages::WsFillData],
578 instruments: &AHashMap<Ustr, InstrumentAny>,
579 cloid_cache: &CloidCache,
580 account_id: AccountId,
581 ts_init: UnixNanos,
582 processed_trade_ids: &mut FifoCache<u64, 10_000>,
583 ) -> Option<NautilusWsMessage> {
584 let mut exec_reports = Vec::new();
585
586 for fill in fills {
587 if processed_trade_ids.contains(&fill.tid) {
588 log::debug!("Skipping duplicate fill: tid={}", fill.tid);
589 continue;
590 }
591
592 let instrument = instruments.get(&fill.coin);
593
594 if let Some(instrument) = instrument {
595 log::debug!("Found instrument for fill coin={}", fill.coin);
596 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
597 Ok(mut report) => {
598 processed_trade_ids.add(fill.tid);
600
601 if let Some(cloid) = &fill.cloid {
602 let cloid_ustr = Ustr::from(cloid.as_str());
603 let resolved = cloid_cache
604 .lock()
605 .expect(MUTEX_POISONED)
606 .get(&cloid_ustr)
607 .copied();
608
609 if let Some(real_client_order_id) = resolved {
610 log::debug!(
611 "Resolved fill cloid {cloid} -> {real_client_order_id}"
612 );
613 report.client_order_id = Some(real_client_order_id);
614 }
615 }
616 log::debug!(
617 "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
618 report.venue_order_id,
619 report.trade_id
620 );
621 exec_reports.push(ExecutionReport::Fill(report));
622 }
623 Err(e) => {
624 log::error!("Error parsing fill: {e}");
625 }
626 }
627 } else {
628 log::warn!(
630 "No instrument found for fill coin={}. Keys: {:?}",
631 fill.coin,
632 instruments.keys().collect::<Vec<_>>()
633 );
634 }
635 }
636
637 if exec_reports.is_empty() {
638 None
639 } else {
640 Some(NautilusWsMessage::ExecutionReports(exec_reports))
641 }
642 }
643
644 fn handle_trades(
645 data: &[super::messages::WsTradeData],
646 instruments: &AHashMap<Ustr, InstrumentAny>,
647 ts_init: UnixNanos,
648 ) -> Option<NautilusWsMessage> {
649 let mut trade_ticks = Vec::new();
650
651 for trade in data {
652 if let Some(instrument) = instruments.get(&trade.coin) {
653 match parse_ws_trade_tick(trade, instrument, ts_init) {
654 Ok(tick) => trade_ticks.push(tick),
655 Err(e) => {
656 log::error!("Error parsing trade tick: {e}");
657 }
658 }
659 } else {
660 log::debug!("No instrument found for coin: {}", trade.coin);
661 }
662 }
663
664 if trade_ticks.is_empty() {
665 None
666 } else {
667 Some(NautilusWsMessage::Trades(trade_ticks))
668 }
669 }
670
671 fn handle_bbo(
672 data: &super::messages::WsBboData,
673 instruments: &AHashMap<Ustr, InstrumentAny>,
674 ts_init: UnixNanos,
675 ) -> Option<NautilusWsMessage> {
676 if let Some(instrument) = instruments.get(&data.coin) {
677 match parse_ws_quote_tick(data, instrument, ts_init) {
678 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
679 Err(e) => {
680 log::error!("Error parsing quote tick: {e}");
681 None
682 }
683 }
684 } else {
685 log::debug!("No instrument found for coin: {}", data.coin);
686 None
687 }
688 }
689
690 fn handle_l2_book(
691 data: &super::messages::WsBookData,
692 instruments: &AHashMap<Ustr, InstrumentAny>,
693 depth10_subs: &AHashSet<Ustr>,
694 ts_init: UnixNanos,
695 ) -> Vec<NautilusWsMessage> {
696 let mut out = Vec::new();
697
698 let Some(instrument) = instruments.get(&data.coin) else {
699 log::debug!("No instrument found for coin: {}", data.coin);
700 return out;
701 };
702
703 match parse_ws_order_book_deltas(data, instrument, ts_init) {
704 Ok(deltas) => out.push(NautilusWsMessage::Deltas(deltas)),
705 Err(e) => log::error!("Error parsing order book deltas: {e}"),
706 }
707
708 if depth10_subs.contains(&data.coin) {
709 match parse_ws_order_book_depth10(data, instrument, ts_init) {
710 Ok(depth) => out.push(NautilusWsMessage::Depth10(Box::new(depth))),
711 Err(e) => log::error!("Error parsing order book depth10: {e}"),
712 }
713 }
714
715 out
716 }
717
718 fn handle_candle(
719 data: &CandleData,
720 instruments: &AHashMap<Ustr, InstrumentAny>,
721 bar_types: &AHashMap<String, BarType>,
722 bar_cache: &mut AHashMap<String, CandleData>,
723 ts_init: UnixNanos,
724 ) -> Option<NautilusWsMessage> {
725 let key = format!("candle:{}:{}", data.s, data.i);
726
727 let mut closed_bar = None;
728
729 if let Some(cached) = bar_cache.get(&key) {
730 if cached.close_time != data.close_time {
732 log::debug!(
733 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
734 data.s,
735 cached.close_time,
736 data.close_time
737 );
738 closed_bar = Some(cached.clone());
739 }
740 }
741
742 bar_cache.insert(key.clone(), data.clone());
743
744 if let Some(closed_data) = closed_bar {
745 if let Some(bar_type) = bar_types.get(&key) {
746 if let Some(instrument) = instruments.get(&data.s) {
747 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
748 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
749 Err(e) => {
750 log::error!("Error parsing closed candle: {e}");
751 }
752 }
753 } else {
754 log::debug!("No instrument found for coin: {}", data.s);
755 }
756 } else {
757 log::debug!("No bar type found for key: {key}");
758 }
759 }
760
761 None
762 }
763
764 fn handle_asset_context(
765 data: &WsActiveAssetCtxData,
766 instruments: &AHashMap<Ustr, InstrumentAny>,
767 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
768 mark_price_cache: &mut AHashMap<Ustr, String>,
769 index_price_cache: &mut AHashMap<Ustr, String>,
770 funding_rate_cache: &mut AHashMap<Ustr, String>,
771 ts_init: UnixNanos,
772 ) -> Vec<NautilusWsMessage> {
773 let mut result = Vec::new();
774
775 let coin = match data {
776 WsActiveAssetCtxData::Perp { coin, .. } => coin,
777 WsActiveAssetCtxData::Spot { coin, .. } => coin,
778 };
779
780 if let Some(instrument) = instruments.get(coin) {
781 let (mark_px, oracle_px, funding) = match data {
782 WsActiveAssetCtxData::Perp { ctx, .. } => (
783 &ctx.shared.mark_px,
784 Some(&ctx.oracle_px),
785 Some(&ctx.funding),
786 ),
787 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
788 };
789
790 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
791 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
792 let funding_changed =
793 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
794
795 let subscribed_types = asset_context_subs.get(coin);
796
797 if mark_changed || index_changed || funding_changed {
798 match parse_ws_asset_context(data, instrument, ts_init) {
799 Ok((mark_price, index_price, funding_rate)) => {
800 if mark_changed
801 && subscribed_types
802 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
803 {
804 mark_price_cache.insert(*coin, mark_px.clone());
805 result.push(NautilusWsMessage::MarkPrice(mark_price));
806 }
807
808 if index_changed
809 && subscribed_types
810 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
811 {
812 if let Some(px) = oracle_px {
813 index_price_cache.insert(*coin, px.clone());
814 }
815
816 if let Some(index) = index_price {
817 result.push(NautilusWsMessage::IndexPrice(index));
818 }
819 }
820
821 if funding_changed
822 && subscribed_types
823 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
824 {
825 if let Some(rate) = funding {
826 funding_rate_cache.insert(*coin, rate.clone());
827 }
828
829 if let Some(funding) = funding_rate {
830 result.push(NautilusWsMessage::FundingRate(funding));
831 }
832 }
833 }
834 Err(e) => {
835 log::error!("Error parsing asset context: {e}");
836 }
837 }
838 }
839 } else {
840 log::debug!("No instrument found for coin: {coin}");
841 }
842
843 result
844 }
845
846 fn all_mids_data_types(subscriptions: &SubscriptionState) -> Vec<DataType> {
847 let mut topics = subscriptions.all_topics();
848 topics.sort_unstable();
849 topics.dedup();
850
851 let all_mids_channel = HyperliquidWsChannel::AllMids.as_str();
852 let all_mids_prefix = format!("{all_mids_channel}:");
853 let mut data_types = Vec::new();
854
855 for topic in topics {
856 if topic == all_mids_channel {
857 data_types.push(DataType::new("HyperliquidAllMids", None, None));
858 } else if let Some(dex) = topic.strip_prefix(&all_mids_prefix) {
859 let mut metadata = Params::new();
860 metadata.insert(
861 "dex".to_string(),
862 serde_json::Value::String(dex.to_string()),
863 );
864 data_types.push(DataType::new("HyperliquidAllMids", Some(metadata), None));
865 }
866 }
867
868 if data_types.is_empty() {
869 data_types.push(DataType::new("HyperliquidAllMids", None, None));
870 }
871
872 data_types
873 }
874}
875
876pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
877 match sub {
878 SubscriptionRequest::AllMids { dex } => {
879 if let Some(dex_name) = dex {
880 format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
881 } else {
882 HyperliquidWsChannel::AllMids.as_str().to_string()
883 }
884 }
885 SubscriptionRequest::Notification { user } => {
886 format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
887 }
888 SubscriptionRequest::WebData2 { user } => {
889 format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
890 }
891 SubscriptionRequest::Candle { coin, interval } => {
892 format!(
893 "{}:{coin}:{}",
894 HyperliquidWsChannel::Candle.as_str(),
895 interval.as_str()
896 )
897 }
898 SubscriptionRequest::L2Book { coin, .. } => {
899 format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
900 }
901 SubscriptionRequest::Trades { coin } => {
902 format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
903 }
904 SubscriptionRequest::OrderUpdates { user } => {
905 format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
906 }
907 SubscriptionRequest::UserEvents { user } => {
908 format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
909 }
910 SubscriptionRequest::UserFills { user, .. } => {
911 format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
912 }
913 SubscriptionRequest::UserFundings { user } => {
914 format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
915 }
916 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
917 format!(
918 "{}:{user}",
919 HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
920 )
921 }
922 SubscriptionRequest::ActiveAssetCtx { coin } => {
923 format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
924 }
925 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
926 format!(
927 "{}:{coin}",
928 HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
929 )
930 }
931 SubscriptionRequest::ActiveAssetData { user, coin } => {
932 format!(
933 "{}:{user}:{coin}",
934 HyperliquidWsChannel::ActiveAssetData.as_str()
935 )
936 }
937 SubscriptionRequest::UserTwapSliceFills { user } => {
938 format!(
939 "{}:{user}",
940 HyperliquidWsChannel::UserTwapSliceFills.as_str()
941 )
942 }
943 SubscriptionRequest::UserTwapHistory { user } => {
944 format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
945 }
946 SubscriptionRequest::Bbo { coin } => {
947 format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
948 }
949 }
950}
951
952pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
954 match error {
955 HyperliquidWsError::TungsteniteError(_) => true,
956 HyperliquidWsError::ClientError(msg) => {
957 let msg_lower = msg.to_lowercase();
958 msg_lower.contains("timeout")
959 || msg_lower.contains("timed out")
960 || msg_lower.contains("connection")
961 || msg_lower.contains("network")
962 }
963 _ => false,
964 }
965}
966
967pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
969 HyperliquidWsError::ClientError(msg)
970}
971
972#[cfg(test)]
973mod tests {
974 use ahash::{AHashMap, AHashSet};
975 use nautilus_core::nanos::UnixNanos;
976 use nautilus_model::{
977 identifiers::{InstrumentId, Symbol},
978 instruments::{CryptoPerpetual, InstrumentAny},
979 types::{Currency, Price, Quantity},
980 };
981 use rstest::rstest;
982 use ustr::Ustr;
983
984 use super::{
985 super::messages::{NautilusWsMessage, WsBookData, WsLevelData},
986 FeedHandler,
987 };
988 use crate::common::consts::HYPERLIQUID_VENUE;
989
990 fn btc_perp() -> InstrumentAny {
991 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
992 InstrumentId::new(Symbol::new("BTC-PERP"), *HYPERLIQUID_VENUE),
993 Symbol::new("BTC-PERP"),
994 Currency::from("BTC"),
995 Currency::from("USDC"),
996 Currency::from("USDC"),
997 false,
998 2,
999 3,
1000 Price::from("0.01"),
1001 Quantity::from("0.001"),
1002 None,
1003 None,
1004 None,
1005 None,
1006 None,
1007 None,
1008 None,
1009 None,
1010 None,
1011 None,
1012 None,
1013 None,
1014 None,
1015 UnixNanos::default(),
1016 UnixNanos::default(),
1017 ))
1018 }
1019
1020 fn one_level_book() -> WsBookData {
1021 WsBookData {
1022 coin: Ustr::from("BTC"),
1023 levels: [
1024 vec![WsLevelData {
1025 px: "100.00".to_string(),
1026 sz: "1.0".to_string(),
1027 n: 1,
1028 }],
1029 vec![WsLevelData {
1030 px: "100.01".to_string(),
1031 sz: "1.0".to_string(),
1032 n: 1,
1033 }],
1034 ],
1035 time: 1_700_000_000_000,
1036 }
1037 }
1038
1039 #[rstest]
1040 fn handle_l2_book_emits_deltas_only_when_not_in_depth10_subs() {
1041 let mut instruments = AHashMap::new();
1042 instruments.insert(Ustr::from("BTC"), btc_perp());
1043 let depth10_subs = AHashSet::<Ustr>::new();
1044
1045 let msgs = FeedHandler::handle_l2_book(
1046 &one_level_book(),
1047 &instruments,
1048 &depth10_subs,
1049 UnixNanos::default(),
1050 );
1051
1052 assert_eq!(msgs.len(), 1);
1053 assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
1054 }
1055
1056 #[rstest]
1057 fn handle_l2_book_emits_deltas_and_depth10_when_coin_in_subs() {
1058 let mut instruments = AHashMap::new();
1059 instruments.insert(Ustr::from("BTC"), btc_perp());
1060 let mut depth10_subs = AHashSet::<Ustr>::new();
1061 depth10_subs.insert(Ustr::from("BTC"));
1062
1063 let msgs = FeedHandler::handle_l2_book(
1064 &one_level_book(),
1065 &instruments,
1066 &depth10_subs,
1067 UnixNanos::default(),
1068 );
1069
1070 assert_eq!(msgs.len(), 2);
1071 assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
1072 assert!(matches!(msgs[1], NautilusWsMessage::Depth10(_)));
1073 }
1074
1075 #[rstest]
1076 fn handle_l2_book_returns_empty_when_instrument_unknown() {
1077 let instruments = AHashMap::<Ustr, InstrumentAny>::new();
1078 let depth10_subs = AHashSet::<Ustr>::new();
1079
1080 let msgs = FeedHandler::handle_l2_book(
1081 &one_level_book(),
1082 &instruments,
1083 &depth10_subs,
1084 UnixNanos::default(),
1085 );
1086
1087 assert!(msgs.is_empty());
1088 }
1089}