1use std::{
17 str::FromStr,
18 sync::{
19 Arc, Mutex,
20 atomic::{AtomicBool, Ordering},
21 },
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use chrono::{DateTime, Utc};
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 messages::{
31 DataEvent,
32 data::{
33 BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
34 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeCustomData, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes,
38 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
39 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
40 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43};
44use nautilus_core::{
45 AtomicMap, MUTEX_POISONED, Params, UnixNanos,
46 datetime::datetime_to_unix_nanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50 data::{Bar, BarType, BookOrder, Data, DataType, FundingRateUpdate, OrderBookDeltas_API},
51 enums::{BarAggregation, BookType, OrderSide},
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 types::{Price, Quantity},
56};
57use rust_decimal::Decimal;
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use crate::{
63 common::{
64 consts::HYPERLIQUID_VENUE,
65 credential::{Secrets, credential_env_vars},
66 parse::bar_type_to_interval,
67 },
68 config::HyperliquidDataClientConfig,
69 data_types::register_hyperliquid_custom_data,
70 http::{
71 client::HyperliquidHttpClient,
72 models::{HyperliquidCandle, HyperliquidFundingHistoryEntry, HyperliquidL2Book},
73 },
74 websocket::{client::HyperliquidWebSocketClient, messages::NautilusWsMessage},
75};
76
77#[derive(Debug)]
78pub struct HyperliquidDataClient {
79 clock: &'static AtomicTime,
80 client_id: ClientId,
81 config: HyperliquidDataClientConfig,
82 http_client: HyperliquidHttpClient,
83 ws_client: HyperliquidWebSocketClient,
84 is_connected: AtomicBool,
85 cancellation_token: CancellationToken,
86 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
87 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
88 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
89 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90 coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
91}
92
93impl HyperliquidDataClient {
94 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
100 let clock = get_atomic_clock_realtime();
101 let data_sender = get_data_event_sender();
102
103 let (pk_var, _) = credential_env_vars(config.environment);
106 let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
107
108 let mut http_client = if has_credentials {
109 let secrets =
110 Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
111 HyperliquidHttpClient::with_secrets(
112 &secrets,
113 config.http_timeout_secs,
114 config.proxy_url.clone(),
115 )?
116 } else {
117 HyperliquidHttpClient::new(
118 config.environment,
119 config.http_timeout_secs,
120 config.proxy_url.clone(),
121 )?
122 };
123
124 if let Some(url) = &config.base_url_http {
125 http_client.set_base_info_url(url.clone());
126 }
127
128 let ws_url = config.base_url_ws.clone();
129 let ws_client = HyperliquidWebSocketClient::new(
130 ws_url,
131 config.environment,
132 None,
133 config.transport_backend,
134 config.proxy_url.clone(),
135 );
136
137 Ok(Self {
138 clock,
139 client_id,
140 config,
141 http_client,
142 ws_client,
143 is_connected: AtomicBool::new(false),
144 cancellation_token: CancellationToken::new(),
145 ws_stream_handle: Mutex::new(None),
146 pending_tasks: Mutex::new(Vec::new()),
147 data_sender,
148 instruments: Arc::new(AtomicMap::new()),
149 coin_to_instrument_id: Arc::new(AtomicMap::new()),
150 })
151 }
152
153 fn spawn_task<F>(&self, description: &'static str, fut: F)
154 where
155 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
156 {
157 let runtime = get_runtime();
158 let handle = runtime.spawn(async move {
159 if let Err(e) = fut.await {
160 log::warn!("{description} failed: {e:?}");
161 }
162 });
163
164 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
165 tasks.retain(|handle| !handle.is_finished());
166 tasks.push(handle);
167 }
168
169 fn abort_pending_tasks(&self) {
170 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
171 for handle in tasks.drain(..) {
172 handle.abort();
173 }
174 }
175
176 fn venue(&self) -> Venue {
177 *HYPERLIQUID_VENUE
178 }
179
180 fn custom_instrument_id(data_type: &DataType) -> anyhow::Result<Option<InstrumentId>> {
181 let Some(raw_instrument_id) = data_type
182 .metadata()
183 .and_then(|m| m.get("instrument_id"))
184 .and_then(|v| v.as_str())
185 .map(str::trim)
186 .filter(|value| !value.is_empty())
187 else {
188 return Ok(None);
189 };
190
191 let instrument_id = InstrumentId::from_str(raw_instrument_id)
192 .with_context(|| format!("invalid instrument_id metadata `{raw_instrument_id}`"))?;
193
194 Ok(Some(instrument_id))
195 }
196
197 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
198 let instruments = self
199 .http_client
200 .request_instruments()
201 .await
202 .context("failed to fetch instruments during bootstrap")?;
203
204 self.instruments.rcu(|m| {
205 for instrument in &instruments {
206 m.insert(instrument.id(), instrument.clone());
207 }
208 });
209
210 self.coin_to_instrument_id.rcu(|m| {
211 for instrument in &instruments {
212 m.insert(instrument.raw_symbol().inner(), instrument.id());
213 }
214 });
215
216 for instrument in &instruments {
217 self.http_client.cache_instrument(instrument);
218 self.ws_client.cache_instrument(instrument.clone());
219 }
220
221 match self
222 .http_client
223 .build_all_dex_asset_ctxs_instrument_ids()
224 .await
225 {
226 Ok(mapping) => {
227 let mapping = mapping
228 .into_iter()
229 .map(|(dex, instrument_ids)| (Ustr::from(dex.as_str()), instrument_ids))
230 .collect();
231 self.ws_client
232 .cache_all_dex_asset_ctxs_instrument_ids(mapping);
233 }
234 Err(e) => {
235 log::warn!("Failed to build Hyperliquid allDexsAssetCtxs mapping: {e}");
236 }
237 }
238
239 log::info!(
240 "Bootstrapped {} instruments with {} coin mappings",
241 self.instruments.len(),
242 self.coin_to_instrument_id.len()
243 );
244 Ok(instruments)
245 }
246
247 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
248 let mut ws_client = self.ws_client.clone();
250
251 ws_client
252 .connect()
253 .await
254 .context("failed to connect to Hyperliquid WebSocket")?;
255
256 if let Some(handle) = ws_client.take_task_handle() {
258 self.ws_client.set_task_handle(handle);
259 }
260
261 let data_sender = self.data_sender.clone();
262 let cancellation_token = self.cancellation_token.clone();
263
264 let task = get_runtime().spawn(async move {
265 log::info!("Hyperliquid WebSocket consumption loop started");
266
267 loop {
268 tokio::select! {
269 () = cancellation_token.cancelled() => {
270 log::info!("WebSocket consumption loop cancelled");
271 break;
272 }
273 msg_opt = ws_client.next_event() => {
274 if let Some(msg) = msg_opt {
275 match msg {
276 NautilusWsMessage::Trades(trades) => {
277 for trade in trades {
278 if let Err(e) = data_sender
279 .send(DataEvent::Data(Data::Trade(trade)))
280 {
281 log::error!("Failed to send trade tick: {e}");
282 }
283 }
284 }
285 NautilusWsMessage::Quote(quote) => {
286 if let Err(e) = data_sender
287 .send(DataEvent::Data(Data::Quote(quote)))
288 {
289 log::error!("Failed to send quote tick: {e}");
290 }
291 }
292 NautilusWsMessage::Deltas(deltas) => {
293 if let Err(e) = data_sender
294 .send(DataEvent::Data(Data::Deltas(
295 OrderBookDeltas_API::new(deltas),
296 )))
297 {
298 log::error!("Failed to send order book deltas: {e}");
299 }
300 }
301 NautilusWsMessage::Depth10(depth) => {
302 if let Err(e) =
303 data_sender.send(DataEvent::Data(Data::Depth10(depth)))
304 {
305 log::error!("Failed to send order book depth10: {e}");
306 }
307 }
308 NautilusWsMessage::Candle(bar) => {
309 if let Err(e) = data_sender
310 .send(DataEvent::Data(Data::Bar(bar)))
311 {
312 log::error!("Failed to send bar: {e}");
313 }
314 }
315 NautilusWsMessage::MarkPrice(update) => {
316 if let Err(e) = data_sender
317 .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
318 {
319 log::error!("Failed to send mark price update: {e}");
320 }
321 }
322 NautilusWsMessage::IndexPrice(update) => {
323 if let Err(e) = data_sender
324 .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
325 {
326 log::error!("Failed to send index price update: {e}");
327 }
328 }
329 NautilusWsMessage::FundingRate(update) => {
330 if let Err(e) = data_sender
331 .send(DataEvent::FundingRate(update))
332 {
333 log::error!("Failed to send funding rate update: {e}");
334 }
335 }
336 NautilusWsMessage::CustomData(data) => {
337 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
338 log::error!("Failed to send custom data: {e}");
339 }
340 }
341 NautilusWsMessage::Reconnected => {
342 log::info!("WebSocket reconnected");
343 }
344 NautilusWsMessage::Error(e) => {
345 log::error!("WebSocket error: {e}");
346 }
347 NautilusWsMessage::ExecutionReports(_) => {
348 }
350 }
351 } else {
352 log::debug!("WebSocket next_event returned None, stream closed");
354 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
355 }
356 }
357 }
358 }
359
360 log::info!("Hyperliquid WebSocket consumption loop finished");
361 });
362
363 let mut slot = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
364 *slot = Some(task);
365 log::info!("WebSocket consumption task spawned");
366
367 Ok(())
368 }
369}
370
371#[async_trait::async_trait(?Send)]
372impl DataClient for HyperliquidDataClient {
373 fn client_id(&self) -> ClientId {
374 self.client_id
375 }
376
377 fn venue(&self) -> Option<Venue> {
378 Some(self.venue())
379 }
380
381 fn start(&mut self) -> anyhow::Result<()> {
382 log::info!(
383 "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
384 self.client_id,
385 self.config.environment,
386 self.config.proxy_url,
387 );
388 Ok(())
389 }
390
391 fn stop(&mut self) -> anyhow::Result<()> {
392 log::info!("Stopping Hyperliquid data client {}", self.client_id);
393 self.cancellation_token.cancel();
394 self.is_connected.store(false, Ordering::Relaxed);
395 Ok(())
396 }
397
398 fn reset(&mut self) -> anyhow::Result<()> {
399 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
400 self.is_connected.store(false, Ordering::Relaxed);
401 self.cancellation_token = CancellationToken::new();
402 self.abort_pending_tasks();
403
404 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
405 handle.abort();
406 }
407 Ok(())
408 }
409
410 fn dispose(&mut self) -> anyhow::Result<()> {
411 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
412 self.stop()
413 }
414
415 fn is_connected(&self) -> bool {
416 self.is_connected.load(Ordering::Acquire)
417 }
418
419 fn is_disconnected(&self) -> bool {
420 !self.is_connected()
421 }
422
423 async fn connect(&mut self) -> anyhow::Result<()> {
424 if self.is_connected() {
425 return Ok(());
426 }
427
428 if self.cancellation_token.is_cancelled() {
429 self.cancellation_token = CancellationToken::new();
430 }
431
432 register_hyperliquid_custom_data();
433
434 let instruments = self
435 .bootstrap_instruments()
436 .await
437 .context("failed to bootstrap instruments")?;
438
439 for instrument in instruments {
440 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
441 log::warn!("Failed to send instrument: {e}");
442 }
443 }
444
445 self.spawn_ws()
446 .await
447 .context("failed to spawn WebSocket client")?;
448
449 self.is_connected.store(true, Ordering::Relaxed);
450 log::info!("Connected: client_id={}", self.client_id);
451
452 Ok(())
453 }
454
455 async fn disconnect(&mut self) -> anyhow::Result<()> {
456 if !self.is_connected() {
457 return Ok(());
458 }
459
460 self.cancellation_token.cancel();
461
462 let ws_stream_handle = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take();
463 if let Some(handle) = ws_stream_handle
464 && let Err(e) = handle.await
465 {
466 log::error!("Error waiting for WebSocket stream task: {e}");
467 }
468
469 self.abort_pending_tasks();
470
471 if let Err(e) = self.ws_client.disconnect().await {
472 log::error!("Error disconnecting WebSocket client: {e}");
473 }
474
475 self.instruments.store(AHashMap::new());
476
477 self.is_connected.store(false, Ordering::Relaxed);
478 log::info!("Disconnected: client_id={}", self.client_id);
479
480 Ok(())
481 }
482
483 fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
484 let data_type = cmd.data_type.type_name();
485
486 if data_type == "HyperliquidAllMids" {
487 let ws = self.ws_client.clone();
488 let dex = cmd
489 .data_type
490 .metadata()
491 .as_ref()
492 .and_then(|m| m.get("dex"))
493 .and_then(|v| v.as_str())
494 .map(str::trim)
495 .filter(|value| !value.is_empty())
496 .map(ToString::to_string);
497
498 log::debug!("Subscribing to all mids (dex: {:?})", dex.as_deref());
499
500 self.spawn_task("subscribe_all_mids", async move {
501 ws.subscribe_all_mids_with_dex(dex.as_deref()).await
502 });
503
504 return Ok(());
505 }
506
507 if data_type == "HyperliquidAllDexsAssetCtxs" {
508 let ws = self.ws_client.clone();
509
510 self.spawn_task("subscribe_all_dexs_asset_ctxs", async move {
511 ws.subscribe_all_dexs_asset_ctxs().await
512 });
513
514 return Ok(());
515 }
516
517 if data_type == "HyperliquidOpenInterest" {
518 let ws = self.ws_client.clone();
519 let instrument_id = Self::custom_instrument_id(&cmd.data_type)?.context(
520 "HyperliquidOpenInterest subscriptions require metadata['instrument_id']",
521 )?;
522
523 self.spawn_task("subscribe_open_interest", async move {
524 ws.subscribe_open_interest(instrument_id).await
525 });
526
527 return Ok(());
528 }
529
530 log::warn!("Unsupported custom data subscription: {data_type}");
531 Ok(())
532 }
533
534 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
535 let data_type = cmd.data_type.type_name();
536
537 if data_type == "HyperliquidAllMids" {
538 let ws = self.ws_client.clone();
539 let dex = cmd
540 .data_type
541 .metadata()
542 .as_ref()
543 .and_then(|m| m.get("dex"))
544 .and_then(|v| v.as_str())
545 .map(str::trim)
546 .filter(|value| !value.is_empty())
547 .map(ToString::to_string);
548
549 log::debug!("Unsubscribing from all mids (dex: {:?})", dex.as_deref());
550
551 self.spawn_task("unsubscribe_all_mids", async move {
552 ws.unsubscribe_all_mids_with_dex(dex.as_deref()).await
553 });
554
555 return Ok(());
556 }
557
558 if data_type == "HyperliquidAllDexsAssetCtxs" {
559 let ws = self.ws_client.clone();
560
561 self.spawn_task("unsubscribe_all_dexs_asset_ctxs", async move {
562 ws.unsubscribe_all_dexs_asset_ctxs().await
563 });
564
565 return Ok(());
566 }
567
568 if data_type == "HyperliquidOpenInterest" {
569 let ws = self.ws_client.clone();
570 let instrument_id = Self::custom_instrument_id(&cmd.data_type)?.context(
571 "HyperliquidOpenInterest unsubscriptions require metadata['instrument_id']",
572 )?;
573
574 self.spawn_task("unsubscribe_open_interest", async move {
575 ws.unsubscribe_open_interest(instrument_id).await
576 });
577
578 return Ok(());
579 }
580
581 log::warn!("Unsupported custom data unsubscription: {data_type}");
582 Ok(())
583 }
584
585 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
586 let instruments = self.instruments.load();
587 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
588 if let Err(e) = self
589 .data_sender
590 .send(DataEvent::Instrument(instrument.clone()))
591 {
592 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
593 }
594 } else {
595 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
596 }
597 Ok(())
598 }
599
600 fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
601 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
602
603 if subscription.book_type != BookType::L2_MBP {
604 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
605 }
606
607 let ws = self.ws_client.clone();
608 let instrument_id = subscription.instrument_id;
609 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
610
611 self.spawn_task("subscribe_book_deltas", async move {
612 ws.subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
613 .await
614 });
615
616 Ok(())
617 }
618
619 fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
620 log::debug!(
621 "Subscribing to book depth10: {}",
622 subscription.instrument_id
623 );
624
625 if subscription.book_type != BookType::L2_MBP {
626 anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
627 }
628
629 let ws = self.ws_client.clone();
630 let instrument_id = subscription.instrument_id;
631 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
632
633 self.spawn_task("subscribe_book_depth10", async move {
634 ws.subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
635 .await
636 });
637
638 Ok(())
639 }
640
641 fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
642 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
643
644 let ws = self.ws_client.clone();
645 let instrument_id = subscription.instrument_id;
646
647 self.spawn_task("subscribe_quotes", async move {
648 ws.subscribe_quotes(instrument_id).await
649 });
650
651 Ok(())
652 }
653
654 fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
655 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
656
657 let ws = self.ws_client.clone();
658 let instrument_id = subscription.instrument_id;
659
660 self.spawn_task("subscribe_trades", async move {
661 ws.subscribe_trades(instrument_id).await
662 });
663
664 Ok(())
665 }
666
667 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
668 let ws = self.ws_client.clone();
669 let instrument_id = cmd.instrument_id;
670
671 self.spawn_task("subscribe_mark_prices", async move {
672 ws.subscribe_mark_prices(instrument_id).await
673 });
674
675 Ok(())
676 }
677
678 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
679 let ws = self.ws_client.clone();
680 let instrument_id = cmd.instrument_id;
681
682 self.spawn_task("subscribe_index_prices", async move {
683 ws.subscribe_index_prices(instrument_id).await
684 });
685
686 Ok(())
687 }
688
689 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
690 let ws = self.ws_client.clone();
691 let instrument_id = cmd.instrument_id;
692
693 self.spawn_task("subscribe_funding_rates", async move {
694 ws.subscribe_funding_rates(instrument_id).await
695 });
696
697 Ok(())
698 }
699
700 fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
701 log::debug!("Subscribing to bars: {}", subscription.bar_type);
702
703 let instrument_id = subscription.bar_type.instrument_id();
704 if !self.instruments.contains_key(&instrument_id) {
705 anyhow::bail!("Instrument {instrument_id} not found");
706 }
707
708 let bar_type = subscription.bar_type;
709 let ws = self.ws_client.clone();
710
711 self.spawn_task("subscribe_bars", async move {
712 ws.subscribe_bars(bar_type).await
713 });
714
715 Ok(())
716 }
717
718 fn unsubscribe_book_deltas(
719 &mut self,
720 unsubscription: &UnsubscribeBookDeltas,
721 ) -> anyhow::Result<()> {
722 log::debug!(
723 "Unsubscribing from book deltas: {}",
724 unsubscription.instrument_id
725 );
726
727 let ws = self.ws_client.clone();
728 let instrument_id = unsubscription.instrument_id;
729
730 self.spawn_task("unsubscribe_book_deltas", async move {
731 ws.unsubscribe_book(instrument_id).await
732 });
733
734 Ok(())
735 }
736
737 fn unsubscribe_book_depth10(
738 &mut self,
739 unsubscription: &UnsubscribeBookDepth10,
740 ) -> anyhow::Result<()> {
741 log::debug!(
742 "Unsubscribing from book depth10: {}",
743 unsubscription.instrument_id
744 );
745
746 let ws = self.ws_client.clone();
747 let instrument_id = unsubscription.instrument_id;
748
749 self.spawn_task("unsubscribe_book_depth10", async move {
750 ws.unsubscribe_book_depth10(instrument_id).await
751 });
752
753 Ok(())
754 }
755
756 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
757 log::debug!(
758 "Unsubscribing from quotes: {}",
759 unsubscription.instrument_id
760 );
761
762 let ws = self.ws_client.clone();
763 let instrument_id = unsubscription.instrument_id;
764
765 self.spawn_task("unsubscribe_quotes", async move {
766 ws.unsubscribe_quotes(instrument_id).await
767 });
768
769 Ok(())
770 }
771
772 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
773 log::debug!(
774 "Unsubscribing from trades: {}",
775 unsubscription.instrument_id
776 );
777
778 let ws = self.ws_client.clone();
779 let instrument_id = unsubscription.instrument_id;
780
781 self.spawn_task("unsubscribe_trades", async move {
782 ws.unsubscribe_trades(instrument_id).await
783 });
784
785 Ok(())
786 }
787
788 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
789 let ws = self.ws_client.clone();
790 let instrument_id = cmd.instrument_id;
791
792 self.spawn_task("unsubscribe_mark_prices", async move {
793 ws.unsubscribe_mark_prices(instrument_id).await
794 });
795
796 Ok(())
797 }
798
799 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
800 let ws = self.ws_client.clone();
801 let instrument_id = cmd.instrument_id;
802
803 self.spawn_task("unsubscribe_index_prices", async move {
804 ws.unsubscribe_index_prices(instrument_id).await
805 });
806
807 Ok(())
808 }
809
810 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
811 let ws = self.ws_client.clone();
812 let instrument_id = cmd.instrument_id;
813
814 self.spawn_task("unsubscribe_funding_rates", async move {
815 ws.unsubscribe_funding_rates(instrument_id).await
816 });
817
818 Ok(())
819 }
820
821 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
822 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
823
824 let bar_type = unsubscription.bar_type;
825 let ws = self.ws_client.clone();
826
827 self.spawn_task("unsubscribe_bars", async move {
828 ws.unsubscribe_bars(bar_type).await
829 });
830
831 Ok(())
832 }
833
834 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
835 log::debug!("Requesting all instruments");
836
837 let http = self.http_client.clone();
838 let sender = self.data_sender.clone();
839 let instruments_cache = self.instruments.clone();
840 let coin_map = self.coin_to_instrument_id.clone();
841 let ws_instruments = self.ws_client.instruments_cache();
842 let request_id = request.request_id;
843 let client_id = request.client_id.unwrap_or(self.client_id);
844 let venue = self.venue();
845 let start_nanos = datetime_to_unix_nanos(request.start);
846 let end_nanos = datetime_to_unix_nanos(request.end);
847 let params = request.params;
848 let clock = self.clock;
849
850 self.spawn_task("request_instruments", async move {
851 let instruments = http
852 .request_instruments()
853 .await
854 .context("failed to fetch instruments from Hyperliquid")?;
855
856 instruments_cache.rcu(|instruments_map| {
857 coin_map.rcu(|coin_to_id| {
858 for instrument in &instruments {
859 let instrument_id = instrument.id();
860 instruments_map.insert(instrument_id, instrument.clone());
861 let coin = instrument.raw_symbol().inner();
862 coin_to_id.insert(coin, instrument_id);
863 ws_instruments.insert(coin, instrument.clone());
864 }
865 });
866 });
867
868 let response = DataResponse::Instruments(InstrumentsResponse::new(
869 request_id,
870 client_id,
871 venue,
872 instruments,
873 start_nanos,
874 end_nanos,
875 clock.get_time_ns(),
876 params,
877 ));
878
879 if let Err(e) = sender.send(DataEvent::Response(response)) {
880 log::error!("Failed to send instruments response: {e}");
881 }
882 Ok(())
883 });
884
885 Ok(())
886 }
887
888 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
889 log::debug!("Requesting instrument: {}", request.instrument_id);
890
891 let http = self.http_client.clone();
892 let sender = self.data_sender.clone();
893 let instruments_cache = self.instruments.clone();
894 let coin_map = self.coin_to_instrument_id.clone();
895 let ws_instruments = self.ws_client.instruments_cache();
896 let instrument_id = request.instrument_id;
897 let request_id = request.request_id;
898 let client_id = request.client_id.unwrap_or(self.client_id);
899 let start_nanos = datetime_to_unix_nanos(request.start);
900 let end_nanos = datetime_to_unix_nanos(request.end);
901 let params = request.params;
902 let clock = self.clock;
903
904 self.spawn_task("request_instrument", async move {
905 let all_instruments = http
906 .request_instruments()
907 .await
908 .context("failed to fetch instruments from Hyperliquid")?;
909
910 instruments_cache.rcu(|instruments_map| {
911 coin_map.rcu(|coin_to_id| {
912 for instrument in &all_instruments {
913 let id = instrument.id();
914 instruments_map.insert(id, instrument.clone());
915 let coin = instrument.raw_symbol().inner();
916 coin_to_id.insert(coin, id);
917 ws_instruments.insert(coin, instrument.clone());
918 }
919 });
920 });
921
922 if let Some(instrument) = all_instruments
923 .into_iter()
924 .find(|i| i.id() == instrument_id)
925 {
926 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
927 request_id,
928 client_id,
929 instrument.id(),
930 instrument,
931 start_nanos,
932 end_nanos,
933 clock.get_time_ns(),
934 params,
935 )));
936
937 if let Err(e) = sender.send(DataEvent::Response(response)) {
938 log::error!("Failed to send instrument response: {e}");
939 }
940 } else {
941 log::error!("Instrument not found: {instrument_id}");
942 }
943 Ok(())
944 });
945
946 Ok(())
947 }
948
949 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
950 log::debug!("Requesting bars for {}", request.bar_type);
951
952 let http = self.http_client.clone();
953 let sender = self.data_sender.clone();
954 let bar_type = request.bar_type;
955 let start = request.start;
956 let end = request.end;
957 let limit = request.limit.map(|n| n.get() as u32);
958 let request_id = request.request_id;
959 let client_id = request.client_id.unwrap_or(self.client_id);
960 let params = request.params;
961 let clock = self.clock;
962 let start_nanos = datetime_to_unix_nanos(start);
963 let end_nanos = datetime_to_unix_nanos(end);
964 let instruments = Arc::clone(&self.instruments);
965
966 self.spawn_task("request_bars", async move {
967 let bars = request_bars_from_http(http, bar_type, start, end, limit, instruments)
968 .await
969 .context("bar request failed")?;
970
971 let response = DataResponse::Bars(BarsResponse::new(
972 request_id,
973 client_id,
974 bar_type,
975 bars,
976 start_nanos,
977 end_nanos,
978 clock.get_time_ns(),
979 params,
980 ));
981
982 if let Err(e) = sender.send(DataEvent::Response(response)) {
983 log::error!("Failed to send bars response: {e}");
984 }
985 Ok(())
986 });
987
988 Ok(())
989 }
990
991 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
992 anyhow::bail!(
997 "Historical trade requests are not supported by Hyperliquid for {}; \
998 subscribe to trades via WebSocket for live trade ticks",
999 request.instrument_id,
1000 )
1001 }
1002
1003 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1004 let instrument_id = request.instrument_id;
1005 log::debug!("Requesting funding rates for {instrument_id}");
1006
1007 let instruments = self.instruments.load();
1008 let instrument = instruments
1009 .get(&instrument_id)
1010 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1011
1012 if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1013 anyhow::bail!("Funding rates are only available for perpetual instruments");
1014 }
1015
1016 let coin = instrument.raw_symbol().to_string();
1017 let http = self.http_client.clone();
1018 let sender = self.data_sender.clone();
1019 let client_id = request.client_id.unwrap_or(self.client_id);
1020 let request_id = request.request_id;
1021 let params = request.params;
1022 let clock = self.clock;
1023 let limit = request.limit.map(|n| n.get());
1024 let start_dt = request.start;
1025 let end_dt = request.end;
1026 let start_nanos = datetime_to_unix_nanos(start_dt);
1027 let end_nanos = datetime_to_unix_nanos(end_dt);
1028
1029 let now_ms = Utc::now().timestamp_millis() as u64;
1030
1031 let default_lookback_ms: u64 = 7 * 86_400_000;
1033 let start_ms = match start_dt {
1034 Some(dt) => dt.timestamp_millis().max(0) as u64,
1035 None => now_ms.saturating_sub(default_lookback_ms),
1036 };
1037 let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1038
1039 self.spawn_task("request_funding_rates", async move {
1040 let entries = http
1041 .info_funding_history(&coin, start_ms, end_ms)
1042 .await
1043 .with_context(|| format!("funding rates request failed for {instrument_id}"))?;
1044
1045 let mut funding_rates: Vec<FundingRateUpdate> = entries
1046 .iter()
1047 .filter_map(
1048 |entry| match funding_entry_to_update(entry, instrument_id) {
1049 Ok(update) => Some(update),
1050 Err(e) => {
1051 log::warn!("Skipping funding history entry for {instrument_id}: {e}",);
1052 None
1053 }
1054 },
1055 )
1056 .collect();
1057
1058 if let Some(limit) = limit
1059 && funding_rates.len() > limit
1060 {
1061 funding_rates.truncate(limit);
1062 }
1063
1064 log::debug!(
1065 "Fetched {} funding rates for {instrument_id}",
1066 funding_rates.len(),
1067 );
1068
1069 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1070 request_id,
1071 client_id,
1072 instrument_id,
1073 funding_rates,
1074 start_nanos,
1075 end_nanos,
1076 clock.get_time_ns(),
1077 params,
1078 ));
1079
1080 if let Err(e) = sender.send(DataEvent::Response(response)) {
1081 log::error!("Failed to send funding rates response: {e}");
1082 }
1083 Ok(())
1084 });
1085
1086 Ok(())
1087 }
1088
1089 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1090 let instrument_id = request.instrument_id;
1091 let instruments = self.instruments.load();
1092 let instrument = instruments
1093 .get(&instrument_id)
1094 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1095
1096 let raw_symbol = instrument.raw_symbol().to_string();
1097 let price_precision = instrument.price_precision();
1098 let size_precision = instrument.size_precision();
1099 let depth = request.depth.map(|d| d.get());
1100
1101 let http = self.http_client.clone();
1102 let sender = self.data_sender.clone();
1103 let client_id = request.client_id.unwrap_or(self.client_id);
1104 let request_id = request.request_id;
1105 let params = request.params;
1106 let clock = self.clock;
1107
1108 self.spawn_task("request_book_snapshot", async move {
1109 let l2_book = http
1110 .info_l2_book(&raw_symbol)
1111 .await
1112 .with_context(|| format!("book snapshot request failed for {instrument_id}"))?;
1113
1114 let book = parse_l2_book_snapshot(
1115 &l2_book,
1116 instrument_id,
1117 price_precision,
1118 size_precision,
1119 depth,
1120 );
1121
1122 let response = DataResponse::Book(BookResponse::new(
1123 request_id,
1124 client_id,
1125 instrument_id,
1126 book,
1127 None,
1128 None,
1129 clock.get_time_ns(),
1130 params,
1131 ));
1132
1133 if let Err(e) = sender.send(DataEvent::Response(response)) {
1134 log::error!("Failed to send book snapshot response: {e}");
1135 }
1136 Ok(())
1137 });
1138
1139 Ok(())
1140 }
1141}
1142
1143pub(crate) fn parse_l2_book_snapshot(
1147 l2_book: &HyperliquidL2Book,
1148 instrument_id: InstrumentId,
1149 price_precision: u8,
1150 size_precision: u8,
1151 depth: Option<usize>,
1152) -> OrderBook {
1153 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1154 let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1155
1156 let all_bids = l2_book
1157 .levels
1158 .first()
1159 .map_or([].as_slice(), |v| v.as_slice());
1160 let all_asks = l2_book
1161 .levels
1162 .get(1)
1163 .map_or([].as_slice(), |v| v.as_slice());
1164
1165 let bids = match depth {
1166 Some(d) if d < all_bids.len() => &all_bids[..d],
1167 _ => all_bids,
1168 };
1169 let asks = match depth {
1170 Some(d) if d < all_asks.len() => &all_asks[..d],
1171 _ => all_asks,
1172 };
1173
1174 for (i, level) in bids.iter().enumerate() {
1175 let Ok(px) = level.px.parse::<f64>() else {
1176 continue;
1177 };
1178 let Ok(sz) = level.sz.parse::<f64>() else {
1179 continue;
1180 };
1181
1182 if sz > 0.0 {
1183 let price = Price::new(px, price_precision);
1184 let size = Quantity::new(sz, size_precision);
1185 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1186 book.add(order, 0, i as u64, ts_event);
1187 }
1188 }
1189
1190 let bids_len = bids.len();
1191
1192 for (i, level) in asks.iter().enumerate() {
1193 let Ok(px) = level.px.parse::<f64>() else {
1194 continue;
1195 };
1196 let Ok(sz) = level.sz.parse::<f64>() else {
1197 continue;
1198 };
1199
1200 if sz > 0.0 {
1201 let price = Price::new(px, price_precision);
1202 let size = Quantity::new(sz, size_precision);
1203 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1204 book.add(order, 0, (bids_len + i) as u64, ts_event);
1205 }
1206 }
1207
1208 log::info!(
1209 "Built order book for {instrument_id} with {} bids and {} asks",
1210 bids.len(),
1211 asks.len(),
1212 );
1213
1214 book
1215}
1216
1217pub(crate) fn parse_book_precision_params(
1220 params: Option<&Params>,
1221) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1222 let Some(params) = params else {
1223 return Ok((None, None));
1224 };
1225
1226 let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1227 match params.get(key) {
1228 None => Ok(None),
1229 Some(v) => v
1230 .as_u64()
1231 .and_then(|n| u32::try_from(n).ok())
1232 .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1233 .map(Some),
1234 }
1235 };
1236
1237 Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1238}
1239
1240pub(crate) fn funding_entry_to_update(
1243 entry: &HyperliquidFundingHistoryEntry,
1244 instrument_id: InstrumentId,
1245) -> anyhow::Result<FundingRateUpdate> {
1246 let rate: Decimal = entry
1247 .funding_rate
1248 .parse()
1249 .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1250 let ts = UnixNanos::from(entry.time * 1_000_000);
1251 Ok(FundingRateUpdate::new(
1252 instrument_id,
1253 rate,
1254 Some(60),
1255 None,
1256 ts,
1257 ts,
1258 ))
1259}
1260
1261pub(crate) fn candle_to_bar(
1262 candle: &HyperliquidCandle,
1263 bar_type: BarType,
1264 price_precision: u8,
1265 size_precision: u8,
1266) -> anyhow::Result<Bar> {
1267 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1268 let ts_event = ts_init;
1269
1270 let open = candle.open.parse::<f64>().context("parse open price")?;
1271 let high = candle.high.parse::<f64>().context("parse high price")?;
1272 let low = candle.low.parse::<f64>().context("parse low price")?;
1273 let close = candle.close.parse::<f64>().context("parse close price")?;
1274 let volume = candle.volume.parse::<f64>().context("parse volume")?;
1275
1276 Ok(Bar::new(
1277 bar_type,
1278 Price::new(open, price_precision),
1279 Price::new(high, price_precision),
1280 Price::new(low, price_precision),
1281 Price::new(close, price_precision),
1282 Quantity::new(volume, size_precision),
1283 ts_event,
1284 ts_init,
1285 ))
1286}
1287
1288async fn request_bars_from_http(
1290 http_client: HyperliquidHttpClient,
1291 bar_type: BarType,
1292 start: Option<DateTime<Utc>>,
1293 end: Option<DateTime<Utc>>,
1294 limit: Option<u32>,
1295 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1296) -> anyhow::Result<Vec<Bar>> {
1297 let instrument_id = bar_type.instrument_id();
1299 let instrument = instruments
1300 .load()
1301 .get(&instrument_id)
1302 .cloned()
1303 .context("instrument not found in cache")?;
1304
1305 let price_precision = instrument.price_precision();
1306 let size_precision = instrument.size_precision();
1307 let raw_symbol = instrument.raw_symbol();
1308 let coin = raw_symbol.as_str();
1309
1310 let interval = bar_type_to_interval(&bar_type)?;
1311
1312 let now = Utc::now();
1314 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1315 let start_time = if let Some(start) = start {
1316 start.timestamp_millis() as u64
1317 } else {
1318 let spec = bar_type.spec();
1320 let step_ms = match spec.aggregation {
1321 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1322 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1323 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1324 _ => 60_000,
1325 };
1326 end_time.saturating_sub(1000 * step_ms)
1327 };
1328
1329 let candles = http_client
1330 .info_candle_snapshot(coin, interval, start_time, end_time)
1331 .await
1332 .context("failed to fetch candle snapshot from Hyperliquid")?;
1333
1334 let mut bars: Vec<Bar> = candles
1335 .iter()
1336 .filter_map(|candle| {
1337 candle_to_bar(candle, bar_type, price_precision, size_precision)
1338 .map_err(|e| {
1339 log::warn!("Failed to convert candle to bar: {e}");
1340 e
1341 })
1342 .ok()
1343 })
1344 .collect();
1345
1346 if let Some(limit) = limit
1347 && bars.len() > limit as usize
1348 {
1349 bars = bars.into_iter().take(limit as usize).collect();
1350 }
1351
1352 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1353 Ok(bars)
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358 use rstest::rstest;
1359 use rust_decimal_macros::dec;
1360 use ustr::Ustr;
1361
1362 use super::*;
1363 use crate::common::testing::load_test_data;
1364
1365 fn btc_perp_id() -> InstrumentId {
1366 InstrumentId::from("BTC-PERP.HYPERLIQUID")
1367 }
1368
1369 #[rstest]
1370 fn test_funding_entry_to_update_parses_positive_rate() {
1371 let entry = HyperliquidFundingHistoryEntry {
1372 coin: Ustr::from("BTC"),
1373 funding_rate: "0.0000125".to_string(),
1374 premium: Some("0.00029005".to_string()),
1375 time: 1769908800000,
1376 };
1377 let instrument_id = btc_perp_id();
1378
1379 let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1380
1381 assert_eq!(update.instrument_id, instrument_id);
1382 assert_eq!(update.rate, dec!(0.0000125));
1383 assert_eq!(update.interval, Some(60));
1384 assert!(update.next_funding_ns.is_none());
1385 assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1386 assert_eq!(update.ts_init, update.ts_event);
1387 }
1388
1389 #[rstest]
1390 fn test_funding_entry_to_update_handles_negative_rate() {
1391 let entry = HyperliquidFundingHistoryEntry {
1392 coin: Ustr::from("BTC"),
1393 funding_rate: "-0.0000081".to_string(),
1394 premium: None,
1395 time: 1769912400000,
1396 };
1397 let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1398 assert_eq!(update.rate, dec!(-0.0000081));
1399 }
1400
1401 #[rstest]
1402 fn test_funding_entry_to_update_rejects_invalid_rate() {
1403 let entry = HyperliquidFundingHistoryEntry {
1404 coin: Ustr::from("BTC"),
1405 funding_rate: "not-a-number".to_string(),
1406 premium: None,
1407 time: 1769912400000,
1408 };
1409 let result = funding_entry_to_update(&entry, btc_perp_id());
1410 assert!(result.is_err());
1411 }
1412
1413 #[rstest]
1414 fn test_parse_book_precision_params_none() {
1415 let (n, m) = parse_book_precision_params(None).unwrap();
1416 assert_eq!(n, None);
1417 assert_eq!(m, None);
1418 }
1419
1420 fn make_params(json: serde_json::Value) -> Params {
1421 serde_json::from_value(json).expect("valid params payload")
1422 }
1423
1424 #[rstest]
1425 fn test_parse_book_precision_params_only_n_sig_figs() {
1426 let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1427 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1428 assert_eq!(n, Some(4));
1429 assert_eq!(m, None);
1430 }
1431
1432 #[rstest]
1433 fn test_parse_book_precision_params_both() {
1434 let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1435 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1436 assert_eq!(n, Some(5));
1437 assert_eq!(m, Some(2));
1438 }
1439
1440 #[rstest]
1441 fn test_parse_book_precision_params_rejects_negative() {
1442 let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1443 let err = parse_book_precision_params(Some(¶ms)).unwrap_err();
1444 assert!(err.to_string().contains("n_sig_figs"));
1445 }
1446
1447 #[rstest]
1448 fn test_funding_history_fixture_parses() {
1449 let entries: Vec<HyperliquidFundingHistoryEntry> =
1450 load_test_data("http_funding_history.json");
1451 assert_eq!(entries.len(), 3);
1452 assert_eq!(entries[0].coin.as_str(), "BTC");
1453 assert_eq!(entries[0].funding_rate, "0.0000125");
1454 assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1455 assert!(entries[2].premium.is_none());
1456
1457 let updates: Vec<FundingRateUpdate> = entries
1458 .iter()
1459 .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1460 .collect();
1461 assert_eq!(updates.len(), 3);
1462 assert_eq!(updates[0].rate, dec!(0.0000125));
1463 assert_eq!(updates[1].rate, dec!(-0.0000081));
1464 assert_eq!(updates[2].rate, dec!(0.0000033));
1465 }
1466
1467 fn level(px: &str, sz: &str) -> crate::http::models::HyperliquidLevel {
1468 crate::http::models::HyperliquidLevel {
1469 px: px.to_string(),
1470 sz: sz.to_string(),
1471 }
1472 }
1473
1474 fn sample_l2_book() -> HyperliquidL2Book {
1475 HyperliquidL2Book {
1476 coin: Ustr::from("BTC"),
1477 levels: vec![
1478 vec![
1479 level("98450.50", "2.5"),
1480 level("98449.00", "1.2"),
1481 level("98448.00", "0.8"),
1482 ],
1483 vec![
1484 level("98451.00", "1.5"),
1485 level("98452.00", "2.0"),
1486 level("98453.00", "0.5"),
1487 ],
1488 ],
1489 time: 1769908800000,
1490 }
1491 }
1492
1493 #[rstest]
1494 fn test_parse_l2_book_snapshot_populates_both_sides() {
1495 let book_data = sample_l2_book();
1496 let instrument_id = btc_perp_id();
1497 let book = parse_l2_book_snapshot(&book_data, instrument_id, 2, 4, None);
1498
1499 assert_eq!(book.instrument_id, instrument_id);
1500 assert_eq!(book.book_type, BookType::L2_MBP);
1501 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1502 assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1503 assert_eq!(book.best_bid_size(), Some(Quantity::new(2.5, 4)));
1504 assert_eq!(book.best_ask_size(), Some(Quantity::new(1.5, 4)));
1505 assert_eq!(book.update_count, 6);
1506 }
1507
1508 #[rstest]
1509 fn test_parse_l2_book_snapshot_truncates_to_depth() {
1510 let book_data = sample_l2_book();
1511 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, Some(1));
1512
1513 assert_eq!(book.update_count, 2);
1515 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1516 assert_eq!(book.best_ask_price(), Some(Price::new(98451.00, 2)));
1517 }
1518
1519 #[rstest]
1520 fn test_parse_l2_book_snapshot_uses_venue_time_as_ts_event() {
1521 let book_data = sample_l2_book();
1522 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1523 let expected_ts = UnixNanos::from(1769908800000_u64 * 1_000_000);
1524
1525 assert_eq!(book.ts_last, expected_ts);
1528 }
1529
1530 #[rstest]
1531 fn test_parse_l2_book_snapshot_skips_non_positive_size() {
1532 let book_data = HyperliquidL2Book {
1533 coin: Ustr::from("BTC"),
1534 levels: vec![
1535 vec![level("98450.50", "2.5"), level("98449.00", "0")],
1536 vec![level("98451.00", "0"), level("98452.00", "1.5")],
1537 ],
1538 time: 1769908800000,
1539 };
1540 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1541
1542 assert_eq!(book.update_count, 2, "zero-sized levels must be skipped");
1543 assert_eq!(book.best_bid_price(), Some(Price::new(98450.50, 2)));
1544 assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1545 }
1546
1547 #[rstest]
1548 fn test_parse_l2_book_snapshot_skips_unparsable_levels() {
1549 let book_data = HyperliquidL2Book {
1550 coin: Ustr::from("BTC"),
1551 levels: vec![
1552 vec![level("not-a-number", "1.0"), level("98449.00", "1.2")],
1553 vec![level("98451.00", "garbage"), level("98452.00", "1.5")],
1554 ],
1555 time: 1769908800000,
1556 };
1557 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1558
1559 assert_eq!(book.update_count, 2);
1561 assert_eq!(book.best_bid_price(), Some(Price::new(98449.00, 2)));
1562 assert_eq!(book.best_ask_price(), Some(Price::new(98452.00, 2)));
1563 }
1564
1565 #[rstest]
1566 fn test_parse_l2_book_snapshot_empty_levels_yields_empty_book() {
1567 let book_data = HyperliquidL2Book {
1568 coin: Ustr::from("BTC"),
1569 levels: vec![],
1570 time: 1769908800000,
1571 };
1572 let book = parse_l2_book_snapshot(&book_data, btc_perp_id(), 2, 4, None);
1573
1574 assert_eq!(book.update_count, 0);
1575 assert!(book.best_bid_price().is_none());
1576 assert!(book.best_ask_price().is_none());
1577 }
1578}