Skip to main content

nautilus_hyperliquid/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Hyperliquid.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::BarType,
29    identifiers::{AccountId, ClientOrderId},
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33    RECONNECTED,
34    retry::{RetryManager, create_websocket_retry_manager},
35    websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41    client::AssetContextDataType,
42    enums::HyperliquidWsChannel,
43    error::HyperliquidWsError,
44    messages::{
45        CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46        SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47    },
48    parse::{
49        parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50        parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
51    },
52};
53
54/// Commands sent from the outer client to the inner message handler.
55#[derive(Debug)]
56#[allow(
57    clippy::large_enum_variant,
58    reason = "Commands are ephemeral and immediately consumed"
59)]
60#[allow(private_interfaces)]
61pub enum HandlerCommand {
62    /// Set the WebSocketClient for the handler to use.
63    SetClient(WebSocketClient),
64    /// Disconnect the WebSocket connection.
65    Disconnect,
66    /// Subscribe to the given subscriptions.
67    Subscribe {
68        subscriptions: Vec<SubscriptionRequest>,
69    },
70    /// Unsubscribe from the given subscriptions.
71    Unsubscribe {
72        subscriptions: Vec<SubscriptionRequest>,
73    },
74    /// Initialize the instruments cache with the given instruments.
75    InitializeInstruments(Vec<InstrumentAny>),
76    /// Update a single instrument in the cache.
77    UpdateInstrument(InstrumentAny),
78    /// Add a bar type mapping for candle parsing.
79    AddBarType { key: String, bar_type: BarType },
80    /// Remove a bar type mapping.
81    RemoveBarType { key: String },
82    /// Update asset context subscriptions for a coin.
83    UpdateAssetContextSubs {
84        coin: Ustr,
85        data_types: AHashSet<AssetContextDataType>,
86    },
87    /// Cache spot fill coin mappings for instrument lookup.
88    CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
89}
90
91pub(super) struct FeedHandler {
92    clock: &'static AtomicTime,
93    signal: Arc<AtomicBool>,
94    client: Option<WebSocketClient>,
95    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
98    account_id: Option<AccountId>,
99    subscriptions: SubscriptionState,
100    retry_manager: RetryManager<HyperliquidWsError>,
101    message_buffer: Vec<NautilusWsMessage>,
102    instruments: AHashMap<Ustr, InstrumentAny>,
103    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
104    bar_types_cache: AHashMap<String, BarType>,
105    bar_cache: AHashMap<String, CandleData>,
106    asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
107    processed_trade_ids: FifoCache<u64, 10_000>,
108    mark_price_cache: AHashMap<Ustr, String>,
109    index_price_cache: AHashMap<Ustr, String>,
110    funding_rate_cache: AHashMap<Ustr, String>,
111}
112
113impl FeedHandler {
114    /// Creates a new [`FeedHandler`] instance.
115    pub(super) fn new(
116        signal: Arc<AtomicBool>,
117        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
118        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
120        account_id: Option<AccountId>,
121        subscriptions: SubscriptionState,
122        cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
123    ) -> Self {
124        Self {
125            clock: get_atomic_clock_realtime(),
126            signal,
127            client: None,
128            cmd_rx,
129            raw_rx,
130            out_tx,
131            account_id,
132            subscriptions,
133            retry_manager: create_websocket_retry_manager(),
134            message_buffer: Vec::new(),
135            instruments: AHashMap::new(),
136            cloid_cache,
137            bar_types_cache: AHashMap::new(),
138            bar_cache: AHashMap::new(),
139            asset_context_subs: AHashMap::new(),
140            processed_trade_ids: FifoCache::new(),
141            mark_price_cache: AHashMap::new(),
142            index_price_cache: AHashMap::new(),
143            funding_rate_cache: AHashMap::new(),
144        }
145    }
146
147    /// Send a message to the output channel.
148    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
149        self.out_tx
150            .send(msg)
151            .map_err(|e| format!("Failed to send message: {e}"))
152    }
153
154    /// Check if the handler has received a stop signal.
155    pub(super) fn is_stopped(&self) -> bool {
156        self.signal.load(Ordering::Relaxed)
157    }
158
159    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
160        if let Some(client) = &self.client {
161            self.retry_manager
162                .execute_with_retry(
163                    "websocket_send",
164                    || {
165                        let payload = payload.clone();
166                        async move {
167                            client.send_text(payload, None).await.map_err(|e| {
168                                HyperliquidWsError::ClientError(format!("Send failed: {e}"))
169                            })
170                        }
171                    },
172                    should_retry_hyperliquid_error,
173                    create_hyperliquid_timeout_error,
174                )
175                .await
176                .map_err(|e| anyhow::anyhow!("{e}"))
177        } else {
178            Err(anyhow::anyhow!("No WebSocket client available"))
179        }
180    }
181
182    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
183        if !self.message_buffer.is_empty() {
184            return Some(self.message_buffer.remove(0));
185        }
186
187        loop {
188            tokio::select! {
189                Some(cmd) = self.cmd_rx.recv() => {
190                    match cmd {
191                        HandlerCommand::SetClient(client) => {
192                            log::debug!("Setting WebSocket client in handler");
193                            self.client = Some(client);
194                        }
195                        HandlerCommand::Disconnect => {
196                            log::debug!("Handler received disconnect command");
197
198                            if let Some(ref client) = self.client {
199                                client.disconnect().await;
200                            }
201                            self.signal.store(true, Ordering::SeqCst);
202                            return None;
203                        }
204                        HandlerCommand::Subscribe { subscriptions } => {
205                            for subscription in subscriptions {
206                                let key = subscription_to_key(&subscription);
207                                self.subscriptions.mark_subscribe(&key);
208
209                                let request = HyperliquidWsRequest::Subscribe { subscription };
210                                match serde_json::to_string(&request) {
211                                    Ok(payload) => {
212                                        log::debug!("Sending subscribe payload: {payload}");
213                                        if let Err(e) = self.send_with_retry(payload).await {
214                                            log::error!("Error subscribing to {key}: {e}");
215                                            self.subscriptions.mark_failure(&key);
216                                        }
217                                    }
218                                    Err(e) => {
219                                        log::error!("Error serializing subscription for {key}: {e}");
220                                        self.subscriptions.mark_failure(&key);
221                                    }
222                                }
223                            }
224                        }
225                        HandlerCommand::Unsubscribe { subscriptions } => {
226                            for subscription in subscriptions {
227                                let key = subscription_to_key(&subscription);
228                                self.subscriptions.mark_unsubscribe(&key);
229
230                                let request = HyperliquidWsRequest::Unsubscribe { subscription };
231                                match serde_json::to_string(&request) {
232                                    Ok(payload) => {
233                                        log::debug!("Sending unsubscribe payload: {payload}");
234                                        if let Err(e) = self.send_with_retry(payload).await {
235                                            log::error!("Error unsubscribing from {key}: {e}");
236                                        }
237                                    }
238                                    Err(e) => {
239                                        log::error!("Error serializing unsubscription for {key}: {e}");
240                                    }
241                                }
242                            }
243                        }
244                        HandlerCommand::InitializeInstruments(instruments) => {
245                            for inst in instruments {
246                                let coin = inst.raw_symbol().inner();
247                                self.instruments.insert(coin, inst);
248                            }
249                        }
250                        HandlerCommand::UpdateInstrument(inst) => {
251                            let coin = inst.raw_symbol().inner();
252                            self.instruments.insert(coin, inst);
253                        }
254                        HandlerCommand::AddBarType { key, bar_type } => {
255                            self.bar_types_cache.insert(key, bar_type);
256                        }
257                        HandlerCommand::RemoveBarType { key } => {
258                            self.bar_types_cache.remove(&key);
259                            self.bar_cache.remove(&key);
260                        }
261                        HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
262                            if data_types.is_empty() {
263                                self.asset_context_subs.remove(&coin);
264                            } else {
265                                self.asset_context_subs.insert(coin, data_types);
266                            }
267                        }
268                        HandlerCommand::CacheSpotFillCoins(_) => {
269                            // No longer needed - raw_symbol now contains the proper format
270                        }
271                    }
272                }
273
274                Some(raw_msg) = self.raw_rx.recv() => {
275                    match raw_msg {
276                        Message::Text(text) => {
277                            if text == RECONNECTED {
278                                log::info!("Received RECONNECTED sentinel");
279                                return Some(NautilusWsMessage::Reconnected);
280                            }
281
282                            match serde_json::from_str::<HyperliquidWsMessage>(&text) {
283                                Ok(msg) => {
284                                    let ts_init = self.clock.get_time_ns();
285
286                                    let nautilus_msgs = Self::parse_to_nautilus_messages(
287                                        msg,
288                                        &self.instruments,
289                                        &self.cloid_cache,
290                                        &self.bar_types_cache,
291                                        self.account_id,
292                                        ts_init,
293                                        &self.asset_context_subs,
294                                        &mut self.processed_trade_ids,
295                                        &mut self.mark_price_cache,
296                                        &mut self.index_price_cache,
297                                        &mut self.funding_rate_cache,
298                                        &mut self.bar_cache,
299                                    );
300
301                                    if !nautilus_msgs.is_empty() {
302                                        let mut iter = nautilus_msgs.into_iter();
303                                        let first = iter.next().unwrap();
304                                        self.message_buffer.extend(iter);
305                                        return Some(first);
306                                    }
307                                }
308                                Err(e) => {
309                                    log::error!("Error parsing WebSocket message: {e}, text: {text}");
310                                }
311                            }
312                        }
313                        Message::Ping(data) => {
314                            if let Some(ref client) = self.client
315                                && let Err(e) = client.send_pong(data.to_vec()).await {
316                                log::error!("Error sending pong: {e}");
317                            }
318                        }
319                        Message::Close(_) => {
320                            log::info!("Received WebSocket close frame");
321                            return None;
322                        }
323                        _ => {}
324                    }
325                }
326
327                else => {
328                    log::debug!("Handler shutting down: stream ended or command channel closed");
329                    return None;
330                }
331            }
332        }
333    }
334
335    #[allow(clippy::too_many_arguments)]
336    fn parse_to_nautilus_messages(
337        msg: HyperliquidWsMessage,
338        instruments: &AHashMap<Ustr, InstrumentAny>,
339        cloid_cache: &DashMap<Ustr, ClientOrderId>,
340        bar_types: &AHashMap<String, BarType>,
341        account_id: Option<AccountId>,
342        ts_init: UnixNanos,
343        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
344        processed_trade_ids: &mut FifoCache<u64, 10_000>,
345        mark_price_cache: &mut AHashMap<Ustr, String>,
346        index_price_cache: &mut AHashMap<Ustr, String>,
347        funding_rate_cache: &mut AHashMap<Ustr, String>,
348        bar_cache: &mut AHashMap<String, CandleData>,
349    ) -> Vec<NautilusWsMessage> {
350        let mut result = Vec::new();
351
352        match msg {
353            HyperliquidWsMessage::OrderUpdates { data } => {
354                if let Some(account_id) = account_id
355                    && let Some(msg) = Self::handle_order_updates(
356                        &data,
357                        instruments,
358                        cloid_cache,
359                        account_id,
360                        ts_init,
361                    )
362                {
363                    result.push(msg);
364                }
365            }
366            HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
367                // Process fills from userEvents channel (userFills channel is redundant)
368                match data {
369                    WsUserEventData::Fills { fills } => {
370                        log::debug!("Received {} fill(s) from userEvents channel", fills.len());
371                        for fill in &fills {
372                            log::debug!(
373                                "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
374                                fill.oid,
375                                fill.coin,
376                                fill.side,
377                                fill.sz,
378                                fill.px
379                            );
380                        }
381
382                        if let Some(account_id) = account_id {
383                            log::debug!("Processing fills with account_id={account_id}");
384
385                            if let Some(msg) = Self::handle_user_fills(
386                                &fills,
387                                instruments,
388                                cloid_cache,
389                                account_id,
390                                ts_init,
391                                processed_trade_ids,
392                            ) {
393                                log::debug!("Successfully created fill message");
394                                result.push(msg);
395                            } else {
396                                log::debug!("handle_user_fills returned None (no new fills)");
397                            }
398                        } else {
399                            log::warn!("Cannot process fills: account_id is None");
400                        }
401                    }
402                    _ => {
403                        log::debug!("Received non-fill user event: {data:?}");
404                    }
405                }
406            }
407            HyperliquidWsMessage::UserFills { data } => {
408                // UserFills channel is redundant with userEvents, but handle it for
409                // backwards compatibility if explicitly subscribed
410                if let Some(account_id) = account_id
411                    && let Some(msg) = Self::handle_user_fills(
412                        &data.fills,
413                        instruments,
414                        cloid_cache,
415                        account_id,
416                        ts_init,
417                        processed_trade_ids,
418                    )
419                {
420                    result.push(msg);
421                }
422            }
423            HyperliquidWsMessage::Trades { data } => {
424                if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
425                    result.push(msg);
426                }
427            }
428            HyperliquidWsMessage::Bbo { data } => {
429                if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
430                    result.push(msg);
431                }
432            }
433            HyperliquidWsMessage::L2Book { data } => {
434                if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
435                    result.push(msg);
436                }
437            }
438            HyperliquidWsMessage::Candle { data } => {
439                if let Some(msg) =
440                    Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
441                {
442                    result.push(msg);
443                }
444            }
445            HyperliquidWsMessage::ActiveAssetCtx { data }
446            | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
447                result.extend(Self::handle_asset_context(
448                    &data,
449                    instruments,
450                    asset_context_subs,
451                    mark_price_cache,
452                    index_price_cache,
453                    funding_rate_cache,
454                    ts_init,
455                ));
456            }
457            HyperliquidWsMessage::Error { data } => {
458                log::warn!("Received error from Hyperliquid WebSocket: {data}");
459            }
460            // Ignore other message types (subscription confirmations, etc)
461            _ => {}
462        }
463
464        result
465    }
466
467    fn handle_order_updates(
468        data: &[super::messages::WsOrderData],
469        instruments: &AHashMap<Ustr, InstrumentAny>,
470        cloid_cache: &DashMap<Ustr, ClientOrderId>,
471        account_id: AccountId,
472        ts_init: UnixNanos,
473    ) -> Option<NautilusWsMessage> {
474        let mut exec_reports = Vec::new();
475
476        for order_update in data {
477            let instrument = instruments.get(&order_update.order.coin);
478
479            if let Some(instrument) = instrument {
480                match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
481                    Ok(mut report) => {
482                        // Resolve cloid to real client_order_id if cached
483                        if let Some(cloid) = &order_update.order.cloid {
484                            let cloid_ustr = Ustr::from(cloid.as_str());
485                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
486                                let real_client_order_id = *entry.value();
487                                log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
488                                report.client_order_id = Some(real_client_order_id);
489                            }
490                        }
491                        exec_reports.push(ExecutionReport::Order(report));
492                    }
493                    Err(e) => {
494                        log::error!("Error parsing order update: {e}");
495                    }
496                }
497            } else {
498                log::debug!("No instrument found for coin: {}", order_update.order.coin);
499            }
500        }
501
502        if exec_reports.is_empty() {
503            None
504        } else {
505            Some(NautilusWsMessage::ExecutionReports(exec_reports))
506        }
507    }
508
509    fn handle_user_fills(
510        fills: &[super::messages::WsFillData],
511        instruments: &AHashMap<Ustr, InstrumentAny>,
512        cloid_cache: &DashMap<Ustr, ClientOrderId>,
513        account_id: AccountId,
514        ts_init: UnixNanos,
515        processed_trade_ids: &mut FifoCache<u64, 10_000>,
516    ) -> Option<NautilusWsMessage> {
517        let mut exec_reports = Vec::new();
518
519        for fill in fills {
520            if processed_trade_ids.contains(&fill.tid) {
521                log::debug!("Skipping duplicate fill: tid={}", fill.tid);
522                continue;
523            }
524
525            let instrument = instruments.get(&fill.coin);
526
527            if let Some(instrument) = instrument {
528                log::debug!("Found instrument for fill coin={}", fill.coin);
529                match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
530                    Ok(mut report) => {
531                        // Mark processed only after successful parse
532                        processed_trade_ids.add(fill.tid);
533
534                        if let Some(cloid) = &fill.cloid {
535                            let cloid_ustr = Ustr::from(cloid.as_str());
536                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
537                                let real_client_order_id = *entry.value();
538                                log::debug!(
539                                    "Resolved fill cloid {cloid} -> {real_client_order_id}"
540                                );
541                                report.client_order_id = Some(real_client_order_id);
542                            }
543                        }
544                        log::debug!(
545                            "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
546                            report.venue_order_id,
547                            report.trade_id
548                        );
549                        exec_reports.push(ExecutionReport::Fill(report));
550                    }
551                    Err(e) => {
552                        log::error!("Error parsing fill: {e}");
553                    }
554                }
555            } else {
556                // Not marked as processed so fill is retried if instrument loads later
557                log::warn!(
558                    "No instrument found for fill coin={}. Keys: {:?}",
559                    fill.coin,
560                    instruments.keys().collect::<Vec<_>>()
561                );
562            }
563        }
564
565        if exec_reports.is_empty() {
566            None
567        } else {
568            Some(NautilusWsMessage::ExecutionReports(exec_reports))
569        }
570    }
571
572    fn handle_trades(
573        data: &[super::messages::WsTradeData],
574        instruments: &AHashMap<Ustr, InstrumentAny>,
575        ts_init: UnixNanos,
576    ) -> Option<NautilusWsMessage> {
577        let mut trade_ticks = Vec::new();
578
579        for trade in data {
580            if let Some(instrument) = instruments.get(&trade.coin) {
581                match parse_ws_trade_tick(trade, instrument, ts_init) {
582                    Ok(tick) => trade_ticks.push(tick),
583                    Err(e) => {
584                        log::error!("Error parsing trade tick: {e}");
585                    }
586                }
587            } else {
588                log::debug!("No instrument found for coin: {}", trade.coin);
589            }
590        }
591
592        if trade_ticks.is_empty() {
593            None
594        } else {
595            Some(NautilusWsMessage::Trades(trade_ticks))
596        }
597    }
598
599    fn handle_bbo(
600        data: &super::messages::WsBboData,
601        instruments: &AHashMap<Ustr, InstrumentAny>,
602        ts_init: UnixNanos,
603    ) -> Option<NautilusWsMessage> {
604        if let Some(instrument) = instruments.get(&data.coin) {
605            match parse_ws_quote_tick(data, instrument, ts_init) {
606                Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
607                Err(e) => {
608                    log::error!("Error parsing quote tick: {e}");
609                    None
610                }
611            }
612        } else {
613            log::debug!("No instrument found for coin: {}", data.coin);
614            None
615        }
616    }
617
618    fn handle_l2_book(
619        data: &super::messages::WsBookData,
620        instruments: &AHashMap<Ustr, InstrumentAny>,
621        ts_init: UnixNanos,
622    ) -> Option<NautilusWsMessage> {
623        if let Some(instrument) = instruments.get(&data.coin) {
624            match parse_ws_order_book_deltas(data, instrument, ts_init) {
625                Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
626                Err(e) => {
627                    log::error!("Error parsing order book deltas: {e}");
628                    None
629                }
630            }
631        } else {
632            log::debug!("No instrument found for coin: {}", data.coin);
633            None
634        }
635    }
636
637    fn handle_candle(
638        data: &CandleData,
639        instruments: &AHashMap<Ustr, InstrumentAny>,
640        bar_types: &AHashMap<String, BarType>,
641        bar_cache: &mut AHashMap<String, CandleData>,
642        ts_init: UnixNanos,
643    ) -> Option<NautilusWsMessage> {
644        let key = format!("candle:{}:{}", data.s, data.i);
645
646        let mut closed_bar = None;
647
648        if let Some(cached) = bar_cache.get(&key) {
649            // Emit cached bar when close_time changes, indicating the previous period closed
650            if cached.close_time != data.close_time {
651                log::debug!(
652                    "Bar period changed for {}: prev_close_time={}, new_close_time={}",
653                    data.s,
654                    cached.close_time,
655                    data.close_time
656                );
657                closed_bar = Some(cached.clone());
658            }
659        }
660
661        bar_cache.insert(key.clone(), data.clone());
662
663        if let Some(closed_data) = closed_bar {
664            if let Some(bar_type) = bar_types.get(&key) {
665                if let Some(instrument) = instruments.get(&data.s) {
666                    match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
667                        Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
668                        Err(e) => {
669                            log::error!("Error parsing closed candle: {e}");
670                        }
671                    }
672                } else {
673                    log::debug!("No instrument found for coin: {}", data.s);
674                }
675            } else {
676                log::debug!("No bar type found for key: {key}");
677            }
678        }
679
680        None
681    }
682
683    fn handle_asset_context(
684        data: &WsActiveAssetCtxData,
685        instruments: &AHashMap<Ustr, InstrumentAny>,
686        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
687        mark_price_cache: &mut AHashMap<Ustr, String>,
688        index_price_cache: &mut AHashMap<Ustr, String>,
689        funding_rate_cache: &mut AHashMap<Ustr, String>,
690        ts_init: UnixNanos,
691    ) -> Vec<NautilusWsMessage> {
692        let mut result = Vec::new();
693
694        let coin = match data {
695            WsActiveAssetCtxData::Perp { coin, .. } => coin,
696            WsActiveAssetCtxData::Spot { coin, .. } => coin,
697        };
698
699        if let Some(instrument) = instruments.get(coin) {
700            let (mark_px, oracle_px, funding) = match data {
701                WsActiveAssetCtxData::Perp { ctx, .. } => (
702                    &ctx.shared.mark_px,
703                    Some(&ctx.oracle_px),
704                    Some(&ctx.funding),
705                ),
706                WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
707            };
708
709            let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
710            let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
711            let funding_changed =
712                funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
713
714            let subscribed_types = asset_context_subs.get(coin);
715
716            if mark_changed || index_changed || funding_changed {
717                match parse_ws_asset_context(data, instrument, ts_init) {
718                    Ok((mark_price, index_price, funding_rate)) => {
719                        if mark_changed
720                            && subscribed_types
721                                .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
722                        {
723                            mark_price_cache.insert(*coin, mark_px.clone());
724                            result.push(NautilusWsMessage::MarkPrice(mark_price));
725                        }
726
727                        if index_changed
728                            && subscribed_types
729                                .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
730                        {
731                            if let Some(px) = oracle_px {
732                                index_price_cache.insert(*coin, px.clone());
733                            }
734
735                            if let Some(index) = index_price {
736                                result.push(NautilusWsMessage::IndexPrice(index));
737                            }
738                        }
739
740                        if funding_changed
741                            && subscribed_types
742                                .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
743                        {
744                            if let Some(rate) = funding {
745                                funding_rate_cache.insert(*coin, rate.clone());
746                            }
747
748                            if let Some(funding) = funding_rate {
749                                result.push(NautilusWsMessage::FundingRate(funding));
750                            }
751                        }
752                    }
753                    Err(e) => {
754                        log::error!("Error parsing asset context: {e}");
755                    }
756                }
757            }
758        } else {
759            log::debug!("No instrument found for coin: {coin}");
760        }
761
762        result
763    }
764}
765
766pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
767    match sub {
768        SubscriptionRequest::AllMids { dex } => {
769            if let Some(dex_name) = dex {
770                format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
771            } else {
772                HyperliquidWsChannel::AllMids.as_str().to_string()
773            }
774        }
775        SubscriptionRequest::Notification { user } => {
776            format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
777        }
778        SubscriptionRequest::WebData2 { user } => {
779            format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
780        }
781        SubscriptionRequest::Candle { coin, interval } => {
782            format!(
783                "{}:{coin}:{}",
784                HyperliquidWsChannel::Candle.as_str(),
785                interval.as_str()
786            )
787        }
788        SubscriptionRequest::L2Book { coin, .. } => {
789            format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
790        }
791        SubscriptionRequest::Trades { coin } => {
792            format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
793        }
794        SubscriptionRequest::OrderUpdates { user } => {
795            format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
796        }
797        SubscriptionRequest::UserEvents { user } => {
798            format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
799        }
800        SubscriptionRequest::UserFills { user, .. } => {
801            format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
802        }
803        SubscriptionRequest::UserFundings { user } => {
804            format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
805        }
806        SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
807            format!(
808                "{}:{user}",
809                HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
810            )
811        }
812        SubscriptionRequest::ActiveAssetCtx { coin } => {
813            format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
814        }
815        SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
816            format!(
817                "{}:{coin}",
818                HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
819            )
820        }
821        SubscriptionRequest::ActiveAssetData { user, coin } => {
822            format!(
823                "{}:{user}:{coin}",
824                HyperliquidWsChannel::ActiveAssetData.as_str()
825            )
826        }
827        SubscriptionRequest::UserTwapSliceFills { user } => {
828            format!(
829                "{}:{user}",
830                HyperliquidWsChannel::UserTwapSliceFills.as_str()
831            )
832        }
833        SubscriptionRequest::UserTwapHistory { user } => {
834            format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
835        }
836        SubscriptionRequest::Bbo { coin } => {
837            format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
838        }
839    }
840}
841
842/// Determines whether a Hyperliquid WebSocket error should trigger a retry.
843pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
844    match error {
845        HyperliquidWsError::TungsteniteError(_) => true,
846        HyperliquidWsError::ClientError(msg) => {
847            let msg_lower = msg.to_lowercase();
848            msg_lower.contains("timeout")
849                || msg_lower.contains("timed out")
850                || msg_lower.contains("connection")
851                || msg_lower.contains("network")
852        }
853        _ => false,
854    }
855}
856
857/// Creates a timeout error for Hyperliquid retry logic.
858pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
859    HyperliquidWsError::ClientError(msg)
860}