Skip to main content

nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    clients::DataClient,
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent,
29        data::{
30            BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
31            RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
32            SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
33            SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
34            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
35            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38};
39use nautilus_core::{
40    AtomicMap, UnixNanos,
41    datetime::datetime_to_unix_nanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45    data::{Bar, BarType, BookOrder, Data, OrderBookDeltas_API},
46    enums::{BarAggregation, BookType, OrderSide},
47    identifiers::{ClientId, InstrumentId, Venue},
48    instruments::{Instrument, InstrumentAny},
49    orderbook::OrderBook,
50    types::{Price, Quantity},
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57    common::{
58        consts::HYPERLIQUID_VENUE,
59        credential::{Secrets, credential_env_vars},
60        parse::bar_type_to_interval,
61    },
62    config::HyperliquidDataClientConfig,
63    http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
64    websocket::{
65        client::HyperliquidWebSocketClient,
66        messages::{HyperliquidWsMessage, NautilusWsMessage},
67        parse::{
68            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
69        },
70    },
71};
72
73#[derive(Debug)]
74pub struct HyperliquidDataClient {
75    client_id: ClientId,
76    #[allow(dead_code)]
77    config: HyperliquidDataClientConfig,
78    http_client: HyperliquidHttpClient,
79    ws_client: HyperliquidWebSocketClient,
80    is_connected: AtomicBool,
81    cancellation_token: CancellationToken,
82    tasks: Vec<JoinHandle<()>>,
83    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
84    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
85    // Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
86    coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
87    clock: &'static AtomicTime,
88    #[allow(dead_code)]
89    instrument_refresh_active: bool,
90}
91
92impl HyperliquidDataClient {
93    /// Creates a new [`HyperliquidDataClient`] instance.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the HTTP client fails to initialize.
98    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
99        let clock = get_atomic_clock_realtime();
100        let data_sender = get_data_event_sender();
101
102        // Only fall back to unauthenticated when credentials are absent,
103        // not when they're invalid (fail fast on malformed keys)
104        let (pk_var, _) = credential_env_vars(config.is_testnet);
105        let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
106
107        let mut http_client = if has_credentials {
108            let secrets = Secrets::resolve(config.private_key.as_deref(), None, config.is_testnet)?;
109            HyperliquidHttpClient::with_secrets(
110                &secrets,
111                config.http_timeout_secs,
112                config.http_proxy_url.clone(),
113            )?
114        } else {
115            HyperliquidHttpClient::new(
116                config.is_testnet,
117                config.http_timeout_secs,
118                config.http_proxy_url.clone(),
119            )?
120        };
121
122        // Apply URL overrides from config (used for testing with mock servers)
123        if let Some(url) = &config.base_url_http {
124            http_client.set_base_info_url(url.clone());
125        }
126
127        let ws_url = config.base_url_ws.clone();
128        let ws_client = HyperliquidWebSocketClient::new(ws_url, config.is_testnet, None);
129
130        Ok(Self {
131            client_id,
132            config,
133            http_client,
134            ws_client,
135            is_connected: AtomicBool::new(false),
136            cancellation_token: CancellationToken::new(),
137            tasks: Vec::new(),
138            data_sender,
139            instruments: Arc::new(AtomicMap::new()),
140            coin_to_instrument_id: Arc::new(AtomicMap::new()),
141            clock,
142            instrument_refresh_active: false,
143        })
144    }
145
146    fn venue(&self) -> Venue {
147        *HYPERLIQUID_VENUE
148    }
149
150    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
151        let instruments = self
152            .http_client
153            .request_instruments()
154            .await
155            .context("failed to fetch instruments during bootstrap")?;
156
157        self.instruments.rcu(|m| {
158            for instrument in &instruments {
159                m.insert(instrument.id(), instrument.clone());
160            }
161        });
162
163        self.coin_to_instrument_id.rcu(|m| {
164            for instrument in &instruments {
165                m.insert(instrument.raw_symbol().inner(), instrument.id());
166            }
167        });
168
169        for instrument in &instruments {
170            self.ws_client.cache_instrument(instrument.clone());
171        }
172
173        log::info!(
174            "Bootstrapped {} instruments with {} coin mappings",
175            self.instruments.len(),
176            self.coin_to_instrument_id.len()
177        );
178        Ok(instruments)
179    }
180
181    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
182        // Clone client before connecting so the clone can have out_rx set
183        let mut ws_client = self.ws_client.clone();
184
185        ws_client
186            .connect()
187            .await
188            .context("failed to connect to Hyperliquid WebSocket")?;
189
190        // Transfer task handle to original so disconnect() can await it
191        if let Some(handle) = ws_client.take_task_handle() {
192            self.ws_client.set_task_handle(handle);
193        }
194
195        let data_sender = self.data_sender.clone();
196        let cancellation_token = self.cancellation_token.clone();
197
198        let task = get_runtime().spawn(async move {
199            log::info!("Hyperliquid WebSocket consumption loop started");
200
201            loop {
202                tokio::select! {
203                    () = cancellation_token.cancelled() => {
204                        log::info!("WebSocket consumption loop cancelled");
205                        break;
206                    }
207                    msg_opt = ws_client.next_event() => {
208                        if let Some(msg) = msg_opt {
209                            match msg {
210                                NautilusWsMessage::Trades(trades) => {
211                                    for trade in trades {
212                                        if let Err(e) = data_sender
213                                            .send(DataEvent::Data(Data::Trade(trade)))
214                                        {
215                                            log::error!("Failed to send trade tick: {e}");
216                                        }
217                                    }
218                                }
219                                NautilusWsMessage::Quote(quote) => {
220                                    if let Err(e) = data_sender
221                                        .send(DataEvent::Data(Data::Quote(quote)))
222                                    {
223                                        log::error!("Failed to send quote tick: {e}");
224                                    }
225                                }
226                                NautilusWsMessage::Deltas(deltas) => {
227                                    if let Err(e) = data_sender
228                                        .send(DataEvent::Data(Data::Deltas(
229                                            OrderBookDeltas_API::new(deltas),
230                                        )))
231                                    {
232                                        log::error!("Failed to send order book deltas: {e}");
233                                    }
234                                }
235                                NautilusWsMessage::Candle(bar) => {
236                                    if let Err(e) = data_sender
237                                        .send(DataEvent::Data(Data::Bar(bar)))
238                                    {
239                                        log::error!("Failed to send bar: {e}");
240                                    }
241                                }
242                                NautilusWsMessage::MarkPrice(update) => {
243                                    if let Err(e) = data_sender
244                                        .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
245                                    {
246                                        log::error!("Failed to send mark price update: {e}");
247                                    }
248                                }
249                                NautilusWsMessage::IndexPrice(update) => {
250                                    if let Err(e) = data_sender
251                                        .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
252                                    {
253                                        log::error!("Failed to send index price update: {e}");
254                                    }
255                                }
256                                NautilusWsMessage::FundingRate(update) => {
257                                    if let Err(e) = data_sender
258                                        .send(DataEvent::FundingRate(update))
259                                    {
260                                        log::error!("Failed to send funding rate update: {e}");
261                                    }
262                                }
263                                NautilusWsMessage::Reconnected => {
264                                    log::info!("WebSocket reconnected");
265                                }
266                                NautilusWsMessage::Error(e) => {
267                                    log::error!("WebSocket error: {e}");
268                                }
269                                NautilusWsMessage::ExecutionReports(_) => {
270                                    // Handled by execution client
271                                }
272                            }
273                        } else {
274                            // Connection closed or error
275                            log::debug!("WebSocket next_event returned None, stream closed");
276                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
277                        }
278                    }
279                }
280            }
281
282            log::info!("Hyperliquid WebSocket consumption loop finished");
283        });
284
285        self.tasks.push(task);
286        log::info!("WebSocket consumption task spawned");
287
288        Ok(())
289    }
290
291    #[allow(dead_code)]
292    fn handle_ws_message(
293        msg: HyperliquidWsMessage,
294        ws_client: &HyperliquidWebSocketClient,
295        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
296        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
297        coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
298        _venue: Venue,
299        clock: &'static AtomicTime,
300    ) {
301        match msg {
302            HyperliquidWsMessage::Bbo { data } => {
303                let coin = data.coin;
304                log::debug!("Received BBO message for coin: {coin}");
305
306                let coin_map = coin_to_instrument_id.load();
307                let instrument_id = coin_map.get(&data.coin);
308
309                if let Some(&instrument_id) = instrument_id {
310                    let instruments_map = instruments.load();
311                    if let Some(instrument) = instruments_map.get(&instrument_id) {
312                        let ts_init = clock.get_time_ns();
313
314                        match parse_ws_quote_tick(&data, instrument, ts_init) {
315                            Ok(quote_tick) => {
316                                log::debug!(
317                                    "Parsed quote tick for {}: bid={}, ask={}",
318                                    data.coin,
319                                    quote_tick.bid_price,
320                                    quote_tick.ask_price
321                                );
322
323                                if let Err(e) =
324                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
325                                {
326                                    log::error!("Failed to send quote tick: {e}");
327                                }
328                            }
329                            Err(e) => {
330                                log::error!("Failed to parse quote tick for {}: {e}", data.coin);
331                            }
332                        }
333                    }
334                } else {
335                    log::warn!(
336                        "Received BBO for unknown coin: {} (no matching instrument found)",
337                        data.coin
338                    );
339                }
340            }
341            HyperliquidWsMessage::Trades { data } => {
342                let count = data.len();
343                log::debug!("Received {count} trade(s)");
344
345                for trade_data in data {
346                    let coin = trade_data.coin;
347                    let coin_map = coin_to_instrument_id.load();
348
349                    if let Some(&instrument_id) = coin_map.get(&coin) {
350                        let instruments_map = instruments.load();
351                        if let Some(instrument) = instruments_map.get(&instrument_id) {
352                            let ts_init = clock.get_time_ns();
353
354                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
355                                Ok(trade_tick) => {
356                                    if let Err(e) =
357                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
358                                    {
359                                        log::error!("Failed to send trade tick: {e}");
360                                    }
361                                }
362                                Err(e) => {
363                                    log::error!("Failed to parse trade tick for {coin}: {e}");
364                                }
365                            }
366                        }
367                    } else {
368                        log::warn!("Received trade for unknown coin: {coin}");
369                    }
370                }
371            }
372            HyperliquidWsMessage::L2Book { data } => {
373                let coin = data.coin;
374                log::debug!("Received L2 book update for coin: {coin}");
375
376                let coin_map = coin_to_instrument_id.load();
377                if let Some(&instrument_id) = coin_map.get(&data.coin) {
378                    let instruments_map = instruments.load();
379                    if let Some(instrument) = instruments_map.get(&instrument_id) {
380                        let ts_init = clock.get_time_ns();
381
382                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
383                            Ok(deltas) => {
384                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
385                                    OrderBookDeltas_API::new(deltas),
386                                ))) {
387                                    log::error!("Failed to send order book deltas: {e}");
388                                }
389                            }
390                            Err(e) => {
391                                log::error!(
392                                    "Failed to parse order book deltas for {}: {e}",
393                                    data.coin
394                                );
395                            }
396                        }
397                    }
398                } else {
399                    log::warn!("Received L2 book for unknown coin: {coin}");
400                }
401            }
402            HyperliquidWsMessage::Candle { data } => {
403                let coin = &data.s;
404                let interval = &data.i;
405                log::debug!("Received candle for {coin}:{interval}");
406
407                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
408                    let coin = Ustr::from(&data.s);
409                    let coin_map = coin_to_instrument_id.load();
410
411                    if let Some(&instrument_id) = coin_map.get(&coin) {
412                        let instruments_map = instruments.load();
413                        if let Some(instrument) = instruments_map.get(&instrument_id) {
414                            let ts_init = clock.get_time_ns();
415
416                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
417                                Ok(bar) => {
418                                    if let Err(e) =
419                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
420                                    {
421                                        log::error!("Failed to send bar data: {e}");
422                                    }
423                                }
424                                Err(e) => {
425                                    log::error!("Failed to parse candle for {coin}: {e}");
426                                }
427                            }
428                        }
429                    } else {
430                        log::warn!("Received candle for unknown coin: {coin}");
431                    }
432                } else {
433                    log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
434                }
435            }
436            _ => {
437                log::trace!("Received unhandled WebSocket message: {msg:?}");
438            }
439        }
440    }
441}
442
443impl HyperliquidDataClient {
444    #[allow(dead_code)]
445    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
446        if let Err(e) = sender.send(DataEvent::Data(data)) {
447            log::error!("Failed to emit data event: {e}");
448        }
449    }
450}
451
452#[async_trait::async_trait(?Send)]
453impl DataClient for HyperliquidDataClient {
454    fn client_id(&self) -> ClientId {
455        self.client_id
456    }
457
458    fn venue(&self) -> Option<Venue> {
459        Some(self.venue())
460    }
461
462    fn start(&mut self) -> anyhow::Result<()> {
463        log::info!(
464            "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
465            self.client_id,
466            self.config.is_testnet,
467            self.config.http_proxy_url,
468            self.config.ws_proxy_url,
469        );
470        Ok(())
471    }
472
473    fn stop(&mut self) -> anyhow::Result<()> {
474        log::info!("Stopping Hyperliquid data client {}", self.client_id);
475        self.cancellation_token.cancel();
476        self.is_connected.store(false, Ordering::Relaxed);
477        Ok(())
478    }
479
480    fn reset(&mut self) -> anyhow::Result<()> {
481        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
482        self.is_connected.store(false, Ordering::Relaxed);
483        self.cancellation_token = CancellationToken::new();
484        self.tasks.clear();
485        Ok(())
486    }
487
488    fn dispose(&mut self) -> anyhow::Result<()> {
489        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
490        self.stop()
491    }
492
493    fn is_connected(&self) -> bool {
494        self.is_connected.load(Ordering::Acquire)
495    }
496
497    fn is_disconnected(&self) -> bool {
498        !self.is_connected()
499    }
500
501    async fn connect(&mut self) -> anyhow::Result<()> {
502        if self.is_connected() {
503            return Ok(());
504        }
505
506        let instruments = self
507            .bootstrap_instruments()
508            .await
509            .context("failed to bootstrap instruments")?;
510
511        for instrument in instruments {
512            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
513                log::warn!("Failed to send instrument: {e}");
514            }
515        }
516
517        self.spawn_ws()
518            .await
519            .context("failed to spawn WebSocket client")?;
520
521        self.is_connected.store(true, Ordering::Relaxed);
522        log::info!("Connected: client_id={}", self.client_id);
523
524        Ok(())
525    }
526
527    async fn disconnect(&mut self) -> anyhow::Result<()> {
528        if !self.is_connected() {
529            return Ok(());
530        }
531
532        self.cancellation_token.cancel();
533
534        for task in self.tasks.drain(..) {
535            if let Err(e) = task.await {
536                log::error!("Error waiting for task to complete: {e}");
537            }
538        }
539
540        if let Err(e) = self.ws_client.disconnect().await {
541            log::error!("Error disconnecting WebSocket client: {e}");
542        }
543
544        self.instruments.store(AHashMap::new());
545
546        self.is_connected.store(false, Ordering::Relaxed);
547        log::info!("Disconnected: client_id={}", self.client_id);
548
549        Ok(())
550    }
551
552    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
553        log::debug!("Requesting all instruments");
554
555        let http = self.http_client.clone();
556        let sender = self.data_sender.clone();
557        let instruments_cache = self.instruments.clone();
558        let coin_map = self.coin_to_instrument_id.clone();
559        let ws_instruments = self.ws_client.instruments_cache();
560        let request_id = request.request_id;
561        let client_id = request.client_id.unwrap_or(self.client_id);
562        let venue = self.venue();
563        let start_nanos = datetime_to_unix_nanos(request.start);
564        let end_nanos = datetime_to_unix_nanos(request.end);
565        let params = request.params;
566        let clock = self.clock;
567
568        get_runtime().spawn(async move {
569            match http.request_instruments().await {
570                Ok(instruments) => {
571                    instruments_cache.rcu(|instruments_map| {
572                        coin_map.rcu(|coin_to_id| {
573                            for instrument in &instruments {
574                                let instrument_id = instrument.id();
575                                instruments_map.insert(instrument_id, instrument.clone());
576                                let coin = instrument.raw_symbol().inner();
577                                coin_to_id.insert(coin, instrument_id);
578                                ws_instruments.insert(coin, instrument.clone());
579                            }
580                        });
581                    });
582
583                    let response = DataResponse::Instruments(InstrumentsResponse::new(
584                        request_id,
585                        client_id,
586                        venue,
587                        instruments,
588                        start_nanos,
589                        end_nanos,
590                        clock.get_time_ns(),
591                        params,
592                    ));
593
594                    if let Err(e) = sender.send(DataEvent::Response(response)) {
595                        log::error!("Failed to send instruments response: {e}");
596                    }
597                }
598                Err(e) => {
599                    log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
600                }
601            }
602        });
603
604        Ok(())
605    }
606
607    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
608        log::debug!("Requesting instrument: {}", request.instrument_id);
609
610        let http = self.http_client.clone();
611        let sender = self.data_sender.clone();
612        let instruments_cache = self.instruments.clone();
613        let coin_map = self.coin_to_instrument_id.clone();
614        let ws_instruments = self.ws_client.instruments_cache();
615        let instrument_id = request.instrument_id;
616        let request_id = request.request_id;
617        let client_id = request.client_id.unwrap_or(self.client_id);
618        let start_nanos = datetime_to_unix_nanos(request.start);
619        let end_nanos = datetime_to_unix_nanos(request.end);
620        let params = request.params;
621        let clock = self.clock;
622
623        get_runtime().spawn(async move {
624            match http.request_instruments().await {
625                Ok(all_instruments) => {
626                    instruments_cache.rcu(|instruments_map| {
627                        coin_map.rcu(|coin_to_id| {
628                            for instrument in &all_instruments {
629                                let id = instrument.id();
630                                instruments_map.insert(id, instrument.clone());
631                                let coin = instrument.raw_symbol().inner();
632                                coin_to_id.insert(coin, id);
633                                ws_instruments.insert(coin, instrument.clone());
634                            }
635                        });
636                    });
637
638                    if let Some(instrument) = all_instruments
639                        .into_iter()
640                        .find(|i| i.id() == instrument_id)
641                    {
642                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
643                            request_id,
644                            client_id,
645                            instrument.id(),
646                            instrument,
647                            start_nanos,
648                            end_nanos,
649                            clock.get_time_ns(),
650                            params,
651                        )));
652
653                        if let Err(e) = sender.send(DataEvent::Response(response)) {
654                            log::error!("Failed to send instrument response: {e}");
655                        }
656                    } else {
657                        log::error!("Instrument not found: {instrument_id}");
658                    }
659                }
660                Err(e) => {
661                    log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
662                }
663            }
664        });
665
666        Ok(())
667    }
668
669    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
670        log::debug!("Requesting bars for {}", request.bar_type);
671
672        let http = self.http_client.clone();
673        let sender = self.data_sender.clone();
674        let bar_type = request.bar_type;
675        let start = request.start;
676        let end = request.end;
677        let limit = request.limit.map(|n| n.get() as u32);
678        let request_id = request.request_id;
679        let client_id = request.client_id.unwrap_or(self.client_id);
680        let params = request.params;
681        let clock = self.clock;
682        let start_nanos = datetime_to_unix_nanos(start);
683        let end_nanos = datetime_to_unix_nanos(end);
684        let instruments = Arc::clone(&self.instruments);
685
686        get_runtime().spawn(async move {
687            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
688                Ok(bars) => {
689                    let response = DataResponse::Bars(BarsResponse::new(
690                        request_id,
691                        client_id,
692                        bar_type,
693                        bars,
694                        start_nanos,
695                        end_nanos,
696                        clock.get_time_ns(),
697                        params,
698                    ));
699
700                    if let Err(e) = sender.send(DataEvent::Response(response)) {
701                        log::error!("Failed to send bars response: {e}");
702                    }
703                }
704                Err(e) => log::error!("Bar request failed: {e:?}"),
705            }
706        });
707
708        Ok(())
709    }
710
711    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
712        log::debug!("Requesting trades for {}", request.instrument_id);
713
714        // NOTE: Hyperliquid does not provide public historical trade data via REST API
715        // - Real-time trades are available via WebSocket (subscribe_trades)
716        // - User fills (authenticated) are available via generate_fill_reports
717        // For now, return empty response similar to exchanges without public trade history
718        log::warn!(
719            "Historical trade data not available via REST on Hyperliquid for {}",
720            request.instrument_id
721        );
722
723        let trades = Vec::new();
724
725        let response = DataResponse::Trades(TradesResponse::new(
726            request.request_id,
727            request.client_id.unwrap_or(self.client_id),
728            request.instrument_id,
729            trades,
730            datetime_to_unix_nanos(request.start),
731            datetime_to_unix_nanos(request.end),
732            self.clock.get_time_ns(),
733            request.params,
734        ));
735
736        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
737            log::error!("Failed to send trades response: {e}");
738        }
739
740        Ok(())
741    }
742
743    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
744        let instrument_id = request.instrument_id;
745        let instruments = self.instruments.load();
746        let instrument = instruments
747            .get(&instrument_id)
748            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
749
750        let raw_symbol = instrument.raw_symbol().to_string();
751        let price_precision = instrument.price_precision();
752        let size_precision = instrument.size_precision();
753        let depth = request.depth.map(|d| d.get());
754
755        let http = self.http_client.clone();
756        let sender = self.data_sender.clone();
757        let client_id = request.client_id.unwrap_or(self.client_id);
758        let request_id = request.request_id;
759        let params = request.params;
760        let clock = self.clock;
761
762        get_runtime().spawn(async move {
763            match http.info_l2_book(&raw_symbol).await {
764                Ok(l2_book) => {
765                    let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
766                    let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
767
768                    let all_bids = l2_book
769                        .levels
770                        .first()
771                        .map_or([].as_slice(), |v| v.as_slice());
772                    let all_asks = l2_book
773                        .levels
774                        .get(1)
775                        .map_or([].as_slice(), |v| v.as_slice());
776
777                    let bids = match depth {
778                        Some(d) if d < all_bids.len() => &all_bids[..d],
779                        _ => all_bids,
780                    };
781                    let asks = match depth {
782                        Some(d) if d < all_asks.len() => &all_asks[..d],
783                        _ => all_asks,
784                    };
785
786                    for (i, level) in bids.iter().enumerate() {
787                        let px: f64 = match level.px.parse() {
788                            Ok(v) => v,
789                            Err(_) => continue,
790                        };
791                        let sz: f64 = match level.sz.parse() {
792                            Ok(v) => v,
793                            Err(_) => continue,
794                        };
795
796                        if sz > 0.0 {
797                            let price = Price::new(px, price_precision);
798                            let size = Quantity::new(sz, size_precision);
799                            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
800                            book.add(order, 0, i as u64, ts_event);
801                        }
802                    }
803
804                    let bids_len = bids.len();
805                    for (i, level) in asks.iter().enumerate() {
806                        let px: f64 = match level.px.parse() {
807                            Ok(v) => v,
808                            Err(_) => continue,
809                        };
810                        let sz: f64 = match level.sz.parse() {
811                            Ok(v) => v,
812                            Err(_) => continue,
813                        };
814
815                        if sz > 0.0 {
816                            let price = Price::new(px, price_precision);
817                            let size = Quantity::new(sz, size_precision);
818                            let order =
819                                BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
820                            book.add(order, 0, (bids_len + i) as u64, ts_event);
821                        }
822                    }
823
824                    log::info!(
825                        "Fetched order book for {instrument_id} with {} bids and {} asks",
826                        bids.len(),
827                        asks.len(),
828                    );
829
830                    let response = DataResponse::Book(BookResponse::new(
831                        request_id,
832                        client_id,
833                        instrument_id,
834                        book,
835                        None,
836                        None,
837                        clock.get_time_ns(),
838                        params,
839                    ));
840
841                    if let Err(e) = sender.send(DataEvent::Response(response)) {
842                        log::error!("Failed to send book snapshot response: {e}");
843                    }
844                }
845                Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
846            }
847        });
848
849        Ok(())
850    }
851
852    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
853        let instruments = self.instruments.load();
854        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
855            if let Err(e) = self
856                .data_sender
857                .send(DataEvent::Instrument(instrument.clone()))
858            {
859                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
860            }
861        } else {
862            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
863        }
864        Ok(())
865    }
866
867    fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
868        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
869
870        let ws = self.ws_client.clone();
871        let instrument_id = subscription.instrument_id;
872
873        get_runtime().spawn(async move {
874            if let Err(e) = ws.subscribe_trades(instrument_id).await {
875                log::error!("Failed to subscribe to trades: {e:?}");
876            }
877        });
878
879        Ok(())
880    }
881
882    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
883        log::debug!(
884            "Unsubscribing from trades: {}",
885            unsubscription.instrument_id
886        );
887
888        let ws = self.ws_client.clone();
889        let instrument_id = unsubscription.instrument_id;
890
891        get_runtime().spawn(async move {
892            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
893                log::error!("Failed to unsubscribe from trades: {e:?}");
894            }
895        });
896
897        Ok(())
898    }
899
900    fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
901        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
902
903        if subscription.book_type != BookType::L2_MBP {
904            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
905        }
906
907        let ws = self.ws_client.clone();
908        let instrument_id = subscription.instrument_id;
909
910        get_runtime().spawn(async move {
911            if let Err(e) = ws.subscribe_book(instrument_id).await {
912                log::error!("Failed to subscribe to book deltas: {e:?}");
913            }
914        });
915
916        Ok(())
917    }
918
919    fn unsubscribe_book_deltas(
920        &mut self,
921        unsubscription: &UnsubscribeBookDeltas,
922    ) -> anyhow::Result<()> {
923        log::debug!(
924            "Unsubscribing from book deltas: {}",
925            unsubscription.instrument_id
926        );
927
928        let ws = self.ws_client.clone();
929        let instrument_id = unsubscription.instrument_id;
930
931        get_runtime().spawn(async move {
932            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
933                log::error!("Failed to unsubscribe from book deltas: {e:?}");
934            }
935        });
936
937        Ok(())
938    }
939
940    fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
941        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
942
943        let ws = self.ws_client.clone();
944        let instrument_id = subscription.instrument_id;
945
946        get_runtime().spawn(async move {
947            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
948                log::error!("Failed to subscribe to quotes: {e:?}");
949            }
950        });
951
952        Ok(())
953    }
954
955    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
956        log::debug!(
957            "Unsubscribing from quotes: {}",
958            unsubscription.instrument_id
959        );
960
961        let ws = self.ws_client.clone();
962        let instrument_id = unsubscription.instrument_id;
963
964        get_runtime().spawn(async move {
965            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
966                log::error!("Failed to unsubscribe from quotes: {e:?}");
967            }
968        });
969
970        Ok(())
971    }
972
973    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
974        let ws = self.ws_client.clone();
975        let instrument_id = cmd.instrument_id;
976
977        get_runtime().spawn(async move {
978            if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
979                log::error!("Failed to subscribe to mark prices: {e:?}");
980            }
981        });
982
983        Ok(())
984    }
985
986    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
987        let ws = self.ws_client.clone();
988        let instrument_id = cmd.instrument_id;
989
990        get_runtime().spawn(async move {
991            if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
992                log::error!("Failed to unsubscribe from mark prices: {e:?}");
993            }
994        });
995
996        Ok(())
997    }
998
999    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1000        let ws = self.ws_client.clone();
1001        let instrument_id = cmd.instrument_id;
1002
1003        get_runtime().spawn(async move {
1004            if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
1005                log::error!("Failed to subscribe to index prices: {e:?}");
1006            }
1007        });
1008
1009        Ok(())
1010    }
1011
1012    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1013        let ws = self.ws_client.clone();
1014        let instrument_id = cmd.instrument_id;
1015
1016        get_runtime().spawn(async move {
1017            if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
1018                log::error!("Failed to unsubscribe from index prices: {e:?}");
1019            }
1020        });
1021
1022        Ok(())
1023    }
1024
1025    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1026        let ws = self.ws_client.clone();
1027        let instrument_id = cmd.instrument_id;
1028
1029        get_runtime().spawn(async move {
1030            if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
1031                log::error!("Failed to subscribe to funding rates: {e:?}");
1032            }
1033        });
1034
1035        Ok(())
1036    }
1037
1038    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1039        let ws = self.ws_client.clone();
1040        let instrument_id = cmd.instrument_id;
1041
1042        get_runtime().spawn(async move {
1043            if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
1044                log::error!("Failed to unsubscribe from funding rates: {e:?}");
1045            }
1046        });
1047
1048        Ok(())
1049    }
1050
1051    fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
1052        log::debug!("Subscribing to bars: {}", subscription.bar_type);
1053
1054        let instrument_id = subscription.bar_type.instrument_id();
1055        if !self.instruments.contains_key(&instrument_id) {
1056            anyhow::bail!("Instrument {instrument_id} not found");
1057        }
1058
1059        let bar_type = subscription.bar_type;
1060        let ws = self.ws_client.clone();
1061
1062        get_runtime().spawn(async move {
1063            if let Err(e) = ws.subscribe_bars(bar_type).await {
1064                log::error!("Failed to subscribe to bars: {e:?}");
1065            }
1066        });
1067
1068        Ok(())
1069    }
1070
1071    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
1072        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
1073
1074        let bar_type = unsubscription.bar_type;
1075        let ws = self.ws_client.clone();
1076
1077        get_runtime().spawn(async move {
1078            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
1079                log::error!("Failed to unsubscribe from bars: {e:?}");
1080            }
1081        });
1082
1083        Ok(())
1084    }
1085}
1086
1087pub(crate) fn candle_to_bar(
1088    candle: &HyperliquidCandle,
1089    bar_type: BarType,
1090    price_precision: u8,
1091    size_precision: u8,
1092) -> anyhow::Result<Bar> {
1093    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1094    let ts_event = ts_init;
1095
1096    let open = candle.open.parse::<f64>().context("parse open price")?;
1097    let high = candle.high.parse::<f64>().context("parse high price")?;
1098    let low = candle.low.parse::<f64>().context("parse low price")?;
1099    let close = candle.close.parse::<f64>().context("parse close price")?;
1100    let volume = candle.volume.parse::<f64>().context("parse volume")?;
1101
1102    Ok(Bar::new(
1103        bar_type,
1104        Price::new(open, price_precision),
1105        Price::new(high, price_precision),
1106        Price::new(low, price_precision),
1107        Price::new(close, price_precision),
1108        Quantity::new(volume, size_precision),
1109        ts_event,
1110        ts_init,
1111    ))
1112}
1113
1114/// Request bars from HTTP API.
1115async fn request_bars_from_http(
1116    http_client: HyperliquidHttpClient,
1117    bar_type: BarType,
1118    start: Option<DateTime<Utc>>,
1119    end: Option<DateTime<Utc>>,
1120    limit: Option<u32>,
1121    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1122) -> anyhow::Result<Vec<Bar>> {
1123    // Get instrument details for precision
1124    let instrument_id = bar_type.instrument_id();
1125    let instrument = instruments
1126        .load()
1127        .get(&instrument_id)
1128        .cloned()
1129        .context("instrument not found in cache")?;
1130
1131    let price_precision = instrument.price_precision();
1132    let size_precision = instrument.size_precision();
1133    let raw_symbol = instrument.raw_symbol();
1134    let coin = raw_symbol.as_str();
1135
1136    let interval = bar_type_to_interval(&bar_type)?;
1137
1138    // Hyperliquid uses millisecond timestamps
1139    let now = Utc::now();
1140    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1141    let start_time = if let Some(start) = start {
1142        start.timestamp_millis() as u64
1143    } else {
1144        // Default to 1000 bars before end_time
1145        let spec = bar_type.spec();
1146        let step_ms = match spec.aggregation {
1147            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1148            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1149            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1150            _ => 60_000,
1151        };
1152        end_time.saturating_sub(1000 * step_ms)
1153    };
1154
1155    let candles = http_client
1156        .info_candle_snapshot(coin, interval, start_time, end_time)
1157        .await
1158        .context("failed to fetch candle snapshot from Hyperliquid")?;
1159
1160    let mut bars: Vec<Bar> = candles
1161        .iter()
1162        .filter_map(|candle| {
1163            candle_to_bar(candle, bar_type, price_precision, size_precision)
1164                .map_err(|e| {
1165                    log::warn!("Failed to convert candle to bar: {e}");
1166                    e
1167                })
1168                .ok()
1169        })
1170        .collect();
1171
1172    if let Some(limit) = limit
1173        && bars.len() > limit as usize
1174    {
1175        bars = bars.into_iter().take(limit as usize).collect();
1176    }
1177
1178    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1179    Ok(bars)
1180}