1use std::sync::{
17 Arc,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 clients::DataClient,
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent,
29 data::{
30 BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
31 RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
32 SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
33 SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
34 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
35 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38};
39use nautilus_core::{
40 AtomicMap, UnixNanos,
41 datetime::datetime_to_unix_nanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45 data::{Bar, BarType, BookOrder, Data, OrderBookDeltas_API},
46 enums::{BarAggregation, BookType, OrderSide},
47 identifiers::{ClientId, InstrumentId, Venue},
48 instruments::{Instrument, InstrumentAny},
49 orderbook::OrderBook,
50 types::{Price, Quantity},
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57 common::{
58 consts::HYPERLIQUID_VENUE,
59 credential::{Secrets, credential_env_vars},
60 parse::bar_type_to_interval,
61 },
62 config::HyperliquidDataClientConfig,
63 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
64 websocket::{
65 client::HyperliquidWebSocketClient,
66 messages::{HyperliquidWsMessage, NautilusWsMessage},
67 parse::{
68 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
69 },
70 },
71};
72
73#[derive(Debug)]
74pub struct HyperliquidDataClient {
75 client_id: ClientId,
76 #[allow(dead_code)]
77 config: HyperliquidDataClientConfig,
78 http_client: HyperliquidHttpClient,
79 ws_client: HyperliquidWebSocketClient,
80 is_connected: AtomicBool,
81 cancellation_token: CancellationToken,
82 tasks: Vec<JoinHandle<()>>,
83 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
84 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
85 coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
87 clock: &'static AtomicTime,
88 #[allow(dead_code)]
89 instrument_refresh_active: bool,
90}
91
92impl HyperliquidDataClient {
93 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
99 let clock = get_atomic_clock_realtime();
100 let data_sender = get_data_event_sender();
101
102 let (pk_var, _) = credential_env_vars(config.is_testnet);
105 let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
106
107 let mut http_client = if has_credentials {
108 let secrets = Secrets::resolve(config.private_key.as_deref(), None, config.is_testnet)?;
109 HyperliquidHttpClient::with_secrets(
110 &secrets,
111 config.http_timeout_secs,
112 config.http_proxy_url.clone(),
113 )?
114 } else {
115 HyperliquidHttpClient::new(
116 config.is_testnet,
117 config.http_timeout_secs,
118 config.http_proxy_url.clone(),
119 )?
120 };
121
122 if let Some(url) = &config.base_url_http {
124 http_client.set_base_info_url(url.clone());
125 }
126
127 let ws_url = config.base_url_ws.clone();
128 let ws_client = HyperliquidWebSocketClient::new(ws_url, config.is_testnet, None);
129
130 Ok(Self {
131 client_id,
132 config,
133 http_client,
134 ws_client,
135 is_connected: AtomicBool::new(false),
136 cancellation_token: CancellationToken::new(),
137 tasks: Vec::new(),
138 data_sender,
139 instruments: Arc::new(AtomicMap::new()),
140 coin_to_instrument_id: Arc::new(AtomicMap::new()),
141 clock,
142 instrument_refresh_active: false,
143 })
144 }
145
146 fn venue(&self) -> Venue {
147 *HYPERLIQUID_VENUE
148 }
149
150 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
151 let instruments = self
152 .http_client
153 .request_instruments()
154 .await
155 .context("failed to fetch instruments during bootstrap")?;
156
157 self.instruments.rcu(|m| {
158 for instrument in &instruments {
159 m.insert(instrument.id(), instrument.clone());
160 }
161 });
162
163 self.coin_to_instrument_id.rcu(|m| {
164 for instrument in &instruments {
165 m.insert(instrument.raw_symbol().inner(), instrument.id());
166 }
167 });
168
169 for instrument in &instruments {
170 self.ws_client.cache_instrument(instrument.clone());
171 }
172
173 log::info!(
174 "Bootstrapped {} instruments with {} coin mappings",
175 self.instruments.len(),
176 self.coin_to_instrument_id.len()
177 );
178 Ok(instruments)
179 }
180
181 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
182 let mut ws_client = self.ws_client.clone();
184
185 ws_client
186 .connect()
187 .await
188 .context("failed to connect to Hyperliquid WebSocket")?;
189
190 if let Some(handle) = ws_client.take_task_handle() {
192 self.ws_client.set_task_handle(handle);
193 }
194
195 let data_sender = self.data_sender.clone();
196 let cancellation_token = self.cancellation_token.clone();
197
198 let task = get_runtime().spawn(async move {
199 log::info!("Hyperliquid WebSocket consumption loop started");
200
201 loop {
202 tokio::select! {
203 () = cancellation_token.cancelled() => {
204 log::info!("WebSocket consumption loop cancelled");
205 break;
206 }
207 msg_opt = ws_client.next_event() => {
208 if let Some(msg) = msg_opt {
209 match msg {
210 NautilusWsMessage::Trades(trades) => {
211 for trade in trades {
212 if let Err(e) = data_sender
213 .send(DataEvent::Data(Data::Trade(trade)))
214 {
215 log::error!("Failed to send trade tick: {e}");
216 }
217 }
218 }
219 NautilusWsMessage::Quote(quote) => {
220 if let Err(e) = data_sender
221 .send(DataEvent::Data(Data::Quote(quote)))
222 {
223 log::error!("Failed to send quote tick: {e}");
224 }
225 }
226 NautilusWsMessage::Deltas(deltas) => {
227 if let Err(e) = data_sender
228 .send(DataEvent::Data(Data::Deltas(
229 OrderBookDeltas_API::new(deltas),
230 )))
231 {
232 log::error!("Failed to send order book deltas: {e}");
233 }
234 }
235 NautilusWsMessage::Candle(bar) => {
236 if let Err(e) = data_sender
237 .send(DataEvent::Data(Data::Bar(bar)))
238 {
239 log::error!("Failed to send bar: {e}");
240 }
241 }
242 NautilusWsMessage::MarkPrice(update) => {
243 if let Err(e) = data_sender
244 .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
245 {
246 log::error!("Failed to send mark price update: {e}");
247 }
248 }
249 NautilusWsMessage::IndexPrice(update) => {
250 if let Err(e) = data_sender
251 .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
252 {
253 log::error!("Failed to send index price update: {e}");
254 }
255 }
256 NautilusWsMessage::FundingRate(update) => {
257 if let Err(e) = data_sender
258 .send(DataEvent::FundingRate(update))
259 {
260 log::error!("Failed to send funding rate update: {e}");
261 }
262 }
263 NautilusWsMessage::Reconnected => {
264 log::info!("WebSocket reconnected");
265 }
266 NautilusWsMessage::Error(e) => {
267 log::error!("WebSocket error: {e}");
268 }
269 NautilusWsMessage::ExecutionReports(_) => {
270 }
272 }
273 } else {
274 log::debug!("WebSocket next_event returned None, stream closed");
276 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
277 }
278 }
279 }
280 }
281
282 log::info!("Hyperliquid WebSocket consumption loop finished");
283 });
284
285 self.tasks.push(task);
286 log::info!("WebSocket consumption task spawned");
287
288 Ok(())
289 }
290
291 #[allow(dead_code)]
292 fn handle_ws_message(
293 msg: HyperliquidWsMessage,
294 ws_client: &HyperliquidWebSocketClient,
295 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
296 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
297 coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
298 _venue: Venue,
299 clock: &'static AtomicTime,
300 ) {
301 match msg {
302 HyperliquidWsMessage::Bbo { data } => {
303 let coin = data.coin;
304 log::debug!("Received BBO message for coin: {coin}");
305
306 let coin_map = coin_to_instrument_id.load();
307 let instrument_id = coin_map.get(&data.coin);
308
309 if let Some(&instrument_id) = instrument_id {
310 let instruments_map = instruments.load();
311 if let Some(instrument) = instruments_map.get(&instrument_id) {
312 let ts_init = clock.get_time_ns();
313
314 match parse_ws_quote_tick(&data, instrument, ts_init) {
315 Ok(quote_tick) => {
316 log::debug!(
317 "Parsed quote tick for {}: bid={}, ask={}",
318 data.coin,
319 quote_tick.bid_price,
320 quote_tick.ask_price
321 );
322
323 if let Err(e) =
324 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
325 {
326 log::error!("Failed to send quote tick: {e}");
327 }
328 }
329 Err(e) => {
330 log::error!("Failed to parse quote tick for {}: {e}", data.coin);
331 }
332 }
333 }
334 } else {
335 log::warn!(
336 "Received BBO for unknown coin: {} (no matching instrument found)",
337 data.coin
338 );
339 }
340 }
341 HyperliquidWsMessage::Trades { data } => {
342 let count = data.len();
343 log::debug!("Received {count} trade(s)");
344
345 for trade_data in data {
346 let coin = trade_data.coin;
347 let coin_map = coin_to_instrument_id.load();
348
349 if let Some(&instrument_id) = coin_map.get(&coin) {
350 let instruments_map = instruments.load();
351 if let Some(instrument) = instruments_map.get(&instrument_id) {
352 let ts_init = clock.get_time_ns();
353
354 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
355 Ok(trade_tick) => {
356 if let Err(e) =
357 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
358 {
359 log::error!("Failed to send trade tick: {e}");
360 }
361 }
362 Err(e) => {
363 log::error!("Failed to parse trade tick for {coin}: {e}");
364 }
365 }
366 }
367 } else {
368 log::warn!("Received trade for unknown coin: {coin}");
369 }
370 }
371 }
372 HyperliquidWsMessage::L2Book { data } => {
373 let coin = data.coin;
374 log::debug!("Received L2 book update for coin: {coin}");
375
376 let coin_map = coin_to_instrument_id.load();
377 if let Some(&instrument_id) = coin_map.get(&data.coin) {
378 let instruments_map = instruments.load();
379 if let Some(instrument) = instruments_map.get(&instrument_id) {
380 let ts_init = clock.get_time_ns();
381
382 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
383 Ok(deltas) => {
384 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
385 OrderBookDeltas_API::new(deltas),
386 ))) {
387 log::error!("Failed to send order book deltas: {e}");
388 }
389 }
390 Err(e) => {
391 log::error!(
392 "Failed to parse order book deltas for {}: {e}",
393 data.coin
394 );
395 }
396 }
397 }
398 } else {
399 log::warn!("Received L2 book for unknown coin: {coin}");
400 }
401 }
402 HyperliquidWsMessage::Candle { data } => {
403 let coin = &data.s;
404 let interval = &data.i;
405 log::debug!("Received candle for {coin}:{interval}");
406
407 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
408 let coin = Ustr::from(&data.s);
409 let coin_map = coin_to_instrument_id.load();
410
411 if let Some(&instrument_id) = coin_map.get(&coin) {
412 let instruments_map = instruments.load();
413 if let Some(instrument) = instruments_map.get(&instrument_id) {
414 let ts_init = clock.get_time_ns();
415
416 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
417 Ok(bar) => {
418 if let Err(e) =
419 data_sender.send(DataEvent::Data(Data::Bar(bar)))
420 {
421 log::error!("Failed to send bar data: {e}");
422 }
423 }
424 Err(e) => {
425 log::error!("Failed to parse candle for {coin}: {e}");
426 }
427 }
428 }
429 } else {
430 log::warn!("Received candle for unknown coin: {coin}");
431 }
432 } else {
433 log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
434 }
435 }
436 _ => {
437 log::trace!("Received unhandled WebSocket message: {msg:?}");
438 }
439 }
440 }
441}
442
443impl HyperliquidDataClient {
444 #[allow(dead_code)]
445 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
446 if let Err(e) = sender.send(DataEvent::Data(data)) {
447 log::error!("Failed to emit data event: {e}");
448 }
449 }
450}
451
452#[async_trait::async_trait(?Send)]
453impl DataClient for HyperliquidDataClient {
454 fn client_id(&self) -> ClientId {
455 self.client_id
456 }
457
458 fn venue(&self) -> Option<Venue> {
459 Some(self.venue())
460 }
461
462 fn start(&mut self) -> anyhow::Result<()> {
463 log::info!(
464 "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
465 self.client_id,
466 self.config.is_testnet,
467 self.config.http_proxy_url,
468 self.config.ws_proxy_url,
469 );
470 Ok(())
471 }
472
473 fn stop(&mut self) -> anyhow::Result<()> {
474 log::info!("Stopping Hyperliquid data client {}", self.client_id);
475 self.cancellation_token.cancel();
476 self.is_connected.store(false, Ordering::Relaxed);
477 Ok(())
478 }
479
480 fn reset(&mut self) -> anyhow::Result<()> {
481 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
482 self.is_connected.store(false, Ordering::Relaxed);
483 self.cancellation_token = CancellationToken::new();
484 self.tasks.clear();
485 Ok(())
486 }
487
488 fn dispose(&mut self) -> anyhow::Result<()> {
489 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
490 self.stop()
491 }
492
493 fn is_connected(&self) -> bool {
494 self.is_connected.load(Ordering::Acquire)
495 }
496
497 fn is_disconnected(&self) -> bool {
498 !self.is_connected()
499 }
500
501 async fn connect(&mut self) -> anyhow::Result<()> {
502 if self.is_connected() {
503 return Ok(());
504 }
505
506 let instruments = self
507 .bootstrap_instruments()
508 .await
509 .context("failed to bootstrap instruments")?;
510
511 for instrument in instruments {
512 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
513 log::warn!("Failed to send instrument: {e}");
514 }
515 }
516
517 self.spawn_ws()
518 .await
519 .context("failed to spawn WebSocket client")?;
520
521 self.is_connected.store(true, Ordering::Relaxed);
522 log::info!("Connected: client_id={}", self.client_id);
523
524 Ok(())
525 }
526
527 async fn disconnect(&mut self) -> anyhow::Result<()> {
528 if !self.is_connected() {
529 return Ok(());
530 }
531
532 self.cancellation_token.cancel();
533
534 for task in self.tasks.drain(..) {
535 if let Err(e) = task.await {
536 log::error!("Error waiting for task to complete: {e}");
537 }
538 }
539
540 if let Err(e) = self.ws_client.disconnect().await {
541 log::error!("Error disconnecting WebSocket client: {e}");
542 }
543
544 self.instruments.store(AHashMap::new());
545
546 self.is_connected.store(false, Ordering::Relaxed);
547 log::info!("Disconnected: client_id={}", self.client_id);
548
549 Ok(())
550 }
551
552 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
553 log::debug!("Requesting all instruments");
554
555 let http = self.http_client.clone();
556 let sender = self.data_sender.clone();
557 let instruments_cache = self.instruments.clone();
558 let coin_map = self.coin_to_instrument_id.clone();
559 let ws_instruments = self.ws_client.instruments_cache();
560 let request_id = request.request_id;
561 let client_id = request.client_id.unwrap_or(self.client_id);
562 let venue = self.venue();
563 let start_nanos = datetime_to_unix_nanos(request.start);
564 let end_nanos = datetime_to_unix_nanos(request.end);
565 let params = request.params;
566 let clock = self.clock;
567
568 get_runtime().spawn(async move {
569 match http.request_instruments().await {
570 Ok(instruments) => {
571 instruments_cache.rcu(|instruments_map| {
572 coin_map.rcu(|coin_to_id| {
573 for instrument in &instruments {
574 let instrument_id = instrument.id();
575 instruments_map.insert(instrument_id, instrument.clone());
576 let coin = instrument.raw_symbol().inner();
577 coin_to_id.insert(coin, instrument_id);
578 ws_instruments.insert(coin, instrument.clone());
579 }
580 });
581 });
582
583 let response = DataResponse::Instruments(InstrumentsResponse::new(
584 request_id,
585 client_id,
586 venue,
587 instruments,
588 start_nanos,
589 end_nanos,
590 clock.get_time_ns(),
591 params,
592 ));
593
594 if let Err(e) = sender.send(DataEvent::Response(response)) {
595 log::error!("Failed to send instruments response: {e}");
596 }
597 }
598 Err(e) => {
599 log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
600 }
601 }
602 });
603
604 Ok(())
605 }
606
607 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
608 log::debug!("Requesting instrument: {}", request.instrument_id);
609
610 let http = self.http_client.clone();
611 let sender = self.data_sender.clone();
612 let instruments_cache = self.instruments.clone();
613 let coin_map = self.coin_to_instrument_id.clone();
614 let ws_instruments = self.ws_client.instruments_cache();
615 let instrument_id = request.instrument_id;
616 let request_id = request.request_id;
617 let client_id = request.client_id.unwrap_or(self.client_id);
618 let start_nanos = datetime_to_unix_nanos(request.start);
619 let end_nanos = datetime_to_unix_nanos(request.end);
620 let params = request.params;
621 let clock = self.clock;
622
623 get_runtime().spawn(async move {
624 match http.request_instruments().await {
625 Ok(all_instruments) => {
626 instruments_cache.rcu(|instruments_map| {
627 coin_map.rcu(|coin_to_id| {
628 for instrument in &all_instruments {
629 let id = instrument.id();
630 instruments_map.insert(id, instrument.clone());
631 let coin = instrument.raw_symbol().inner();
632 coin_to_id.insert(coin, id);
633 ws_instruments.insert(coin, instrument.clone());
634 }
635 });
636 });
637
638 if let Some(instrument) = all_instruments
639 .into_iter()
640 .find(|i| i.id() == instrument_id)
641 {
642 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
643 request_id,
644 client_id,
645 instrument.id(),
646 instrument,
647 start_nanos,
648 end_nanos,
649 clock.get_time_ns(),
650 params,
651 )));
652
653 if let Err(e) = sender.send(DataEvent::Response(response)) {
654 log::error!("Failed to send instrument response: {e}");
655 }
656 } else {
657 log::error!("Instrument not found: {instrument_id}");
658 }
659 }
660 Err(e) => {
661 log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
662 }
663 }
664 });
665
666 Ok(())
667 }
668
669 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
670 log::debug!("Requesting bars for {}", request.bar_type);
671
672 let http = self.http_client.clone();
673 let sender = self.data_sender.clone();
674 let bar_type = request.bar_type;
675 let start = request.start;
676 let end = request.end;
677 let limit = request.limit.map(|n| n.get() as u32);
678 let request_id = request.request_id;
679 let client_id = request.client_id.unwrap_or(self.client_id);
680 let params = request.params;
681 let clock = self.clock;
682 let start_nanos = datetime_to_unix_nanos(start);
683 let end_nanos = datetime_to_unix_nanos(end);
684 let instruments = Arc::clone(&self.instruments);
685
686 get_runtime().spawn(async move {
687 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
688 Ok(bars) => {
689 let response = DataResponse::Bars(BarsResponse::new(
690 request_id,
691 client_id,
692 bar_type,
693 bars,
694 start_nanos,
695 end_nanos,
696 clock.get_time_ns(),
697 params,
698 ));
699
700 if let Err(e) = sender.send(DataEvent::Response(response)) {
701 log::error!("Failed to send bars response: {e}");
702 }
703 }
704 Err(e) => log::error!("Bar request failed: {e:?}"),
705 }
706 });
707
708 Ok(())
709 }
710
711 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
712 log::debug!("Requesting trades for {}", request.instrument_id);
713
714 log::warn!(
719 "Historical trade data not available via REST on Hyperliquid for {}",
720 request.instrument_id
721 );
722
723 let trades = Vec::new();
724
725 let response = DataResponse::Trades(TradesResponse::new(
726 request.request_id,
727 request.client_id.unwrap_or(self.client_id),
728 request.instrument_id,
729 trades,
730 datetime_to_unix_nanos(request.start),
731 datetime_to_unix_nanos(request.end),
732 self.clock.get_time_ns(),
733 request.params,
734 ));
735
736 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
737 log::error!("Failed to send trades response: {e}");
738 }
739
740 Ok(())
741 }
742
743 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
744 let instrument_id = request.instrument_id;
745 let instruments = self.instruments.load();
746 let instrument = instruments
747 .get(&instrument_id)
748 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
749
750 let raw_symbol = instrument.raw_symbol().to_string();
751 let price_precision = instrument.price_precision();
752 let size_precision = instrument.size_precision();
753 let depth = request.depth.map(|d| d.get());
754
755 let http = self.http_client.clone();
756 let sender = self.data_sender.clone();
757 let client_id = request.client_id.unwrap_or(self.client_id);
758 let request_id = request.request_id;
759 let params = request.params;
760 let clock = self.clock;
761
762 get_runtime().spawn(async move {
763 match http.info_l2_book(&raw_symbol).await {
764 Ok(l2_book) => {
765 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
766 let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
767
768 let all_bids = l2_book
769 .levels
770 .first()
771 .map_or([].as_slice(), |v| v.as_slice());
772 let all_asks = l2_book
773 .levels
774 .get(1)
775 .map_or([].as_slice(), |v| v.as_slice());
776
777 let bids = match depth {
778 Some(d) if d < all_bids.len() => &all_bids[..d],
779 _ => all_bids,
780 };
781 let asks = match depth {
782 Some(d) if d < all_asks.len() => &all_asks[..d],
783 _ => all_asks,
784 };
785
786 for (i, level) in bids.iter().enumerate() {
787 let px: f64 = match level.px.parse() {
788 Ok(v) => v,
789 Err(_) => continue,
790 };
791 let sz: f64 = match level.sz.parse() {
792 Ok(v) => v,
793 Err(_) => continue,
794 };
795
796 if sz > 0.0 {
797 let price = Price::new(px, price_precision);
798 let size = Quantity::new(sz, size_precision);
799 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
800 book.add(order, 0, i as u64, ts_event);
801 }
802 }
803
804 let bids_len = bids.len();
805 for (i, level) in asks.iter().enumerate() {
806 let px: f64 = match level.px.parse() {
807 Ok(v) => v,
808 Err(_) => continue,
809 };
810 let sz: f64 = match level.sz.parse() {
811 Ok(v) => v,
812 Err(_) => continue,
813 };
814
815 if sz > 0.0 {
816 let price = Price::new(px, price_precision);
817 let size = Quantity::new(sz, size_precision);
818 let order =
819 BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
820 book.add(order, 0, (bids_len + i) as u64, ts_event);
821 }
822 }
823
824 log::info!(
825 "Fetched order book for {instrument_id} with {} bids and {} asks",
826 bids.len(),
827 asks.len(),
828 );
829
830 let response = DataResponse::Book(BookResponse::new(
831 request_id,
832 client_id,
833 instrument_id,
834 book,
835 None,
836 None,
837 clock.get_time_ns(),
838 params,
839 ));
840
841 if let Err(e) = sender.send(DataEvent::Response(response)) {
842 log::error!("Failed to send book snapshot response: {e}");
843 }
844 }
845 Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
846 }
847 });
848
849 Ok(())
850 }
851
852 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
853 let instruments = self.instruments.load();
854 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
855 if let Err(e) = self
856 .data_sender
857 .send(DataEvent::Instrument(instrument.clone()))
858 {
859 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
860 }
861 } else {
862 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
863 }
864 Ok(())
865 }
866
867 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
868 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
869
870 let ws = self.ws_client.clone();
871 let instrument_id = subscription.instrument_id;
872
873 get_runtime().spawn(async move {
874 if let Err(e) = ws.subscribe_trades(instrument_id).await {
875 log::error!("Failed to subscribe to trades: {e:?}");
876 }
877 });
878
879 Ok(())
880 }
881
882 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
883 log::debug!(
884 "Unsubscribing from trades: {}",
885 unsubscription.instrument_id
886 );
887
888 let ws = self.ws_client.clone();
889 let instrument_id = unsubscription.instrument_id;
890
891 get_runtime().spawn(async move {
892 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
893 log::error!("Failed to unsubscribe from trades: {e:?}");
894 }
895 });
896
897 Ok(())
898 }
899
900 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
901 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
902
903 if subscription.book_type != BookType::L2_MBP {
904 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
905 }
906
907 let ws = self.ws_client.clone();
908 let instrument_id = subscription.instrument_id;
909
910 get_runtime().spawn(async move {
911 if let Err(e) = ws.subscribe_book(instrument_id).await {
912 log::error!("Failed to subscribe to book deltas: {e:?}");
913 }
914 });
915
916 Ok(())
917 }
918
919 fn unsubscribe_book_deltas(
920 &mut self,
921 unsubscription: &UnsubscribeBookDeltas,
922 ) -> anyhow::Result<()> {
923 log::debug!(
924 "Unsubscribing from book deltas: {}",
925 unsubscription.instrument_id
926 );
927
928 let ws = self.ws_client.clone();
929 let instrument_id = unsubscription.instrument_id;
930
931 get_runtime().spawn(async move {
932 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
933 log::error!("Failed to unsubscribe from book deltas: {e:?}");
934 }
935 });
936
937 Ok(())
938 }
939
940 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
941 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
942
943 let ws = self.ws_client.clone();
944 let instrument_id = subscription.instrument_id;
945
946 get_runtime().spawn(async move {
947 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
948 log::error!("Failed to subscribe to quotes: {e:?}");
949 }
950 });
951
952 Ok(())
953 }
954
955 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
956 log::debug!(
957 "Unsubscribing from quotes: {}",
958 unsubscription.instrument_id
959 );
960
961 let ws = self.ws_client.clone();
962 let instrument_id = unsubscription.instrument_id;
963
964 get_runtime().spawn(async move {
965 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
966 log::error!("Failed to unsubscribe from quotes: {e:?}");
967 }
968 });
969
970 Ok(())
971 }
972
973 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
974 let ws = self.ws_client.clone();
975 let instrument_id = cmd.instrument_id;
976
977 get_runtime().spawn(async move {
978 if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
979 log::error!("Failed to subscribe to mark prices: {e:?}");
980 }
981 });
982
983 Ok(())
984 }
985
986 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
987 let ws = self.ws_client.clone();
988 let instrument_id = cmd.instrument_id;
989
990 get_runtime().spawn(async move {
991 if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
992 log::error!("Failed to unsubscribe from mark prices: {e:?}");
993 }
994 });
995
996 Ok(())
997 }
998
999 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1000 let ws = self.ws_client.clone();
1001 let instrument_id = cmd.instrument_id;
1002
1003 get_runtime().spawn(async move {
1004 if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
1005 log::error!("Failed to subscribe to index prices: {e:?}");
1006 }
1007 });
1008
1009 Ok(())
1010 }
1011
1012 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1013 let ws = self.ws_client.clone();
1014 let instrument_id = cmd.instrument_id;
1015
1016 get_runtime().spawn(async move {
1017 if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
1018 log::error!("Failed to unsubscribe from index prices: {e:?}");
1019 }
1020 });
1021
1022 Ok(())
1023 }
1024
1025 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1026 let ws = self.ws_client.clone();
1027 let instrument_id = cmd.instrument_id;
1028
1029 get_runtime().spawn(async move {
1030 if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
1031 log::error!("Failed to subscribe to funding rates: {e:?}");
1032 }
1033 });
1034
1035 Ok(())
1036 }
1037
1038 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1039 let ws = self.ws_client.clone();
1040 let instrument_id = cmd.instrument_id;
1041
1042 get_runtime().spawn(async move {
1043 if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
1044 log::error!("Failed to unsubscribe from funding rates: {e:?}");
1045 }
1046 });
1047
1048 Ok(())
1049 }
1050
1051 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
1052 log::debug!("Subscribing to bars: {}", subscription.bar_type);
1053
1054 let instrument_id = subscription.bar_type.instrument_id();
1055 if !self.instruments.contains_key(&instrument_id) {
1056 anyhow::bail!("Instrument {instrument_id} not found");
1057 }
1058
1059 let bar_type = subscription.bar_type;
1060 let ws = self.ws_client.clone();
1061
1062 get_runtime().spawn(async move {
1063 if let Err(e) = ws.subscribe_bars(bar_type).await {
1064 log::error!("Failed to subscribe to bars: {e:?}");
1065 }
1066 });
1067
1068 Ok(())
1069 }
1070
1071 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
1072 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
1073
1074 let bar_type = unsubscription.bar_type;
1075 let ws = self.ws_client.clone();
1076
1077 get_runtime().spawn(async move {
1078 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
1079 log::error!("Failed to unsubscribe from bars: {e:?}");
1080 }
1081 });
1082
1083 Ok(())
1084 }
1085}
1086
1087pub(crate) fn candle_to_bar(
1088 candle: &HyperliquidCandle,
1089 bar_type: BarType,
1090 price_precision: u8,
1091 size_precision: u8,
1092) -> anyhow::Result<Bar> {
1093 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1094 let ts_event = ts_init;
1095
1096 let open = candle.open.parse::<f64>().context("parse open price")?;
1097 let high = candle.high.parse::<f64>().context("parse high price")?;
1098 let low = candle.low.parse::<f64>().context("parse low price")?;
1099 let close = candle.close.parse::<f64>().context("parse close price")?;
1100 let volume = candle.volume.parse::<f64>().context("parse volume")?;
1101
1102 Ok(Bar::new(
1103 bar_type,
1104 Price::new(open, price_precision),
1105 Price::new(high, price_precision),
1106 Price::new(low, price_precision),
1107 Price::new(close, price_precision),
1108 Quantity::new(volume, size_precision),
1109 ts_event,
1110 ts_init,
1111 ))
1112}
1113
1114async fn request_bars_from_http(
1116 http_client: HyperliquidHttpClient,
1117 bar_type: BarType,
1118 start: Option<DateTime<Utc>>,
1119 end: Option<DateTime<Utc>>,
1120 limit: Option<u32>,
1121 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1122) -> anyhow::Result<Vec<Bar>> {
1123 let instrument_id = bar_type.instrument_id();
1125 let instrument = instruments
1126 .load()
1127 .get(&instrument_id)
1128 .cloned()
1129 .context("instrument not found in cache")?;
1130
1131 let price_precision = instrument.price_precision();
1132 let size_precision = instrument.size_precision();
1133 let raw_symbol = instrument.raw_symbol();
1134 let coin = raw_symbol.as_str();
1135
1136 let interval = bar_type_to_interval(&bar_type)?;
1137
1138 let now = Utc::now();
1140 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1141 let start_time = if let Some(start) = start {
1142 start.timestamp_millis() as u64
1143 } else {
1144 let spec = bar_type.spec();
1146 let step_ms = match spec.aggregation {
1147 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1148 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1149 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1150 _ => 60_000,
1151 };
1152 end_time.saturating_sub(1000 * step_ms)
1153 };
1154
1155 let candles = http_client
1156 .info_candle_snapshot(coin, interval, start_time, end_time)
1157 .await
1158 .context("failed to fetch candle snapshot from Hyperliquid")?;
1159
1160 let mut bars: Vec<Bar> = candles
1161 .iter()
1162 .filter_map(|candle| {
1163 candle_to_bar(candle, bar_type, price_precision, size_precision)
1164 .map_err(|e| {
1165 log::warn!("Failed to convert candle to bar: {e}");
1166 e
1167 })
1168 .ok()
1169 })
1170 .collect();
1171
1172 if let Some(limit) = limit
1173 && bars.len() > limit as usize
1174 {
1175 bars = bars.into_iter().take(limit as usize).collect();
1176 }
1177
1178 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1179 Ok(bars)
1180}