1use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 clients::DataClient,
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent,
29 data::{
30 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
31 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
32 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
33 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
34 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
35 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38};
39use nautilus_core::{
40 UnixNanos,
41 datetime::datetime_to_unix_nanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45 data::{Bar, BarType, Data, OrderBookDeltas_API},
46 enums::{BarAggregation, BookType},
47 identifiers::{ClientId, InstrumentId, Venue},
48 instruments::{Instrument, InstrumentAny},
49 types::{Price, Quantity},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53use ustr::Ustr;
54
55use crate::{
56 common::{consts::HYPERLIQUID_VENUE, parse::bar_type_to_interval},
57 config::HyperliquidDataClientConfig,
58 http::{client::HyperliquidHttpClient, models::HyperliquidCandle},
59 websocket::{
60 client::HyperliquidWebSocketClient,
61 messages::{HyperliquidWsMessage, NautilusWsMessage},
62 parse::{
63 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
64 },
65 },
66};
67
68#[derive(Debug)]
69pub struct HyperliquidDataClient {
70 client_id: ClientId,
71 #[allow(dead_code)]
72 config: HyperliquidDataClientConfig,
73 http_client: HyperliquidHttpClient,
74 ws_client: HyperliquidWebSocketClient,
75 is_connected: AtomicBool,
76 cancellation_token: CancellationToken,
77 tasks: Vec<JoinHandle<()>>,
78 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
79 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
80 coin_to_instrument_id: Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
83 clock: &'static AtomicTime,
84 #[allow(dead_code)]
85 instrument_refresh_active: bool,
86}
87
88impl HyperliquidDataClient {
89 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
95 let clock = get_atomic_clock_realtime();
96 let data_sender = get_data_event_sender();
97
98 let mut http_client = if let Some(private_key_str) = &config.private_key {
99 let secrets = crate::common::credential::Secrets {
100 private_key: crate::common::credential::EvmPrivateKey::new(
101 private_key_str.clone(),
102 )?,
103 is_testnet: config.is_testnet,
104 vault_address: None,
105 };
106 HyperliquidHttpClient::with_secrets(
107 &secrets,
108 config.http_timeout_secs,
109 config.http_proxy_url.clone(),
110 )?
111 } else {
112 HyperliquidHttpClient::new(
113 config.is_testnet,
114 config.http_timeout_secs,
115 config.http_proxy_url.clone(),
116 )?
117 };
118
119 if let Some(url) = &config.base_url_http {
121 http_client.set_base_info_url(url.clone());
122 }
123
124 let ws_url = config.base_url_ws.clone();
125 let ws_client = HyperliquidWebSocketClient::new(ws_url, config.is_testnet, None);
126
127 Ok(Self {
128 client_id,
129 config,
130 http_client,
131 ws_client,
132 is_connected: AtomicBool::new(false),
133 cancellation_token: CancellationToken::new(),
134 tasks: Vec::new(),
135 data_sender,
136 instruments: Arc::new(RwLock::new(AHashMap::new())),
137 coin_to_instrument_id: Arc::new(RwLock::new(AHashMap::new())),
138 clock,
139 instrument_refresh_active: false,
140 })
141 }
142
143 fn venue(&self) -> Venue {
144 *HYPERLIQUID_VENUE
145 }
146
147 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
148 let instruments = self
149 .http_client
150 .request_instruments()
151 .await
152 .context("failed to fetch instruments during bootstrap")?;
153
154 let mut instruments_map = self.instruments.write().unwrap();
155 let mut coin_map = self.coin_to_instrument_id.write().unwrap();
156
157 for instrument in &instruments {
158 let instrument_id = instrument.id();
159 instruments_map.insert(instrument_id, instrument.clone());
160
161 let coin = instrument.raw_symbol().inner();
162 coin_map.insert(coin, instrument_id);
163
164 self.ws_client.cache_instrument(instrument.clone());
165 }
166
167 log::info!(
168 "Bootstrapped {} instruments with {} coin mappings",
169 instruments_map.len(),
170 coin_map.len()
171 );
172 Ok(instruments)
173 }
174
175 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
176 let mut ws_client = self.ws_client.clone();
178
179 ws_client
180 .connect()
181 .await
182 .context("failed to connect to Hyperliquid WebSocket")?;
183
184 if let Some(handle) = ws_client.take_task_handle() {
186 self.ws_client.set_task_handle(handle);
187 }
188
189 let data_sender = self.data_sender.clone();
190 let cancellation_token = self.cancellation_token.clone();
191
192 let task = get_runtime().spawn(async move {
193 log::info!("Hyperliquid WebSocket consumption loop started");
194
195 loop {
196 tokio::select! {
197 () = cancellation_token.cancelled() => {
198 log::info!("WebSocket consumption loop cancelled");
199 break;
200 }
201 msg_opt = ws_client.next_event() => {
202 if let Some(msg) = msg_opt {
203 match msg {
204 NautilusWsMessage::Trades(trades) => {
205 for trade in trades {
206 if let Err(e) = data_sender
207 .send(DataEvent::Data(Data::Trade(trade)))
208 {
209 log::error!("Failed to send trade tick: {e}");
210 }
211 }
212 }
213 NautilusWsMessage::Quote(quote) => {
214 if let Err(e) = data_sender
215 .send(DataEvent::Data(Data::Quote(quote)))
216 {
217 log::error!("Failed to send quote tick: {e}");
218 }
219 }
220 NautilusWsMessage::Deltas(deltas) => {
221 if let Err(e) = data_sender
222 .send(DataEvent::Data(Data::Deltas(
223 OrderBookDeltas_API::new(deltas),
224 )))
225 {
226 log::error!("Failed to send order book deltas: {e}");
227 }
228 }
229 NautilusWsMessage::Candle(bar) => {
230 if let Err(e) = data_sender
231 .send(DataEvent::Data(Data::Bar(bar)))
232 {
233 log::error!("Failed to send bar: {e}");
234 }
235 }
236 NautilusWsMessage::MarkPrice(update) => {
237 if let Err(e) = data_sender
238 .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
239 {
240 log::error!("Failed to send mark price update: {e}");
241 }
242 }
243 NautilusWsMessage::IndexPrice(update) => {
244 if let Err(e) = data_sender
245 .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
246 {
247 log::error!("Failed to send index price update: {e}");
248 }
249 }
250 NautilusWsMessage::FundingRate(update) => {
251 if let Err(e) = data_sender
252 .send(DataEvent::FundingRate(update))
253 {
254 log::error!("Failed to send funding rate update: {e}");
255 }
256 }
257 NautilusWsMessage::Reconnected => {
258 log::info!("WebSocket reconnected");
259 }
260 NautilusWsMessage::Error(e) => {
261 log::error!("WebSocket error: {e}");
262 }
263 NautilusWsMessage::ExecutionReports(_) => {
264 }
266 }
267 } else {
268 log::warn!("WebSocket next_event returned None, connection may be closed");
270 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
271 }
272 }
273 }
274 }
275
276 log::info!("Hyperliquid WebSocket consumption loop finished");
277 });
278
279 self.tasks.push(task);
280 log::info!("WebSocket consumption task spawned");
281
282 Ok(())
283 }
284
285 #[allow(dead_code)]
286 fn handle_ws_message(
287 msg: HyperliquidWsMessage,
288 ws_client: &HyperliquidWebSocketClient,
289 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
290 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
291 coin_to_instrument_id: &Arc<RwLock<AHashMap<Ustr, InstrumentId>>>,
292 _venue: Venue,
293 clock: &'static AtomicTime,
294 ) {
295 match msg {
296 HyperliquidWsMessage::Bbo { data } => {
297 let coin = data.coin;
298 log::debug!("Received BBO message for coin: {coin}");
299
300 let coin_map = coin_to_instrument_id.read().unwrap();
303 let instrument_id = coin_map.get(&data.coin);
304
305 if let Some(&instrument_id) = instrument_id {
306 let instruments_map = instruments.read().unwrap();
307 if let Some(instrument) = instruments_map.get(&instrument_id) {
308 let ts_init = clock.get_time_ns();
309
310 match parse_ws_quote_tick(&data, instrument, ts_init) {
311 Ok(quote_tick) => {
312 log::debug!(
313 "Parsed quote tick for {}: bid={}, ask={}",
314 data.coin,
315 quote_tick.bid_price,
316 quote_tick.ask_price
317 );
318 if let Err(e) =
319 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
320 {
321 log::error!("Failed to send quote tick: {e}");
322 }
323 }
324 Err(e) => {
325 log::error!("Failed to parse quote tick for {}: {e}", data.coin);
326 }
327 }
328 }
329 } else {
330 log::warn!(
331 "Received BBO for unknown coin: {} (no matching instrument found)",
332 data.coin
333 );
334 }
335 }
336 HyperliquidWsMessage::Trades { data } => {
337 let count = data.len();
338 log::debug!("Received {count} trade(s)");
339
340 for trade_data in data {
342 let coin = trade_data.coin;
343 let coin_map = coin_to_instrument_id.read().unwrap();
344
345 if let Some(&instrument_id) = coin_map.get(&coin) {
346 let instruments_map = instruments.read().unwrap();
347 if let Some(instrument) = instruments_map.get(&instrument_id) {
348 let ts_init = clock.get_time_ns();
349
350 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
351 Ok(trade_tick) => {
352 if let Err(e) =
353 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
354 {
355 log::error!("Failed to send trade tick: {e}");
356 }
357 }
358 Err(e) => {
359 log::error!("Failed to parse trade tick for {coin}: {e}");
360 }
361 }
362 }
363 } else {
364 log::warn!("Received trade for unknown coin: {coin}");
365 }
366 }
367 }
368 HyperliquidWsMessage::L2Book { data } => {
369 let coin = data.coin;
370 log::debug!("Received L2 book update for coin: {coin}");
371
372 let coin_map = coin_to_instrument_id.read().unwrap();
373 if let Some(&instrument_id) = coin_map.get(&data.coin) {
374 let instruments_map = instruments.read().unwrap();
375 if let Some(instrument) = instruments_map.get(&instrument_id) {
376 let ts_init = clock.get_time_ns();
377
378 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
379 Ok(deltas) => {
380 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
381 OrderBookDeltas_API::new(deltas),
382 ))) {
383 log::error!("Failed to send order book deltas: {e}");
384 }
385 }
386 Err(e) => {
387 log::error!(
388 "Failed to parse order book deltas for {}: {e}",
389 data.coin
390 );
391 }
392 }
393 }
394 } else {
395 log::warn!("Received L2 book for unknown coin: {coin}");
396 }
397 }
398 HyperliquidWsMessage::Candle { data } => {
399 let coin = &data.s;
400 let interval = &data.i;
401 log::debug!("Received candle for {coin}:{interval}");
402
403 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
404 let coin = Ustr::from(&data.s);
405 let coin_map = coin_to_instrument_id.read().unwrap();
406
407 if let Some(&instrument_id) = coin_map.get(&coin) {
408 let instruments_map = instruments.read().unwrap();
409 if let Some(instrument) = instruments_map.get(&instrument_id) {
410 let ts_init = clock.get_time_ns();
411
412 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
413 Ok(bar) => {
414 if let Err(e) =
415 data_sender.send(DataEvent::Data(Data::Bar(bar)))
416 {
417 log::error!("Failed to send bar data: {e}");
418 }
419 }
420 Err(e) => {
421 log::error!("Failed to parse candle for {coin}: {e}");
422 }
423 }
424 }
425 } else {
426 log::warn!("Received candle for unknown coin: {coin}");
427 }
428 } else {
429 log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
430 }
431 }
432 _ => {
433 log::trace!("Received unhandled WebSocket message: {msg:?}");
435 }
436 }
437 }
438
439 fn get_instrument(&self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
440 let instruments = self.instruments.read().unwrap();
441 instruments
442 .get(instrument_id)
443 .cloned()
444 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))
445 }
446}
447
448impl HyperliquidDataClient {
449 #[allow(dead_code)]
450 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
451 if let Err(e) = sender.send(DataEvent::Data(data)) {
452 log::error!("Failed to emit data event: {e}");
453 }
454 }
455}
456
457#[async_trait::async_trait(?Send)]
458impl DataClient for HyperliquidDataClient {
459 fn client_id(&self) -> ClientId {
460 self.client_id
461 }
462
463 fn venue(&self) -> Option<Venue> {
464 Some(self.venue())
465 }
466
467 fn start(&mut self) -> anyhow::Result<()> {
468 log::info!(
469 "Starting Hyperliquid data client: client_id={}, is_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
470 self.client_id,
471 self.config.is_testnet,
472 self.config.http_proxy_url,
473 self.config.ws_proxy_url,
474 );
475 Ok(())
476 }
477
478 fn stop(&mut self) -> anyhow::Result<()> {
479 log::info!("Stopping Hyperliquid data client {}", self.client_id);
480 self.cancellation_token.cancel();
481 self.is_connected.store(false, Ordering::Relaxed);
482 Ok(())
483 }
484
485 fn reset(&mut self) -> anyhow::Result<()> {
486 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
487 self.is_connected.store(false, Ordering::Relaxed);
488 self.cancellation_token = CancellationToken::new();
489 self.tasks.clear();
490 Ok(())
491 }
492
493 fn dispose(&mut self) -> anyhow::Result<()> {
494 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
495 self.stop()
496 }
497
498 fn is_connected(&self) -> bool {
499 self.is_connected.load(Ordering::Acquire)
500 }
501
502 fn is_disconnected(&self) -> bool {
503 !self.is_connected()
504 }
505
506 async fn connect(&mut self) -> anyhow::Result<()> {
507 if self.is_connected() {
508 return Ok(());
509 }
510
511 let instruments = self
512 .bootstrap_instruments()
513 .await
514 .context("failed to bootstrap instruments")?;
515
516 for instrument in instruments {
517 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
518 log::warn!("Failed to send instrument: {e}");
519 }
520 }
521
522 self.spawn_ws()
523 .await
524 .context("failed to spawn WebSocket client")?;
525
526 self.is_connected.store(true, Ordering::Relaxed);
527 log::info!("Connected: client_id={}", self.client_id);
528
529 Ok(())
530 }
531
532 async fn disconnect(&mut self) -> anyhow::Result<()> {
533 if !self.is_connected() {
534 return Ok(());
535 }
536
537 self.cancellation_token.cancel();
539
540 for task in self.tasks.drain(..) {
542 if let Err(e) = task.await {
543 log::error!("Error waiting for task to complete: {e}");
544 }
545 }
546
547 if let Err(e) = self.ws_client.disconnect().await {
549 log::error!("Error disconnecting WebSocket client: {e}");
550 }
551
552 {
554 let mut instruments = self.instruments.write().unwrap();
555 instruments.clear();
556 }
557
558 self.is_connected.store(false, Ordering::Relaxed);
559 log::info!("Disconnected: client_id={}", self.client_id);
560
561 Ok(())
562 }
563
564 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
565 log::debug!("Requesting all instruments");
566
567 let instruments = {
568 let instruments_map = self.instruments.read().unwrap();
569 instruments_map.values().cloned().collect()
570 };
571
572 let response = DataResponse::Instruments(InstrumentsResponse::new(
573 request.request_id,
574 request.client_id.unwrap_or(self.client_id),
575 self.venue(),
576 instruments,
577 datetime_to_unix_nanos(request.start),
578 datetime_to_unix_nanos(request.end),
579 self.clock.get_time_ns(),
580 request.params,
581 ));
582
583 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
584 log::error!("Failed to send instruments response: {e}");
585 }
586
587 Ok(())
588 }
589
590 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
591 log::debug!("Requesting instrument: {}", request.instrument_id);
592
593 let instrument = self.get_instrument(&request.instrument_id)?;
594
595 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
596 request.request_id,
597 request.client_id.unwrap_or(self.client_id),
598 instrument.id(),
599 instrument,
600 datetime_to_unix_nanos(request.start),
601 datetime_to_unix_nanos(request.end),
602 self.clock.get_time_ns(),
603 request.params,
604 )));
605
606 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
607 log::error!("Failed to send instrument response: {e}");
608 }
609
610 Ok(())
611 }
612
613 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
614 log::debug!("Requesting bars for {}", request.bar_type);
615
616 let http = self.http_client.clone();
617 let sender = self.data_sender.clone();
618 let bar_type = request.bar_type;
619 let start = request.start;
620 let end = request.end;
621 let limit = request.limit.map(|n| n.get() as u32);
622 let request_id = request.request_id;
623 let client_id = request.client_id.unwrap_or(self.client_id);
624 let params = request.params;
625 let clock = self.clock;
626 let start_nanos = datetime_to_unix_nanos(start);
627 let end_nanos = datetime_to_unix_nanos(end);
628 let instruments = Arc::clone(&self.instruments);
629
630 get_runtime().spawn(async move {
631 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
632 Ok(bars) => {
633 let response = DataResponse::Bars(BarsResponse::new(
634 request_id,
635 client_id,
636 bar_type,
637 bars,
638 start_nanos,
639 end_nanos,
640 clock.get_time_ns(),
641 params,
642 ));
643 if let Err(e) = sender.send(DataEvent::Response(response)) {
644 log::error!("Failed to send bars response: {e}");
645 }
646 }
647 Err(e) => log::error!("Bar request failed: {e:?}"),
648 }
649 });
650
651 Ok(())
652 }
653
654 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
655 log::debug!("Requesting trades for {}", request.instrument_id);
656
657 log::warn!(
662 "Historical trade data not available via REST on Hyperliquid for {}",
663 request.instrument_id
664 );
665
666 let trades = Vec::new();
667
668 let response = DataResponse::Trades(TradesResponse::new(
669 request.request_id,
670 request.client_id.unwrap_or(self.client_id),
671 request.instrument_id,
672 trades,
673 datetime_to_unix_nanos(request.start),
674 datetime_to_unix_nanos(request.end),
675 self.clock.get_time_ns(),
676 request.params,
677 ));
678
679 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
680 log::error!("Failed to send trades response: {e}");
681 }
682
683 Ok(())
684 }
685
686 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
687 let instruments = self.instruments.read().unwrap();
688 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
689 if let Err(e) = self
690 .data_sender
691 .send(DataEvent::Instrument(instrument.clone()))
692 {
693 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
694 }
695 } else {
696 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
697 }
698 Ok(())
699 }
700
701 fn subscribe_trades(&mut self, subscription: &SubscribeTrades) -> anyhow::Result<()> {
702 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
703
704 let ws = self.ws_client.clone();
705 let instrument_id = subscription.instrument_id;
706
707 get_runtime().spawn(async move {
708 if let Err(e) = ws.subscribe_trades(instrument_id).await {
709 log::error!("Failed to subscribe to trades: {e:?}");
710 }
711 });
712
713 Ok(())
714 }
715
716 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
717 log::debug!(
718 "Unsubscribing from trades: {}",
719 unsubscription.instrument_id
720 );
721
722 let ws = self.ws_client.clone();
723 let instrument_id = unsubscription.instrument_id;
724
725 get_runtime().spawn(async move {
726 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
727 log::error!("Failed to unsubscribe from trades: {e:?}");
728 }
729 });
730
731 Ok(())
732 }
733
734 fn subscribe_book_deltas(&mut self, subscription: &SubscribeBookDeltas) -> anyhow::Result<()> {
735 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
736
737 if subscription.book_type != BookType::L2_MBP {
738 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
739 }
740
741 let ws = self.ws_client.clone();
742 let instrument_id = subscription.instrument_id;
743
744 get_runtime().spawn(async move {
745 if let Err(e) = ws.subscribe_book(instrument_id).await {
746 log::error!("Failed to subscribe to book deltas: {e:?}");
747 }
748 });
749
750 Ok(())
751 }
752
753 fn unsubscribe_book_deltas(
754 &mut self,
755 unsubscription: &UnsubscribeBookDeltas,
756 ) -> anyhow::Result<()> {
757 log::debug!(
758 "Unsubscribing from book deltas: {}",
759 unsubscription.instrument_id
760 );
761
762 let ws = self.ws_client.clone();
763 let instrument_id = unsubscription.instrument_id;
764
765 get_runtime().spawn(async move {
766 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
767 log::error!("Failed to unsubscribe from book deltas: {e:?}");
768 }
769 });
770
771 Ok(())
772 }
773
774 fn subscribe_quotes(&mut self, subscription: &SubscribeQuotes) -> anyhow::Result<()> {
775 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
776
777 let ws = self.ws_client.clone();
778 let instrument_id = subscription.instrument_id;
779
780 get_runtime().spawn(async move {
781 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
782 log::error!("Failed to subscribe to quotes: {e:?}");
783 }
784 });
785
786 Ok(())
787 }
788
789 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
790 log::debug!(
791 "Unsubscribing from quotes: {}",
792 unsubscription.instrument_id
793 );
794
795 let ws = self.ws_client.clone();
796 let instrument_id = unsubscription.instrument_id;
797
798 get_runtime().spawn(async move {
799 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
800 log::error!("Failed to unsubscribe from quotes: {e:?}");
801 }
802 });
803
804 Ok(())
805 }
806
807 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
808 let ws = self.ws_client.clone();
809 let instrument_id = cmd.instrument_id;
810
811 get_runtime().spawn(async move {
812 if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
813 log::error!("Failed to subscribe to mark prices: {e:?}");
814 }
815 });
816
817 Ok(())
818 }
819
820 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
821 let ws = self.ws_client.clone();
822 let instrument_id = cmd.instrument_id;
823
824 get_runtime().spawn(async move {
825 if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
826 log::error!("Failed to unsubscribe from mark prices: {e:?}");
827 }
828 });
829
830 Ok(())
831 }
832
833 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
834 let ws = self.ws_client.clone();
835 let instrument_id = cmd.instrument_id;
836
837 get_runtime().spawn(async move {
838 if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
839 log::error!("Failed to subscribe to index prices: {e:?}");
840 }
841 });
842
843 Ok(())
844 }
845
846 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
847 let ws = self.ws_client.clone();
848 let instrument_id = cmd.instrument_id;
849
850 get_runtime().spawn(async move {
851 if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
852 log::error!("Failed to unsubscribe from index prices: {e:?}");
853 }
854 });
855
856 Ok(())
857 }
858
859 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
860 let ws = self.ws_client.clone();
861 let instrument_id = cmd.instrument_id;
862
863 get_runtime().spawn(async move {
864 if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
865 log::error!("Failed to subscribe to funding rates: {e:?}");
866 }
867 });
868
869 Ok(())
870 }
871
872 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
873 let ws = self.ws_client.clone();
874 let instrument_id = cmd.instrument_id;
875
876 get_runtime().spawn(async move {
877 if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
878 log::error!("Failed to unsubscribe from funding rates: {e:?}");
879 }
880 });
881
882 Ok(())
883 }
884
885 fn subscribe_bars(&mut self, subscription: &SubscribeBars) -> anyhow::Result<()> {
886 log::debug!("Subscribing to bars: {}", subscription.bar_type);
887
888 let instruments = self.instruments.read().unwrap();
889 let instrument_id = subscription.bar_type.instrument_id();
890 if !instruments.contains_key(&instrument_id) {
891 anyhow::bail!("Instrument {instrument_id} not found");
892 }
893
894 drop(instruments);
895
896 let bar_type = subscription.bar_type;
897 let ws = self.ws_client.clone();
898
899 get_runtime().spawn(async move {
900 if let Err(e) = ws.subscribe_bars(bar_type).await {
901 log::error!("Failed to subscribe to bars: {e:?}");
902 }
903 });
904
905 Ok(())
906 }
907
908 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
909 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
910
911 let bar_type = unsubscription.bar_type;
912 let ws = self.ws_client.clone();
913
914 get_runtime().spawn(async move {
915 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
916 log::error!("Failed to unsubscribe from bars: {e:?}");
917 }
918 });
919
920 Ok(())
921 }
922}
923
924pub(crate) fn candle_to_bar(
925 candle: &HyperliquidCandle,
926 bar_type: BarType,
927 price_precision: u8,
928 size_precision: u8,
929) -> anyhow::Result<Bar> {
930 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
931 let ts_event = ts_init;
932
933 let open = candle.open.parse::<f64>().context("parse open price")?;
934 let high = candle.high.parse::<f64>().context("parse high price")?;
935 let low = candle.low.parse::<f64>().context("parse low price")?;
936 let close = candle.close.parse::<f64>().context("parse close price")?;
937 let volume = candle.volume.parse::<f64>().context("parse volume")?;
938
939 Ok(Bar::new(
940 bar_type,
941 Price::new(open, price_precision),
942 Price::new(high, price_precision),
943 Price::new(low, price_precision),
944 Price::new(close, price_precision),
945 Quantity::new(volume, size_precision),
946 ts_event,
947 ts_init,
948 ))
949}
950
951async fn request_bars_from_http(
953 http_client: HyperliquidHttpClient,
954 bar_type: BarType,
955 start: Option<DateTime<Utc>>,
956 end: Option<DateTime<Utc>>,
957 limit: Option<u32>,
958 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
959) -> anyhow::Result<Vec<Bar>> {
960 let instrument_id = bar_type.instrument_id();
962 let instrument = {
963 let guard = instruments.read().unwrap();
964 guard
965 .get(&instrument_id)
966 .cloned()
967 .context("instrument not found in cache")?
968 };
969
970 let price_precision = instrument.price_precision();
971 let size_precision = instrument.size_precision();
972 let raw_symbol = instrument.raw_symbol();
973 let coin = raw_symbol.as_str();
974
975 let interval = bar_type_to_interval(&bar_type)?;
976
977 let now = Utc::now();
979 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
980 let start_time = if let Some(start) = start {
981 start.timestamp_millis() as u64
982 } else {
983 let spec = bar_type.spec();
985 let step_ms = match spec.aggregation {
986 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
987 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
988 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
989 _ => 60_000,
990 };
991 end_time.saturating_sub(1000 * step_ms)
992 };
993
994 let candles = http_client
995 .info_candle_snapshot(coin, interval, start_time, end_time)
996 .await
997 .context("failed to fetch candle snapshot from Hyperliquid")?;
998
999 let mut bars: Vec<Bar> = candles
1000 .iter()
1001 .filter_map(|candle| {
1002 candle_to_bar(candle, bar_type, price_precision, size_precision)
1003 .map_err(|e| {
1004 log::warn!("Failed to convert candle to bar: {e}");
1005 e
1006 })
1007 .ok()
1008 })
1009 .collect();
1010
1011 if let Some(limit) = limit
1012 && bars.len() > limit as usize
1013 {
1014 bars = bars.into_iter().take(limit as usize).collect();
1015 }
1016
1017 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1018 Ok(bars)
1019}