Skip to main content

nautilus_hyperliquid/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc, Mutex,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    clients::DataClient,
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent,
29        data::{
30            BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
31            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
32            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeCustomData, SubscribeFundingRates,
34            SubscribeIndexPrices, SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes,
35            SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38        },
39    },
40};
41use nautilus_core::{
42    AtomicMap, MUTEX_POISONED, Params, UnixNanos,
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47    data::{Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDeltas_API},
48    enums::{BarAggregation, BookType, OrderSide},
49    identifiers::{ClientId, InstrumentId, Venue},
50    instruments::{Instrument, InstrumentAny},
51    orderbook::OrderBook,
52    types::{Price, Quantity},
53};
54use rust_decimal::Decimal;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60    common::{
61        consts::HYPERLIQUID_VENUE,
62        credential::{Secrets, credential_env_vars},
63        parse::bar_type_to_interval,
64    },
65    config::HyperliquidDataClientConfig,
66    data_types::register_hyperliquid_custom_data,
67    http::{
68        client::HyperliquidHttpClient,
69        models::{HyperliquidCandle, HyperliquidFundingHistoryEntry, HyperliquidL2Book},
70    },
71    websocket::{
72        client::HyperliquidWebSocketClient,
73        messages::{HyperliquidWsMessage, NautilusWsMessage},
74        parse::{
75            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
76        },
77    },
78};
79
80#[derive(Debug)]
81pub struct HyperliquidDataClient {
82    clock: &'static AtomicTime,
83    client_id: ClientId,
84    #[allow(dead_code)]
85    config: HyperliquidDataClientConfig,
86    http_client: HyperliquidHttpClient,
87    ws_client: HyperliquidWebSocketClient,
88    is_connected: AtomicBool,
89    cancellation_token: CancellationToken,
90    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
91    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
92    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
93    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
94    coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
95}
96
97impl HyperliquidDataClient {
98    /// Creates a new [`HyperliquidDataClient`] instance.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the HTTP client fails to initialize.
103    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
104        let clock = get_atomic_clock_realtime();
105        let data_sender = get_data_event_sender();
106
107        // Only fall back to unauthenticated when credentials are absent,
108        // not when they're invalid (fail fast on malformed keys)
109        let (pk_var, _) = credential_env_vars(config.environment);
110        let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
111
112        let mut http_client = if has_credentials {
113            let secrets =
114                Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
115            HyperliquidHttpClient::with_secrets(
116                &secrets,
117                config.http_timeout_secs,
118                config.proxy_url.clone(),
119            )?
120        } else {
121            HyperliquidHttpClient::new(
122                config.environment,
123                config.http_timeout_secs,
124                config.proxy_url.clone(),
125            )?
126        };
127
128        if let Some(url) = &config.base_url_http {
129            http_client.set_base_info_url(url.clone());
130        }
131
132        let ws_url = config.base_url_ws.clone();
133        let ws_client = HyperliquidWebSocketClient::new(
134            ws_url,
135            config.environment,
136            None,
137            config.transport_backend,
138            config.proxy_url.clone(),
139        );
140
141        Ok(Self {
142            clock,
143            client_id,
144            config,
145            http_client,
146            ws_client,
147            is_connected: AtomicBool::new(false),
148            cancellation_token: CancellationToken::new(),
149            ws_stream_handle: Mutex::new(None),
150            pending_tasks: Mutex::new(Vec::new()),
151            data_sender,
152            instruments: Arc::new(AtomicMap::new()),
153            coin_to_instrument_id: Arc::new(AtomicMap::new()),
154        })
155    }
156
157    fn spawn_task<F>(&self, description: &'static str, fut: F)
158    where
159        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
160    {
161        let runtime = get_runtime();
162        let handle = runtime.spawn(async move {
163            if let Err(e) = fut.await {
164                log::warn!("{description} failed: {e:?}");
165            }
166        });
167
168        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
169        tasks.retain(|handle| !handle.is_finished());
170        tasks.push(handle);
171    }
172
173    fn abort_pending_tasks(&self) {
174        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
175        for handle in tasks.drain(..) {
176            handle.abort();
177        }
178    }
179
180    fn venue(&self) -> Venue {
181        *HYPERLIQUID_VENUE
182    }
183
184    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
185        let instruments = self
186            .http_client
187            .request_instruments()
188            .await
189            .context("failed to fetch instruments during bootstrap")?;
190
191        self.instruments.rcu(|m| {
192            for instrument in &instruments {
193                m.insert(instrument.id(), instrument.clone());
194            }
195        });
196
197        self.coin_to_instrument_id.rcu(|m| {
198            for instrument in &instruments {
199                m.insert(instrument.raw_symbol().inner(), instrument.id());
200            }
201        });
202
203        for instrument in &instruments {
204            self.ws_client.cache_instrument(instrument.clone());
205        }
206
207        log::info!(
208            "Bootstrapped {} instruments with {} coin mappings",
209            self.instruments.len(),
210            self.coin_to_instrument_id.len()
211        );
212        Ok(instruments)
213    }
214
215    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
216        // Clone client before connecting so the clone can have out_rx set
217        let mut ws_client = self.ws_client.clone();
218
219        ws_client
220            .connect()
221            .await
222            .context("failed to connect to Hyperliquid WebSocket")?;
223
224        // Transfer task handle to original so disconnect() can await it
225        if let Some(handle) = ws_client.take_task_handle() {
226            self.ws_client.set_task_handle(handle);
227        }
228
229        let data_sender = self.data_sender.clone();
230        let cancellation_token = self.cancellation_token.clone();
231
232        let task = get_runtime().spawn(async move {
233            log::info!("Hyperliquid WebSocket consumption loop started");
234
235            loop {
236                tokio::select! {
237                    () = cancellation_token.cancelled() => {
238                        log::info!("WebSocket consumption loop cancelled");
239                        break;
240                    }
241                    msg_opt = ws_client.next_event() => {
242                        if let Some(msg) = msg_opt {
243                            match msg {
244                                NautilusWsMessage::Trades(trades) => {
245                                    for trade in trades {
246                                        if let Err(e) = data_sender
247                                            .send(DataEvent::Data(Data::Trade(trade)))
248                                        {
249                                            log::error!("Failed to send trade tick: {e}");
250                                        }
251                                    }
252                                }
253                                NautilusWsMessage::Quote(quote) => {
254                                    if let Err(e) = data_sender
255                                        .send(DataEvent::Data(Data::Quote(quote)))
256                                    {
257                                        log::error!("Failed to send quote tick: {e}");
258                                    }
259                                }
260                                NautilusWsMessage::Deltas(deltas) => {
261                                    if let Err(e) = data_sender
262                                        .send(DataEvent::Data(Data::Deltas(
263                                            OrderBookDeltas_API::new(deltas),
264                                        )))
265                                    {
266                                        log::error!("Failed to send order book deltas: {e}");
267                                    }
268                                }
269                                NautilusWsMessage::Depth10(depth) => {
270                                    if let Err(e) =
271                                        data_sender.send(DataEvent::Data(Data::Depth10(depth)))
272                                    {
273                                        log::error!("Failed to send order book depth10: {e}");
274                                    }
275                                }
276                                NautilusWsMessage::Candle(bar) => {
277                                    if let Err(e) = data_sender
278                                        .send(DataEvent::Data(Data::Bar(bar)))
279                                    {
280                                        log::error!("Failed to send bar: {e}");
281                                    }
282                                }
283                                NautilusWsMessage::MarkPrice(update) => {
284                                    if let Err(e) = data_sender
285                                        .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
286                                    {
287                                        log::error!("Failed to send mark price update: {e}");
288                                    }
289                                }
290                                NautilusWsMessage::IndexPrice(update) => {
291                                    if let Err(e) = data_sender
292                                        .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
293                                    {
294                                        log::error!("Failed to send index price update: {e}");
295                                    }
296                                }
297                                NautilusWsMessage::FundingRate(update) => {
298                                    if let Err(e) = data_sender
299                                        .send(DataEvent::FundingRate(update))
300                                    {
301                                        log::error!("Failed to send funding rate update: {e}");
302                                    }
303                                }
304                                NautilusWsMessage::CustomData(data) => {
305                                    if let Err(e) = data_sender.send(DataEvent::Data(data)) {
306                                        log::error!("Failed to send custom data: {e}");
307                                    }
308                                }
309                                NautilusWsMessage::Reconnected => {
310                                    log::info!("WebSocket reconnected");
311                                }
312                                NautilusWsMessage::Error(e) => {
313                                    log::error!("WebSocket error: {e}");
314                                }
315                                NautilusWsMessage::ExecutionReports(_) => {
316                                    // Handled by execution client
317                                }
318                            }
319                        } else {
320                            // Connection closed or error
321                            log::debug!("WebSocket next_event returned None, stream closed");
322                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
323                        }
324                    }
325                }
326            }
327
328            log::info!("Hyperliquid WebSocket consumption loop finished");
329        });
330
331        let mut slot = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
332        *slot = Some(task);
333        log::info!("WebSocket consumption task spawned");
334
335        Ok(())
336    }
337
338    #[allow(dead_code)]
339    fn handle_ws_message(
340        msg: HyperliquidWsMessage,
341        ws_client: &HyperliquidWebSocketClient,
342        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
343        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
344        coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
345        _venue: Venue,
346        clock: &'static AtomicTime,
347    ) {
348        match msg {
349            HyperliquidWsMessage::Bbo { data } => {
350                let coin = data.coin;
351                log::debug!("Received BBO message for coin: {coin}");
352
353                let coin_map = coin_to_instrument_id.load();
354                let instrument_id = coin_map.get(&data.coin);
355
356                if let Some(&instrument_id) = instrument_id {
357                    let instruments_map = instruments.load();
358                    if let Some(instrument) = instruments_map.get(&instrument_id) {
359                        let ts_init = clock.get_time_ns();
360
361                        match parse_ws_quote_tick(&data, instrument, ts_init) {
362                            Ok(quote_tick) => {
363                                log::debug!(
364                                    "Parsed quote tick for {}: bid={}, ask={}",
365                                    data.coin,
366                                    quote_tick.bid_price,
367                                    quote_tick.ask_price
368                                );
369
370                                if let Err(e) =
371                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
372                                {
373                                    log::error!("Failed to send quote tick: {e}");
374                                }
375                            }
376                            Err(e) => {
377                                log::error!("Failed to parse quote tick for {}: {e}", data.coin);
378                            }
379                        }
380                    }
381                } else {
382                    log::warn!(
383                        "Received BBO for unknown coin: {} (no matching instrument found)",
384                        data.coin
385                    );
386                }
387            }
388            HyperliquidWsMessage::Trades { data } => {
389                let count = data.len();
390                log::debug!("Received {count} trade(s)");
391
392                for trade_data in data {
393                    let coin = trade_data.coin;
394                    let coin_map = coin_to_instrument_id.load();
395
396                    if let Some(&instrument_id) = coin_map.get(&coin) {
397                        let instruments_map = instruments.load();
398                        if let Some(instrument) = instruments_map.get(&instrument_id) {
399                            let ts_init = clock.get_time_ns();
400
401                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
402                                Ok(trade_tick) => {
403                                    if let Err(e) =
404                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
405                                    {
406                                        log::error!("Failed to send trade tick: {e}");
407                                    }
408                                }
409                                Err(e) => {
410                                    log::error!("Failed to parse trade tick for {coin}: {e}");
411                                }
412                            }
413                        }
414                    } else {
415                        log::warn!("Received trade for unknown coin: {coin}");
416                    }
417                }
418            }
419            HyperliquidWsMessage::L2Book { data } => {
420                let coin = data.coin;
421                log::debug!("Received L2 book update for coin: {coin}");
422
423                let coin_map = coin_to_instrument_id.load();
424                if let Some(&instrument_id) = coin_map.get(&data.coin) {
425                    let instruments_map = instruments.load();
426                    if let Some(instrument) = instruments_map.get(&instrument_id) {
427                        let ts_init = clock.get_time_ns();
428
429                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
430                            Ok(deltas) => {
431                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
432                                    OrderBookDeltas_API::new(deltas),
433                                ))) {
434                                    log::error!("Failed to send order book deltas: {e}");
435                                }
436                            }
437                            Err(e) => {
438                                log::error!(
439                                    "Failed to parse order book deltas for {}: {e}",
440                                    data.coin
441                                );
442                            }
443                        }
444                    }
445                } else {
446                    log::warn!("Received L2 book for unknown coin: {coin}");
447                }
448            }
449            HyperliquidWsMessage::Candle { data } => {
450                let coin = &data.s;
451                let interval = &data.i;
452                log::debug!("Received candle for {coin}:{interval}");
453
454                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
455                    let coin = Ustr::from(&data.s);
456                    let coin_map = coin_to_instrument_id.load();
457
458                    if let Some(&instrument_id) = coin_map.get(&coin) {
459                        let instruments_map = instruments.load();
460                        if let Some(instrument) = instruments_map.get(&instrument_id) {
461                            let ts_init = clock.get_time_ns();
462
463                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
464                                Ok(bar) => {
465                                    if let Err(e) =
466                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
467                                    {
468                                        log::error!("Failed to send bar data: {e}");
469                                    }
470                                }
471                                Err(e) => {
472                                    log::error!("Failed to parse candle for {coin}: {e}");
473                                }
474                            }
475                        }
476                    } else {
477                        log::warn!("Received candle for unknown coin: {coin}");
478                    }
479                } else {
480                    log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
481                }
482            }
483            _ => {
484                log::trace!("Received unhandled WebSocket message: {msg:?}");
485            }
486        }
487    }
488}
489
490impl HyperliquidDataClient {
491    #[allow(dead_code)]
492    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
493        if let Err(e) = sender.send(DataEvent::Data(data)) {
494            log::error!("Failed to emit data event: {e}");
495        }
496    }
497}
498
499#[async_trait::async_trait(?Send)]
500impl DataClient for HyperliquidDataClient {
501    fn client_id(&self) -> ClientId {
502        self.client_id
503    }
504
505    fn venue(&self) -> Option<Venue> {
506        Some(self.venue())
507    }
508
509    fn start(&mut self) -> anyhow::Result<()> {
510        log::info!(
511            "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
512            self.client_id,
513            self.config.environment,
514            self.config.proxy_url,
515        );
516        Ok(())
517    }
518
519    fn stop(&mut self) -> anyhow::Result<()> {
520        log::info!("Stopping Hyperliquid data client {}", self.client_id);
521        self.cancellation_token.cancel();
522        self.is_connected.store(false, Ordering::Relaxed);
523        Ok(())
524    }
525
526    fn reset(&mut self) -> anyhow::Result<()> {
527        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
528        self.is_connected.store(false, Ordering::Relaxed);
529        self.cancellation_token = CancellationToken::new();
530        self.abort_pending_tasks();
531
532        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
533            handle.abort();
534        }
535        Ok(())
536    }
537
538    fn dispose(&mut self) -> anyhow::Result<()> {
539        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
540        self.stop()
541    }
542
543    fn is_connected(&self) -> bool {
544        self.is_connected.load(Ordering::Acquire)
545    }
546
547    fn is_disconnected(&self) -> bool {
548        !self.is_connected()
549    }
550
551    async fn connect(&mut self) -> anyhow::Result<()> {
552        if self.is_connected() {
553            return Ok(());
554        }
555
556        if self.cancellation_token.is_cancelled() {
557            self.cancellation_token = CancellationToken::new();
558        }
559
560        register_hyperliquid_custom_data();
561
562        let instruments = self
563            .bootstrap_instruments()
564            .await
565            .context("failed to bootstrap instruments")?;
566
567        for instrument in instruments {
568            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
569                log::warn!("Failed to send instrument: {e}");
570            }
571        }
572
573        self.spawn_ws()
574            .await
575            .context("failed to spawn WebSocket client")?;
576
577        self.is_connected.store(true, Ordering::Relaxed);
578        log::info!("Connected: client_id={}", self.client_id);
579
580        Ok(())
581    }
582
583    async fn disconnect(&mut self) -> anyhow::Result<()> {
584        if !self.is_connected() {
585            return Ok(());
586        }
587
588        self.cancellation_token.cancel();
589
590        let ws_stream_handle = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take();
591        if let Some(handle) = ws_stream_handle
592            && let Err(e) = handle.await
593        {
594            log::error!("Error waiting for WebSocket stream task: {e}");
595        }
596
597        self.abort_pending_tasks();
598
599        if let Err(e) = self.ws_client.disconnect().await {
600            log::error!("Error disconnecting WebSocket client: {e}");
601        }
602
603        self.instruments.store(AHashMap::new());
604
605        self.is_connected.store(false, Ordering::Relaxed);
606        log::info!("Disconnected: client_id={}", self.client_id);
607
608        Ok(())
609    }
610
611    fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
612        let data_type = cmd.data_type.type_name();
613
614        if data_type == "HyperliquidAllMids" {
615            let ws = self.ws_client.clone();
616            let dex = cmd
617                .data_type
618                .metadata()
619                .as_ref()
620                .and_then(|m| m.get("dex"))
621                .and_then(|v| v.as_str())
622                .map(str::trim)
623                .filter(|value| !value.is_empty())
624                .map(ToString::to_string);
625
626            log::debug!("Subscribing to all mids (dex: {:?})", dex.as_deref());
627
628            self.spawn_task("subscribe_all_mids", async move {
629                ws.subscribe_all_mids_with_dex(dex.as_deref()).await
630            });
631
632            return Ok(());
633        }
634
635        log::warn!("Unsupported custom data subscription: {data_type}");
636        Ok(())
637    }
638
639    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
640        let data_type = cmd.data_type.type_name();
641
642        if data_type == "HyperliquidAllMids" {
643            let ws = self.ws_client.clone();
644            let dex = cmd
645                .data_type
646                .metadata()
647                .as_ref()
648                .and_then(|m| m.get("dex"))
649                .and_then(|v| v.as_str())
650                .map(str::trim)
651                .filter(|value| !value.is_empty())
652                .map(ToString::to_string);
653
654            log::debug!("Unsubscribing from all mids (dex: {:?})", dex.as_deref());
655
656            self.spawn_task("unsubscribe_all_mids", async move {
657                ws.unsubscribe_all_mids_with_dex(dex.as_deref()).await
658            });
659
660            return Ok(());
661        }
662
663        log::warn!("Unsupported custom data unsubscription: {data_type}");
664        Ok(())
665    }
666
667    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
668        let instruments = self.instruments.load();
669        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
670            if let Err(e) = self
671                .data_sender
672                .send(DataEvent::Instrument(instrument.clone()))
673            {
674                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
675            }
676        } else {
677            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
678        }
679        Ok(())
680    }
681
682    fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
683        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
684
685        if subscription.book_type != BookType::L2_MBP {
686            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
687        }
688
689        let ws = self.ws_client.clone();
690        let instrument_id = subscription.instrument_id;
691        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
692
693        self.spawn_task("subscribe_book_deltas", async move {
694            ws.subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
695                .await
696        });
697
698        Ok(())
699    }
700
701    fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
702        log::debug!(
703            "Subscribing to book depth10: {}",
704            subscription.instrument_id
705        );
706
707        if subscription.book_type != BookType::L2_MBP {
708            anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
709        }
710
711        let ws = self.ws_client.clone();
712        let instrument_id = subscription.instrument_id;
713        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
714
715        self.spawn_task("subscribe_book_depth10", async move {
716            ws.subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
717                .await
718        });
719
720        Ok(())
721    }
722
723    fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
724        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
725
726        let ws = self.ws_client.clone();
727        let instrument_id = subscription.instrument_id;
728
729        self.spawn_task("subscribe_quotes", async move {
730            ws.subscribe_quotes(instrument_id).await
731        });
732
733        Ok(())
734    }
735
736    fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
737        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
738
739        let ws = self.ws_client.clone();
740        let instrument_id = subscription.instrument_id;
741
742        self.spawn_task("subscribe_trades", async move {
743            ws.subscribe_trades(instrument_id).await
744        });
745
746        Ok(())
747    }
748
749    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
750        let ws = self.ws_client.clone();
751        let instrument_id = cmd.instrument_id;
752
753        self.spawn_task("subscribe_mark_prices", async move {
754            ws.subscribe_mark_prices(instrument_id).await
755        });
756
757        Ok(())
758    }
759
760    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
761        let ws = self.ws_client.clone();
762        let instrument_id = cmd.instrument_id;
763
764        self.spawn_task("subscribe_index_prices", async move {
765            ws.subscribe_index_prices(instrument_id).await
766        });
767
768        Ok(())
769    }
770
771    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
772        let ws = self.ws_client.clone();
773        let instrument_id = cmd.instrument_id;
774
775        self.spawn_task("subscribe_funding_rates", async move {
776            ws.subscribe_funding_rates(instrument_id).await
777        });
778
779        Ok(())
780    }
781
782    fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
783        log::debug!("Subscribing to bars: {}", subscription.bar_type);
784
785        let instrument_id = subscription.bar_type.instrument_id();
786        if !self.instruments.contains_key(&instrument_id) {
787            anyhow::bail!("Instrument {instrument_id} not found");
788        }
789
790        let bar_type = subscription.bar_type;
791        let ws = self.ws_client.clone();
792
793        self.spawn_task("subscribe_bars", async move {
794            ws.subscribe_bars(bar_type).await
795        });
796
797        Ok(())
798    }
799
800    fn unsubscribe_book_deltas(
801        &mut self,
802        unsubscription: &UnsubscribeBookDeltas,
803    ) -> anyhow::Result<()> {
804        log::debug!(
805            "Unsubscribing from book deltas: {}",
806            unsubscription.instrument_id
807        );
808
809        let ws = self.ws_client.clone();
810        let instrument_id = unsubscription.instrument_id;
811
812        self.spawn_task("unsubscribe_book_deltas", async move {
813            ws.unsubscribe_book(instrument_id).await
814        });
815
816        Ok(())
817    }
818
819    fn unsubscribe_book_depth10(
820        &mut self,
821        unsubscription: &UnsubscribeBookDepth10,
822    ) -> anyhow::Result<()> {
823        log::debug!(
824            "Unsubscribing from book depth10: {}",
825            unsubscription.instrument_id
826        );
827
828        let ws = self.ws_client.clone();
829        let instrument_id = unsubscription.instrument_id;
830
831        self.spawn_task("unsubscribe_book_depth10", async move {
832            ws.unsubscribe_book_depth10(instrument_id).await
833        });
834
835        Ok(())
836    }
837
838    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
839        log::debug!(
840            "Unsubscribing from quotes: {}",
841            unsubscription.instrument_id
842        );
843
844        let ws = self.ws_client.clone();
845        let instrument_id = unsubscription.instrument_id;
846
847        self.spawn_task("unsubscribe_quotes", async move {
848            ws.unsubscribe_quotes(instrument_id).await
849        });
850
851        Ok(())
852    }
853
854    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
855        log::debug!(
856            "Unsubscribing from trades: {}",
857            unsubscription.instrument_id
858        );
859
860        let ws = self.ws_client.clone();
861        let instrument_id = unsubscription.instrument_id;
862
863        self.spawn_task("unsubscribe_trades", async move {
864            ws.unsubscribe_trades(instrument_id).await
865        });
866
867        Ok(())
868    }
869
870    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
871        let ws = self.ws_client.clone();
872        let instrument_id = cmd.instrument_id;
873
874        self.spawn_task("unsubscribe_mark_prices", async move {
875            ws.unsubscribe_mark_prices(instrument_id).await
876        });
877
878        Ok(())
879    }
880
881    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
882        let ws = self.ws_client.clone();
883        let instrument_id = cmd.instrument_id;
884
885        self.spawn_task("unsubscribe_index_prices", async move {
886            ws.unsubscribe_index_prices(instrument_id).await
887        });
888
889        Ok(())
890    }
891
892    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
893        let ws = self.ws_client.clone();
894        let instrument_id = cmd.instrument_id;
895
896        self.spawn_task("unsubscribe_funding_rates", async move {
897            ws.unsubscribe_funding_rates(instrument_id).await
898        });
899
900        Ok(())
901    }
902
903    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
904        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
905
906        let bar_type = unsubscription.bar_type;
907        let ws = self.ws_client.clone();
908
909        self.spawn_task("unsubscribe_bars", async move {
910            ws.unsubscribe_bars(bar_type).await
911        });
912
913        Ok(())
914    }
915
916    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
917        log::debug!("Requesting all instruments");
918
919        let http = self.http_client.clone();
920        let sender = self.data_sender.clone();
921        let instruments_cache = self.instruments.clone();
922        let coin_map = self.coin_to_instrument_id.clone();
923        let ws_instruments = self.ws_client.instruments_cache();
924        let request_id = request.request_id;
925        let client_id = request.client_id.unwrap_or(self.client_id);
926        let venue = self.venue();
927        let start_nanos = datetime_to_unix_nanos(request.start);
928        let end_nanos = datetime_to_unix_nanos(request.end);
929        let params = request.params;
930        let clock = self.clock;
931
932        self.spawn_task("request_instruments", async move {
933            let instruments = http
934                .request_instruments()
935                .await
936                .context("failed to fetch instruments from Hyperliquid")?;
937
938            instruments_cache.rcu(|instruments_map| {
939                coin_map.rcu(|coin_to_id| {
940                    for instrument in &instruments {
941                        let instrument_id = instrument.id();
942                        instruments_map.insert(instrument_id, instrument.clone());
943                        let coin = instrument.raw_symbol().inner();
944                        coin_to_id.insert(coin, instrument_id);
945                        ws_instruments.insert(coin, instrument.clone());
946                    }
947                });
948            });
949
950            let response = DataResponse::Instruments(InstrumentsResponse::new(
951                request_id,
952                client_id,
953                venue,
954                instruments,
955                start_nanos,
956                end_nanos,
957                clock.get_time_ns(),
958                params,
959            ));
960
961            if let Err(e) = sender.send(DataEvent::Response(response)) {
962                log::error!("Failed to send instruments response: {e}");
963            }
964            Ok(())
965        });
966
967        Ok(())
968    }
969
970    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
971        log::debug!("Requesting instrument: {}", request.instrument_id);
972
973        let http = self.http_client.clone();
974        let sender = self.data_sender.clone();
975        let instruments_cache = self.instruments.clone();
976        let coin_map = self.coin_to_instrument_id.clone();
977        let ws_instruments = self.ws_client.instruments_cache();
978        let instrument_id = request.instrument_id;
979        let request_id = request.request_id;
980        let client_id = request.client_id.unwrap_or(self.client_id);
981        let start_nanos = datetime_to_unix_nanos(request.start);
982        let end_nanos = datetime_to_unix_nanos(request.end);
983        let params = request.params;
984        let clock = self.clock;
985
986        self.spawn_task("request_instrument", async move {
987            let all_instruments = http
988                .request_instruments()
989                .await
990                .context("failed to fetch instruments from Hyperliquid")?;
991
992            instruments_cache.rcu(|instruments_map| {
993                coin_map.rcu(|coin_to_id| {
994                    for instrument in &all_instruments {
995                        let id = instrument.id();
996                        instruments_map.insert(id, instrument.clone());
997                        let coin = instrument.raw_symbol().inner();
998                        coin_to_id.insert(coin, id);
999                        ws_instruments.insert(coin, instrument.clone());
1000                    }
1001                });
1002            });
1003
1004            if let Some(instrument) = all_instruments
1005                .into_iter()
1006                .find(|i| i.id() == instrument_id)
1007            {
1008                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1009                    request_id,
1010                    client_id,
1011                    instrument.id(),
1012                    instrument,
1013                    start_nanos,
1014                    end_nanos,
1015                    clock.get_time_ns(),
1016                    params,
1017                )));
1018
1019                if let Err(e) = sender.send(DataEvent::Response(response)) {
1020                    log::error!("Failed to send instrument response: {e}");
1021                }
1022            } else {
1023                log::error!("Instrument not found: {instrument_id}");
1024            }
1025            Ok(())
1026        });
1027
1028        Ok(())
1029    }
1030
1031    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1032        log::debug!("Requesting bars for {}", request.bar_type);
1033
1034        let http = self.http_client.clone();
1035        let sender = self.data_sender.clone();
1036        let bar_type = request.bar_type;
1037        let start = request.start;
1038        let end = request.end;
1039        let limit = request.limit.map(|n| n.get() as u32);
1040        let request_id = request.request_id;
1041        let client_id = request.client_id.unwrap_or(self.client_id);
1042        let params = request.params;
1043        let clock = self.clock;
1044        let start_nanos = datetime_to_unix_nanos(start);
1045        let end_nanos = datetime_to_unix_nanos(end);
1046        let instruments = Arc::clone(&self.instruments);
1047
1048        self.spawn_task("request_bars", async move {
1049            let bars = request_bars_from_http(http, bar_type, start, end, limit, instruments)
1050                .await
1051                .context("bar request failed")?;
1052
1053            let response = DataResponse::Bars(BarsResponse::new(
1054                request_id,
1055                client_id,
1056                bar_type,
1057                bars,
1058                start_nanos,
1059                end_nanos,
1060                clock.get_time_ns(),
1061                params,
1062            ));
1063
1064            if let Err(e) = sender.send(DataEvent::Response(response)) {
1065                log::error!("Failed to send bars response: {e}");
1066            }
1067            Ok(())
1068        });
1069
1070        Ok(())
1071    }
1072
1073    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1074        // Hyperliquid has no public trade-tape REST endpoint; real-time
1075        // trades are available via the `trades` WebSocket channel and
1076        // account-scoped fills via `userFills`/`userFillsByTime`, but
1077        // market-wide trade history cannot be served.
1078        anyhow::bail!(
1079            "Historical trade requests are not supported by Hyperliquid for {}; \
1080             subscribe to trades via WebSocket for live trade ticks",
1081            request.instrument_id,
1082        )
1083    }
1084
1085    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1086        let instrument_id = request.instrument_id;
1087        log::debug!("Requesting funding rates for {instrument_id}");
1088
1089        let instruments = self.instruments.load();
1090        let instrument = instruments
1091            .get(&instrument_id)
1092            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1093
1094        if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1095            anyhow::bail!("Funding rates are only available for perpetual instruments");
1096        }
1097
1098        let coin = instrument.raw_symbol().to_string();
1099        let http = self.http_client.clone();
1100        let sender = self.data_sender.clone();
1101        let client_id = request.client_id.unwrap_or(self.client_id);
1102        let request_id = request.request_id;
1103        let params = request.params;
1104        let clock = self.clock;
1105        let limit = request.limit.map(|n| n.get());
1106        let start_dt = request.start;
1107        let end_dt = request.end;
1108        let start_nanos = datetime_to_unix_nanos(start_dt);
1109        let end_nanos = datetime_to_unix_nanos(end_dt);
1110
1111        let now_ms = Utc::now().timestamp_millis() as u64;
1112
1113        // Hyperliquid requires a startTime; default to a 7-day lookback when none given
1114        let default_lookback_ms: u64 = 7 * 86_400_000;
1115        let start_ms = match start_dt {
1116            Some(dt) => dt.timestamp_millis().max(0) as u64,
1117            None => now_ms.saturating_sub(default_lookback_ms),
1118        };
1119        let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1120
1121        self.spawn_task("request_funding_rates", async move {
1122            let entries = http
1123                .info_funding_history(&coin, start_ms, end_ms)
1124                .await
1125                .with_context(|| format!("funding rates request failed for {instrument_id}"))?;
1126
1127            let mut funding_rates: Vec<FundingRateUpdate> = entries
1128                .iter()
1129                .filter_map(
1130                    |entry| match funding_entry_to_update(entry, instrument_id) {
1131                        Ok(update) => Some(update),
1132                        Err(e) => {
1133                            log::warn!("Skipping funding history entry for {instrument_id}: {e}",);
1134                            None
1135                        }
1136                    },
1137                )
1138                .collect();
1139
1140            if let Some(limit) = limit
1141                && funding_rates.len() > limit
1142            {
1143                funding_rates.truncate(limit);
1144            }
1145
1146            log::debug!(
1147                "Fetched {} funding rates for {instrument_id}",
1148                funding_rates.len(),
1149            );
1150
1151            let response = DataResponse::FundingRates(FundingRatesResponse::new(
1152                request_id,
1153                client_id,
1154                instrument_id,
1155                funding_rates,
1156                start_nanos,
1157                end_nanos,
1158                clock.get_time_ns(),
1159                params,
1160            ));
1161
1162            if let Err(e) = sender.send(DataEvent::Response(response)) {
1163                log::error!("Failed to send funding rates response: {e}");
1164            }
1165            Ok(())
1166        });
1167
1168        Ok(())
1169    }
1170
1171    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1172        let instrument_id = request.instrument_id;
1173        let instruments = self.instruments.load();
1174        let instrument = instruments
1175            .get(&instrument_id)
1176            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1177
1178        let raw_symbol = instrument.raw_symbol().to_string();
1179        let price_precision = instrument.price_precision();
1180        let size_precision = instrument.size_precision();
1181        let depth = request.depth.map(|d| d.get());
1182
1183        let http = self.http_client.clone();
1184        let sender = self.data_sender.clone();
1185        let client_id = request.client_id.unwrap_or(self.client_id);
1186        let request_id = request.request_id;
1187        let params = request.params;
1188        let clock = self.clock;
1189
1190        self.spawn_task("request_book_snapshot", async move {
1191            let l2_book = http
1192                .info_l2_book(&raw_symbol)
1193                .await
1194                .with_context(|| format!("book snapshot request failed for {instrument_id}"))?;
1195
1196            let book = parse_l2_book_snapshot(
1197                &l2_book,
1198                instrument_id,
1199                price_precision,
1200                size_precision,
1201                depth,
1202            );
1203
1204            let response = DataResponse::Book(BookResponse::new(
1205                request_id,
1206                client_id,
1207                instrument_id,
1208                book,
1209                None,
1210                None,
1211                clock.get_time_ns(),
1212                params,
1213            ));
1214
1215            if let Err(e) = sender.send(DataEvent::Response(response)) {
1216                log::error!("Failed to send book snapshot response: {e}");
1217            }
1218            Ok(())
1219        });
1220
1221        Ok(())
1222    }
1223}
1224
1225// Levels with unparsable px/sz or non-positive size are skipped rather than
1226// erroring; the snapshot's `time` field (ms) becomes `ts_event` after the
1227// ms->ns conversion.
1228pub(crate) fn parse_l2_book_snapshot(
1229    l2_book: &HyperliquidL2Book,
1230    instrument_id: InstrumentId,
1231    price_precision: u8,
1232    size_precision: u8,
1233    depth: Option<usize>,
1234) -> OrderBook {
1235    let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1236    let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1237
1238    let all_bids = l2_book
1239        .levels
1240        .first()
1241        .map_or([].as_slice(), |v| v.as_slice());
1242    let all_asks = l2_book
1243        .levels
1244        .get(1)
1245        .map_or([].as_slice(), |v| v.as_slice());
1246
1247    let bids = match depth {
1248        Some(d) if d < all_bids.len() => &all_bids[..d],
1249        _ => all_bids,
1250    };
1251    let asks = match depth {
1252        Some(d) if d < all_asks.len() => &all_asks[..d],
1253        _ => all_asks,
1254    };
1255
1256    for (i, level) in bids.iter().enumerate() {
1257        let Ok(px) = level.px.parse::<f64>() else {
1258            continue;
1259        };
1260        let Ok(sz) = level.sz.parse::<f64>() else {
1261            continue;
1262        };
1263
1264        if sz > 0.0 {
1265            let price = Price::new(px, price_precision);
1266            let size = Quantity::new(sz, size_precision);
1267            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1268            book.add(order, 0, i as u64, ts_event);
1269        }
1270    }
1271
1272    let bids_len = bids.len();
1273
1274    for (i, level) in asks.iter().enumerate() {
1275        let Ok(px) = level.px.parse::<f64>() else {
1276            continue;
1277        };
1278        let Ok(sz) = level.sz.parse::<f64>() else {
1279            continue;
1280        };
1281
1282        if sz > 0.0 {
1283            let price = Price::new(px, price_precision);
1284            let size = Quantity::new(sz, size_precision);
1285            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1286            book.add(order, 0, (bids_len + i) as u64, ts_event);
1287        }
1288    }
1289
1290    log::info!(
1291        "Built order book for {instrument_id} with {} bids and {} asks",
1292        bids.len(),
1293        asks.len(),
1294    );
1295
1296    book
1297}
1298
1299// Reads optional `nSigFigs` / `mantissa` L2 precision controls from
1300// `subscribe_params`; bails on non-positive integer values.
1301pub(crate) fn parse_book_precision_params(
1302    params: Option<&Params>,
1303) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1304    let Some(params) = params else {
1305        return Ok((None, None));
1306    };
1307
1308    let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1309        match params.get(key) {
1310            None => Ok(None),
1311            Some(v) => v
1312                .as_u64()
1313                .and_then(|n| u32::try_from(n).ok())
1314                .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1315                .map(Some),
1316        }
1317    };
1318
1319    Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1320}
1321
1322// Hyperliquid funds perpetuals hourly, so `interval` is fixed at 60 mins;
1323// `time` from the venue marks the end of the funding interval in ms.
1324pub(crate) fn funding_entry_to_update(
1325    entry: &HyperliquidFundingHistoryEntry,
1326    instrument_id: InstrumentId,
1327) -> anyhow::Result<FundingRateUpdate> {
1328    let rate: Decimal = entry
1329        .funding_rate
1330        .parse()
1331        .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1332    let ts = UnixNanos::from(entry.time * 1_000_000);
1333    Ok(FundingRateUpdate::new(
1334        instrument_id,
1335        rate,
1336        Some(60),
1337        None,
1338        ts,
1339        ts,
1340    ))
1341}
1342
1343pub(crate) fn candle_to_bar(
1344    candle: &HyperliquidCandle,
1345    bar_type: BarType,
1346    price_precision: u8,
1347    size_precision: u8,
1348) -> anyhow::Result<Bar> {
1349    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1350    let ts_event = ts_init;
1351
1352    let open = candle.open.parse::<f64>().context("parse open price")?;
1353    let high = candle.high.parse::<f64>().context("parse high price")?;
1354    let low = candle.low.parse::<f64>().context("parse low price")?;
1355    let close = candle.close.parse::<f64>().context("parse close price")?;
1356    let volume = candle.volume.parse::<f64>().context("parse volume")?;
1357
1358    Ok(Bar::new(
1359        bar_type,
1360        Price::new(open, price_precision),
1361        Price::new(high, price_precision),
1362        Price::new(low, price_precision),
1363        Price::new(close, price_precision),
1364        Quantity::new(volume, size_precision),
1365        ts_event,
1366        ts_init,
1367    ))
1368}
1369
1370/// Request bars from HTTP API.
1371async fn request_bars_from_http(
1372    http_client: HyperliquidHttpClient,
1373    bar_type: BarType,
1374    start: Option<DateTime<Utc>>,
1375    end: Option<DateTime<Utc>>,
1376    limit: Option<u32>,
1377    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1378) -> anyhow::Result<Vec<Bar>> {
1379    // Get instrument details for precision
1380    let instrument_id = bar_type.instrument_id();
1381    let instrument = instruments
1382        .load()
1383        .get(&instrument_id)
1384        .cloned()
1385        .context("instrument not found in cache")?;
1386
1387    let price_precision = instrument.price_precision();
1388    let size_precision = instrument.size_precision();
1389    let raw_symbol = instrument.raw_symbol();
1390    let coin = raw_symbol.as_str();
1391
1392    let interval = bar_type_to_interval(&bar_type)?;
1393
1394    // Hyperliquid uses millisecond timestamps
1395    let now = Utc::now();
1396    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1397    let start_time = if let Some(start) = start {
1398        start.timestamp_millis() as u64
1399    } else {
1400        // Default to 1000 bars before end_time
1401        let spec = bar_type.spec();
1402        let step_ms = match spec.aggregation {
1403            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1404            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1405            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1406            _ => 60_000,
1407        };
1408        end_time.saturating_sub(1000 * step_ms)
1409    };
1410
1411    let candles = http_client
1412        .info_candle_snapshot(coin, interval, start_time, end_time)
1413        .await
1414        .context("failed to fetch candle snapshot from Hyperliquid")?;
1415
1416    let mut bars: Vec<Bar> = candles
1417        .iter()
1418        .filter_map(|candle| {
1419            candle_to_bar(candle, bar_type, price_precision, size_precision)
1420                .map_err(|e| {
1421                    log::warn!("Failed to convert candle to bar: {e}");
1422                    e
1423                })
1424                .ok()
1425        })
1426        .collect();
1427
1428    if let Some(limit) = limit
1429        && bars.len() > limit as usize
1430    {
1431        bars = bars.into_iter().take(limit as usize).collect();
1432    }
1433
1434    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1435    Ok(bars)
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440    use rstest::rstest;
1441    use rust_decimal_macros::dec;
1442    use ustr::Ustr;
1443
1444    use super::*;
1445    use crate::common::testing::load_test_data;
1446
1447    fn btc_perp_id() -> InstrumentId {
1448        InstrumentId::from("BTC-PERP.HYPERLIQUID")
1449    }
1450
1451    #[rstest]
1452    fn test_funding_entry_to_update_parses_positive_rate() {
1453        let entry = HyperliquidFundingHistoryEntry {
1454            coin: Ustr::from("BTC"),
1455            funding_rate: "0.0000125".to_string(),
1456            premium: Some("0.00029005".to_string()),
1457            time: 1769908800000,
1458        };
1459        let instrument_id = btc_perp_id();
1460
1461        let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1462
1463        assert_eq!(update.instrument_id, instrument_id);
1464        assert_eq!(update.rate, dec!(0.0000125));
1465        assert_eq!(update.interval, Some(60));
1466        assert!(update.next_funding_ns.is_none());
1467        assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1468        assert_eq!(update.ts_init, update.ts_event);
1469    }
1470
1471    #[rstest]
1472    fn test_funding_entry_to_update_handles_negative_rate() {
1473        let entry = HyperliquidFundingHistoryEntry {
1474            coin: Ustr::from("BTC"),
1475            funding_rate: "-0.0000081".to_string(),
1476            premium: None,
1477            time: 1769912400000,
1478        };
1479        let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1480        assert_eq!(update.rate, dec!(-0.0000081));
1481    }
1482
1483    #[rstest]
1484    fn test_funding_entry_to_update_rejects_invalid_rate() {
1485        let entry = HyperliquidFundingHistoryEntry {
1486            coin: Ustr::from("BTC"),
1487            funding_rate: "not-a-number".to_string(),
1488            premium: None,
1489            time: 1769912400000,
1490        };
1491        let result = funding_entry_to_update(&entry, btc_perp_id());
1492        assert!(result.is_err());
1493    }
1494
1495    #[rstest]
1496    fn test_parse_book_precision_params_none() {
1497        let (n, m) = parse_book_precision_params(None).unwrap();
1498        assert_eq!(n, None);
1499        assert_eq!(m, None);
1500    }
1501
1502    fn make_params(json: serde_json::Value) -> Params {
1503        serde_json::from_value(json).expect("valid params payload")
1504    }
1505
1506    #[rstest]
1507    fn test_parse_book_precision_params_only_n_sig_figs() {
1508        let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1509        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1510        assert_eq!(n, Some(4));
1511        assert_eq!(m, None);
1512    }
1513
1514    #[rstest]
1515    fn test_parse_book_precision_params_both() {
1516        let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1517        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1518        assert_eq!(n, Some(5));
1519        assert_eq!(m, Some(2));
1520    }
1521
1522    #[rstest]
1523    fn test_parse_book_precision_params_rejects_negative() {
1524        let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1525        let err = parse_book_precision_params(Some(&params)).unwrap_err();
1526        assert!(err.to_string().contains("n_sig_figs"));
1527    }
1528
1529    #[rstest]
1530    fn test_funding_history_fixture_parses() {
1531        let entries: Vec<HyperliquidFundingHistoryEntry> =
1532            load_test_data("http_funding_history.json");
1533        assert_eq!(entries.len(), 3);
1534        assert_eq!(entries[0].coin.as_str(), "BTC");
1535        assert_eq!(entries[0].funding_rate, "0.0000125");
1536        assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1537        assert!(entries[2].premium.is_none());
1538
1539        let updates: Vec<FundingRateUpdate> = entries
1540            .iter()
1541            .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1542            .collect();
1543        assert_eq!(updates.len(), 3);
1544        assert_eq!(updates[0].rate, dec!(0.0000125));
1545        assert_eq!(updates[1].rate, dec!(-0.0000081));
1546        assert_eq!(updates[2].rate, dec!(0.0000033));
1547    }
1548
1549    fn level(px: &str, sz: &str) -> crate::http::models::HyperliquidLevel {
1550        crate::http::models::HyperliquidLevel {
1551            px: px.to_string(),
1552            sz: sz.to_string(),
1553        }
1554    }
1555
1556    fn sample_l2_book() -> HyperliquidL2Book {
1557        HyperliquidL2Book {
1558            coin: Ustr::from("BTC"),
1559            levels: vec![
1560                vec![
1561                    level("98450.50", "2.5"),
1562                    level("98449.00", "1.2"),
1563                    level("98448.00", "0.8"),
1564                ],
1565                vec![
1566                    level("98451.00", "1.5"),
1567                    level("98452.00", "2.0"),
1568                    level("98453.00", "0.5"),
1569                ],
1570            ],
1571            time: 1769908800000,
1572        }
1573    }
1574
1575    #[rstest]
1576    fn test_parse_l2_book_snapshot_populates_both_sides() {
1577        let book_data = sample_l2_book();
1578        let instrument_id = btc_perp_id();
1579        let book = parse_l2_book_snapshot(&book_data, instrument_id, 2, 4, None);
1580
1581        assert_eq!(book.instrument_id, instrument_id);
1582        assert_eq!(book.book_type, BookType::L2_MBP);
1583        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1584        assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1585        assert_eq!(book.best_bid_size(), Some(Quantity::new(2.5, 4)));
1586        assert_eq!(book.best_ask_size(), Some(Quantity::new(1.5, 4)));
1587        assert_eq!(book.update_count, 6);
1588    }
1589
1590    #[rstest]
1591    fn test_parse_l2_book_snapshot_truncates_to_depth() {
1592        let book_data = sample_l2_book();
1593        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, Some(1));
1594
1595        // depth=1 keeps the top of book on each side, drops the rest.
1596        assert_eq!(book.update_count, 2);
1597        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1598        assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1599    }
1600
1601    #[rstest]
1602    fn test_parse_l2_book_snapshot_uses_venue_time_as_ts_event() {
1603        let book_data = sample_l2_book();
1604        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1605        let expected_ts = UnixNanos::from(1769908800000_u64 * 1_000_000);
1606
1607        // ts_last reflects the last applied delta; every added order
1608        // carries the venue time after the ms->ns conversion.
1609        assert_eq!(book.ts_last, expected_ts);
1610    }
1611
1612    #[rstest]
1613    fn test_parse_l2_book_snapshot_skips_non_positive_size() {
1614        let book_data = HyperliquidL2Book {
1615            coin: Ustr::from("BTC"),
1616            levels: vec![
1617                vec![level("98450.50", "2.5"), level("98449.00", "0")],
1618                vec![level("98451.00", "0"), level("98452.00", "1.5")],
1619            ],
1620            time: 1769908800000,
1621        };
1622        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1623
1624        assert_eq!(book.update_count, 2, "zero-sized levels must be skipped");
1625        assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1626        assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1627    }
1628
1629    #[rstest]
1630    fn test_parse_l2_book_snapshot_skips_unparsable_levels() {
1631        let book_data = HyperliquidL2Book {
1632            coin: Ustr::from("BTC"),
1633            levels: vec![
1634                vec![level("not-a-number", "1.0"), level("98449.00", "1.2")],
1635                vec![level("98451.00", "garbage"), level("98452.00", "1.5")],
1636            ],
1637            time: 1769908800000,
1638        };
1639        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1640
1641        // Each side has one parseable level remaining.
1642        assert_eq!(book.update_count, 2);
1643        assert_eq!(book.best_bid_price(), Some(Price::new(98449.00, 2)));
1644        assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1645    }
1646
1647    #[rstest]
1648    fn test_parse_l2_book_snapshot_empty_levels_yields_empty_book() {
1649        let book_data = HyperliquidL2Book {
1650            coin: Ustr::from("BTC"),
1651            levels: vec![],
1652            time: 1769908800000,
1653        };
1654        let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1655
1656        assert_eq!(book.update_count, 0);
1657        assert!(book.best_bid_price().is_none());
1658        assert!(book.best_ask_price().is_none());
1659    }
1660}