Skip to main content

nautilus_hyperliquid/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc, Mutex,
20        atomic::{AtomicBool, Ordering},
21    },
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use chrono::{DateTime, Utc};
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    messages::{
31        DataEvent,
32        data::{
33            BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
34            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeCustomData, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes,
38            SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
39            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
40            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43};
44use nautilus_core::{
45    AtomicMap, MUTEX_POISONED, Params, UnixNanos,
46    datetime::datetime_to_unix_nanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50    data::{Bar, BarType, BookOrder, Data, DataType, FundingRateUpdate, OrderBookDeltas_API},
51    enums::{BarAggregation, BookType, OrderSide},
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54    orderbook::OrderBook,
55    types::{Price, Quantity},
56};
57use rust_decimal::Decimal;
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use crate::{
63    common::{
64        consts::HYPERLIQUID_VENUE,
65        credential::{Secrets, credential_env_vars},
66        parse::bar_type_to_interval,
67    },
68    config::HyperliquidDataClientConfig,
69    data_types::register_hyperliquid_custom_data,
70    http::{
71        client::HyperliquidHttpClient,
72        models::{HyperliquidCandle, HyperliquidFundingHistoryEntry, HyperliquidL2Book},
73    },
74    websocket::{client::HyperliquidWebSocketClient, messages::NautilusWsMessage},
75};
76
77#[derive(Debug)]
78pub struct HyperliquidDataClient {
79    clock: &'static AtomicTime,
80    client_id: ClientId,
81    config: HyperliquidDataClientConfig,
82    http_client: HyperliquidHttpClient,
83    ws_client: HyperliquidWebSocketClient,
84    is_connected: AtomicBool,
85    cancellation_token: CancellationToken,
86    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
87    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
88    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
89    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90    coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
91}
92
93impl HyperliquidDataClient {
94    /// Creates a new [`HyperliquidDataClient`] instance.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the HTTP client fails to initialize.
99    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
100        let clock = get_atomic_clock_realtime();
101        let data_sender = get_data_event_sender();
102
103        // Only fall back to unauthenticated when credentials are absent,
104        // not when they're invalid (fail fast on malformed keys)
105        let (pk_var, _) = credential_env_vars(config.environment);
106        let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
107
108        let mut http_client = if has_credentials {
109            let secrets =
110                Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
111            HyperliquidHttpClient::with_secrets(
112                &secrets,
113                config.http_timeout_secs,
114                config.proxy_url.clone(),
115            )?
116        } else {
117            HyperliquidHttpClient::new(
118                config.environment,
119                config.http_timeout_secs,
120                config.proxy_url.clone(),
121            )?
122        };
123
124        if let Some(url) = &config.base_url_http {
125            http_client.set_base_info_url(url.clone());
126        }
127
128        let ws_url = config.base_url_ws.clone();
129        let ws_client = HyperliquidWebSocketClient::new(
130            ws_url,
131            config.environment,
132            None,
133            config.transport_backend,
134            config.proxy_url.clone(),
135        );
136
137        Ok(Self {
138            clock,
139            client_id,
140            config,
141            http_client,
142            ws_client,
143            is_connected: AtomicBool::new(false),
144            cancellation_token: CancellationToken::new(),
145            ws_stream_handle: Mutex::new(None),
146            pending_tasks: Mutex::new(Vec::new()),
147            data_sender,
148            instruments: Arc::new(AtomicMap::new()),
149            coin_to_instrument_id: Arc::new(AtomicMap::new()),
150        })
151    }
152
153    fn spawn_task<F>(&self, description: &'static str, fut: F)
154    where
155        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
156    {
157        let runtime = get_runtime();
158        let handle = runtime.spawn(async move {
159            if let Err(e) = fut.await {
160                log::warn!("{description} failed: {e:?}");
161            }
162        });
163
164        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
165        tasks.retain(|handle| !handle.is_finished());
166        tasks.push(handle);
167    }
168
169    fn abort_pending_tasks(&self) {
170        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
171        for handle in tasks.drain(..) {
172            handle.abort();
173        }
174    }
175
176    fn venue(&self) -> Venue {
177        *HYPERLIQUID_VENUE
178    }
179
180    fn custom_instrument_id(data_type: &DataType) -> anyhow::Result<Option<InstrumentId>> {
181        let Some(raw_instrument_id) = data_type
182            .metadata()
183            .and_then(|m| m.get("instrument_id"))
184            .and_then(|v| v.as_str())
185            .map(str::trim)
186            .filter(|value| !value.is_empty())
187        else {
188            return Ok(None);
189        };
190
191        let instrument_id = InstrumentId::from_str(raw_instrument_id)
192            .with_context(|| format!("invalid instrument_id metadata `{raw_instrument_id}`"))?;
193
194        Ok(Some(instrument_id))
195    }
196
197    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
198        let instruments = self
199            .http_client
200            .request_instruments()
201            .await
202            .context("failed to fetch instruments during bootstrap")?;
203
204        self.instruments.rcu(|m| {
205            for instrument in &instruments {
206                m.insert(instrument.id(), instrument.clone());
207            }
208        });
209
210        self.coin_to_instrument_id.rcu(|m| {
211            for instrument in &instruments {
212                m.insert(instrument.raw_symbol().inner(), instrument.id());
213            }
214        });
215
216        for instrument in &instruments {
217            self.http_client.cache_instrument(instrument);
218            self.ws_client.cache_instrument(instrument.clone());
219        }
220
221        match self
222            .http_client
223            .build_all_dex_asset_ctxs_instrument_ids()
224            .await
225        {
226            Ok(mapping) => {
227                let mapping = mapping
228                    .into_iter()
229                    .map(|(dex, instrument_ids)| (Ustr::from(dex.as_str()), instrument_ids))
230                    .collect();
231                self.ws_client
232                    .cache_all_dex_asset_ctxs_instrument_ids(mapping);
233            }
234            Err(e) => {
235                log::warn!("Failed to build Hyperliquid allDexsAssetCtxs mapping: {e}");
236            }
237        }
238
239        log::info!(
240            "Bootstrapped {} instruments with {} coin mappings",
241            self.instruments.len(),
242            self.coin_to_instrument_id.len()
243        );
244        Ok(instruments)
245    }
246
247    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
248        // Clone client before connecting so the clone can have out_rx set
249        let mut ws_client = self.ws_client.clone();
250
251        ws_client
252            .connect()
253            .await
254            .context("failed to connect to Hyperliquid WebSocket")?;
255
256        // Transfer task handle to original so disconnect() can await it
257        if let Some(handle) = ws_client.take_task_handle() {
258            self.ws_client.set_task_handle(handle);
259        }
260
261        let data_sender = self.data_sender.clone();
262        let cancellation_token = self.cancellation_token.clone();
263
264        let task = get_runtime().spawn(async move {
265            log::info!("Hyperliquid WebSocket consumption loop started");
266
267            loop {
268                tokio::select! {
269                    () = cancellation_token.cancelled() => {
270                        log::info!("WebSocket consumption loop cancelled");
271                        break;
272                    }
273                    msg_opt = ws_client.next_event() => {
274                        if let Some(msg) = msg_opt {
275                            match msg {
276                                NautilusWsMessage::Trades(trades) => {
277                                    for trade in trades {
278                                        if let Err(e) = data_sender
279                                            .send(DataEvent::Data(Data::Trade(trade)))
280                                        {
281                                            log::error!("Failed to send trade tick: {e}");
282                                        }
283                                    }
284                                }
285                                NautilusWsMessage::Quote(quote) => {
286                                    if let Err(e) = data_sender
287                                        .send(DataEvent::Data(Data::Quote(quote)))
288                                    {
289                                        log::error!("Failed to send quote tick: {e}");
290                                    }
291                                }
292                                NautilusWsMessage::Deltas(deltas) => {
293                                    if let Err(e) = data_sender
294                                        .send(DataEvent::Data(Data::Deltas(
295                                            OrderBookDeltas_API::new(deltas),
296                                        )))
297                                    {
298                                        log::error!("Failed to send order book deltas: {e}");
299                                    }
300                                }
301                                NautilusWsMessage::Depth10(depth) => {
302                                    if let Err(e) =
303                                        data_sender.send(DataEvent::Data(Data::Depth10(depth)))
304                                    {
305                                        log::error!("Failed to send order book depth10: {e}");
306                                    }
307                                }
308                                NautilusWsMessage::Candle(bar) => {
309                                    if let Err(e) = data_sender
310                                        .send(DataEvent::Data(Data::Bar(bar)))
311                                    {
312                                        log::error!("Failed to send bar: {e}");
313                                    }
314                                }
315                                NautilusWsMessage::MarkPrice(update) => {
316                                    if let Err(e) = data_sender
317                                        .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
318                                    {
319                                        log::error!("Failed to send mark price update: {e}");
320                                    }
321                                }
322                                NautilusWsMessage::IndexPrice(update) => {
323                                    if let Err(e) = data_sender
324                                        .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
325                                    {
326                                        log::error!("Failed to send index price update: {e}");
327                                    }
328                                }
329                                NautilusWsMessage::FundingRate(update) => {
330                                    if let Err(e) = data_sender
331                                        .send(DataEvent::FundingRate(update))
332                                    {
333                                        log::error!("Failed to send funding rate update: {e}");
334                                    }
335                                }
336                                NautilusWsMessage::CustomData(data) => {
337                                    if let Err(e) = data_sender.send(DataEvent::Data(data)) {
338                                        log::error!("Failed to send custom data: {e}");
339                                    }
340                                }
341                                NautilusWsMessage::Reconnected => {
342                                    log::info!("WebSocket reconnected");
343                                }
344                                NautilusWsMessage::Error(e) => {
345                                    log::error!("WebSocket error: {e}");
346                                }
347                                NautilusWsMessage::ExecutionReports(_) => {
348                                    // Handled by execution client
349                                }
350                            }
351                        } else {
352                            // Connection closed or error
353                            log::debug!("WebSocket next_event returned None, stream closed");
354                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
355                        }
356                    }
357                }
358            }
359
360            log::info!("Hyperliquid WebSocket consumption loop finished");
361        });
362
363        let mut slot = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
364        *slot = Some(task);
365        log::info!("WebSocket consumption task spawned");
366
367        Ok(())
368    }
369}
370
371#[async_trait::async_trait(?Send)]
372impl DataClient for HyperliquidDataClient {
373    fn client_id(&self) -> ClientId {
374        self.client_id
375    }
376
377    fn venue(&self) -> Option<Venue> {
378        Some(self.venue())
379    }
380
381    fn start(&mut self) -> anyhow::Result<()> {
382        log::info!(
383            "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
384            self.client_id,
385            self.config.environment,
386            self.config.proxy_url,
387        );
388        Ok(())
389    }
390
391    fn stop(&mut self) -> anyhow::Result<()> {
392        log::info!("Stopping Hyperliquid data client {}", self.client_id);
393        self.cancellation_token.cancel();
394        self.is_connected.store(false, Ordering::Relaxed);
395        Ok(())
396    }
397
398    fn reset(&mut self) -> anyhow::Result<()> {
399        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
400        self.is_connected.store(false, Ordering::Relaxed);
401        self.cancellation_token = CancellationToken::new();
402        self.abort_pending_tasks();
403
404        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
405            handle.abort();
406        }
407        Ok(())
408    }
409
410    fn dispose(&mut self) -> anyhow::Result<()> {
411        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
412        self.stop()
413    }
414
415    fn is_connected(&self) -> bool {
416        self.is_connected.load(Ordering::Acquire)
417    }
418
419    fn is_disconnected(&self) -> bool {
420        !self.is_connected()
421    }
422
423    async fn connect(&mut self) -> anyhow::Result<()> {
424        if self.is_connected() {
425            return Ok(());
426        }
427
428        if self.cancellation_token.is_cancelled() {
429            self.cancellation_token = CancellationToken::new();
430        }
431
432        register_hyperliquid_custom_data();
433
434        let instruments = self
435            .bootstrap_instruments()
436            .await
437            .context("failed to bootstrap instruments")?;
438
439        for instrument in instruments {
440            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
441                log::warn!("Failed to send instrument: {e}");
442            }
443        }
444
445        self.spawn_ws()
446            .await
447            .context("failed to spawn WebSocket client")?;
448
449        self.is_connected.store(true, Ordering::Relaxed);
450        log::info!("Connected: client_id={}", self.client_id);
451
452        Ok(())
453    }
454
455    async fn disconnect(&mut self) -> anyhow::Result<()> {
456        if !self.is_connected() {
457            return Ok(());
458        }
459
460        self.cancellation_token.cancel();
461
462        let ws_stream_handle = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take();
463        if let Some(handle) = ws_stream_handle
464            && let Err(e) = handle.await
465        {
466            log::error!("Error waiting for WebSocket stream task: {e}");
467        }
468
469        self.abort_pending_tasks();
470
471        if let Err(e) = self.ws_client.disconnect().await {
472            log::error!("Error disconnecting WebSocket client: {e}");
473        }
474
475        self.instruments.store(AHashMap::new());
476
477        self.is_connected.store(false, Ordering::Relaxed);
478        log::info!("Disconnected: client_id={}", self.client_id);
479
480        Ok(())
481    }
482
483    fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
484        let data_type = cmd.data_type.type_name();
485
486        if data_type == "HyperliquidAllMids" {
487            let ws = self.ws_client.clone();
488            let dex = cmd
489                .data_type
490                .metadata()
491                .as_ref()
492                .and_then(|m| m.get("dex"))
493                .and_then(|v| v.as_str())
494                .map(str::trim)
495                .filter(|value| !value.is_empty())
496                .map(ToString::to_string);
497
498            log::debug!("Subscribing to all mids (dex: {:?})", dex.as_deref());
499
500            self.spawn_task("subscribe_all_mids", async move {
501                ws.subscribe_all_mids_with_dex(dex.as_deref()).await
502            });
503
504            return Ok(());
505        }
506
507        if data_type == "HyperliquidAllDexsAssetCtxs" {
508            let ws = self.ws_client.clone();
509
510            self.spawn_task("subscribe_all_dexs_asset_ctxs", async move {
511                ws.subscribe_all_dexs_asset_ctxs().await
512            });
513
514            return Ok(());
515        }
516
517        if data_type == "HyperliquidOpenInterest" {
518            let ws = self.ws_client.clone();
519            let instrument_id = Self::custom_instrument_id(&cmd.data_type)?.context(
520                "HyperliquidOpenInterest subscriptions require metadata['instrument_id']",
521            )?;
522
523            self.spawn_task("subscribe_open_interest", async move {
524                ws.subscribe_open_interest(instrument_id).await
525            });
526
527            return Ok(());
528        }
529
530        log::warn!("Unsupported custom data subscription: {data_type}");
531        Ok(())
532    }
533
534    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
535        let data_type = cmd.data_type.type_name();
536
537        if data_type == "HyperliquidAllMids" {
538            let ws = self.ws_client.clone();
539            let dex = cmd
540                .data_type
541                .metadata()
542                .as_ref()
543                .and_then(|m| m.get("dex"))
544                .and_then(|v| v.as_str())
545                .map(str::trim)
546                .filter(|value| !value.is_empty())
547                .map(ToString::to_string);
548
549            log::debug!("Unsubscribing from all mids (dex: {:?})", dex.as_deref());
550
551            self.spawn_task("unsubscribe_all_mids", async move {
552                ws.unsubscribe_all_mids_with_dex(dex.as_deref()).await
553            });
554
555            return Ok(());
556        }
557
558        if data_type == "HyperliquidAllDexsAssetCtxs" {
559            let ws = self.ws_client.clone();
560
561            self.spawn_task("unsubscribe_all_dexs_asset_ctxs", async move {
562                ws.unsubscribe_all_dexs_asset_ctxs().await
563            });
564
565            return Ok(());
566        }
567
568        if data_type == "HyperliquidOpenInterest" {
569            let ws = self.ws_client.clone();
570            let instrument_id = Self::custom_instrument_id(&cmd.data_type)?.context(
571                "HyperliquidOpenInterest unsubscriptions require metadata['instrument_id']",
572            )?;
573
574            self.spawn_task("unsubscribe_open_interest", async move {
575                ws.unsubscribe_open_interest(instrument_id).await
576            });
577
578            return Ok(());
579        }
580
581        log::warn!("Unsupported custom data unsubscription: {data_type}");
582        Ok(())
583    }
584
585    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
586        let instruments = self.instruments.load();
587        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
588            if let Err(e) = self
589                .data_sender
590                .send(DataEvent::Instrument(instrument.clone()))
591            {
592                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
593            }
594        } else {
595            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
596        }
597        Ok(())
598    }
599
600    fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
601        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
602
603        if subscription.book_type != BookType::L2_MBP {
604            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
605        }
606
607        let ws = self.ws_client.clone();
608        let instrument_id = subscription.instrument_id;
609        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
610
611        self.spawn_task("subscribe_book_deltas", async move {
612            ws.subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
613                .await
614        });
615
616        Ok(())
617    }
618
619    fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
620        log::debug!(
621            "Subscribing to book depth10: {}",
622            subscription.instrument_id
623        );
624
625        if subscription.book_type != BookType::L2_MBP {
626            anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
627        }
628
629        let ws = self.ws_client.clone();
630        let instrument_id = subscription.instrument_id;
631        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
632
633        self.spawn_task("subscribe_book_depth10", async move {
634            ws.subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
635                .await
636        });
637
638        Ok(())
639    }
640
641    fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
642        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
643
644        let ws = self.ws_client.clone();
645        let instrument_id = subscription.instrument_id;
646
647        self.spawn_task("subscribe_quotes", async move {
648            ws.subscribe_quotes(instrument_id).await
649        });
650
651        Ok(())
652    }
653
654    fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
655        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
656
657        let ws = self.ws_client.clone();
658        let instrument_id = subscription.instrument_id;
659
660        self.spawn_task("subscribe_trades", async move {
661            ws.subscribe_trades(instrument_id).await
662        });
663
664        Ok(())
665    }
666
667    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
668        let ws = self.ws_client.clone();
669        let instrument_id = cmd.instrument_id;
670
671        self.spawn_task("subscribe_mark_prices", async move {
672            ws.subscribe_mark_prices(instrument_id).await
673        });
674
675        Ok(())
676    }
677
678    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
679        let ws = self.ws_client.clone();
680        let instrument_id = cmd.instrument_id;
681
682        self.spawn_task("subscribe_index_prices", async move {
683            ws.subscribe_index_prices(instrument_id).await
684        });
685
686        Ok(())
687    }
688
689    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
690        let ws = self.ws_client.clone();
691        let instrument_id = cmd.instrument_id;
692
693        self.spawn_task("subscribe_funding_rates", async move {
694            ws.subscribe_funding_rates(instrument_id).await
695        });
696
697        Ok(())
698    }
699
700    fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
701        log::debug!("Subscribing to bars: {}", subscription.bar_type);
702
703        let instrument_id = subscription.bar_type.instrument_id();
704        if !self.instruments.contains_key(&instrument_id) {
705            anyhow::bail!("Instrument {instrument_id} not found");
706        }
707
708        let bar_type = subscription.bar_type;
709        let ws = self.ws_client.clone();
710
711        self.spawn_task("subscribe_bars", async move {
712            ws.subscribe_bars(bar_type).await
713        });
714
715        Ok(())
716    }
717
718    fn unsubscribe_book_deltas(
719        &mut self,
720        unsubscription: &UnsubscribeBookDeltas,
721    ) -> anyhow::Result<()> {
722        log::debug!(
723            "Unsubscribing from book deltas: {}",
724            unsubscription.instrument_id
725        );
726
727        let ws = self.ws_client.clone();
728        let instrument_id = unsubscription.instrument_id;
729
730        self.spawn_task("unsubscribe_book_deltas", async move {
731            ws.unsubscribe_book(instrument_id).await
732        });
733
734        Ok(())
735    }
736
737    fn unsubscribe_book_depth10(
738        &mut self,
739        unsubscription: &UnsubscribeBookDepth10,
740    ) -> anyhow::Result<()> {
741        log::debug!(
742            "Unsubscribing from book depth10: {}",
743            unsubscription.instrument_id
744        );
745
746        let ws = self.ws_client.clone();
747        let instrument_id = unsubscription.instrument_id;
748
749        self.spawn_task("unsubscribe_book_depth10", async move {
750            ws.unsubscribe_book_depth10(instrument_id).await
751        });
752
753        Ok(())
754    }
755
756    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
757        log::debug!(
758            "Unsubscribing from quotes: {}",
759            unsubscription.instrument_id
760        );
761
762        let ws = self.ws_client.clone();
763        let instrument_id = unsubscription.instrument_id;
764
765        self.spawn_task("unsubscribe_quotes", async move {
766            ws.unsubscribe_quotes(instrument_id).await
767        });
768
769        Ok(())
770    }
771
772    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
773        log::debug!(
774            "Unsubscribing from trades: {}",
775            unsubscription.instrument_id
776        );
777
778        let ws = self.ws_client.clone();
779        let instrument_id = unsubscription.instrument_id;
780
781        self.spawn_task("unsubscribe_trades", async move {
782            ws.unsubscribe_trades(instrument_id).await
783        });
784
785        Ok(())
786    }
787
788    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
789        let ws = self.ws_client.clone();
790        let instrument_id = cmd.instrument_id;
791
792        self.spawn_task("unsubscribe_mark_prices", async move {
793            ws.unsubscribe_mark_prices(instrument_id).await
794        });
795
796        Ok(())
797    }
798
799    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
800        let ws = self.ws_client.clone();
801        let instrument_id = cmd.instrument_id;
802
803        self.spawn_task("unsubscribe_index_prices", async move {
804            ws.unsubscribe_index_prices(instrument_id).await
805        });
806
807        Ok(())
808    }
809
810    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
811        let ws = self.ws_client.clone();
812        let instrument_id = cmd.instrument_id;
813
814        self.spawn_task("unsubscribe_funding_rates", async move {
815            ws.unsubscribe_funding_rates(instrument_id).await
816        });
817
818        Ok(())
819    }
820
821    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
822        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
823
824        let bar_type = unsubscription.bar_type;
825        let ws = self.ws_client.clone();
826
827        self.spawn_task("unsubscribe_bars", async move {
828            ws.unsubscribe_bars(bar_type).await
829        });
830
831        Ok(())
832    }
833
834    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
835        log::debug!("Requesting all instruments");
836
837        let http = self.http_client.clone();
838        let sender = self.data_sender.clone();
839        let instruments_cache = self.instruments.clone();
840        let coin_map = self.coin_to_instrument_id.clone();
841        let ws_instruments = self.ws_client.instruments_cache();
842        let request_id = request.request_id;
843        let client_id = request.client_id.unwrap_or(self.client_id);
844        let venue = self.venue();
845        let start_nanos = datetime_to_unix_nanos(request.start);
846        let end_nanos = datetime_to_unix_nanos(request.end);
847        let params = request.params;
848        let clock = self.clock;
849
850        self.spawn_task("request_instruments", async move {
851            let instruments = http
852                .request_instruments()
853                .await
854                .context("failed to fetch instruments from Hyperliquid")?;
855
856            instruments_cache.rcu(|instruments_map| {
857                coin_map.rcu(|coin_to_id| {
858                    for instrument in &instruments {
859                        let instrument_id = instrument.id();
860                        instruments_map.insert(instrument_id, instrument.clone());
861                        let coin = instrument.raw_symbol().inner();
862                        coin_to_id.insert(coin, instrument_id);
863                        ws_instruments.insert(coin, instrument.clone());
864                    }
865                });
866            });
867
868            let response = DataResponse::Instruments(InstrumentsResponse::new(
869                request_id,
870                client_id,
871                venue,
872                instruments,
873                start_nanos,
874                end_nanos,
875                clock.get_time_ns(),
876                params,
877            ));
878
879            if let Err(e) = sender.send(DataEvent::Response(response)) {
880                log::error!("Failed to send instruments response: {e}");
881            }
882            Ok(())
883        });
884
885        Ok(())
886    }
887
888    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
889        log::debug!("Requesting instrument: {}", request.instrument_id);
890
891        let http = self.http_client.clone();
892        let sender = self.data_sender.clone();
893        let instruments_cache = self.instruments.clone();
894        let coin_map = self.coin_to_instrument_id.clone();
895        let ws_instruments = self.ws_client.instruments_cache();
896        let instrument_id = request.instrument_id;
897        let request_id = request.request_id;
898        let client_id = request.client_id.unwrap_or(self.client_id);
899        let start_nanos = datetime_to_unix_nanos(request.start);
900        let end_nanos = datetime_to_unix_nanos(request.end);
901        let params = request.params;
902        let clock = self.clock;
903
904        self.spawn_task("request_instrument", async move {
905            let all_instruments = http
906                .request_instruments()
907                .await
908                .context("failed to fetch instruments from Hyperliquid")?;
909
910            instruments_cache.rcu(|instruments_map| {
911                coin_map.rcu(|coin_to_id| {
912                    for instrument in &all_instruments {
913                        let id = instrument.id();
914                        instruments_map.insert(id, instrument.clone());
915                        let coin = instrument.raw_symbol().inner();
916                        coin_to_id.insert(coin, id);
917                        ws_instruments.insert(coin, instrument.clone());
918                    }
919                });
920            });
921
922            if let Some(instrument) = all_instruments
923                .into_iter()
924                .find(|i| i.id() == instrument_id)
925            {
926                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
927                    request_id,
928                    client_id,
929                    instrument.id(),
930                    instrument,
931                    start_nanos,
932                    end_nanos,
933                    clock.get_time_ns(),
934                    params,
935                )));
936
937                if let Err(e) = sender.send(DataEvent::Response(response)) {
938                    log::error!("Failed to send instrument response: {e}");
939                }
940            } else {
941                log::error!("Instrument not found: {instrument_id}");
942            }
943            Ok(())
944        });
945
946        Ok(())
947    }
948
949    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
950        log::debug!("Requesting bars for {}", request.bar_type);
951
952        let http = self.http_client.clone();
953        let sender = self.data_sender.clone();
954        let bar_type = request.bar_type;
955        let start = request.start;
956        let end = request.end;
957        let limit = request.limit.map(|n| n.get() as u32);
958        let request_id = request.request_id;
959        let client_id = request.client_id.unwrap_or(self.client_id);
960        let params = request.params;
961        let clock = self.clock;
962        let start_nanos = datetime_to_unix_nanos(start);
963        let end_nanos = datetime_to_unix_nanos(end);
964        let instruments = Arc::clone(&self.instruments);
965
966        self.spawn_task("request_bars", async move {
967            let bars = request_bars_from_http(http, bar_type, start, end, limit, instruments)
968                .await
969                .context("bar request failed")?;
970
971            let response = DataResponse::Bars(BarsResponse::new(
972                request_id,
973                client_id,
974                bar_type,
975                bars,
976                start_nanos,
977                end_nanos,
978                clock.get_time_ns(),
979                params,
980            ));
981
982            if let Err(e) = sender.send(DataEvent::Response(response)) {
983                log::error!("Failed to send bars response: {e}");
984            }
985            Ok(())
986        });
987
988        Ok(())
989    }
990
991    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
992        // Hyperliquid has no public trade-tape REST endpoint; real-time
993        // trades are available via the `trades` WebSocket channel and
994        // account-scoped fills via `userFills`/`userFillsByTime`, but
995        // market-wide trade history cannot be served.
996        anyhow::bail!(
997            "Historical trade requests are not supported by Hyperliquid for {}; \
998             subscribe to trades via WebSocket for live trade ticks",
999            request.instrument_id,
1000        )
1001    }
1002
1003    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1004        let instrument_id = request.instrument_id;
1005        log::debug!("Requesting funding rates for {instrument_id}");
1006
1007        let instruments = self.instruments.load();
1008        let instrument = instruments
1009            .get(&instrument_id)
1010            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1011
1012        if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1013            anyhow::bail!("Funding rates are only available for perpetual instruments");
1014        }
1015
1016        let coin = instrument.raw_symbol().to_string();
1017        let http = self.http_client.clone();
1018        let sender = self.data_sender.clone();
1019        let client_id = request.client_id.unwrap_or(self.client_id);
1020        let request_id = request.request_id;
1021        let params = request.params;
1022        let clock = self.clock;
1023        let limit = request.limit.map(|n| n.get());
1024        let start_dt = request.start;
1025        let end_dt = request.end;
1026        let start_nanos = datetime_to_unix_nanos(start_dt);
1027        let end_nanos = datetime_to_unix_nanos(end_dt);
1028
1029        let now_ms = Utc::now().timestamp_millis() as u64;
1030
1031        // Hyperliquid requires a startTime; default to a 7-day lookback when none given
1032        let default_lookback_ms: u64 = 7 * 86_400_000;
1033        let start_ms = match start_dt {
1034            Some(dt) => dt.timestamp_millis().max(0) as u64,
1035            None => now_ms.saturating_sub(default_lookback_ms),
1036        };
1037        let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1038
1039        self.spawn_task("request_funding_rates", async move {
1040            let entries = http
1041                .info_funding_history(&coin, start_ms, end_ms)
1042                .await
1043                .with_context(|| format!("funding rates request failed for {instrument_id}"))?;
1044
1045            let mut funding_rates: Vec<FundingRateUpdate> = entries
1046                .iter()
1047                .filter_map(
1048                    |entry| match funding_entry_to_update(entry, instrument_id) {
1049                        Ok(update) => Some(update),
1050                        Err(e) => {
1051                            log::warn!("Skipping funding history entry for {instrument_id}: {e}",);
1052                            None
1053                        }
1054                    },
1055                )
1056                .collect();
1057
1058            if let Some(limit) = limit
1059                && funding_rates.len() > limit
1060            {
1061                funding_rates.truncate(limit);
1062            }
1063
1064            log::debug!(
1065                "Fetched {} funding rates for {instrument_id}",
1066                funding_rates.len(),
1067            );
1068
1069            let response = DataResponse::FundingRates(FundingRatesResponse::new(
1070                request_id,
1071                client_id,
1072                instrument_id,
1073                funding_rates,
1074                start_nanos,
1075                end_nanos,
1076                clock.get_time_ns(),
1077                params,
1078            ));
1079
1080            if let Err(e) = sender.send(DataEvent::Response(response)) {
1081                log::error!("Failed to send funding rates response: {e}");
1082            }
1083            Ok(())
1084        });
1085
1086        Ok(())
1087    }
1088
1089    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1090        let instrument_id = request.instrument_id;
1091        let instruments = self.instruments.load();
1092        let instrument = instruments
1093            .get(&instrument_id)
1094            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1095
1096        let raw_symbol = instrument.raw_symbol().to_string();
1097        let price_precision = instrument.price_precision();
1098        let size_precision = instrument.size_precision();
1099        let depth = request.depth.map(|d| d.get());
1100
1101        let http = self.http_client.clone();
1102        let sender = self.data_sender.clone();
1103        let client_id = request.client_id.unwrap_or(self.client_id);
1104        let request_id = request.request_id;
1105        let params = request.params;
1106        let clock = self.clock;
1107
1108        self.spawn_task("request_book_snapshot", async move {
1109            let l2_book = http
1110                .info_l2_book(&raw_symbol)
1111                .await
1112                .with_context(|| format!("book snapshot request failed for {instrument_id}"))?;
1113
1114            let book = parse_l2_book_snapshot(
1115                &l2_book,
1116                instrument_id,
1117                price_precision,
1118                size_precision,
1119                depth,
1120            );
1121
1122            let response = DataResponse::Book(BookResponse::new(
1123                request_id,
1124                client_id,
1125                instrument_id,
1126                book,
1127                None,
1128                None,
1129                clock.get_time_ns(),
1130                params,
1131            ));
1132
1133            if let Err(e) = sender.send(DataEvent::Response(response)) {
1134                log::error!("Failed to send book snapshot response: {e}");
1135            }
1136            Ok(())
1137        });
1138
1139        Ok(())
1140    }
1141}
1142
1143// Levels with unparsable px/sz or non-positive size are skipped rather than
1144// erroring; the snapshot's `time` field (ms) becomes `ts_event` after the
1145// ms->ns conversion.
1146pub(crate) fn parse_l2_book_snapshot(
1147    l2_book: &HyperliquidL2Book,
1148    instrument_id: InstrumentId,
1149    price_precision: u8,
1150    size_precision: u8,
1151    depth: Option<usize>,
1152) -> OrderBook {
1153    let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1154    let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1155
1156    let all_bids = l2_book
1157        .levels
1158        .first()
1159        .map_or([].as_slice(), |v| v.as_slice());
1160    let all_asks = l2_book
1161        .levels
1162        .get(1)
1163        .map_or([].as_slice(), |v| v.as_slice());
1164
1165    let bids = match depth {
1166        Some(d) if d < all_bids.len() => &all_bids[..d],
1167        _ => all_bids,
1168    };
1169    let asks = match depth {
1170        Some(d) if d < all_asks.len() => &all_asks[..d],
1171        _ => all_asks,
1172    };
1173
1174    for (i, level) in bids.iter().enumerate() {
1175        let Ok(px) = level.px.parse::<f64>() else {
1176            continue;
1177        };
1178        let Ok(sz) = level.sz.parse::<f64>() else {
1179            continue;
1180        };
1181
1182        if sz > 0.0 {
1183            let price = Price::new(px, price_precision);
1184            let size = Quantity::new(sz, size_precision);
1185            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1186            book.add(order, 0, i as u64, ts_event);
1187        }
1188    }
1189
1190    let bids_len = bids.len();
1191
1192    for (i, level) in asks.iter().enumerate() {
1193        let Ok(px) = level.px.parse::<f64>() else {
1194            continue;
1195        };
1196        let Ok(sz) = level.sz.parse::<f64>() else {
1197            continue;
1198        };
1199
1200        if sz > 0.0 {
1201            let price = Price::new(px, price_precision);
1202            let size = Quantity::new(sz, size_precision);
1203            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1204            book.add(order, 0, (bids_len + i) as u64, ts_event);
1205        }
1206    }
1207
1208    log::info!(
1209        "Built order book for {instrument_id} with {} bids and {} asks",
1210        bids.len(),
1211        asks.len(),
1212    );
1213
1214    book
1215}
1216
1217// Reads optional `nSigFigs` / `mantissa` L2 precision controls from
1218// `subscribe_params`; bails on non-positive integer values.
1219pub(crate) fn parse_book_precision_params(
1220    params: Option<&Params>,
1221) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1222    let Some(params) = params else {
1223        return Ok((None, None));
1224    };
1225
1226    let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1227        match params.get(key) {
1228            None => Ok(None),
1229            Some(v) => v
1230                .as_u64()
1231                .and_then(|n| u32::try_from(n).ok())
1232                .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1233                .map(Some),
1234        }
1235    };
1236
1237    Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1238}
1239
1240// Hyperliquid funds perpetuals hourly, so `interval` is fixed at 60 mins;
1241// `time` from the venue marks the end of the funding interval in ms.
1242pub(crate) fn funding_entry_to_update(
1243    entry: &HyperliquidFundingHistoryEntry,
1244    instrument_id: InstrumentId,
1245) -> anyhow::Result<FundingRateUpdate> {
1246    let rate: Decimal = entry
1247        .funding_rate
1248        .parse()
1249        .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1250    let ts = UnixNanos::from(entry.time * 1_000_000);
1251    Ok(FundingRateUpdate::new(
1252        instrument_id,
1253        rate,
1254        Some(60),
1255        None,
1256        ts,
1257        ts,
1258    ))
1259}
1260
1261pub(crate) fn candle_to_bar(
1262    candle: &HyperliquidCandle,
1263    bar_type: BarType,
1264    price_precision: u8,
1265    size_precision: u8,
1266) -> anyhow::Result<Bar> {
1267    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1268    let ts_event = ts_init;
1269
1270    let open = candle.open.parse::<f64>().context("parse open price")?;
1271    let high = candle.high.parse::<f64>().context("parse high price")?;
1272    let low = candle.low.parse::<f64>().context("parse low price")?;
1273    let close = candle.close.parse::<f64>().context("parse close price")?;
1274    let volume = candle.volume.parse::<f64>().context("parse volume")?;
1275
1276    Ok(Bar::new(
1277        bar_type,
1278        Price::new(open, price_precision),
1279        Price::new(high, price_precision),
1280        Price::new(low, price_precision),
1281        Price::new(close, price_precision),
1282        Quantity::new(volume, size_precision),
1283        ts_event,
1284        ts_init,
1285    ))
1286}
1287
1288/// Request bars from HTTP API.
1289async fn request_bars_from_http(
1290    http_client: HyperliquidHttpClient,
1291    bar_type: BarType,
1292    start: Option<DateTime<Utc>>,
1293    end: Option<DateTime<Utc>>,
1294    limit: Option<u32>,
1295    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1296) -> anyhow::Result<Vec<Bar>> {
1297    // Get instrument details for precision
1298    let instrument_id = bar_type.instrument_id();
1299    let instrument = instruments
1300        .load()
1301        .get(&instrument_id)
1302        .cloned()
1303        .context("instrument not found in cache")?;
1304
1305    let price_precision = instrument.price_precision();
1306    let size_precision = instrument.size_precision();
1307    let raw_symbol = instrument.raw_symbol();
1308    let coin = raw_symbol.as_str();
1309
1310    let interval = bar_type_to_interval(&bar_type)?;
1311
1312    // Hyperliquid uses millisecond timestamps
1313    let now = Utc::now();
1314    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1315    let start_time = if let Some(start) = start {
1316        start.timestamp_millis() as u64
1317    } else {
1318        // Default to 1000 bars before end_time
1319        let spec = bar_type.spec();
1320        let step_ms = match spec.aggregation {
1321            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1322            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1323            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1324            _ => 60_000,
1325        };
1326        end_time.saturating_sub(1000 * step_ms)
1327    };
1328
1329    let candles = http_client
1330        .info_candle_snapshot(coin, interval, start_time, end_time)
1331        .await
1332        .context("failed to fetch candle snapshot from Hyperliquid")?;
1333
1334    let mut bars: Vec<Bar> = candles
1335        .iter()
1336        .filter_map(|candle| {
1337            candle_to_bar(candle, bar_type, price_precision, size_precision)
1338                .map_err(|e| {
1339                    log::warn!("Failed to convert candle to bar: {e}");
1340                    e
1341                })
1342                .ok()
1343        })
1344        .collect();
1345
1346    if let Some(limit) = limit
1347        && bars.len() > limit as usize
1348    {
1349        bars = bars.into_iter().take(limit as usize).collect();
1350    }
1351
1352    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1353    Ok(bars)
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358    use rstest::rstest;
1359    use rust_decimal_macros::dec;
1360    use ustr::Ustr;
1361
1362    use super::*;
1363    use crate::common::testing::load_test_data;
1364
1365    fn btc_perp_id() -> InstrumentId {
1366        InstrumentId::from("BTC-PERP.HYPERLIQUID")
1367    }
1368
1369    #[rstest]
1370    fn test_funding_entry_to_update_parses_positive_rate() {
1371        let entry = HyperliquidFundingHistoryEntry {
1372            coin: Ustr::from("BTC"),
1373            funding_rate: "0.0000125".to_string(),
1374            premium: Some("0.00029005".to_string()),
1375            time: 1769908800000,
1376        };
1377        let instrument_id = btc_perp_id();
1378
1379        let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1380
1381        assert_eq!(update.instrument_id, instrument_id);
1382        assert_eq!(update.rate, dec!(0.0000125));
1383        assert_eq!(update.interval, Some(60));
1384        assert!(update.next_funding_ns.is_none());
1385        assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1386        assert_eq!(update.ts_init, update.ts_event);
1387    }
1388
1389    #[rstest]
1390    fn test_funding_entry_to_update_handles_negative_rate() {
1391        let entry = HyperliquidFundingHistoryEntry {
1392            coin: Ustr::from("BTC"),
1393            funding_rate: "-0.0000081".to_string(),
1394            premium: None,
1395            time: 1769912400000,
1396        };
1397        let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1398        assert_eq!(update.rate, dec!(-0.0000081));
1399    }
1400
1401    #[rstest]
1402    fn test_funding_entry_to_update_rejects_invalid_rate() {
1403        let entry = HyperliquidFundingHistoryEntry {
1404            coin: Ustr::from("BTC"),
1405            funding_rate: "not-a-number".to_string(),
1406            premium: None,
1407            time: 1769912400000,
1408        };
1409        let result = funding_entry_to_update(&entry, btc_perp_id());
1410        assert!(result.is_err());
1411    }
1412
1413    #[rstest]
1414    fn test_parse_book_precision_params_none() {
1415        let (n, m) = parse_book_precision_params(None).unwrap();
1416        assert_eq!(n, None);
1417        assert_eq!(m, None);
1418    }
1419
1420    fn make_params(json: serde_json::Value) -> Params {
1421        serde_json::from_value(json).expect("valid params payload")
1422    }
1423
1424    #[rstest]
1425    fn test_parse_book_precision_params_only_n_sig_figs() {
1426        let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1427        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1428        assert_eq!(n, Some(4));
1429        assert_eq!(m, None);
1430    }
1431
1432    #[rstest]
1433    fn test_parse_book_precision_params_both() {
1434        let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1435        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1436        assert_eq!(n, Some(5));
1437        assert_eq!(m, Some(2));
1438    }
1439
1440    #[rstest]
1441    fn test_parse_book_precision_params_rejects_negative() {
1442        let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1443        let err = parse_book_precision_params(Some(&params)).unwrap_err();
1444        assert!(err.to_string().contains("n_sig_figs"));
1445    }
1446
1447    #[rstest]
1448    fn test_funding_history_fixture_parses() {
1449        let entries: Vec<HyperliquidFundingHistoryEntry> =
1450            load_test_data("http_funding_history.json");
1451        assert_eq!(entries.len(), 3);
1452        assert_eq!(entries[0].coin.as_str(), "BTC");
1453        assert_eq!(entries[0].funding_rate, "0.0000125");
1454        assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1455        assert!(entries[2].premium.is_none());
1456
1457        let updates: Vec<FundingRateUpdate> = entries
1458            .iter()
1459            .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1460            .collect();
1461        assert_eq!(updates.len(), 3);
1462        assert_eq!(updates[0].rate, dec!(0.0000125));
1463        assert_eq!(updates[1].rate, dec!(-0.0000081));
1464        assert_eq!(updates[2].rate, dec!(0.0000033));
1465    }
1466
1467    fn level(px: &str, sz: &str) -> crate::http::models::HyperliquidLevel {
1468        crate::http::models::HyperliquidLevel {
1469            px: px.to_string(),
1470            sz: sz.to_string(),
1471        }
1472    }
1473
1474    fn sample_l2_book() -> HyperliquidL2Book {
1475        HyperliquidL2Book {
1476            coin: Ustr::from("BTC"),
1477            levels: vec![
1478                vec![
1479                    level("98450.50", "2.5"),
1480                    level("98449.00", "1.2"),
1481                    level("98448.00", "0.8"),
1482                ],
1483                vec![
1484                    level("98451.00", "1.5"),
1485                    level("98452.00", "2.0"),
1486                    level("98453.00", "0.5"),
1487                ],
1488            ],
1489            time: 1769908800000,
1490        }
1491    }
1492
1493    #[rstest]
1494    fn test_parse_l2_book_snapshot_populates_both_sides() {
1495        let book_data = sample_l2_book();
1496        let instrument_id = btc_perp_id();
1497        let book = parse_l2_book_snapshot(&book_data, instrument_id, 2, 4, None);
1498
1499        assert_eq!(book.instrument_id, instrument_id);
1500        assert_eq!(book.book_type, BookType::L2_MBP);
1501        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1502        assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1503        assert_eq!(book.best_bid_size(), Some(Quantity::new(2.5, 4)));
1504        assert_eq!(book.best_ask_size(), Some(Quantity::new(1.5, 4)));
1505        assert_eq!(book.update_count, 6);
1506    }
1507
1508    #[rstest]
1509    fn test_parse_l2_book_snapshot_truncates_to_depth() {
1510        let book_data = sample_l2_book();
1511        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, Some(1));
1512
1513        // depth=1 keeps the top of book on each side, drops the rest.
1514        assert_eq!(book.update_count, 2);
1515        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1516        assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1517    }
1518
1519    #[rstest]
1520    fn test_parse_l2_book_snapshot_uses_venue_time_as_ts_event() {
1521        let book_data = sample_l2_book();
1522        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1523        let expected_ts = UnixNanos::from(1769908800000_u64 * 1_000_000);
1524
1525        // ts_last reflects the last applied delta; every added order
1526        // carries the venue time after the ms->ns conversion.
1527        assert_eq!(book.ts_last, expected_ts);
1528    }
1529
1530    #[rstest]
1531    fn test_parse_l2_book_snapshot_skips_non_positive_size() {
1532        let book_data = HyperliquidL2Book {
1533            coin: Ustr::from("BTC"),
1534            levels: vec![
1535                vec![level("98450.50", "2.5"), level("98449.00", "0")],
1536                vec![level("98451.00", "0"), level("98452.00", "1.5")],
1537            ],
1538            time: 1769908800000,
1539        };
1540        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1541
1542        assert_eq!(book.update_count, 2, "zero-sized levels must be skipped");
1543        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1544        assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1545    }
1546
1547    #[rstest]
1548    fn test_parse_l2_book_snapshot_skips_unparsable_levels() {
1549        let book_data = HyperliquidL2Book {
1550            coin: Ustr::from("BTC"),
1551            levels: vec![
1552                vec![level("not-a-number", "1.0"), level("98449.00", "1.2")],
1553                vec![level("98451.00", "garbage"), level("98452.00", "1.5")],
1554            ],
1555            time: 1769908800000,
1556        };
1557        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1558
1559        // Each side has one parseable level remaining.
1560        assert_eq!(book.update_count, 2);
1561        assert_eq!(book.best_bid_price(), Some(Price::new(98449.00, 2)));
1562        assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1563    }
1564
1565    #[rstest]
1566    fn test_parse_l2_book_snapshot_empty_levels_yields_empty_book() {
1567        let book_data = HyperliquidL2Book {
1568            coin: Ustr::from("BTC"),
1569            levels: vec![],
1570            time: 1769908800000,
1571        };
1572        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1573
1574        assert_eq!(book.update_count, 0);
1575        assert!(book.best_bid_price().is_none());
1576        assert!(book.best_ask_price().is_none());
1577    }
1578}