Skip to main content

nautilus_hyperliquid/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Hyperliquid.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::cache::fifo::FifoCache;
25use nautilus_core::{
26    AtomicTime, MUTEX_POISONED, Params, nanos::UnixNanos, time::get_atomic_clock_realtime,
27};
28use nautilus_model::{
29    data::{BarType, CustomData, Data, DataType},
30    identifiers::AccountId,
31    instruments::{Instrument, InstrumentAny},
32    types::Price,
33};
34use nautilus_network::{
35    RECONNECTED,
36    retry::{RetryManager, create_websocket_retry_manager},
37    websocket::{SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43    client::{AssetContextDataType, CloidCache},
44    enums::HyperliquidWsChannel,
45    error::HyperliquidWsError,
46    messages::{
47        CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
48        SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
49    },
50    parse::{
51        parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
52        parse_ws_order_book_depth10, parse_ws_order_status_report, parse_ws_quote_tick,
53        parse_ws_trade_tick,
54    },
55};
56use crate::data_types::HyperliquidAllMids;
57
58/// Commands sent from the outer client to the inner message handler.
59#[derive(Debug)]
60#[expect(
61    clippy::large_enum_variant,
62    reason = "Commands are ephemeral and immediately consumed"
63)]
64#[allow(private_interfaces)]
65pub enum HandlerCommand {
66    /// Set the WebSocketClient for the handler to use.
67    SetClient(WebSocketClient),
68    /// Disconnect the WebSocket connection.
69    Disconnect,
70    /// Subscribe to the given subscriptions.
71    Subscribe {
72        subscriptions: Vec<SubscriptionRequest>,
73    },
74    /// Unsubscribe from the given subscriptions.
75    Unsubscribe {
76        subscriptions: Vec<SubscriptionRequest>,
77    },
78    /// Initialize the instruments cache with the given instruments.
79    InitializeInstruments(Vec<InstrumentAny>),
80    /// Update a single instrument in the cache.
81    UpdateInstrument(InstrumentAny),
82    /// Add a bar type mapping for candle parsing.
83    AddBarType { key: String, bar_type: BarType },
84    /// Remove a bar type mapping.
85    RemoveBarType { key: String },
86    /// Update asset context subscriptions for a coin.
87    UpdateAssetContextSubs {
88        coin: Ustr,
89        data_types: AHashSet<AssetContextDataType>,
90    },
91    /// Cache spot fill coin mappings for instrument lookup.
92    CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
93    /// Flag whether the `l2Book` stream for `coin` should also be emitted
94    /// as [`NautilusWsMessage::Depth10`] snapshots.
95    SetDepth10Sub { coin: Ustr, subscribed: bool },
96}
97
98pub(super) struct FeedHandler {
99    clock: &'static AtomicTime,
100    signal: Arc<AtomicBool>,
101    client: Option<WebSocketClient>,
102    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
103    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
105    account_id: Option<AccountId>,
106    subscriptions: SubscriptionState,
107    retry_manager: RetryManager<HyperliquidWsError>,
108    message_buffer: Vec<NautilusWsMessage>,
109    instruments: AHashMap<Ustr, InstrumentAny>,
110    cloid_cache: CloidCache,
111    bar_types_cache: AHashMap<String, BarType>,
112    bar_cache: AHashMap<String, CandleData>,
113    asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
114    depth10_subs: AHashSet<Ustr>,
115    processed_trade_ids: FifoCache<u64, 10_000>,
116    mark_price_cache: AHashMap<Ustr, String>,
117    index_price_cache: AHashMap<Ustr, String>,
118    funding_rate_cache: AHashMap<Ustr, String>,
119}
120
121impl FeedHandler {
122    /// Creates a new [`FeedHandler`] instance.
123    pub(super) fn new(
124        signal: Arc<AtomicBool>,
125        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
126        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
127        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
128        account_id: Option<AccountId>,
129        subscriptions: SubscriptionState,
130        cloid_cache: CloidCache,
131    ) -> Self {
132        Self {
133            clock: get_atomic_clock_realtime(),
134            signal,
135            client: None,
136            cmd_rx,
137            raw_rx,
138            out_tx,
139            account_id,
140            subscriptions,
141            retry_manager: create_websocket_retry_manager(),
142            message_buffer: Vec::new(),
143            instruments: AHashMap::new(),
144            cloid_cache,
145            bar_types_cache: AHashMap::new(),
146            bar_cache: AHashMap::new(),
147            asset_context_subs: AHashMap::new(),
148            depth10_subs: AHashSet::new(),
149            processed_trade_ids: FifoCache::new(),
150            mark_price_cache: AHashMap::new(),
151            index_price_cache: AHashMap::new(),
152            funding_rate_cache: AHashMap::new(),
153        }
154    }
155
156    /// Send a message to the output channel.
157    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
158        self.out_tx
159            .send(msg)
160            .map_err(|e| format!("Failed to send message: {e}"))
161    }
162
163    /// Check if the handler has received a stop signal.
164    pub(super) fn is_stopped(&self) -> bool {
165        self.signal.load(Ordering::Relaxed)
166    }
167
168    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
169        if let Some(client) = &self.client {
170            self.retry_manager
171                .execute_with_retry(
172                    "websocket_send",
173                    || {
174                        let payload = payload.clone();
175                        async move {
176                            client.send_text(payload, None).await.map_err(|e| {
177                                HyperliquidWsError::ClientError(format!("Send failed: {e}"))
178                            })
179                        }
180                    },
181                    should_retry_hyperliquid_error,
182                    create_hyperliquid_timeout_error,
183                )
184                .await
185                .map_err(|e| anyhow::anyhow!("{e}"))
186        } else {
187            Err(anyhow::anyhow!("No WebSocket client available"))
188        }
189    }
190
191    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
192        if !self.message_buffer.is_empty() {
193            return Some(self.message_buffer.remove(0));
194        }
195
196        loop {
197            tokio::select! {
198                Some(cmd) = self.cmd_rx.recv() => {
199                    match cmd {
200                        HandlerCommand::SetClient(client) => {
201                            log::debug!("Setting WebSocket client in handler");
202                            self.client = Some(client);
203                        }
204                        HandlerCommand::Disconnect => {
205                            log::debug!("Handler received disconnect command");
206
207                            if let Some(ref client) = self.client {
208                                client.disconnect().await;
209                            }
210                            self.signal.store(true, Ordering::SeqCst);
211                            return None;
212                        }
213                        HandlerCommand::Subscribe { subscriptions } => {
214                            for subscription in subscriptions {
215                                let key = subscription_to_key(&subscription);
216                                self.subscriptions.mark_subscribe(&key);
217
218                                let request = HyperliquidWsRequest::Subscribe { subscription };
219                                match serde_json::to_string(&request) {
220                                    Ok(payload) => {
221                                        log::debug!("Sending subscribe payload: {payload}");
222                                        if let Err(e) = self.send_with_retry(payload).await {
223                                            log::error!("Error subscribing to {key}: {e}");
224                                            self.subscriptions.mark_failure(&key);
225                                        }
226                                    }
227                                    Err(e) => {
228                                        log::error!("Error serializing subscription for {key}: {e}");
229                                        self.subscriptions.mark_failure(&key);
230                                    }
231                                }
232                            }
233                        }
234                        HandlerCommand::Unsubscribe { subscriptions } => {
235                            for subscription in subscriptions {
236                                let key = subscription_to_key(&subscription);
237                                self.subscriptions.mark_unsubscribe(&key);
238
239                                let request = HyperliquidWsRequest::Unsubscribe { subscription };
240                                match serde_json::to_string(&request) {
241                                    Ok(payload) => {
242                                        log::debug!("Sending unsubscribe payload: {payload}");
243                                        if let Err(e) = self.send_with_retry(payload).await {
244                                            log::error!("Error unsubscribing from {key}: {e}");
245                                        }
246                                    }
247                                    Err(e) => {
248                                        log::error!("Error serializing unsubscription for {key}: {e}");
249                                    }
250                                }
251                            }
252                        }
253                        HandlerCommand::InitializeInstruments(instruments) => {
254                            for inst in instruments {
255                                let coin = inst.raw_symbol().inner();
256                                self.instruments.insert(coin, inst);
257                            }
258                        }
259                        HandlerCommand::UpdateInstrument(inst) => {
260                            let coin = inst.raw_symbol().inner();
261                            self.instruments.insert(coin, inst);
262                        }
263                        HandlerCommand::AddBarType { key, bar_type } => {
264                            self.bar_types_cache.insert(key, bar_type);
265                        }
266                        HandlerCommand::RemoveBarType { key } => {
267                            self.bar_types_cache.remove(&key);
268                            self.bar_cache.remove(&key);
269                        }
270                        HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
271                            if data_types.is_empty() {
272                                self.asset_context_subs.remove(&coin);
273                            } else {
274                                self.asset_context_subs.insert(coin, data_types);
275                            }
276                        }
277                        HandlerCommand::CacheSpotFillCoins(_) => {
278                            // No longer needed - raw_symbol now contains the proper format
279                        }
280                        HandlerCommand::SetDepth10Sub { coin, subscribed } => {
281                            if subscribed {
282                                self.depth10_subs.insert(coin);
283                            } else {
284                                self.depth10_subs.remove(&coin);
285                            }
286                        }
287                    }
288                }
289
290                Some(raw_msg) = self.raw_rx.recv() => {
291                    match raw_msg {
292                        Message::Text(text) => {
293                            if text == RECONNECTED {
294                                log::info!("Received RECONNECTED sentinel");
295                                return Some(NautilusWsMessage::Reconnected);
296                            }
297
298                            match serde_json::from_str::<HyperliquidWsMessage>(&text) {
299                                Ok(msg) => {
300                                    let ts_init = self.clock.get_time_ns();
301                                    let all_mids_data_types =
302                                        Self::all_mids_data_types(&self.subscriptions);
303
304                                    let nautilus_msgs = Self::parse_to_nautilus_messages(
305                                        msg,
306                                        &self.instruments,
307                                        &self.cloid_cache,
308                                        &self.bar_types_cache,
309                                        self.account_id,
310                                        ts_init,
311                                        &self.asset_context_subs,
312                                        &self.depth10_subs,
313                                        &mut self.processed_trade_ids,
314                                        &mut self.mark_price_cache,
315                                        &mut self.index_price_cache,
316                                        &mut self.funding_rate_cache,
317                                        &mut self.bar_cache,
318                                        &all_mids_data_types,
319                                    );
320
321                                    if !nautilus_msgs.is_empty() {
322                                        let mut iter = nautilus_msgs.into_iter();
323                                        let first = iter.next().unwrap();
324                                        self.message_buffer.extend(iter);
325                                        return Some(first);
326                                    }
327                                }
328                                Err(e) => {
329                                    log::error!("Error parsing WebSocket message: {e}, text: {text}");
330                                }
331                            }
332                        }
333                        Message::Ping(data) => {
334                            if let Some(ref client) = self.client
335                                && let Err(e) = client.send_pong(data.to_vec()).await {
336                                log::error!("Error sending pong: {e}");
337                            }
338                        }
339                        Message::Close(_) => {
340                            log::info!("Received WebSocket close frame");
341                            return None;
342                        }
343                        _ => {}
344                    }
345                }
346
347                else => {
348                    log::debug!("Handler shutting down: stream ended or command channel closed");
349                    return None;
350                }
351            }
352        }
353    }
354
355    #[expect(clippy::too_many_arguments)]
356    fn parse_to_nautilus_messages(
357        msg: HyperliquidWsMessage,
358        instruments: &AHashMap<Ustr, InstrumentAny>,
359        cloid_cache: &CloidCache,
360        bar_types: &AHashMap<String, BarType>,
361        account_id: Option<AccountId>,
362        ts_init: UnixNanos,
363        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
364        depth10_subs: &AHashSet<Ustr>,
365        processed_trade_ids: &mut FifoCache<u64, 10_000>,
366        mark_price_cache: &mut AHashMap<Ustr, String>,
367        index_price_cache: &mut AHashMap<Ustr, String>,
368        funding_rate_cache: &mut AHashMap<Ustr, String>,
369        bar_cache: &mut AHashMap<String, CandleData>,
370        all_mids_data_types: &[DataType],
371    ) -> Vec<NautilusWsMessage> {
372        let mut result = Vec::new();
373
374        match msg {
375            HyperliquidWsMessage::OrderUpdates { data } => {
376                if let Some(account_id) = account_id
377                    && let Some(msg) = Self::handle_order_updates(
378                        &data,
379                        instruments,
380                        cloid_cache,
381                        account_id,
382                        ts_init,
383                    )
384                {
385                    result.push(msg);
386                }
387            }
388            HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
389                // Process fills from userEvents channel (userFills channel is redundant)
390                match data {
391                    WsUserEventData::Fills { fills } => {
392                        log::debug!("Received {} fill(s) from userEvents channel", fills.len());
393                        for fill in &fills {
394                            log::debug!(
395                                "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
396                                fill.oid,
397                                fill.coin,
398                                fill.side,
399                                fill.sz,
400                                fill.px
401                            );
402                        }
403
404                        if let Some(account_id) = account_id {
405                            log::debug!("Processing fills with account_id={account_id}");
406
407                            if let Some(msg) = Self::handle_user_fills(
408                                &fills,
409                                instruments,
410                                cloid_cache,
411                                account_id,
412                                ts_init,
413                                processed_trade_ids,
414                            ) {
415                                log::debug!("Successfully created fill message");
416                                result.push(msg);
417                            } else {
418                                log::debug!("handle_user_fills returned None (no new fills)");
419                            }
420                        } else {
421                            log::warn!("Cannot process fills: account_id is None");
422                        }
423                    }
424                    WsUserEventData::Liquidation { liquidation } => {
425                        log::warn!(
426                            "Liquidation event: lid={}, liquidator={}, liquidated_user={}, ntl_pos={}, account_value={}",
427                            liquidation.lid,
428                            liquidation.liquidator,
429                            liquidation.liquidated_user,
430                            liquidation.liquidated_ntl_pos,
431                            liquidation.liquidated_account_value,
432                        );
433                    }
434                    _ => {
435                        log::debug!("Received non-fill user event: {data:?}");
436                    }
437                }
438            }
439            HyperliquidWsMessage::UserFills { data } => {
440                // UserFills channel is redundant with userEvents, but handle it for
441                // backwards compatibility if explicitly subscribed
442                if let Some(account_id) = account_id
443                    && let Some(msg) = Self::handle_user_fills(
444                        &data.fills,
445                        instruments,
446                        cloid_cache,
447                        account_id,
448                        ts_init,
449                        processed_trade_ids,
450                    )
451                {
452                    result.push(msg);
453                }
454            }
455            HyperliquidWsMessage::Trades { data } => {
456                if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
457                    result.push(msg);
458                }
459            }
460            HyperliquidWsMessage::AllMids { data } => {
461                let mut mids = std::collections::HashMap::new();
462                for (coin, mid_str) in &data.mids {
463                    let coin_ustr = Ustr::from(coin.as_str());
464                    if let Some(instrument) = instruments.get(&coin_ustr) {
465                        match mid_str.parse::<Price>() {
466                            Ok(price) => {
467                                mids.insert(instrument.id(), price);
468                            }
469                            Err(e) => {
470                                log::warn!("Failed to parse mid price for {coin}: {e}");
471                            }
472                        }
473                    } else {
474                        log::debug!("No instrument found for coin: {coin}");
475                    }
476                }
477
478                if !mids.is_empty() {
479                    for data_type in all_mids_data_types {
480                        let all_mids = HyperliquidAllMids::new(mids.clone(), ts_init, ts_init);
481                        result.push(NautilusWsMessage::CustomData(Data::Custom(
482                            CustomData::new(Arc::new(all_mids), data_type.clone()),
483                        )));
484                    }
485                }
486            }
487            HyperliquidWsMessage::Bbo { data } => {
488                if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
489                    result.push(msg);
490                }
491            }
492            HyperliquidWsMessage::L2Book { data } => {
493                result.extend(Self::handle_l2_book(
494                    &data,
495                    instruments,
496                    depth10_subs,
497                    ts_init,
498                ));
499            }
500            HyperliquidWsMessage::Candle { data } => {
501                if let Some(msg) =
502                    Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
503                {
504                    result.push(msg);
505                }
506            }
507            HyperliquidWsMessage::ActiveAssetCtx { data }
508            | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
509                result.extend(Self::handle_asset_context(
510                    &data,
511                    instruments,
512                    asset_context_subs,
513                    mark_price_cache,
514                    index_price_cache,
515                    funding_rate_cache,
516                    ts_init,
517                ));
518            }
519            HyperliquidWsMessage::Error { data } => {
520                log::warn!("Received error from Hyperliquid WebSocket: {data}");
521            }
522            // Ignore other message types (subscription confirmations, etc)
523            _ => {}
524        }
525
526        result
527    }
528
529    fn handle_order_updates(
530        data: &[super::messages::WsOrderData],
531        instruments: &AHashMap<Ustr, InstrumentAny>,
532        cloid_cache: &CloidCache,
533        account_id: AccountId,
534        ts_init: UnixNanos,
535    ) -> Option<NautilusWsMessage> {
536        let mut exec_reports = Vec::new();
537
538        for order_update in data {
539            let instrument = instruments.get(&order_update.order.coin);
540
541            if let Some(instrument) = instrument {
542                match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
543                    Ok(mut report) => {
544                        // Resolve cloid to real client_order_id if cached
545                        if let Some(cloid) = &order_update.order.cloid {
546                            let cloid_ustr = Ustr::from(cloid.as_str());
547                            let resolved = cloid_cache
548                                .lock()
549                                .expect(MUTEX_POISONED)
550                                .get(&cloid_ustr)
551                                .copied();
552
553                            if let Some(real_client_order_id) = resolved {
554                                log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
555                                report.client_order_id = Some(real_client_order_id);
556                            }
557                        }
558                        exec_reports.push(ExecutionReport::Order(report));
559                    }
560                    Err(e) => {
561                        log::error!("Error parsing order update: {e}");
562                    }
563                }
564            } else {
565                log::debug!("No instrument found for coin: {}", order_update.order.coin);
566            }
567        }
568
569        if exec_reports.is_empty() {
570            None
571        } else {
572            Some(NautilusWsMessage::ExecutionReports(exec_reports))
573        }
574    }
575
576    fn handle_user_fills(
577        fills: &[super::messages::WsFillData],
578        instruments: &AHashMap<Ustr, InstrumentAny>,
579        cloid_cache: &CloidCache,
580        account_id: AccountId,
581        ts_init: UnixNanos,
582        processed_trade_ids: &mut FifoCache<u64, 10_000>,
583    ) -> Option<NautilusWsMessage> {
584        let mut exec_reports = Vec::new();
585
586        for fill in fills {
587            if processed_trade_ids.contains(&fill.tid) {
588                log::debug!("Skipping duplicate fill: tid={}", fill.tid);
589                continue;
590            }
591
592            let instrument = instruments.get(&fill.coin);
593
594            if let Some(instrument) = instrument {
595                log::debug!("Found instrument for fill coin={}", fill.coin);
596                match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
597                    Ok(mut report) => {
598                        // Mark processed only after successful parse
599                        processed_trade_ids.add(fill.tid);
600
601                        if let Some(cloid) = &fill.cloid {
602                            let cloid_ustr = Ustr::from(cloid.as_str());
603                            let resolved = cloid_cache
604                                .lock()
605                                .expect(MUTEX_POISONED)
606                                .get(&cloid_ustr)
607                                .copied();
608
609                            if let Some(real_client_order_id) = resolved {
610                                log::debug!(
611                                    "Resolved fill cloid {cloid} -> {real_client_order_id}"
612                                );
613                                report.client_order_id = Some(real_client_order_id);
614                            }
615                        }
616                        log::debug!(
617                            "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
618                            report.venue_order_id,
619                            report.trade_id
620                        );
621                        exec_reports.push(ExecutionReport::Fill(report));
622                    }
623                    Err(e) => {
624                        log::error!("Error parsing fill: {e}");
625                    }
626                }
627            } else {
628                // Not marked as processed so fill is retried if instrument loads later
629                log::warn!(
630                    "No instrument found for fill coin={}. Keys: {:?}",
631                    fill.coin,
632                    instruments.keys().collect::<Vec<_>>()
633                );
634            }
635        }
636
637        if exec_reports.is_empty() {
638            None
639        } else {
640            Some(NautilusWsMessage::ExecutionReports(exec_reports))
641        }
642    }
643
644    fn handle_trades(
645        data: &[super::messages::WsTradeData],
646        instruments: &AHashMap<Ustr, InstrumentAny>,
647        ts_init: UnixNanos,
648    ) -> Option<NautilusWsMessage> {
649        let mut trade_ticks = Vec::new();
650
651        for trade in data {
652            if let Some(instrument) = instruments.get(&trade.coin) {
653                match parse_ws_trade_tick(trade, instrument, ts_init) {
654                    Ok(tick) => trade_ticks.push(tick),
655                    Err(e) => {
656                        log::error!("Error parsing trade tick: {e}");
657                    }
658                }
659            } else {
660                log::debug!("No instrument found for coin: {}", trade.coin);
661            }
662        }
663
664        if trade_ticks.is_empty() {
665            None
666        } else {
667            Some(NautilusWsMessage::Trades(trade_ticks))
668        }
669    }
670
671    fn handle_bbo(
672        data: &super::messages::WsBboData,
673        instruments: &AHashMap<Ustr, InstrumentAny>,
674        ts_init: UnixNanos,
675    ) -> Option<NautilusWsMessage> {
676        if let Some(instrument) = instruments.get(&data.coin) {
677            match parse_ws_quote_tick(data, instrument, ts_init) {
678                Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
679                Err(e) => {
680                    log::error!("Error parsing quote tick: {e}");
681                    None
682                }
683            }
684        } else {
685            log::debug!("No instrument found for coin: {}", data.coin);
686            None
687        }
688    }
689
690    fn handle_l2_book(
691        data: &super::messages::WsBookData,
692        instruments: &AHashMap<Ustr, InstrumentAny>,
693        depth10_subs: &AHashSet<Ustr>,
694        ts_init: UnixNanos,
695    ) -> Vec<NautilusWsMessage> {
696        let mut out = Vec::new();
697
698        let Some(instrument) = instruments.get(&data.coin) else {
699            log::debug!("No instrument found for coin: {}", data.coin);
700            return out;
701        };
702
703        match parse_ws_order_book_deltas(data, instrument, ts_init) {
704            Ok(deltas) => out.push(NautilusWsMessage::Deltas(deltas)),
705            Err(e) => log::error!("Error parsing order book deltas: {e}"),
706        }
707
708        if depth10_subs.contains(&data.coin) {
709            match parse_ws_order_book_depth10(data, instrument, ts_init) {
710                Ok(depth) => out.push(NautilusWsMessage::Depth10(Box::new(depth))),
711                Err(e) => log::error!("Error parsing order book depth10: {e}"),
712            }
713        }
714
715        out
716    }
717
718    fn handle_candle(
719        data: &CandleData,
720        instruments: &AHashMap<Ustr, InstrumentAny>,
721        bar_types: &AHashMap<String, BarType>,
722        bar_cache: &mut AHashMap<String, CandleData>,
723        ts_init: UnixNanos,
724    ) -> Option<NautilusWsMessage> {
725        let key = format!("candle:{}:{}", data.s, data.i);
726
727        let mut closed_bar = None;
728
729        if let Some(cached) = bar_cache.get(&key) {
730            // Emit cached bar when close_time changes, indicating the previous period closed
731            if cached.close_time != data.close_time {
732                log::debug!(
733                    "Bar period changed for {}: prev_close_time={}, new_close_time={}",
734                    data.s,
735                    cached.close_time,
736                    data.close_time
737                );
738                closed_bar = Some(cached.clone());
739            }
740        }
741
742        bar_cache.insert(key.clone(), data.clone());
743
744        if let Some(closed_data) = closed_bar {
745            if let Some(bar_type) = bar_types.get(&key) {
746                if let Some(instrument) = instruments.get(&data.s) {
747                    match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
748                        Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
749                        Err(e) => {
750                            log::error!("Error parsing closed candle: {e}");
751                        }
752                    }
753                } else {
754                    log::debug!("No instrument found for coin: {}", data.s);
755                }
756            } else {
757                log::debug!("No bar type found for key: {key}");
758            }
759        }
760
761        None
762    }
763
764    fn handle_asset_context(
765        data: &WsActiveAssetCtxData,
766        instruments: &AHashMap<Ustr, InstrumentAny>,
767        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
768        mark_price_cache: &mut AHashMap<Ustr, String>,
769        index_price_cache: &mut AHashMap<Ustr, String>,
770        funding_rate_cache: &mut AHashMap<Ustr, String>,
771        ts_init: UnixNanos,
772    ) -> Vec<NautilusWsMessage> {
773        let mut result = Vec::new();
774
775        let coin = match data {
776            WsActiveAssetCtxData::Perp { coin, .. } => coin,
777            WsActiveAssetCtxData::Spot { coin, .. } => coin,
778        };
779
780        if let Some(instrument) = instruments.get(coin) {
781            let (mark_px, oracle_px, funding) = match data {
782                WsActiveAssetCtxData::Perp { ctx, .. } => (
783                    &ctx.shared.mark_px,
784                    Some(&ctx.oracle_px),
785                    Some(&ctx.funding),
786                ),
787                WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
788            };
789
790            let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
791            let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
792            let funding_changed =
793                funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
794
795            let subscribed_types = asset_context_subs.get(coin);
796
797            if mark_changed || index_changed || funding_changed {
798                match parse_ws_asset_context(data, instrument, ts_init) {
799                    Ok((mark_price, index_price, funding_rate)) => {
800                        if mark_changed
801                            && subscribed_types
802                                .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
803                        {
804                            mark_price_cache.insert(*coin, mark_px.clone());
805                            result.push(NautilusWsMessage::MarkPrice(mark_price));
806                        }
807
808                        if index_changed
809                            && subscribed_types
810                                .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
811                        {
812                            if let Some(px) = oracle_px {
813                                index_price_cache.insert(*coin, px.clone());
814                            }
815
816                            if let Some(index) = index_price {
817                                result.push(NautilusWsMessage::IndexPrice(index));
818                            }
819                        }
820
821                        if funding_changed
822                            && subscribed_types
823                                .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
824                        {
825                            if let Some(rate) = funding {
826                                funding_rate_cache.insert(*coin, rate.clone());
827                            }
828
829                            if let Some(funding) = funding_rate {
830                                result.push(NautilusWsMessage::FundingRate(funding));
831                            }
832                        }
833                    }
834                    Err(e) => {
835                        log::error!("Error parsing asset context: {e}");
836                    }
837                }
838            }
839        } else {
840            log::debug!("No instrument found for coin: {coin}");
841        }
842
843        result
844    }
845
846    fn all_mids_data_types(subscriptions: &SubscriptionState) -> Vec<DataType> {
847        let mut topics = subscriptions.all_topics();
848        topics.sort_unstable();
849        topics.dedup();
850
851        let all_mids_channel = HyperliquidWsChannel::AllMids.as_str();
852        let all_mids_prefix = format!("{all_mids_channel}:");
853        let mut data_types = Vec::new();
854
855        for topic in topics {
856            if topic == all_mids_channel {
857                data_types.push(DataType::new("HyperliquidAllMids", None, None));
858            } else if let Some(dex) = topic.strip_prefix(&all_mids_prefix) {
859                let mut metadata = Params::new();
860                metadata.insert(
861                    "dex".to_string(),
862                    serde_json::Value::String(dex.to_string()),
863                );
864                data_types.push(DataType::new("HyperliquidAllMids", Some(metadata), None));
865            }
866        }
867
868        if data_types.is_empty() {
869            data_types.push(DataType::new("HyperliquidAllMids", None, None));
870        }
871
872        data_types
873    }
874}
875
876pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
877    match sub {
878        SubscriptionRequest::AllMids { dex } => {
879            if let Some(dex_name) = dex {
880                format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
881            } else {
882                HyperliquidWsChannel::AllMids.as_str().to_string()
883            }
884        }
885        SubscriptionRequest::Notification { user } => {
886            format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
887        }
888        SubscriptionRequest::WebData2 { user } => {
889            format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
890        }
891        SubscriptionRequest::Candle { coin, interval } => {
892            format!(
893                "{}:{coin}:{}",
894                HyperliquidWsChannel::Candle.as_str(),
895                interval.as_str()
896            )
897        }
898        SubscriptionRequest::L2Book { coin, .. } => {
899            format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
900        }
901        SubscriptionRequest::Trades { coin } => {
902            format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
903        }
904        SubscriptionRequest::OrderUpdates { user } => {
905            format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
906        }
907        SubscriptionRequest::UserEvents { user } => {
908            format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
909        }
910        SubscriptionRequest::UserFills { user, .. } => {
911            format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
912        }
913        SubscriptionRequest::UserFundings { user } => {
914            format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
915        }
916        SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
917            format!(
918                "{}:{user}",
919                HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
920            )
921        }
922        SubscriptionRequest::ActiveAssetCtx { coin } => {
923            format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
924        }
925        SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
926            format!(
927                "{}:{coin}",
928                HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
929            )
930        }
931        SubscriptionRequest::ActiveAssetData { user, coin } => {
932            format!(
933                "{}:{user}:{coin}",
934                HyperliquidWsChannel::ActiveAssetData.as_str()
935            )
936        }
937        SubscriptionRequest::UserTwapSliceFills { user } => {
938            format!(
939                "{}:{user}",
940                HyperliquidWsChannel::UserTwapSliceFills.as_str()
941            )
942        }
943        SubscriptionRequest::UserTwapHistory { user } => {
944            format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
945        }
946        SubscriptionRequest::Bbo { coin } => {
947            format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
948        }
949    }
950}
951
952/// Determines whether a Hyperliquid WebSocket error should trigger a retry.
953pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
954    match error {
955        HyperliquidWsError::TungsteniteError(_) => true,
956        HyperliquidWsError::ClientError(msg) => {
957            let msg_lower = msg.to_lowercase();
958            msg_lower.contains("timeout")
959                || msg_lower.contains("timed out")
960                || msg_lower.contains("connection")
961                || msg_lower.contains("network")
962        }
963        _ => false,
964    }
965}
966
967/// Creates a timeout error for Hyperliquid retry logic.
968pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
969    HyperliquidWsError::ClientError(msg)
970}
971
972#[cfg(test)]
973mod tests {
974    use ahash::{AHashMap, AHashSet};
975    use nautilus_core::nanos::UnixNanos;
976    use nautilus_model::{
977        identifiers::{InstrumentId, Symbol},
978        instruments::{CryptoPerpetual, InstrumentAny},
979        types::{Currency, Price, Quantity},
980    };
981    use rstest::rstest;
982    use ustr::Ustr;
983
984    use super::{
985        super::messages::{NautilusWsMessage, WsBookData, WsLevelData},
986        FeedHandler,
987    };
988    use crate::common::consts::HYPERLIQUID_VENUE;
989
990    fn btc_perp() -> InstrumentAny {
991        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
992            InstrumentId::new(Symbol::new("BTC-PERP"), *HYPERLIQUID_VENUE),
993            Symbol::new("BTC-PERP"),
994            Currency::from("BTC"),
995            Currency::from("USDC"),
996            Currency::from("USDC"),
997            false,
998            2,
999            3,
1000            Price::from("0.01"),
1001            Quantity::from("0.001"),
1002            None,
1003            None,
1004            None,
1005            None,
1006            None,
1007            None,
1008            None,
1009            None,
1010            None,
1011            None,
1012            None,
1013            None,
1014            None,
1015            UnixNanos::default(),
1016            UnixNanos::default(),
1017        ))
1018    }
1019
1020    fn one_level_book() -> WsBookData {
1021        WsBookData {
1022            coin: Ustr::from("BTC"),
1023            levels: [
1024                vec![WsLevelData {
1025                    px: "100.00".to_string(),
1026                    sz: "1.0".to_string(),
1027                    n: 1,
1028                }],
1029                vec![WsLevelData {
1030                    px: "100.01".to_string(),
1031                    sz: "1.0".to_string(),
1032                    n: 1,
1033                }],
1034            ],
1035            time: 1_700_000_000_000,
1036        }
1037    }
1038
1039    #[rstest]
1040    fn handle_l2_book_emits_deltas_only_when_not_in_depth10_subs() {
1041        let mut instruments = AHashMap::new();
1042        instruments.insert(Ustr::from("BTC"), btc_perp());
1043        let depth10_subs = AHashSet::<Ustr>::new();
1044
1045        let msgs = FeedHandler::handle_l2_book(
1046            &one_level_book(),
1047            &instruments,
1048            &depth10_subs,
1049            UnixNanos::default(),
1050        );
1051
1052        assert_eq!(msgs.len(), 1);
1053        assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
1054    }
1055
1056    #[rstest]
1057    fn handle_l2_book_emits_deltas_and_depth10_when_coin_in_subs() {
1058        let mut instruments = AHashMap::new();
1059        instruments.insert(Ustr::from("BTC"), btc_perp());
1060        let mut depth10_subs = AHashSet::<Ustr>::new();
1061        depth10_subs.insert(Ustr::from("BTC"));
1062
1063        let msgs = FeedHandler::handle_l2_book(
1064            &one_level_book(),
1065            &instruments,
1066            &depth10_subs,
1067            UnixNanos::default(),
1068        );
1069
1070        assert_eq!(msgs.len(), 2);
1071        assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
1072        assert!(matches!(msgs[1], NautilusWsMessage::Depth10(_)));
1073    }
1074
1075    #[rstest]
1076    fn handle_l2_book_returns_empty_when_instrument_unknown() {
1077        let instruments = AHashMap::<Ustr, InstrumentAny>::new();
1078        let depth10_subs = AHashSet::<Ustr>::new();
1079
1080        let msgs = FeedHandler::handle_l2_book(
1081            &one_level_book(),
1082            &instruments,
1083            &depth10_subs,
1084            UnixNanos::default(),
1085        );
1086
1087        assert!(msgs.is_empty());
1088    }
1089}