1use std::sync::{
17 Arc, Mutex,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 clients::DataClient,
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent,
29 data::{
30 BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
31 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
32 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeCustomData, SubscribeFundingRates,
34 SubscribeIndexPrices, SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes,
35 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38 },
39 },
40};
41use nautilus_core::{
42 AtomicMap, MUTEX_POISONED, Params, UnixNanos,
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47 data::{Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDeltas_API},
48 enums::{BarAggregation, BookType, OrderSide},
49 identifiers::{ClientId, InstrumentId, Venue},
50 instruments::{Instrument, InstrumentAny},
51 orderbook::OrderBook,
52 types::{Price, Quantity},
53};
54use rust_decimal::Decimal;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60 common::{
61 consts::HYPERLIQUID_VENUE,
62 credential::{Secrets, credential_env_vars},
63 parse::bar_type_to_interval,
64 },
65 config::HyperliquidDataClientConfig,
66 data_types::register_hyperliquid_custom_data,
67 http::{
68 client::HyperliquidHttpClient,
69 models::{HyperliquidCandle, HyperliquidFundingHistoryEntry, HyperliquidL2Book},
70 },
71 websocket::{
72 client::HyperliquidWebSocketClient,
73 messages::{HyperliquidWsMessage, NautilusWsMessage},
74 parse::{
75 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
76 },
77 },
78};
79
80#[derive(Debug)]
81pub struct HyperliquidDataClient {
82 clock: &'static AtomicTime,
83 client_id: ClientId,
84 #[allow(dead_code)]
85 config: HyperliquidDataClientConfig,
86 http_client: HyperliquidHttpClient,
87 ws_client: HyperliquidWebSocketClient,
88 is_connected: AtomicBool,
89 cancellation_token: CancellationToken,
90 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
91 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
92 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
93 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
94 coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
95}
96
97impl HyperliquidDataClient {
98 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
104 let clock = get_atomic_clock_realtime();
105 let data_sender = get_data_event_sender();
106
107 let (pk_var, _) = credential_env_vars(config.environment);
110 let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
111
112 let mut http_client = if has_credentials {
113 let secrets =
114 Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
115 HyperliquidHttpClient::with_secrets(
116 &secrets,
117 config.http_timeout_secs,
118 config.proxy_url.clone(),
119 )?
120 } else {
121 HyperliquidHttpClient::new(
122 config.environment,
123 config.http_timeout_secs,
124 config.proxy_url.clone(),
125 )?
126 };
127
128 if let Some(url) = &config.base_url_http {
129 http_client.set_base_info_url(url.clone());
130 }
131
132 let ws_url = config.base_url_ws.clone();
133 let ws_client = HyperliquidWebSocketClient::new(
134 ws_url,
135 config.environment,
136 None,
137 config.transport_backend,
138 config.proxy_url.clone(),
139 );
140
141 Ok(Self {
142 clock,
143 client_id,
144 config,
145 http_client,
146 ws_client,
147 is_connected: AtomicBool::new(false),
148 cancellation_token: CancellationToken::new(),
149 ws_stream_handle: Mutex::new(None),
150 pending_tasks: Mutex::new(Vec::new()),
151 data_sender,
152 instruments: Arc::new(AtomicMap::new()),
153 coin_to_instrument_id: Arc::new(AtomicMap::new()),
154 })
155 }
156
157 fn spawn_task<F>(&self, description: &'static str, fut: F)
158 where
159 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
160 {
161 let runtime = get_runtime();
162 let handle = runtime.spawn(async move {
163 if let Err(e) = fut.await {
164 log::warn!("{description} failed: {e:?}");
165 }
166 });
167
168 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
169 tasks.retain(|handle| !handle.is_finished());
170 tasks.push(handle);
171 }
172
173 fn abort_pending_tasks(&self) {
174 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
175 for handle in tasks.drain(..) {
176 handle.abort();
177 }
178 }
179
180 fn venue(&self) -> Venue {
181 *HYPERLIQUID_VENUE
182 }
183
184 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
185 let instruments = self
186 .http_client
187 .request_instruments()
188 .await
189 .context("failed to fetch instruments during bootstrap")?;
190
191 self.instruments.rcu(|m| {
192 for instrument in &instruments {
193 m.insert(instrument.id(), instrument.clone());
194 }
195 });
196
197 self.coin_to_instrument_id.rcu(|m| {
198 for instrument in &instruments {
199 m.insert(instrument.raw_symbol().inner(), instrument.id());
200 }
201 });
202
203 for instrument in &instruments {
204 self.ws_client.cache_instrument(instrument.clone());
205 }
206
207 log::info!(
208 "Bootstrapped {} instruments with {} coin mappings",
209 self.instruments.len(),
210 self.coin_to_instrument_id.len()
211 );
212 Ok(instruments)
213 }
214
215 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
216 let mut ws_client = self.ws_client.clone();
218
219 ws_client
220 .connect()
221 .await
222 .context("failed to connect to Hyperliquid WebSocket")?;
223
224 if let Some(handle) = ws_client.take_task_handle() {
226 self.ws_client.set_task_handle(handle);
227 }
228
229 let data_sender = self.data_sender.clone();
230 let cancellation_token = self.cancellation_token.clone();
231
232 let task = get_runtime().spawn(async move {
233 log::info!("Hyperliquid WebSocket consumption loop started");
234
235 loop {
236 tokio::select! {
237 () = cancellation_token.cancelled() => {
238 log::info!("WebSocket consumption loop cancelled");
239 break;
240 }
241 msg_opt = ws_client.next_event() => {
242 if let Some(msg) = msg_opt {
243 match msg {
244 NautilusWsMessage::Trades(trades) => {
245 for trade in trades {
246 if let Err(e) = data_sender
247 .send(DataEvent::Data(Data::Trade(trade)))
248 {
249 log::error!("Failed to send trade tick: {e}");
250 }
251 }
252 }
253 NautilusWsMessage::Quote(quote) => {
254 if let Err(e) = data_sender
255 .send(DataEvent::Data(Data::Quote(quote)))
256 {
257 log::error!("Failed to send quote tick: {e}");
258 }
259 }
260 NautilusWsMessage::Deltas(deltas) => {
261 if let Err(e) = data_sender
262 .send(DataEvent::Data(Data::Deltas(
263 OrderBookDeltas_API::new(deltas),
264 )))
265 {
266 log::error!("Failed to send order book deltas: {e}");
267 }
268 }
269 NautilusWsMessage::Depth10(depth) => {
270 if let Err(e) =
271 data_sender.send(DataEvent::Data(Data::Depth10(depth)))
272 {
273 log::error!("Failed to send order book depth10: {e}");
274 }
275 }
276 NautilusWsMessage::Candle(bar) => {
277 if let Err(e) = data_sender
278 .send(DataEvent::Data(Data::Bar(bar)))
279 {
280 log::error!("Failed to send bar: {e}");
281 }
282 }
283 NautilusWsMessage::MarkPrice(update) => {
284 if let Err(e) = data_sender
285 .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
286 {
287 log::error!("Failed to send mark price update: {e}");
288 }
289 }
290 NautilusWsMessage::IndexPrice(update) => {
291 if let Err(e) = data_sender
292 .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
293 {
294 log::error!("Failed to send index price update: {e}");
295 }
296 }
297 NautilusWsMessage::FundingRate(update) => {
298 if let Err(e) = data_sender
299 .send(DataEvent::FundingRate(update))
300 {
301 log::error!("Failed to send funding rate update: {e}");
302 }
303 }
304 NautilusWsMessage::CustomData(data) => {
305 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
306 log::error!("Failed to send custom data: {e}");
307 }
308 }
309 NautilusWsMessage::Reconnected => {
310 log::info!("WebSocket reconnected");
311 }
312 NautilusWsMessage::Error(e) => {
313 log::error!("WebSocket error: {e}");
314 }
315 NautilusWsMessage::ExecutionReports(_) => {
316 }
318 }
319 } else {
320 log::debug!("WebSocket next_event returned None, stream closed");
322 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
323 }
324 }
325 }
326 }
327
328 log::info!("Hyperliquid WebSocket consumption loop finished");
329 });
330
331 let mut slot = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
332 *slot = Some(task);
333 log::info!("WebSocket consumption task spawned");
334
335 Ok(())
336 }
337
338 #[allow(dead_code)]
339 fn handle_ws_message(
340 msg: HyperliquidWsMessage,
341 ws_client: &HyperliquidWebSocketClient,
342 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
343 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
344 coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
345 _venue: Venue,
346 clock: &'static AtomicTime,
347 ) {
348 match msg {
349 HyperliquidWsMessage::Bbo { data } => {
350 let coin = data.coin;
351 log::debug!("Received BBO message for coin: {coin}");
352
353 let coin_map = coin_to_instrument_id.load();
354 let instrument_id = coin_map.get(&data.coin);
355
356 if let Some(&instrument_id) = instrument_id {
357 let instruments_map = instruments.load();
358 if let Some(instrument) = instruments_map.get(&instrument_id) {
359 let ts_init = clock.get_time_ns();
360
361 match parse_ws_quote_tick(&data, instrument, ts_init) {
362 Ok(quote_tick) => {
363 log::debug!(
364 "Parsed quote tick for {}: bid={}, ask={}",
365 data.coin,
366 quote_tick.bid_price,
367 quote_tick.ask_price
368 );
369
370 if let Err(e) =
371 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
372 {
373 log::error!("Failed to send quote tick: {e}");
374 }
375 }
376 Err(e) => {
377 log::error!("Failed to parse quote tick for {}: {e}", data.coin);
378 }
379 }
380 }
381 } else {
382 log::warn!(
383 "Received BBO for unknown coin: {} (no matching instrument found)",
384 data.coin
385 );
386 }
387 }
388 HyperliquidWsMessage::Trades { data } => {
389 let count = data.len();
390 log::debug!("Received {count} trade(s)");
391
392 for trade_data in data {
393 let coin = trade_data.coin;
394 let coin_map = coin_to_instrument_id.load();
395
396 if let Some(&instrument_id) = coin_map.get(&coin) {
397 let instruments_map = instruments.load();
398 if let Some(instrument) = instruments_map.get(&instrument_id) {
399 let ts_init = clock.get_time_ns();
400
401 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
402 Ok(trade_tick) => {
403 if let Err(e) =
404 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
405 {
406 log::error!("Failed to send trade tick: {e}");
407 }
408 }
409 Err(e) => {
410 log::error!("Failed to parse trade tick for {coin}: {e}");
411 }
412 }
413 }
414 } else {
415 log::warn!("Received trade for unknown coin: {coin}");
416 }
417 }
418 }
419 HyperliquidWsMessage::L2Book { data } => {
420 let coin = data.coin;
421 log::debug!("Received L2 book update for coin: {coin}");
422
423 let coin_map = coin_to_instrument_id.load();
424 if let Some(&instrument_id) = coin_map.get(&data.coin) {
425 let instruments_map = instruments.load();
426 if let Some(instrument) = instruments_map.get(&instrument_id) {
427 let ts_init = clock.get_time_ns();
428
429 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
430 Ok(deltas) => {
431 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
432 OrderBookDeltas_API::new(deltas),
433 ))) {
434 log::error!("Failed to send order book deltas: {e}");
435 }
436 }
437 Err(e) => {
438 log::error!(
439 "Failed to parse order book deltas for {}: {e}",
440 data.coin
441 );
442 }
443 }
444 }
445 } else {
446 log::warn!("Received L2 book for unknown coin: {coin}");
447 }
448 }
449 HyperliquidWsMessage::Candle { data } => {
450 let coin = &data.s;
451 let interval = &data.i;
452 log::debug!("Received candle for {coin}:{interval}");
453
454 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
455 let coin = Ustr::from(&data.s);
456 let coin_map = coin_to_instrument_id.load();
457
458 if let Some(&instrument_id) = coin_map.get(&coin) {
459 let instruments_map = instruments.load();
460 if let Some(instrument) = instruments_map.get(&instrument_id) {
461 let ts_init = clock.get_time_ns();
462
463 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
464 Ok(bar) => {
465 if let Err(e) =
466 data_sender.send(DataEvent::Data(Data::Bar(bar)))
467 {
468 log::error!("Failed to send bar data: {e}");
469 }
470 }
471 Err(e) => {
472 log::error!("Failed to parse candle for {coin}: {e}");
473 }
474 }
475 }
476 } else {
477 log::warn!("Received candle for unknown coin: {coin}");
478 }
479 } else {
480 log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
481 }
482 }
483 _ => {
484 log::trace!("Received unhandled WebSocket message: {msg:?}");
485 }
486 }
487 }
488}
489
490impl HyperliquidDataClient {
491 #[allow(dead_code)]
492 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
493 if let Err(e) = sender.send(DataEvent::Data(data)) {
494 log::error!("Failed to emit data event: {e}");
495 }
496 }
497}
498
499#[async_trait::async_trait(?Send)]
500impl DataClient for HyperliquidDataClient {
501 fn client_id(&self) -> ClientId {
502 self.client_id
503 }
504
505 fn venue(&self) -> Option<Venue> {
506 Some(self.venue())
507 }
508
509 fn start(&mut self) -> anyhow::Result<()> {
510 log::info!(
511 "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
512 self.client_id,
513 self.config.environment,
514 self.config.proxy_url,
515 );
516 Ok(())
517 }
518
519 fn stop(&mut self) -> anyhow::Result<()> {
520 log::info!("Stopping Hyperliquid data client {}", self.client_id);
521 self.cancellation_token.cancel();
522 self.is_connected.store(false, Ordering::Relaxed);
523 Ok(())
524 }
525
526 fn reset(&mut self) -> anyhow::Result<()> {
527 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
528 self.is_connected.store(false, Ordering::Relaxed);
529 self.cancellation_token = CancellationToken::new();
530 self.abort_pending_tasks();
531
532 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
533 handle.abort();
534 }
535 Ok(())
536 }
537
538 fn dispose(&mut self) -> anyhow::Result<()> {
539 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
540 self.stop()
541 }
542
543 fn is_connected(&self) -> bool {
544 self.is_connected.load(Ordering::Acquire)
545 }
546
547 fn is_disconnected(&self) -> bool {
548 !self.is_connected()
549 }
550
551 async fn connect(&mut self) -> anyhow::Result<()> {
552 if self.is_connected() {
553 return Ok(());
554 }
555
556 if self.cancellation_token.is_cancelled() {
557 self.cancellation_token = CancellationToken::new();
558 }
559
560 register_hyperliquid_custom_data();
561
562 let instruments = self
563 .bootstrap_instruments()
564 .await
565 .context("failed to bootstrap instruments")?;
566
567 for instrument in instruments {
568 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
569 log::warn!("Failed to send instrument: {e}");
570 }
571 }
572
573 self.spawn_ws()
574 .await
575 .context("failed to spawn WebSocket client")?;
576
577 self.is_connected.store(true, Ordering::Relaxed);
578 log::info!("Connected: client_id={}", self.client_id);
579
580 Ok(())
581 }
582
583 async fn disconnect(&mut self) -> anyhow::Result<()> {
584 if !self.is_connected() {
585 return Ok(());
586 }
587
588 self.cancellation_token.cancel();
589
590 let ws_stream_handle = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take();
591 if let Some(handle) = ws_stream_handle
592 && let Err(e) = handle.await
593 {
594 log::error!("Error waiting for WebSocket stream task: {e}");
595 }
596
597 self.abort_pending_tasks();
598
599 if let Err(e) = self.ws_client.disconnect().await {
600 log::error!("Error disconnecting WebSocket client: {e}");
601 }
602
603 self.instruments.store(AHashMap::new());
604
605 self.is_connected.store(false, Ordering::Relaxed);
606 log::info!("Disconnected: client_id={}", self.client_id);
607
608 Ok(())
609 }
610
611 fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
612 let data_type = cmd.data_type.type_name();
613
614 if data_type == "HyperliquidAllMids" {
615 let ws = self.ws_client.clone();
616 let dex = cmd
617 .data_type
618 .metadata()
619 .as_ref()
620 .and_then(|m| m.get("dex"))
621 .and_then(|v| v.as_str())
622 .map(str::trim)
623 .filter(|value| !value.is_empty())
624 .map(ToString::to_string);
625
626 log::debug!("Subscribing to all mids (dex: {:?})", dex.as_deref());
627
628 self.spawn_task("subscribe_all_mids", async move {
629 ws.subscribe_all_mids_with_dex(dex.as_deref()).await
630 });
631
632 return Ok(());
633 }
634
635 log::warn!("Unsupported custom data subscription: {data_type}");
636 Ok(())
637 }
638
639 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
640 let data_type = cmd.data_type.type_name();
641
642 if data_type == "HyperliquidAllMids" {
643 let ws = self.ws_client.clone();
644 let dex = cmd
645 .data_type
646 .metadata()
647 .as_ref()
648 .and_then(|m| m.get("dex"))
649 .and_then(|v| v.as_str())
650 .map(str::trim)
651 .filter(|value| !value.is_empty())
652 .map(ToString::to_string);
653
654 log::debug!("Unsubscribing from all mids (dex: {:?})", dex.as_deref());
655
656 self.spawn_task("unsubscribe_all_mids", async move {
657 ws.unsubscribe_all_mids_with_dex(dex.as_deref()).await
658 });
659
660 return Ok(());
661 }
662
663 log::warn!("Unsupported custom data unsubscription: {data_type}");
664 Ok(())
665 }
666
667 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
668 let instruments = self.instruments.load();
669 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
670 if let Err(e) = self
671 .data_sender
672 .send(DataEvent::Instrument(instrument.clone()))
673 {
674 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
675 }
676 } else {
677 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
678 }
679 Ok(())
680 }
681
682 fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
683 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
684
685 if subscription.book_type != BookType::L2_MBP {
686 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
687 }
688
689 let ws = self.ws_client.clone();
690 let instrument_id = subscription.instrument_id;
691 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
692
693 self.spawn_task("subscribe_book_deltas", async move {
694 ws.subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
695 .await
696 });
697
698 Ok(())
699 }
700
701 fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
702 log::debug!(
703 "Subscribing to book depth10: {}",
704 subscription.instrument_id
705 );
706
707 if subscription.book_type != BookType::L2_MBP {
708 anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
709 }
710
711 let ws = self.ws_client.clone();
712 let instrument_id = subscription.instrument_id;
713 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
714
715 self.spawn_task("subscribe_book_depth10", async move {
716 ws.subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
717 .await
718 });
719
720 Ok(())
721 }
722
723 fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
724 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
725
726 let ws = self.ws_client.clone();
727 let instrument_id = subscription.instrument_id;
728
729 self.spawn_task("subscribe_quotes", async move {
730 ws.subscribe_quotes(instrument_id).await
731 });
732
733 Ok(())
734 }
735
736 fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
737 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
738
739 let ws = self.ws_client.clone();
740 let instrument_id = subscription.instrument_id;
741
742 self.spawn_task("subscribe_trades", async move {
743 ws.subscribe_trades(instrument_id).await
744 });
745
746 Ok(())
747 }
748
749 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
750 let ws = self.ws_client.clone();
751 let instrument_id = cmd.instrument_id;
752
753 self.spawn_task("subscribe_mark_prices", async move {
754 ws.subscribe_mark_prices(instrument_id).await
755 });
756
757 Ok(())
758 }
759
760 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
761 let ws = self.ws_client.clone();
762 let instrument_id = cmd.instrument_id;
763
764 self.spawn_task("subscribe_index_prices", async move {
765 ws.subscribe_index_prices(instrument_id).await
766 });
767
768 Ok(())
769 }
770
771 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
772 let ws = self.ws_client.clone();
773 let instrument_id = cmd.instrument_id;
774
775 self.spawn_task("subscribe_funding_rates", async move {
776 ws.subscribe_funding_rates(instrument_id).await
777 });
778
779 Ok(())
780 }
781
782 fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
783 log::debug!("Subscribing to bars: {}", subscription.bar_type);
784
785 let instrument_id = subscription.bar_type.instrument_id();
786 if !self.instruments.contains_key(&instrument_id) {
787 anyhow::bail!("Instrument {instrument_id} not found");
788 }
789
790 let bar_type = subscription.bar_type;
791 let ws = self.ws_client.clone();
792
793 self.spawn_task("subscribe_bars", async move {
794 ws.subscribe_bars(bar_type).await
795 });
796
797 Ok(())
798 }
799
800 fn unsubscribe_book_deltas(
801 &mut self,
802 unsubscription: &UnsubscribeBookDeltas,
803 ) -> anyhow::Result<()> {
804 log::debug!(
805 "Unsubscribing from book deltas: {}",
806 unsubscription.instrument_id
807 );
808
809 let ws = self.ws_client.clone();
810 let instrument_id = unsubscription.instrument_id;
811
812 self.spawn_task("unsubscribe_book_deltas", async move {
813 ws.unsubscribe_book(instrument_id).await
814 });
815
816 Ok(())
817 }
818
819 fn unsubscribe_book_depth10(
820 &mut self,
821 unsubscription: &UnsubscribeBookDepth10,
822 ) -> anyhow::Result<()> {
823 log::debug!(
824 "Unsubscribing from book depth10: {}",
825 unsubscription.instrument_id
826 );
827
828 let ws = self.ws_client.clone();
829 let instrument_id = unsubscription.instrument_id;
830
831 self.spawn_task("unsubscribe_book_depth10", async move {
832 ws.unsubscribe_book_depth10(instrument_id).await
833 });
834
835 Ok(())
836 }
837
838 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
839 log::debug!(
840 "Unsubscribing from quotes: {}",
841 unsubscription.instrument_id
842 );
843
844 let ws = self.ws_client.clone();
845 let instrument_id = unsubscription.instrument_id;
846
847 self.spawn_task("unsubscribe_quotes", async move {
848 ws.unsubscribe_quotes(instrument_id).await
849 });
850
851 Ok(())
852 }
853
854 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
855 log::debug!(
856 "Unsubscribing from trades: {}",
857 unsubscription.instrument_id
858 );
859
860 let ws = self.ws_client.clone();
861 let instrument_id = unsubscription.instrument_id;
862
863 self.spawn_task("unsubscribe_trades", async move {
864 ws.unsubscribe_trades(instrument_id).await
865 });
866
867 Ok(())
868 }
869
870 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
871 let ws = self.ws_client.clone();
872 let instrument_id = cmd.instrument_id;
873
874 self.spawn_task("unsubscribe_mark_prices", async move {
875 ws.unsubscribe_mark_prices(instrument_id).await
876 });
877
878 Ok(())
879 }
880
881 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
882 let ws = self.ws_client.clone();
883 let instrument_id = cmd.instrument_id;
884
885 self.spawn_task("unsubscribe_index_prices", async move {
886 ws.unsubscribe_index_prices(instrument_id).await
887 });
888
889 Ok(())
890 }
891
892 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
893 let ws = self.ws_client.clone();
894 let instrument_id = cmd.instrument_id;
895
896 self.spawn_task("unsubscribe_funding_rates", async move {
897 ws.unsubscribe_funding_rates(instrument_id).await
898 });
899
900 Ok(())
901 }
902
903 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
904 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
905
906 let bar_type = unsubscription.bar_type;
907 let ws = self.ws_client.clone();
908
909 self.spawn_task("unsubscribe_bars", async move {
910 ws.unsubscribe_bars(bar_type).await
911 });
912
913 Ok(())
914 }
915
916 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
917 log::debug!("Requesting all instruments");
918
919 let http = self.http_client.clone();
920 let sender = self.data_sender.clone();
921 let instruments_cache = self.instruments.clone();
922 let coin_map = self.coin_to_instrument_id.clone();
923 let ws_instruments = self.ws_client.instruments_cache();
924 let request_id = request.request_id;
925 let client_id = request.client_id.unwrap_or(self.client_id);
926 let venue = self.venue();
927 let start_nanos = datetime_to_unix_nanos(request.start);
928 let end_nanos = datetime_to_unix_nanos(request.end);
929 let params = request.params;
930 let clock = self.clock;
931
932 self.spawn_task("request_instruments", async move {
933 let instruments = http
934 .request_instruments()
935 .await
936 .context("failed to fetch instruments from Hyperliquid")?;
937
938 instruments_cache.rcu(|instruments_map| {
939 coin_map.rcu(|coin_to_id| {
940 for instrument in &instruments {
941 let instrument_id = instrument.id();
942 instruments_map.insert(instrument_id, instrument.clone());
943 let coin = instrument.raw_symbol().inner();
944 coin_to_id.insert(coin, instrument_id);
945 ws_instruments.insert(coin, instrument.clone());
946 }
947 });
948 });
949
950 let response = DataResponse::Instruments(InstrumentsResponse::new(
951 request_id,
952 client_id,
953 venue,
954 instruments,
955 start_nanos,
956 end_nanos,
957 clock.get_time_ns(),
958 params,
959 ));
960
961 if let Err(e) = sender.send(DataEvent::Response(response)) {
962 log::error!("Failed to send instruments response: {e}");
963 }
964 Ok(())
965 });
966
967 Ok(())
968 }
969
970 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
971 log::debug!("Requesting instrument: {}", request.instrument_id);
972
973 let http = self.http_client.clone();
974 let sender = self.data_sender.clone();
975 let instruments_cache = self.instruments.clone();
976 let coin_map = self.coin_to_instrument_id.clone();
977 let ws_instruments = self.ws_client.instruments_cache();
978 let instrument_id = request.instrument_id;
979 let request_id = request.request_id;
980 let client_id = request.client_id.unwrap_or(self.client_id);
981 let start_nanos = datetime_to_unix_nanos(request.start);
982 let end_nanos = datetime_to_unix_nanos(request.end);
983 let params = request.params;
984 let clock = self.clock;
985
986 self.spawn_task("request_instrument", async move {
987 let all_instruments = http
988 .request_instruments()
989 .await
990 .context("failed to fetch instruments from Hyperliquid")?;
991
992 instruments_cache.rcu(|instruments_map| {
993 coin_map.rcu(|coin_to_id| {
994 for instrument in &all_instruments {
995 let id = instrument.id();
996 instruments_map.insert(id, instrument.clone());
997 let coin = instrument.raw_symbol().inner();
998 coin_to_id.insert(coin, id);
999 ws_instruments.insert(coin, instrument.clone());
1000 }
1001 });
1002 });
1003
1004 if let Some(instrument) = all_instruments
1005 .into_iter()
1006 .find(|i| i.id() == instrument_id)
1007 {
1008 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1009 request_id,
1010 client_id,
1011 instrument.id(),
1012 instrument,
1013 start_nanos,
1014 end_nanos,
1015 clock.get_time_ns(),
1016 params,
1017 )));
1018
1019 if let Err(e) = sender.send(DataEvent::Response(response)) {
1020 log::error!("Failed to send instrument response: {e}");
1021 }
1022 } else {
1023 log::error!("Instrument not found: {instrument_id}");
1024 }
1025 Ok(())
1026 });
1027
1028 Ok(())
1029 }
1030
1031 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1032 log::debug!("Requesting bars for {}", request.bar_type);
1033
1034 let http = self.http_client.clone();
1035 let sender = self.data_sender.clone();
1036 let bar_type = request.bar_type;
1037 let start = request.start;
1038 let end = request.end;
1039 let limit = request.limit.map(|n| n.get() as u32);
1040 let request_id = request.request_id;
1041 let client_id = request.client_id.unwrap_or(self.client_id);
1042 let params = request.params;
1043 let clock = self.clock;
1044 let start_nanos = datetime_to_unix_nanos(start);
1045 let end_nanos = datetime_to_unix_nanos(end);
1046 let instruments = Arc::clone(&self.instruments);
1047
1048 self.spawn_task("request_bars", async move {
1049 let bars = request_bars_from_http(http, bar_type, start, end, limit, instruments)
1050 .await
1051 .context("bar request failed")?;
1052
1053 let response = DataResponse::Bars(BarsResponse::new(
1054 request_id,
1055 client_id,
1056 bar_type,
1057 bars,
1058 start_nanos,
1059 end_nanos,
1060 clock.get_time_ns(),
1061 params,
1062 ));
1063
1064 if let Err(e) = sender.send(DataEvent::Response(response)) {
1065 log::error!("Failed to send bars response: {e}");
1066 }
1067 Ok(())
1068 });
1069
1070 Ok(())
1071 }
1072
1073 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1074 anyhow::bail!(
1079 "Historical trade requests are not supported by Hyperliquid for {}; \
1080 subscribe to trades via WebSocket for live trade ticks",
1081 request.instrument_id,
1082 )
1083 }
1084
1085 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1086 let instrument_id = request.instrument_id;
1087 log::debug!("Requesting funding rates for {instrument_id}");
1088
1089 let instruments = self.instruments.load();
1090 let instrument = instruments
1091 .get(&instrument_id)
1092 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1093
1094 if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1095 anyhow::bail!("Funding rates are only available for perpetual instruments");
1096 }
1097
1098 let coin = instrument.raw_symbol().to_string();
1099 let http = self.http_client.clone();
1100 let sender = self.data_sender.clone();
1101 let client_id = request.client_id.unwrap_or(self.client_id);
1102 let request_id = request.request_id;
1103 let params = request.params;
1104 let clock = self.clock;
1105 let limit = request.limit.map(|n| n.get());
1106 let start_dt = request.start;
1107 let end_dt = request.end;
1108 let start_nanos = datetime_to_unix_nanos(start_dt);
1109 let end_nanos = datetime_to_unix_nanos(end_dt);
1110
1111 let now_ms = Utc::now().timestamp_millis() as u64;
1112
1113 let default_lookback_ms: u64 = 7 * 86_400_000;
1115 let start_ms = match start_dt {
1116 Some(dt) => dt.timestamp_millis().max(0) as u64,
1117 None => now_ms.saturating_sub(default_lookback_ms),
1118 };
1119 let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1120
1121 self.spawn_task("request_funding_rates", async move {
1122 let entries = http
1123 .info_funding_history(&coin, start_ms, end_ms)
1124 .await
1125 .with_context(|| format!("funding rates request failed for {instrument_id}"))?;
1126
1127 let mut funding_rates: Vec<FundingRateUpdate> = entries
1128 .iter()
1129 .filter_map(
1130 |entry| match funding_entry_to_update(entry, instrument_id) {
1131 Ok(update) => Some(update),
1132 Err(e) => {
1133 log::warn!("Skipping funding history entry for {instrument_id}: {e}",);
1134 None
1135 }
1136 },
1137 )
1138 .collect();
1139
1140 if let Some(limit) = limit
1141 && funding_rates.len() > limit
1142 {
1143 funding_rates.truncate(limit);
1144 }
1145
1146 log::debug!(
1147 "Fetched {} funding rates for {instrument_id}",
1148 funding_rates.len(),
1149 );
1150
1151 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1152 request_id,
1153 client_id,
1154 instrument_id,
1155 funding_rates,
1156 start_nanos,
1157 end_nanos,
1158 clock.get_time_ns(),
1159 params,
1160 ));
1161
1162 if let Err(e) = sender.send(DataEvent::Response(response)) {
1163 log::error!("Failed to send funding rates response: {e}");
1164 }
1165 Ok(())
1166 });
1167
1168 Ok(())
1169 }
1170
1171 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1172 let instrument_id = request.instrument_id;
1173 let instruments = self.instruments.load();
1174 let instrument = instruments
1175 .get(&instrument_id)
1176 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1177
1178 let raw_symbol = instrument.raw_symbol().to_string();
1179 let price_precision = instrument.price_precision();
1180 let size_precision = instrument.size_precision();
1181 let depth = request.depth.map(|d| d.get());
1182
1183 let http = self.http_client.clone();
1184 let sender = self.data_sender.clone();
1185 let client_id = request.client_id.unwrap_or(self.client_id);
1186 let request_id = request.request_id;
1187 let params = request.params;
1188 let clock = self.clock;
1189
1190 self.spawn_task("request_book_snapshot", async move {
1191 let l2_book = http
1192 .info_l2_book(&raw_symbol)
1193 .await
1194 .with_context(|| format!("book snapshot request failed for {instrument_id}"))?;
1195
1196 let book = parse_l2_book_snapshot(
1197 &l2_book,
1198 instrument_id,
1199 price_precision,
1200 size_precision,
1201 depth,
1202 );
1203
1204 let response = DataResponse::Book(BookResponse::new(
1205 request_id,
1206 client_id,
1207 instrument_id,
1208 book,
1209 None,
1210 None,
1211 clock.get_time_ns(),
1212 params,
1213 ));
1214
1215 if let Err(e) = sender.send(DataEvent::Response(response)) {
1216 log::error!("Failed to send book snapshot response: {e}");
1217 }
1218 Ok(())
1219 });
1220
1221 Ok(())
1222 }
1223}
1224
1225pub(crate) fn parse_l2_book_snapshot(
1229 l2_book: &HyperliquidL2Book,
1230 instrument_id: InstrumentId,
1231 price_precision: u8,
1232 size_precision: u8,
1233 depth: Option<usize>,
1234) -> OrderBook {
1235 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1236 let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1237
1238 let all_bids = l2_book
1239 .levels
1240 .first()
1241 .map_or([].as_slice(), |v| v.as_slice());
1242 let all_asks = l2_book
1243 .levels
1244 .get(1)
1245 .map_or([].as_slice(), |v| v.as_slice());
1246
1247 let bids = match depth {
1248 Some(d) if d < all_bids.len() => &all_bids[..d],
1249 _ => all_bids,
1250 };
1251 let asks = match depth {
1252 Some(d) if d < all_asks.len() => &all_asks[..d],
1253 _ => all_asks,
1254 };
1255
1256 for (i, level) in bids.iter().enumerate() {
1257 let Ok(px) = level.px.parse::<f64>() else {
1258 continue;
1259 };
1260 let Ok(sz) = level.sz.parse::<f64>() else {
1261 continue;
1262 };
1263
1264 if sz > 0.0 {
1265 let price = Price::new(px, price_precision);
1266 let size = Quantity::new(sz, size_precision);
1267 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1268 book.add(order, 0, i as u64, ts_event);
1269 }
1270 }
1271
1272 let bids_len = bids.len();
1273
1274 for (i, level) in asks.iter().enumerate() {
1275 let Ok(px) = level.px.parse::<f64>() else {
1276 continue;
1277 };
1278 let Ok(sz) = level.sz.parse::<f64>() else {
1279 continue;
1280 };
1281
1282 if sz > 0.0 {
1283 let price = Price::new(px, price_precision);
1284 let size = Quantity::new(sz, size_precision);
1285 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1286 book.add(order, 0, (bids_len + i) as u64, ts_event);
1287 }
1288 }
1289
1290 log::info!(
1291 "Built order book for {instrument_id} with {} bids and {} asks",
1292 bids.len(),
1293 asks.len(),
1294 );
1295
1296 book
1297}
1298
1299pub(crate) fn parse_book_precision_params(
1302 params: Option<&Params>,
1303) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1304 let Some(params) = params else {
1305 return Ok((None, None));
1306 };
1307
1308 let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1309 match params.get(key) {
1310 None => Ok(None),
1311 Some(v) => v
1312 .as_u64()
1313 .and_then(|n| u32::try_from(n).ok())
1314 .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1315 .map(Some),
1316 }
1317 };
1318
1319 Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1320}
1321
1322pub(crate) fn funding_entry_to_update(
1325 entry: &HyperliquidFundingHistoryEntry,
1326 instrument_id: InstrumentId,
1327) -> anyhow::Result<FundingRateUpdate> {
1328 let rate: Decimal = entry
1329 .funding_rate
1330 .parse()
1331 .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1332 let ts = UnixNanos::from(entry.time * 1_000_000);
1333 Ok(FundingRateUpdate::new(
1334 instrument_id,
1335 rate,
1336 Some(60),
1337 None,
1338 ts,
1339 ts,
1340 ))
1341}
1342
1343pub(crate) fn candle_to_bar(
1344 candle: &HyperliquidCandle,
1345 bar_type: BarType,
1346 price_precision: u8,
1347 size_precision: u8,
1348) -> anyhow::Result<Bar> {
1349 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1350 let ts_event = ts_init;
1351
1352 let open = candle.open.parse::<f64>().context("parse open price")?;
1353 let high = candle.high.parse::<f64>().context("parse high price")?;
1354 let low = candle.low.parse::<f64>().context("parse low price")?;
1355 let close = candle.close.parse::<f64>().context("parse close price")?;
1356 let volume = candle.volume.parse::<f64>().context("parse volume")?;
1357
1358 Ok(Bar::new(
1359 bar_type,
1360 Price::new(open, price_precision),
1361 Price::new(high, price_precision),
1362 Price::new(low, price_precision),
1363 Price::new(close, price_precision),
1364 Quantity::new(volume, size_precision),
1365 ts_event,
1366 ts_init,
1367 ))
1368}
1369
1370async fn request_bars_from_http(
1372 http_client: HyperliquidHttpClient,
1373 bar_type: BarType,
1374 start: Option<DateTime<Utc>>,
1375 end: Option<DateTime<Utc>>,
1376 limit: Option<u32>,
1377 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1378) -> anyhow::Result<Vec<Bar>> {
1379 let instrument_id = bar_type.instrument_id();
1381 let instrument = instruments
1382 .load()
1383 .get(&instrument_id)
1384 .cloned()
1385 .context("instrument not found in cache")?;
1386
1387 let price_precision = instrument.price_precision();
1388 let size_precision = instrument.size_precision();
1389 let raw_symbol = instrument.raw_symbol();
1390 let coin = raw_symbol.as_str();
1391
1392 let interval = bar_type_to_interval(&bar_type)?;
1393
1394 let now = Utc::now();
1396 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1397 let start_time = if let Some(start) = start {
1398 start.timestamp_millis() as u64
1399 } else {
1400 let spec = bar_type.spec();
1402 let step_ms = match spec.aggregation {
1403 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1404 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1405 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1406 _ => 60_000,
1407 };
1408 end_time.saturating_sub(1000 * step_ms)
1409 };
1410
1411 let candles = http_client
1412 .info_candle_snapshot(coin, interval, start_time, end_time)
1413 .await
1414 .context("failed to fetch candle snapshot from Hyperliquid")?;
1415
1416 let mut bars: Vec<Bar> = candles
1417 .iter()
1418 .filter_map(|candle| {
1419 candle_to_bar(candle, bar_type, price_precision, size_precision)
1420 .map_err(|e| {
1421 log::warn!("Failed to convert candle to bar: {e}");
1422 e
1423 })
1424 .ok()
1425 })
1426 .collect();
1427
1428 if let Some(limit) = limit
1429 && bars.len() > limit as usize
1430 {
1431 bars = bars.into_iter().take(limit as usize).collect();
1432 }
1433
1434 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1435 Ok(bars)
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440 use rstest::rstest;
1441 use rust_decimal_macros::dec;
1442 use ustr::Ustr;
1443
1444 use super::*;
1445 use crate::common::testing::load_test_data;
1446
1447 fn btc_perp_id() -> InstrumentId {
1448 InstrumentId::from("BTC-PERP.HYPERLIQUID")
1449 }
1450
1451 #[rstest]
1452 fn test_funding_entry_to_update_parses_positive_rate() {
1453 let entry = HyperliquidFundingHistoryEntry {
1454 coin: Ustr::from("BTC"),
1455 funding_rate: "0.0000125".to_string(),
1456 premium: Some("0.00029005".to_string()),
1457 time: 1769908800000,
1458 };
1459 let instrument_id = btc_perp_id();
1460
1461 let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1462
1463 assert_eq!(update.instrument_id, instrument_id);
1464 assert_eq!(update.rate, dec!(0.0000125));
1465 assert_eq!(update.interval, Some(60));
1466 assert!(update.next_funding_ns.is_none());
1467 assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1468 assert_eq!(update.ts_init, update.ts_event);
1469 }
1470
1471 #[rstest]
1472 fn test_funding_entry_to_update_handles_negative_rate() {
1473 let entry = HyperliquidFundingHistoryEntry {
1474 coin: Ustr::from("BTC"),
1475 funding_rate: "-0.0000081".to_string(),
1476 premium: None,
1477 time: 1769912400000,
1478 };
1479 let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1480 assert_eq!(update.rate, dec!(-0.0000081));
1481 }
1482
1483 #[rstest]
1484 fn test_funding_entry_to_update_rejects_invalid_rate() {
1485 let entry = HyperliquidFundingHistoryEntry {
1486 coin: Ustr::from("BTC"),
1487 funding_rate: "not-a-number".to_string(),
1488 premium: None,
1489 time: 1769912400000,
1490 };
1491 let result = funding_entry_to_update(&entry, btc_perp_id());
1492 assert!(result.is_err());
1493 }
1494
1495 #[rstest]
1496 fn test_parse_book_precision_params_none() {
1497 let (n, m) = parse_book_precision_params(None).unwrap();
1498 assert_eq!(n, None);
1499 assert_eq!(m, None);
1500 }
1501
1502 fn make_params(json: serde_json::Value) -> Params {
1503 serde_json::from_value(json).expect("valid params payload")
1504 }
1505
1506 #[rstest]
1507 fn test_parse_book_precision_params_only_n_sig_figs() {
1508 let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1509 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1510 assert_eq!(n, Some(4));
1511 assert_eq!(m, None);
1512 }
1513
1514 #[rstest]
1515 fn test_parse_book_precision_params_both() {
1516 let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1517 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1518 assert_eq!(n, Some(5));
1519 assert_eq!(m, Some(2));
1520 }
1521
1522 #[rstest]
1523 fn test_parse_book_precision_params_rejects_negative() {
1524 let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1525 let err = parse_book_precision_params(Some(¶ms)).unwrap_err();
1526 assert!(err.to_string().contains("n_sig_figs"));
1527 }
1528
1529 #[rstest]
1530 fn test_funding_history_fixture_parses() {
1531 let entries: Vec<HyperliquidFundingHistoryEntry> =
1532 load_test_data("http_funding_history.json");
1533 assert_eq!(entries.len(), 3);
1534 assert_eq!(entries[0].coin.as_str(), "BTC");
1535 assert_eq!(entries[0].funding_rate, "0.0000125");
1536 assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1537 assert!(entries[2].premium.is_none());
1538
1539 let updates: Vec<FundingRateUpdate> = entries
1540 .iter()
1541 .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1542 .collect();
1543 assert_eq!(updates.len(), 3);
1544 assert_eq!(updates[0].rate, dec!(0.0000125));
1545 assert_eq!(updates[1].rate, dec!(-0.0000081));
1546 assert_eq!(updates[2].rate, dec!(0.0000033));
1547 }
1548
1549 fn level(px: &str, sz: &str) -> crate::http::models::HyperliquidLevel {
1550 crate::http::models::HyperliquidLevel {
1551 px: px.to_string(),
1552 sz: sz.to_string(),
1553 }
1554 }
1555
1556 fn sample_l2_book() -> HyperliquidL2Book {
1557 HyperliquidL2Book {
1558 coin: Ustr::from("BTC"),
1559 levels: vec![
1560 vec![
1561 level("98450.50", "2.5"),
1562 level("98449.00", "1.2"),
1563 level("98448.00", "0.8"),
1564 ],
1565 vec![
1566 level("98451.00", "1.5"),
1567 level("98452.00", "2.0"),
1568 level("98453.00", "0.5"),
1569 ],
1570 ],
1571 time: 1769908800000,
1572 }
1573 }
1574
1575 #[rstest]
1576 fn test_parse_l2_book_snapshot_populates_both_sides() {
1577 let book_data = sample_l2_book();
1578 let instrument_id = btc_perp_id();
1579 let book = parse_l2_book_snapshot(&book_data, instrument_id, 2, 4, None);
1580
1581 assert_eq!(book.instrument_id, instrument_id);
1582 assert_eq!(book.book_type, BookType::L2_MBP);
1583 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1584 assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1585 assert_eq!(book.best_bid_size(), Some(Quantity::new(2.5, 4)));
1586 assert_eq!(book.best_ask_size(), Some(Quantity::new(1.5, 4)));
1587 assert_eq!(book.update_count, 6);
1588 }
1589
1590 #[rstest]
1591 fn test_parse_l2_book_snapshot_truncates_to_depth() {
1592 let book_data = sample_l2_book();
1593 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, Some(1));
1594
1595 assert_eq!(book.update_count, 2);
1597 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1598 assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1599 }
1600
1601 #[rstest]
1602 fn test_parse_l2_book_snapshot_uses_venue_time_as_ts_event() {
1603 let book_data = sample_l2_book();
1604 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1605 let expected_ts = UnixNanos::from(1769908800000_u64 * 1_000_000);
1606
1607 assert_eq!(book.ts_last, expected_ts);
1610 }
1611
1612 #[rstest]
1613 fn test_parse_l2_book_snapshot_skips_non_positive_size() {
1614 let book_data = HyperliquidL2Book {
1615 coin: Ustr::from("BTC"),
1616 levels: vec![
1617 vec![level("98450.50", "2.5"), level("98449.00", "0")],
1618 vec![level("98451.00", "0"), level("98452.00", "1.5")],
1619 ],
1620 time: 1769908800000,
1621 };
1622 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1623
1624 assert_eq!(book.update_count, 2, "zero-sized levels must be skipped");
1625 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1626 assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1627 }
1628
1629 #[rstest]
1630 fn test_parse_l2_book_snapshot_skips_unparsable_levels() {
1631 let book_data = HyperliquidL2Book {
1632 coin: Ustr::from("BTC"),
1633 levels: vec![
1634 vec![level("not-a-number", "1.0"), level("98449.00", "1.2")],
1635 vec![level("98451.00", "garbage"), level("98452.00", "1.5")],
1636 ],
1637 time: 1769908800000,
1638 };
1639 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1640
1641 assert_eq!(book.update_count, 2);
1643 assert_eq!(book.best_bid_price(), Some(Price::new(98449.00, 2)));
1644 assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1645 }
1646
1647 #[rstest]
1648 fn test_parse_l2_book_snapshot_empty_levels_yields_empty_book() {
1649 let book_data = HyperliquidL2Book {
1650 coin: Ustr::from("BTC"),
1651 levels: vec![],
1652 time: 1769908800000,
1653 };
1654 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1655
1656 assert_eq!(book.update_count, 0);
1657 assert!(book.best_bid_price().is_none());
1658 assert!(book.best_ask_price().is_none());
1659 }
1660}