1use std::{
17 str::FromStr,
18 sync::{
19 Arc, Mutex,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22 time::Duration,
23};
24
25use ahash::{AHashMap, AHashSet};
26use anyhow::Context;
27use arc_swap::ArcSwap;
28use dashmap::DashMap;
29use nautilus_common::{cache::fifo::FifoCacheMap, live::get_runtime};
30use nautilus_core::{AtomicMap, MUTEX_POISONED};
31use nautilus_model::{
32 data::BarType,
33 enums::{OrderSide, OrderType, TimeInForce},
34 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
35 instruments::{Instrument, InstrumentAny},
36 orders::{Order, OrderAny},
37 types::{Price, Quantity},
38};
39use nautilus_network::{
40 mode::ConnectionMode,
41 websocket::{
42 AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43 channel_message_handler,
44 },
45};
46use rust_decimal::Decimal;
47use ustr::Ustr;
48
49use crate::{
50 common::{
51 consts::{HTTP_TIMEOUT, ws_url},
52 enums::{HyperliquidBarInterval, HyperliquidEnvironment},
53 parse::{
54 bar_type_to_interval, clamp_price_to_precision, derive_limit_from_trigger,
55 determine_order_list_grouping, extract_error_message, extract_inner_error,
56 extract_inner_errors, normalize_price,
57 order_to_hyperliquid_request_with_asset_and_cloid, round_to_sig_figs,
58 time_in_force_to_hyperliquid_tif,
59 },
60 },
61 http::{
62 client::HyperliquidHttpClient,
63 error::{Error as HyperliquidError, Result as HyperliquidResult},
64 models::{
65 HyperliquidExchangeResponse, HyperliquidExecAction,
66 HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
67 HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecModifyOrderRequest,
68 HyperliquidExecOrderKind, HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
69 HyperliquidExecTpSl, HyperliquidExecTriggerParams, RESPONSE_STATUS_OK,
70 },
71 rate_limits::{WeightedLimiter, exec_action_weight},
72 },
73 websocket::{
74 enums::HyperliquidWsChannel,
75 handler::{FeedHandler, HandlerCommand},
76 messages::{
77 NautilusWsMessage, PostRequest, PostResponse, PostResponsePayload, SubscriptionRequest,
78 },
79 post::{PostIds, PostRouter},
80 },
81};
82
83const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
84
85pub(super) const CLOID_CACHE_CAPACITY: usize = 10_000;
88
89pub(super) type CloidCache = Arc<Mutex<FifoCacheMap<Ustr, ClientOrderId, CLOID_CACHE_CAPACITY>>>;
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
94pub(super) enum AssetContextDataType {
95 MarkPrice,
96 IndexPrice,
97 FundingRate,
98 OpenInterest,
99}
100
101#[derive(Debug)]
106#[cfg_attr(
107 feature = "python",
108 pyo3::pyclass(
109 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
110 from_py_object
111 )
112)]
113#[cfg_attr(
114 feature = "python",
115 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.hyperliquid")
116)]
117pub struct HyperliquidWebSocketClient {
118 url: String,
119 connection_mode: Arc<ArcSwap<AtomicU8>>,
120 signal: Arc<AtomicBool>,
121 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
122 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
123 auth_tracker: AuthTracker,
124 subscriptions: SubscriptionState,
125 instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
126 bar_types: Arc<AtomicMap<String, BarType>>,
127 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
128 all_dex_asset_ctxs_instrument_ids: Arc<AtomicMap<Ustr, Vec<Option<InstrumentId>>>>,
129 cloid_cache: CloidCache,
130 post_router: Arc<PostRouter>,
131 post_ids: Arc<PostIds>,
132 post_limiter: Arc<WeightedLimiter>,
133 post_timeout: Duration,
134 task_handle: Option<tokio::task::JoinHandle<()>>,
135 account_id: Option<AccountId>,
136 transport_backend: TransportBackend,
137 proxy_url: Option<String>,
138}
139
140impl Clone for HyperliquidWebSocketClient {
141 fn clone(&self) -> Self {
142 Self {
143 url: self.url.clone(),
144 connection_mode: Arc::clone(&self.connection_mode),
145 signal: Arc::clone(&self.signal),
146 cmd_tx: Arc::clone(&self.cmd_tx),
147 out_rx: None,
148 auth_tracker: self.auth_tracker.clone(),
149 subscriptions: self.subscriptions.clone(),
150 instruments: Arc::clone(&self.instruments),
151 bar_types: Arc::clone(&self.bar_types),
152 asset_context_subs: Arc::clone(&self.asset_context_subs),
153 all_dex_asset_ctxs_instrument_ids: Arc::clone(&self.all_dex_asset_ctxs_instrument_ids),
154 cloid_cache: Arc::clone(&self.cloid_cache),
155 post_router: Arc::clone(&self.post_router),
156 post_ids: Arc::clone(&self.post_ids),
157 post_limiter: Arc::clone(&self.post_limiter),
158 post_timeout: self.post_timeout,
159 task_handle: None,
160 account_id: self.account_id,
161 transport_backend: self.transport_backend,
162 proxy_url: self.proxy_url.clone(),
163 }
164 }
165}
166
167impl HyperliquidWebSocketClient {
168 pub fn new(
176 url: Option<String>,
177 environment: HyperliquidEnvironment,
178 account_id: Option<AccountId>,
179 transport_backend: TransportBackend,
180 proxy_url: Option<String>,
181 ) -> Self {
182 let url = url.unwrap_or_else(|| ws_url(environment).to_string());
183 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
184 ConnectionMode::Closed as u8,
185 ))));
186 Self {
187 url,
188 connection_mode,
189 signal: Arc::new(AtomicBool::new(false)),
190 auth_tracker: AuthTracker::new(),
191 subscriptions: SubscriptionState::new(':'),
192 instruments: Arc::new(AtomicMap::new()),
193 bar_types: Arc::new(AtomicMap::new()),
194 asset_context_subs: Arc::new(DashMap::new()),
195 all_dex_asset_ctxs_instrument_ids: Arc::new(AtomicMap::new()),
196 cloid_cache: Arc::new(Mutex::new(FifoCacheMap::new())),
197 post_router: PostRouter::new(),
198 post_ids: Arc::new(PostIds::new(1)),
199 post_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
200 post_timeout: HTTP_TIMEOUT,
201 cmd_tx: {
202 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
204 Arc::new(tokio::sync::RwLock::new(tx))
205 },
206 out_rx: None,
207 task_handle: None,
208 account_id,
209 transport_backend,
210 proxy_url,
211 }
212 }
213
214 pub async fn connect(&mut self) -> anyhow::Result<()> {
216 if self.is_active() {
217 log::warn!("WebSocket already connected");
218 return Ok(());
219 }
220 let (message_handler, raw_rx) = channel_message_handler();
221 let cfg = WebSocketConfig {
222 url: self.url.clone(),
223 headers: vec![],
224 heartbeat: Some(30),
225 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
226 reconnect_timeout_ms: Some(15_000),
227 reconnect_delay_initial_ms: Some(250),
228 reconnect_delay_max_ms: Some(5_000),
229 reconnect_backoff_factor: Some(2.0),
230 reconnect_jitter_ms: Some(200),
231 reconnect_max_attempts: None,
232 idle_timeout_ms: None,
233 backend: self.transport_backend,
234 proxy_url: self.proxy_url.clone(),
235 };
236 let client =
237 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
238
239 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
241 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
242
243 *self.cmd_tx.write().await = cmd_tx.clone();
246 self.out_rx = Some(out_rx);
247
248 self.connection_mode.store(client.connection_mode_atomic());
249 log::info!("Hyperliquid WebSocket connected: {}", self.url);
250
251 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
253 anyhow::bail!("Failed to send SetClient command: {e}");
254 }
255
256 let instruments_vec: Vec<InstrumentAny> =
258 self.instruments.load().values().cloned().collect();
259
260 if !instruments_vec.is_empty()
261 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
262 {
263 log::error!("Failed to send InitializeInstruments: {e}");
264 }
265
266 let all_dex_asset_ctxs_instrument_ids = self
267 .all_dex_asset_ctxs_instrument_ids
268 .load()
269 .iter()
270 .map(|(dex, instrument_ids)| (*dex, instrument_ids.clone()))
271 .collect();
272
273 if let Err(e) = cmd_tx.send(HandlerCommand::CacheAllDexAssetCtxsInstrumentIds(
274 all_dex_asset_ctxs_instrument_ids,
275 )) {
276 log::error!("Failed to send CacheAllDexAssetCtxsInstrumentIds: {e}");
277 }
278
279 let signal = Arc::clone(&self.signal);
281 let account_id = self.account_id;
282 let subscriptions = self.subscriptions.clone();
283 let cmd_tx_for_reconnect = cmd_tx.clone();
284 let cloid_cache = Arc::clone(&self.cloid_cache);
285 let post_router = Arc::clone(&self.post_router);
286
287 let stream_handle = get_runtime().spawn(async move {
288 let mut handler = FeedHandler::new(
289 signal,
290 cmd_rx,
291 raw_rx,
292 out_tx,
293 account_id,
294 subscriptions.clone(),
295 cloid_cache,
296 post_router,
297 );
298
299 let resubscribe_all = || {
300 let topics = subscriptions.all_topics();
301 if topics.is_empty() {
302 log::debug!("No active subscriptions to restore after reconnection");
303 return;
304 }
305
306 log::info!(
307 "Resubscribing to {} active subscriptions after reconnection",
308 topics.len()
309 );
310
311 for topic in topics {
312 match subscription_from_topic(&topic) {
313 Ok(subscription) => {
314 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
315 subscriptions: vec![subscription],
316 }) {
317 log::error!("Failed to send resubscribe command: {e}");
318 }
319 }
320 Err(e) => {
321 log::error!(
322 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
323 );
324 }
325 }
326 }
327 };
328
329 loop {
330 match handler.next().await {
331 Some(NautilusWsMessage::Reconnected) => {
332 log::info!("WebSocket reconnected");
333 resubscribe_all();
334 }
335 Some(msg) => {
336 if handler.send(msg).is_err() {
337 log::error!("Failed to send message (receiver dropped)");
338 break;
339 }
340 }
341 None => {
342 if handler.is_stopped() {
343 log::debug!("Stop signal received, ending message processing");
344 break;
345 }
346 log::warn!("WebSocket stream ended unexpectedly");
347 break;
348 }
349 }
350 }
351 log::debug!("Handler task completed");
352 });
353 self.task_handle = Some(stream_handle);
354 Ok(())
355 }
356
357 pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
360 self.task_handle.take()
361 }
362
363 pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
364 self.task_handle = Some(handle);
365 }
366
367 pub fn set_post_timeout(&mut self, timeout: Duration) {
368 self.post_timeout = timeout;
369 }
370
371 pub(crate) fn abort(&mut self) {
374 self.signal.store(true, Ordering::Relaxed);
375 self.connection_mode
376 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
377
378 if let Some(handle) = self.task_handle.take() {
379 handle.abort();
380 }
381 }
382
383 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
385 log::info!("Disconnecting Hyperliquid WebSocket");
386 self.signal.store(true, Ordering::Relaxed);
387
388 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
389 log::debug!(
390 "Failed to send disconnect command (handler may already be shut down): {e}"
391 );
392 }
393
394 if let Some(handle) = self.task_handle.take() {
395 log::debug!("Waiting for task handle to complete");
396 let abort_handle = handle.abort_handle();
397 tokio::select! {
398 result = handle => {
399 match result {
400 Ok(()) => log::debug!("Task handle completed successfully"),
401 Err(e) if e.is_cancelled() => {
402 log::debug!("Task was cancelled");
403 }
404 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
405 }
406 }
407 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
408 log::warn!("Timeout waiting for task handle, aborting task");
409 abort_handle.abort();
410 }
411 }
412 } else {
413 log::debug!("No task handle to await");
414 }
415 log::debug!("Disconnected");
416 Ok(())
417 }
418
419 pub async fn post_action_exec(
425 &self,
426 signer: &HyperliquidHttpClient,
427 action: &HyperliquidExecAction,
428 ) -> HyperliquidResult<HyperliquidExchangeResponse> {
429 self.post_action_exec_with_timeout(signer, action, self.post_timeout, None)
430 .await
431 }
432
433 pub async fn post_action_exec_with_timeout(
435 &self,
436 signer: &HyperliquidHttpClient,
437 action: &HyperliquidExecAction,
438 timeout: Duration,
439 expires_after: Option<u64>,
440 ) -> HyperliquidResult<HyperliquidExchangeResponse> {
441 let weight = exec_action_weight(action);
442 self.post_limiter.acquire(weight).await;
443
444 let payload = signer.sign_action_exec_request(action, expires_after)?;
445 let response = self
446 .send_post_request(PostRequest::Action { payload }, timeout)
447 .await?;
448
449 match response.response {
450 PostResponsePayload::Action { payload } => {
451 let parsed: HyperliquidExchangeResponse =
452 serde_json::from_value(payload).map_err(HyperliquidError::Serde)?;
453
454 match &parsed {
455 HyperliquidExchangeResponse::Status {
456 status,
457 response: response_data,
458 } if status != RESPONSE_STATUS_OK => {
459 let error_msg = response_data
460 .as_str()
461 .map_or_else(|| response_data.to_string(), |s| s.to_string());
462 Err(HyperliquidError::bad_request(format!(
463 "API error: {error_msg}"
464 )))
465 }
466 HyperliquidExchangeResponse::Error { error } => {
467 Err(HyperliquidError::bad_request(format!("API error: {error}")))
468 }
469 _ => Ok(parsed),
470 }
471 }
472 PostResponsePayload::Error { payload } => Err(map_post_payload_error(payload, weight)),
473 PostResponsePayload::Info { payload } => Err(HyperliquidError::decode(format!(
474 "expected action post response, received info payload: {payload}"
475 ))),
476 }
477 }
478
479 #[allow(
484 clippy::too_many_arguments,
485 reason = "matches the Python and HTTP order submit surface"
486 )]
487 pub async fn submit_order(
488 &self,
489 signer: &HyperliquidHttpClient,
490 instrument_id: InstrumentId,
491 client_order_id: ClientOrderId,
492 order_side: OrderSide,
493 order_type: OrderType,
494 quantity: Quantity,
495 time_in_force: TimeInForce,
496 price: Option<Price>,
497 trigger_price: Option<Price>,
498 post_only: bool,
499 reduce_only: bool,
500 ) -> HyperliquidResult<()> {
501 let symbol = instrument_id.symbol.inner();
502 let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
503 HyperliquidError::bad_request(format!(
504 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
505 ))
506 })?;
507 let is_buy = matches!(order_side, OrderSide::Buy);
508 let price_precision = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
509
510 let price_decimal = match price {
511 Some(px) if signer.normalize_prices() => {
512 normalize_price(px.as_decimal(), price_precision).normalize()
513 }
514 Some(px) => px.as_decimal().normalize(),
515 None if matches!(order_type, OrderType::Market) => Decimal::ZERO,
516 None if matches!(
517 order_type,
518 OrderType::StopMarket | OrderType::MarketIfTouched
519 ) =>
520 {
521 match trigger_price {
522 Some(tp) => {
523 let derived = derive_limit_from_trigger(
524 tp.as_decimal().normalize(),
525 is_buy,
526 signer.market_order_slippage_bps(),
527 );
528 let sig_rounded = round_to_sig_figs(derived, 5);
529 clamp_price_to_precision(sig_rounded, price_precision, is_buy).normalize()
530 }
531 None => Decimal::ZERO,
532 }
533 }
534 None => {
535 return Err(HyperliquidError::bad_request(
536 "Limit orders require a price",
537 ));
538 }
539 };
540
541 let size_decimal = quantity.as_decimal().normalize();
542 let kind = hyperliquid_order_kind(
543 order_type,
544 time_in_force,
545 post_only,
546 trigger_price,
547 signer.normalize_prices(),
548 price_precision,
549 )?;
550
551 let order = HyperliquidExecPlaceOrderRequest {
552 asset,
553 is_buy,
554 price: price_decimal,
555 size: size_decimal,
556 reduce_only,
557 kind,
558 cloid: Some(signer.get_or_generate_client_order_id_cloid(client_order_id)),
559 };
560
561 if let Some(cloid) = order.cloid {
562 self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
563 }
564 let action = HyperliquidExecAction::Order {
565 orders: vec![order],
566 grouping: HyperliquidExecGrouping::Na,
567 builder: signer.builder_attribution(),
568 };
569 let response = self.post_action_exec(signer, &action).await?;
570
571 ensure_ws_action_accepted(&response, "Order submission")
572 }
573
574 pub async fn submit_orders(
576 &self,
577 signer: &HyperliquidHttpClient,
578 orders: &[&OrderAny],
579 ) -> HyperliquidResult<()> {
580 let mut hyperliquid_orders = Vec::with_capacity(orders.len());
581 let mut client_order_ids = Vec::with_capacity(orders.len());
582
583 for order in orders {
584 let instrument_id = order.instrument_id();
585 let symbol = instrument_id.symbol.inner();
586 let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
587 HyperliquidError::bad_request(format!(
588 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
589 ))
590 })?;
591 let price_decimals = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
592 let request = order_to_hyperliquid_request_with_asset_and_cloid(
593 order,
594 asset,
595 price_decimals,
596 signer.normalize_prices(),
597 signer.market_order_slippage_bps(),
598 None,
599 )
600 .map_err(|e| HyperliquidError::bad_request(format!("Failed to convert order: {e}")))?;
601 client_order_ids.push(order.client_order_id());
602 hyperliquid_orders.push(request);
603 }
604
605 for (request, client_order_id) in hyperliquid_orders.iter_mut().zip(client_order_ids) {
606 let cloid = signer.get_or_generate_client_order_id_cloid(client_order_id);
607 request.cloid = Some(cloid);
608 self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
609 }
610
611 let grouping =
612 determine_order_list_grouping(&orders.iter().copied().cloned().collect::<Vec<_>>());
613 let action = HyperliquidExecAction::Order {
614 orders: hyperliquid_orders,
615 grouping,
616 builder: signer.builder_attribution(),
617 };
618 let response = self.post_action_exec(signer, &action).await?;
619
620 ensure_ws_action_accepted(&response, "Order list submission")
621 }
622
623 pub async fn cancel_order(
625 &self,
626 signer: &HyperliquidHttpClient,
627 instrument_id: InstrumentId,
628 client_order_id: Option<ClientOrderId>,
629 venue_order_id: Option<VenueOrderId>,
630 ) -> HyperliquidResult<()> {
631 let symbol = instrument_id.symbol.inner();
632 let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
633 HyperliquidError::bad_request(format!(
634 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
635 ))
636 })?;
637 let action = if let Some(client_order_id) = client_order_id {
638 if let Some(cloid) = signer.cached_client_order_id_cloid(&client_order_id) {
639 HyperliquidExecAction::CancelByCloid {
640 cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
641 }
642 } else if let Some(oid) = venue_order_id {
643 let oid = oid
644 .as_str()
645 .parse::<u64>()
646 .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
647 HyperliquidExecAction::Cancel {
648 cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
649 }
650 } else {
651 let cloid = signer.get_or_generate_client_order_id_cloid(client_order_id);
652 HyperliquidExecAction::CancelByCloid {
653 cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
654 }
655 }
656 } else if let Some(oid) = venue_order_id {
657 let oid = oid
658 .as_str()
659 .parse::<u64>()
660 .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
661 HyperliquidExecAction::Cancel {
662 cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
663 }
664 } else {
665 return Err(HyperliquidError::bad_request(
666 "Either client_order_id or venue_order_id must be provided",
667 ));
668 };
669 let response = self.post_action_exec(signer, &action).await?;
670
671 ensure_ws_action_accepted(&response, "Cancel order")
672 }
673
674 pub async fn cancel_orders(
676 &self,
677 signer: &HyperliquidHttpClient,
678 cancels: &[(InstrumentId, ClientOrderId, Option<VenueOrderId>)],
679 ) -> HyperliquidResult<Vec<Option<String>>> {
680 let mut cloid_requests = Vec::new();
681 let mut cloid_indices = Vec::new();
682 let mut oid_requests = Vec::new();
683 let mut oid_indices = Vec::new();
684 let mut results = vec![None; cancels.len()];
685
686 for (index, (instrument_id, client_order_id, venue_order_id)) in cancels.iter().enumerate()
687 {
688 let symbol = instrument_id.symbol.inner();
689 let Some(asset) = signer.get_asset_index_for_symbol(symbol) else {
690 results[index] = Some(format!(
691 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
692 ));
693 continue;
694 };
695
696 if let Some(cloid) = signer.cached_client_order_id_cloid(client_order_id) {
697 cloid_requests.push(HyperliquidExecCancelByCloidRequest { asset, cloid });
698 cloid_indices.push(index);
699 } else if let Some(venue_order_id) = venue_order_id {
700 match venue_order_id.as_str().parse::<u64>() {
701 Ok(oid) => {
702 oid_requests.push(HyperliquidExecCancelOrderRequest { asset, oid });
703 oid_indices.push(index);
704 }
705 Err(_) => {
706 results[index] = Some("Invalid venue order ID format".to_string());
707 }
708 }
709 } else {
710 let cloid = signer.get_or_generate_client_order_id_cloid(*client_order_id);
711 cloid_requests.push(HyperliquidExecCancelByCloidRequest { asset, cloid });
712 cloid_indices.push(index);
713 }
714 }
715
716 if cloid_requests.is_empty() && oid_requests.is_empty() {
717 return Ok(results);
718 }
719
720 if !cloid_requests.is_empty() {
721 let action = HyperliquidExecAction::CancelByCloid {
722 cancels: cloid_requests,
723 };
724 let errors = self
725 .post_cancel_action_errors(signer, &action, cloid_indices.len())
726 .await?;
727
728 for (index, error) in cloid_indices.into_iter().zip(errors) {
729 results[index] = error;
730 }
731 }
732
733 if !oid_requests.is_empty() {
734 let action = HyperliquidExecAction::Cancel {
735 cancels: oid_requests,
736 };
737 let errors = self
738 .post_cancel_action_errors(signer, &action, oid_indices.len())
739 .await?;
740
741 for (index, error) in oid_indices.into_iter().zip(errors) {
742 results[index] = error;
743 }
744 }
745
746 Ok(results)
747 }
748
749 async fn post_cancel_action_errors(
750 &self,
751 signer: &HyperliquidHttpClient,
752 action: &HyperliquidExecAction,
753 request_count: usize,
754 ) -> HyperliquidResult<Vec<Option<String>>> {
755 match self.post_cancel_action(signer, action).await {
756 Ok(response) if response.is_ok() => {
757 match cancel_errors_for_requests(extract_inner_errors(&response), request_count) {
758 Ok(errors) => Ok(errors),
759 Err(e) => Ok(vec![Some(e.to_string()); request_count]),
760 }
761 }
762 Ok(response) => Ok(vec![
763 Some(format!(
764 "Cancel orders failed: {}",
765 extract_error_message(&response)
766 ));
767 request_count
768 ]),
769 Err(e) => Err(e),
770 }
771 }
772
773 async fn post_cancel_action(
774 &self,
775 signer: &HyperliquidHttpClient,
776 action: &HyperliquidExecAction,
777 ) -> HyperliquidResult<HyperliquidExchangeResponse> {
778 let weight = exec_action_weight(action);
779 self.post_limiter.acquire(weight).await;
780
781 let payload = signer.sign_action_exec_request(action, None)?;
782 let response = self
783 .send_post_request(PostRequest::Action { payload }, self.post_timeout)
784 .await?;
785
786 match response.response {
787 PostResponsePayload::Action { payload } => {
788 serde_json::from_value(payload).map_err(HyperliquidError::Serde)
789 }
790 PostResponsePayload::Error { payload } => Err(map_post_payload_error(payload, weight)),
791 PostResponsePayload::Info { payload } => Err(HyperliquidError::decode(format!(
792 "expected action post response, received info payload: {payload}"
793 ))),
794 }
795 }
796
797 #[allow(
799 clippy::too_many_arguments,
800 reason = "matches the Python and HTTP order modify surface"
801 )]
802 pub async fn modify_order(
803 &self,
804 signer: &HyperliquidHttpClient,
805 instrument_id: InstrumentId,
806 venue_order_id: VenueOrderId,
807 order_side: OrderSide,
808 order_type: OrderType,
809 price: Price,
810 quantity: Quantity,
811 trigger_price: Option<Price>,
812 reduce_only: bool,
813 post_only: bool,
814 time_in_force: TimeInForce,
815 client_order_id: Option<ClientOrderId>,
816 ) -> HyperliquidResult<()> {
817 let symbol = instrument_id.symbol.inner();
818 let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
819 HyperliquidError::bad_request(format!(
820 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
821 ))
822 })?;
823 let oid = venue_order_id
824 .as_str()
825 .parse::<u64>()
826 .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
827 let is_buy = matches!(order_side, OrderSide::Buy);
828 let price_decimals = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
829 let price = if signer.normalize_prices() {
830 normalize_price(price.as_decimal(), price_decimals).normalize()
831 } else {
832 price.as_decimal().normalize()
833 };
834 let kind = hyperliquid_order_kind(
835 order_type,
836 time_in_force,
837 post_only,
838 trigger_price,
839 signer.normalize_prices(),
840 price_decimals,
841 )?;
842 let cloid =
843 client_order_id.map(|id| (id, signer.get_or_generate_client_order_id_cloid(id)));
844 let order = HyperliquidExecPlaceOrderRequest {
845 asset,
846 is_buy,
847 price,
848 size: quantity.as_decimal().normalize(),
849 reduce_only,
850 kind,
851 cloid: cloid.map(|(_, cloid)| cloid),
852 };
853
854 if let Some((client_order_id, cloid)) = cloid {
855 self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
856 }
857 let action = HyperliquidExecAction::Modify {
858 modify: HyperliquidExecModifyOrderRequest { oid, order },
859 };
860 let response = self.post_action_exec(signer, &action).await?;
861
862 ensure_ws_action_accepted(&response, "Modify order")
863 }
864
865 async fn send_post_request(
866 &self,
867 request: PostRequest,
868 timeout: Duration,
869 ) -> HyperliquidResult<PostResponse> {
870 let id = self.post_ids.next();
871
872 match tokio::time::timeout(timeout, async {
873 let rx = self.post_router.register(id).await?;
874
875 let send_result = self
876 .cmd_tx
877 .read()
878 .await
879 .send(HandlerCommand::Post { id, request });
880
881 if let Err(e) = send_result {
882 self.post_router.cancel(id).await;
883 return Err(HyperliquidError::transport(format!(
884 "post command channel closed: {e}"
885 )));
886 }
887
888 self.post_router.await_with_timeout(id, rx, timeout).await
889 })
890 .await
891 {
892 Ok(result) => result,
893 Err(_elapsed) => {
894 self.post_router.cancel(id).await;
895 Err(HyperliquidError::Timeout)
896 }
897 }
898 }
899
900 pub fn is_active(&self) -> bool {
902 let mode = self.connection_mode.load();
903 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
904 }
905
906 pub fn url(&self) -> &str {
908 &self.url
909 }
910
911 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
918 let mut map = AHashMap::new();
919
920 for inst in instruments {
921 let coin = inst.raw_symbol().inner();
922 map.insert(coin, inst);
923 }
924 let count = map.len();
925 self.instruments.store(map);
926 log::info!("Hyperliquid instrument cache initialized with {count} instruments");
927 }
928
929 pub fn cache_instrument(&self, instrument: InstrumentAny) {
933 let coin = instrument.raw_symbol().inner();
934 self.instruments.insert(coin, instrument.clone());
935
936 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
939 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
940 }
941 }
942
943 #[must_use]
945 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
946 self.instruments.clone()
947 }
948
949 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
955 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
956 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
957 }
958 }
959
960 #[allow(
968 clippy::missing_panics_doc,
969 reason = "cloid cache mutex poisoning is not expected"
970 )]
971 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
972 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
973 self.cloid_cache
974 .lock()
975 .expect(MUTEX_POISONED)
976 .insert(cloid, client_order_id);
977 }
978
979 #[allow(
984 clippy::missing_panics_doc,
985 reason = "cloid cache mutex poisoning is not expected"
986 )]
987 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
988 if self
989 .cloid_cache
990 .lock()
991 .expect(MUTEX_POISONED)
992 .remove(cloid)
993 .is_some()
994 {
995 log::debug!("Removed cloid mapping: {cloid}");
996 }
997 }
998
999 #[allow(
1003 clippy::missing_panics_doc,
1004 reason = "cloid cache mutex poisoning is not expected"
1005 )]
1006 pub fn clear_cloid_cache(&self) {
1007 let mut cache = self.cloid_cache.lock().expect(MUTEX_POISONED);
1008 let count = cache.len();
1009 cache.clear();
1010
1011 if count > 0 {
1012 log::debug!("Cleared {count} cloid mappings from cache");
1013 }
1014 }
1015
1016 #[must_use]
1018 #[allow(
1019 clippy::missing_panics_doc,
1020 reason = "cloid cache mutex poisoning is not expected"
1021 )]
1022 pub fn cloid_cache_len(&self) -> usize {
1023 self.cloid_cache.lock().expect(MUTEX_POISONED).len()
1024 }
1025
1026 #[must_use]
1030 #[allow(
1031 clippy::missing_panics_doc,
1032 reason = "cloid cache mutex poisoning is not expected"
1033 )]
1034 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
1035 self.cloid_cache
1036 .lock()
1037 .expect(MUTEX_POISONED)
1038 .get(cloid)
1039 .copied()
1040 }
1041
1042 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
1046 self.instruments
1047 .load()
1048 .values()
1049 .find(|inst| inst.id() == *id)
1050 .cloned()
1051 }
1052
1053 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1055 self.instruments.get_cloned(symbol)
1056 }
1057
1058 pub fn subscription_count(&self) -> usize {
1060 self.subscriptions.len()
1061 }
1062
1063 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
1067 let key = format!("candle:{coin}:{interval}");
1069 self.bar_types.load().get(&key).copied()
1070 }
1071
1072 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1074 self.subscribe_book_with_options(instrument_id, None, None)
1075 .await
1076 }
1077
1078 pub async fn subscribe_book_with_options(
1081 &self,
1082 instrument_id: InstrumentId,
1083 n_sig_figs: Option<u32>,
1084 mantissa: Option<u32>,
1085 ) -> anyhow::Result<()> {
1086 let instrument = self
1087 .get_instrument(&instrument_id)
1088 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1089 let coin = instrument.raw_symbol().inner();
1090
1091 let cmd_tx = self.cmd_tx.read().await;
1092
1093 cmd_tx
1095 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1096 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1097
1098 let subscription = SubscriptionRequest::L2Book {
1099 coin,
1100 mantissa,
1101 n_sig_figs,
1102 };
1103
1104 cmd_tx
1105 .send(HandlerCommand::Subscribe {
1106 subscriptions: vec![subscription],
1107 })
1108 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1109 Ok(())
1110 }
1111
1112 pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1118 self.subscribe_book_depth10_with_options(instrument_id, None, None)
1119 .await
1120 }
1121
1122 pub async fn subscribe_book_depth10_with_options(
1125 &self,
1126 instrument_id: InstrumentId,
1127 n_sig_figs: Option<u32>,
1128 mantissa: Option<u32>,
1129 ) -> anyhow::Result<()> {
1130 let instrument = self
1131 .get_instrument(&instrument_id)
1132 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1133 let coin = instrument.raw_symbol().inner();
1134
1135 let cmd_tx = self.cmd_tx.read().await;
1136
1137 cmd_tx
1138 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1139 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1140
1141 cmd_tx
1142 .send(HandlerCommand::SetDepth10Sub {
1143 coin,
1144 subscribed: true,
1145 })
1146 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
1147
1148 let subscription = SubscriptionRequest::L2Book {
1149 coin,
1150 mantissa,
1151 n_sig_figs,
1152 };
1153
1154 cmd_tx
1155 .send(HandlerCommand::Subscribe {
1156 subscriptions: vec![subscription],
1157 })
1158 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1159 Ok(())
1160 }
1161
1162 pub async fn unsubscribe_book_depth10(
1169 &self,
1170 instrument_id: InstrumentId,
1171 ) -> anyhow::Result<()> {
1172 let instrument = self
1173 .get_instrument(&instrument_id)
1174 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1175 let coin = instrument.raw_symbol().inner();
1176
1177 self.cmd_tx
1178 .read()
1179 .await
1180 .send(HandlerCommand::SetDepth10Sub {
1181 coin,
1182 subscribed: false,
1183 })
1184 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
1185 Ok(())
1186 }
1187
1188 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1190 let instrument = self
1191 .get_instrument(&instrument_id)
1192 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1193 let coin = instrument.raw_symbol().inner();
1194
1195 let cmd_tx = self.cmd_tx.read().await;
1196
1197 cmd_tx
1199 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1200 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1201
1202 let subscription = SubscriptionRequest::Bbo { coin };
1203
1204 cmd_tx
1205 .send(HandlerCommand::Subscribe {
1206 subscriptions: vec![subscription],
1207 })
1208 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1209 Ok(())
1210 }
1211
1212 pub async fn subscribe_all_mids(&self) -> anyhow::Result<()> {
1214 self.subscribe_all_mids_with_dex(None).await
1215 }
1216
1217 pub async fn subscribe_all_dexs_asset_ctxs(&self) -> anyhow::Result<()> {
1219 self.cmd_tx
1220 .read()
1221 .await
1222 .send(HandlerCommand::Subscribe {
1223 subscriptions: vec![SubscriptionRequest::AllDexsAssetCtxs],
1224 })
1225 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1226 Ok(())
1227 }
1228
1229 pub async fn subscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
1231 let cmd_tx = self.cmd_tx.read().await;
1232
1233 let subscription = SubscriptionRequest::AllMids {
1234 dex: dex.map(ToString::to_string),
1235 };
1236
1237 cmd_tx
1238 .send(HandlerCommand::Subscribe {
1239 subscriptions: vec![subscription],
1240 })
1241 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1242 Ok(())
1243 }
1244
1245 pub async fn unsubscribe_all_mids(&self) -> anyhow::Result<()> {
1247 self.unsubscribe_all_mids_with_dex(None).await
1248 }
1249
1250 pub async fn unsubscribe_all_dexs_asset_ctxs(&self) -> anyhow::Result<()> {
1252 self.cmd_tx
1253 .read()
1254 .await
1255 .send(HandlerCommand::Unsubscribe {
1256 subscriptions: vec![SubscriptionRequest::AllDexsAssetCtxs],
1257 })
1258 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1259 Ok(())
1260 }
1261
1262 pub async fn unsubscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
1264 let cmd_tx = self.cmd_tx.read().await;
1265
1266 let subscription = SubscriptionRequest::AllMids {
1267 dex: dex.map(ToString::to_string),
1268 };
1269
1270 cmd_tx
1271 .send(HandlerCommand::Unsubscribe {
1272 subscriptions: vec![subscription],
1273 })
1274 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1275 Ok(())
1276 }
1277
1278 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1280 let instrument = self
1281 .get_instrument(&instrument_id)
1282 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1283 let coin = instrument.raw_symbol().inner();
1284
1285 let cmd_tx = self.cmd_tx.read().await;
1286
1287 cmd_tx
1289 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1290 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1291
1292 let subscription = SubscriptionRequest::Trades { coin };
1293
1294 cmd_tx
1295 .send(HandlerCommand::Subscribe {
1296 subscriptions: vec![subscription],
1297 })
1298 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1299 Ok(())
1300 }
1301
1302 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1304 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
1305 .await
1306 }
1307
1308 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1310 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
1311 .await
1312 }
1313
1314 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
1316 let instrument = self
1318 .get_instrument(&bar_type.instrument_id())
1319 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
1320 let coin = instrument.raw_symbol().inner();
1321 let interval = bar_type_to_interval(&bar_type)?;
1322 let subscription = SubscriptionRequest::Candle { coin, interval };
1323
1324 let key = format!("candle:{coin}:{interval}");
1326 self.bar_types.insert(key.clone(), bar_type);
1327
1328 let cmd_tx = self.cmd_tx.read().await;
1329
1330 cmd_tx
1331 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1332 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1333
1334 cmd_tx
1335 .send(HandlerCommand::AddBarType { key, bar_type })
1336 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
1337
1338 cmd_tx
1339 .send(HandlerCommand::Subscribe {
1340 subscriptions: vec![subscription],
1341 })
1342 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1343 Ok(())
1344 }
1345
1346 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1348 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
1349 .await
1350 }
1351
1352 pub async fn subscribe_open_interest(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1354 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::OpenInterest)
1355 .await
1356 }
1357
1358 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
1360 let subscription = SubscriptionRequest::OrderUpdates {
1361 user: user.to_string(),
1362 };
1363 self.cmd_tx
1364 .read()
1365 .await
1366 .send(HandlerCommand::Subscribe {
1367 subscriptions: vec![subscription],
1368 })
1369 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1370 Ok(())
1371 }
1372
1373 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
1375 let subscription = SubscriptionRequest::UserEvents {
1376 user: user.to_string(),
1377 };
1378 self.cmd_tx
1379 .read()
1380 .await
1381 .send(HandlerCommand::Subscribe {
1382 subscriptions: vec![subscription],
1383 })
1384 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1385 Ok(())
1386 }
1387
1388 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
1393 let subscription = SubscriptionRequest::UserFills {
1394 user: user.to_string(),
1395 aggregate_by_time: None,
1396 };
1397 self.cmd_tx
1398 .read()
1399 .await
1400 .send(HandlerCommand::Subscribe {
1401 subscriptions: vec![subscription],
1402 })
1403 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1404 Ok(())
1405 }
1406
1407 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
1412 self.subscribe_order_updates(user).await?;
1413 self.subscribe_user_events(user).await?;
1414 Ok(())
1415 }
1416
1417 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1419 let instrument = self
1420 .get_instrument(&instrument_id)
1421 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1422 let coin = instrument.raw_symbol().inner();
1423
1424 let subscription = SubscriptionRequest::L2Book {
1425 coin,
1426 mantissa: None,
1427 n_sig_figs: None,
1428 };
1429
1430 self.cmd_tx
1431 .read()
1432 .await
1433 .send(HandlerCommand::Unsubscribe {
1434 subscriptions: vec![subscription],
1435 })
1436 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1437 Ok(())
1438 }
1439
1440 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1442 let instrument = self
1443 .get_instrument(&instrument_id)
1444 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1445 let coin = instrument.raw_symbol().inner();
1446
1447 let subscription = SubscriptionRequest::Bbo { coin };
1448
1449 self.cmd_tx
1450 .read()
1451 .await
1452 .send(HandlerCommand::Unsubscribe {
1453 subscriptions: vec![subscription],
1454 })
1455 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1456 Ok(())
1457 }
1458
1459 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1461 let instrument = self
1462 .get_instrument(&instrument_id)
1463 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1464 let coin = instrument.raw_symbol().inner();
1465
1466 let subscription = SubscriptionRequest::Trades { coin };
1467
1468 self.cmd_tx
1469 .read()
1470 .await
1471 .send(HandlerCommand::Unsubscribe {
1472 subscriptions: vec![subscription],
1473 })
1474 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1475 Ok(())
1476 }
1477
1478 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1480 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
1481 .await
1482 }
1483
1484 pub async fn unsubscribe_index_prices(
1486 &self,
1487 instrument_id: InstrumentId,
1488 ) -> anyhow::Result<()> {
1489 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
1490 .await
1491 }
1492
1493 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
1495 let instrument = self
1497 .get_instrument(&bar_type.instrument_id())
1498 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
1499 let coin = instrument.raw_symbol().inner();
1500 let interval = bar_type_to_interval(&bar_type)?;
1501 let subscription = SubscriptionRequest::Candle { coin, interval };
1502
1503 let key = format!("candle:{coin}:{interval}");
1504 self.bar_types.remove(&key);
1505
1506 let cmd_tx = self.cmd_tx.read().await;
1507
1508 cmd_tx
1509 .send(HandlerCommand::RemoveBarType { key })
1510 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
1511
1512 cmd_tx
1513 .send(HandlerCommand::Unsubscribe {
1514 subscriptions: vec![subscription],
1515 })
1516 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1517 Ok(())
1518 }
1519
1520 pub async fn unsubscribe_funding_rates(
1522 &self,
1523 instrument_id: InstrumentId,
1524 ) -> anyhow::Result<()> {
1525 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
1526 .await
1527 }
1528
1529 pub async fn unsubscribe_open_interest(
1531 &self,
1532 instrument_id: InstrumentId,
1533 ) -> anyhow::Result<()> {
1534 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::OpenInterest)
1535 .await
1536 }
1537
1538 pub fn cache_all_dex_asset_ctxs_instrument_ids(
1540 &self,
1541 mapping: AHashMap<Ustr, Vec<Option<InstrumentId>>>,
1542 ) {
1543 self.all_dex_asset_ctxs_instrument_ids
1544 .store(mapping.clone());
1545
1546 if let Ok(cmd_tx) = self.cmd_tx.try_read()
1547 && let Err(e) = cmd_tx.send(HandlerCommand::CacheAllDexAssetCtxsInstrumentIds(mapping))
1548 {
1549 log::debug!(
1550 "Failed to send CacheAllDexAssetCtxsInstrumentIds command (handler may not be connected yet): {e}"
1551 );
1552 }
1553 }
1554
1555 async fn subscribe_asset_context_data(
1556 &self,
1557 instrument_id: InstrumentId,
1558 data_type: AssetContextDataType,
1559 ) -> anyhow::Result<()> {
1560 let instrument = self
1561 .get_instrument(&instrument_id)
1562 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1563 let coin = instrument.raw_symbol().inner();
1564
1565 let mut entry = self.asset_context_subs.entry(coin).or_default();
1566 let is_first_subscription = entry.is_empty();
1567 entry.insert(data_type);
1568 let data_types = entry.clone();
1569 drop(entry);
1570
1571 let cmd_tx = self.cmd_tx.read().await;
1572
1573 cmd_tx
1574 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1575 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
1576
1577 if is_first_subscription {
1578 log::debug!(
1579 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
1580 );
1581 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1582
1583 cmd_tx
1584 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1585 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1586
1587 cmd_tx
1588 .send(HandlerCommand::Subscribe {
1589 subscriptions: vec![subscription],
1590 })
1591 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1592 } else {
1593 log::debug!(
1594 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
1595 );
1596 }
1597
1598 Ok(())
1599 }
1600
1601 async fn unsubscribe_asset_context_data(
1602 &self,
1603 instrument_id: InstrumentId,
1604 data_type: AssetContextDataType,
1605 ) -> anyhow::Result<()> {
1606 let instrument = self
1607 .get_instrument(&instrument_id)
1608 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1609 let coin = instrument.raw_symbol().inner();
1610
1611 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
1612 entry.remove(&data_type);
1613 let should_unsubscribe = entry.is_empty();
1614 let data_types = entry.clone();
1615 drop(entry);
1616
1617 let cmd_tx = self.cmd_tx.read().await;
1618
1619 if should_unsubscribe {
1620 self.asset_context_subs.remove(&coin);
1621
1622 log::debug!(
1623 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
1624 );
1625 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1626
1627 cmd_tx
1628 .send(HandlerCommand::UpdateAssetContextSubs {
1629 coin,
1630 data_types: AHashSet::new(),
1631 })
1632 .map_err(|e| {
1633 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1634 })?;
1635
1636 cmd_tx
1637 .send(HandlerCommand::Unsubscribe {
1638 subscriptions: vec![subscription],
1639 })
1640 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1641 } else {
1642 log::debug!(
1643 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
1644 );
1645
1646 cmd_tx
1647 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1648 .map_err(|e| {
1649 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1650 })?;
1651 }
1652 }
1653
1654 Ok(())
1655 }
1656
1657 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
1661 if let Some(ref mut rx) = self.out_rx {
1662 rx.recv().await
1663 } else {
1664 None
1665 }
1666 }
1667}
1668
1669fn cancel_errors_for_requests(
1670 errors: Vec<Option<String>>,
1671 request_count: usize,
1672) -> HyperliquidResult<Vec<Option<String>>> {
1673 if errors.is_empty() {
1674 return Ok(vec![None; request_count]);
1675 }
1676
1677 if errors.len() != request_count {
1678 return Err(HyperliquidError::exchange(format!(
1679 "Cancel orders returned {} statuses for {request_count} cancels",
1680 errors.len()
1681 )));
1682 }
1683
1684 Ok(errors)
1685}
1686
1687fn map_post_payload_error(payload: String, weight: u32) -> HyperliquidError {
1688 let lower = payload.to_ascii_lowercase();
1689 let message = format!("WebSocket post error: {payload}");
1690
1691 if starts_with_status(&lower, &["429"])
1692 || lower.contains("too many requests")
1693 || lower.contains("rate limit")
1694 {
1695 HyperliquidError::rate_limit("exchange", weight, None)
1696 } else if starts_with_status(&lower, &["401", "403"])
1697 || lower.contains("unauthorized")
1698 || lower.contains("forbidden")
1699 || lower.contains("authentication")
1700 || lower.contains("authorization")
1701 || lower.contains("invalid signature")
1702 || contains_word(&lower, "auth")
1703 {
1704 HyperliquidError::auth(message)
1705 } else if starts_with_status(&lower, &["400"]) || lower.contains("bad request") {
1706 HyperliquidError::bad_request(message)
1707 } else if starts_with_status(&lower, &["500", "502", "503", "504"]) {
1708 HyperliquidError::exchange(message)
1709 } else {
1710 HyperliquidError::exchange(payload)
1711 }
1712}
1713
1714fn hyperliquid_order_kind(
1715 order_type: OrderType,
1716 time_in_force: TimeInForce,
1717 post_only: bool,
1718 trigger_price: Option<Price>,
1719 normalize_prices_enabled: bool,
1720 price_precision: u8,
1721) -> HyperliquidResult<HyperliquidExecOrderKind> {
1722 match order_type {
1723 OrderType::Market => Ok(HyperliquidExecOrderKind::Limit {
1724 limit: HyperliquidExecLimitParams {
1725 tif: HyperliquidExecTif::Ioc,
1726 },
1727 }),
1728 OrderType::Limit => {
1729 let tif = time_in_force_to_hyperliquid_tif(time_in_force, post_only)
1730 .map_err(|e| HyperliquidError::bad_request(format!("{e}")))?;
1731 Ok(HyperliquidExecOrderKind::Limit {
1732 limit: HyperliquidExecLimitParams { tif },
1733 })
1734 }
1735 OrderType::StopMarket
1736 | OrderType::StopLimit
1737 | OrderType::MarketIfTouched
1738 | OrderType::LimitIfTouched => {
1739 let trigger_price = trigger_price.ok_or_else(|| {
1740 HyperliquidError::bad_request("Trigger orders require a trigger price")
1741 })?;
1742 let trigger_px = if normalize_prices_enabled {
1743 normalize_price(trigger_price.as_decimal(), price_precision).normalize()
1744 } else {
1745 trigger_price.as_decimal().normalize()
1746 };
1747 let tpsl = match order_type {
1748 OrderType::StopMarket | OrderType::StopLimit => HyperliquidExecTpSl::Sl,
1749 OrderType::MarketIfTouched | OrderType::LimitIfTouched => HyperliquidExecTpSl::Tp,
1750 _ => unreachable!(),
1751 };
1752 let is_market = matches!(
1753 order_type,
1754 OrderType::StopMarket | OrderType::MarketIfTouched
1755 );
1756
1757 Ok(HyperliquidExecOrderKind::Trigger {
1758 trigger: HyperliquidExecTriggerParams {
1759 is_market,
1760 trigger_px,
1761 tpsl,
1762 },
1763 })
1764 }
1765 _ => Err(HyperliquidError::bad_request(format!(
1766 "Order type {order_type:?} not supported"
1767 ))),
1768 }
1769}
1770
1771fn ensure_ws_action_accepted(
1772 response: &HyperliquidExchangeResponse,
1773 action_name: &str,
1774) -> HyperliquidResult<()> {
1775 if response.is_ok() {
1776 if let Some(error_msg) = extract_inner_errors(response).into_iter().flatten().next() {
1777 return Err(HyperliquidError::bad_request(format!(
1778 "{action_name} rejected: {error_msg}"
1779 )));
1780 }
1781
1782 if let Some(error_msg) = extract_inner_error(response) {
1783 return Err(HyperliquidError::bad_request(format!(
1784 "{action_name} rejected: {error_msg}"
1785 )));
1786 }
1787
1788 return Ok(());
1789 }
1790
1791 Err(HyperliquidError::bad_request(format!(
1792 "{action_name} failed: {}",
1793 extract_error_message(response)
1794 )))
1795}
1796
1797fn starts_with_status(payload: &str, statuses: &[&str]) -> bool {
1798 let trimmed = payload.trim_start();
1799 statuses
1800 .iter()
1801 .any(|status| starts_with_status_token(trimmed, status))
1802 || trimmed.strip_prefix("http").is_some_and(|rest| {
1803 let rest = rest
1804 .trim_start_matches(|c: char| c.is_ascii_whitespace() || matches!(c, ':' | '/'));
1805 statuses
1806 .iter()
1807 .any(|status| starts_with_status_token(rest, status))
1808 })
1809}
1810
1811fn starts_with_status_token(payload: &str, status: &str) -> bool {
1812 payload.strip_prefix(status).is_some_and(|rest| {
1813 rest.chars()
1814 .next()
1815 .is_none_or(|c| !c.is_ascii_alphanumeric())
1816 })
1817}
1818
1819fn contains_word(payload: &str, word: &str) -> bool {
1820 payload
1821 .split(|c: char| !c.is_ascii_alphanumeric())
1822 .any(|part| part == word)
1823}
1824
1825fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
1828 let (kind, rest) = topic
1829 .split_once(':')
1830 .map_or((topic, None), |(k, r)| (k, Some(r)));
1831
1832 let channel = HyperliquidWsChannel::from_wire_str(kind)
1833 .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
1834
1835 match channel {
1836 HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1837 dex: rest.map(|s| s.to_string()),
1838 }),
1839 HyperliquidWsChannel::AllDexsAssetCtxs => Ok(SubscriptionRequest::AllDexsAssetCtxs),
1840 HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1841 user: rest.context("Missing user")?.to_string(),
1842 }),
1843 HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1844 user: rest.context("Missing user")?.to_string(),
1845 }),
1846 HyperliquidWsChannel::Candle => {
1847 let rest = rest.context("Missing candle params")?;
1849 let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1850 let interval = HyperliquidBarInterval::from_str(interval_str)?;
1851 Ok(SubscriptionRequest::Candle {
1852 coin: Ustr::from(coin),
1853 interval,
1854 })
1855 }
1856 HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1857 coin: Ustr::from(rest.context("Missing coin")?),
1858 mantissa: None,
1859 n_sig_figs: None,
1860 }),
1861 HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1862 coin: Ustr::from(rest.context("Missing coin")?),
1863 }),
1864 HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1865 user: rest.context("Missing user")?.to_string(),
1866 }),
1867 HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1868 user: rest.context("Missing user")?.to_string(),
1869 }),
1870 HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1871 user: rest.context("Missing user")?.to_string(),
1872 aggregate_by_time: None,
1873 }),
1874 HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1875 user: rest.context("Missing user")?.to_string(),
1876 }),
1877 HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1878 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1879 user: rest.context("Missing user")?.to_string(),
1880 })
1881 }
1882 HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1883 coin: Ustr::from(rest.context("Missing coin")?),
1884 }),
1885 HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1886 coin: Ustr::from(rest.context("Missing coin")?),
1887 }),
1888 HyperliquidWsChannel::ActiveAssetData => {
1889 let rest = rest.context("Missing params")?;
1891 let (user, coin) = rest.split_once(':').context("Missing coin")?;
1892 Ok(SubscriptionRequest::ActiveAssetData {
1893 user: user.to_string(),
1894 coin: coin.to_string(),
1895 })
1896 }
1897 HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1898 user: rest.context("Missing user")?.to_string(),
1899 }),
1900 HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1901 user: rest.context("Missing user")?.to_string(),
1902 }),
1903 HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1904 coin: Ustr::from(rest.context("Missing coin")?),
1905 }),
1906
1907 HyperliquidWsChannel::SubscriptionResponse
1909 | HyperliquidWsChannel::User
1910 | HyperliquidWsChannel::Post
1911 | HyperliquidWsChannel::Pong
1912 | HyperliquidWsChannel::Error => {
1913 anyhow::bail!("Not a subscription channel: {kind}")
1914 }
1915 }
1916}
1917
1918#[cfg(test)]
1919mod tests {
1920 use rstest::rstest;
1921
1922 use super::*;
1923 use crate::{
1924 common::{consts::INFLIGHT_MAX, enums::HyperliquidBarInterval},
1925 websocket::handler::subscription_to_key,
1926 };
1927
1928 fn subscription_topic(sub: &SubscriptionRequest) -> String {
1930 subscription_to_key(sub)
1931 }
1932
1933 #[rstest]
1934 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1935 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1936 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1937 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1938 fn test_subscription_topic_generation(
1939 #[case] subscription: SubscriptionRequest,
1940 #[case] expected_topic: &str,
1941 ) {
1942 assert_eq!(subscription_topic(&subscription), expected_topic);
1943 }
1944
1945 #[rstest]
1946 fn test_subscription_topics_unique() {
1947 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1948 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1949
1950 let topic1 = subscription_topic(&sub1);
1951 let topic2 = subscription_topic(&sub2);
1952
1953 assert_ne!(topic1, topic2);
1954 }
1955
1956 #[rstest]
1957 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1958 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1959 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1960 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1961 #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1962 #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1963 #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1964 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1965 let topic = subscription_topic(&subscription);
1966 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1967 assert_eq!(subscription_topic(&reconstructed), topic);
1968 }
1969
1970 #[rstest]
1971 fn test_subscription_topic_candle() {
1972 let sub = SubscriptionRequest::Candle {
1973 coin: "BTC".into(),
1974 interval: HyperliquidBarInterval::OneHour,
1975 };
1976
1977 let topic = subscription_topic(&sub);
1978 assert_eq!(topic, "candle:BTC:1h");
1979 }
1980
1981 #[rstest]
1982 fn set_post_timeout_updates_client_and_clone() {
1983 let mut client = HyperliquidWebSocketClient::new(
1984 None,
1985 HyperliquidEnvironment::Testnet,
1986 None,
1987 TransportBackend::default(),
1988 None,
1989 );
1990 let timeout = std::time::Duration::from_secs(7);
1991
1992 client.set_post_timeout(timeout);
1993
1994 assert_eq!(client.post_timeout, timeout);
1995 assert_eq!(client.clone().post_timeout, timeout);
1996 }
1997
1998 #[rstest]
1999 #[tokio::test(flavor = "multi_thread")]
2000 async fn send_post_request_times_out_while_waiting_for_inflight_slot() {
2001 let client = HyperliquidWebSocketClient::new(
2002 None,
2003 HyperliquidEnvironment::Testnet,
2004 None,
2005 TransportBackend::default(),
2006 None,
2007 );
2008 let mut receivers = Vec::with_capacity(INFLIGHT_MAX);
2009 for offset in 0..INFLIGHT_MAX {
2010 receivers.push(
2011 client
2012 .post_router
2013 .register(10_000 + offset as u64)
2014 .await
2015 .unwrap(),
2016 );
2017 }
2018
2019 let err = client
2020 .send_post_request(
2021 PostRequest::Info {
2022 payload: serde_json::json!({"type": "clearinghouseState", "user": "0x0"}),
2023 },
2024 std::time::Duration::from_millis(25),
2025 )
2026 .await
2027 .expect_err("request should timeout before acquiring an inflight slot");
2028
2029 assert!(matches!(err, HyperliquidError::Timeout));
2030 assert_eq!(receivers.len(), INFLIGHT_MAX);
2031 }
2032
2033 #[rstest]
2034 fn cancel_errors_for_requests_accepts_empty_as_success() {
2035 let errors = cancel_errors_for_requests(Vec::new(), 2).unwrap();
2036
2037 assert_eq!(errors, vec![None, None]);
2038 }
2039
2040 #[rstest]
2041 fn cancel_errors_for_requests_rejects_status_count_mismatch() {
2042 let err = cancel_errors_for_requests(vec![None], 2).expect_err("mismatch should fail");
2043
2044 assert!(
2045 err.to_string()
2046 .contains("returned 1 statuses for 2 cancels")
2047 );
2048 }
2049
2050 #[rstest]
2051 fn test_post_payload_error_maps_rate_limit() {
2052 let err = map_post_payload_error("429 Too Many Requests".to_string(), 3);
2053
2054 assert!(matches!(
2055 err,
2056 HyperliquidError::RateLimit {
2057 scope: "exchange",
2058 weight: 3,
2059 retry_after_ms: None,
2060 }
2061 ));
2062 }
2063
2064 #[rstest]
2065 #[case("401 Unauthorized")]
2066 #[case("HTTP 403: forbidden")]
2067 #[case("invalid signature")]
2068 #[case("authentication failed")]
2069 fn test_post_payload_error_maps_auth(#[case] payload: &str) {
2070 let err = map_post_payload_error(payload.to_string(), 1);
2071
2072 assert!(matches!(err, HyperliquidError::Auth(_)));
2073 }
2074
2075 #[rstest]
2076 #[case("400 Bad Request")]
2077 #[case("HTTP 400: malformed payload")]
2078 #[case("bad request: missing action")]
2079 fn test_post_payload_error_maps_bad_request(#[case] payload: &str) {
2080 let err = map_post_payload_error(payload.to_string(), 1);
2081
2082 assert!(matches!(err, HyperliquidError::BadRequest(_)));
2083 }
2084
2085 #[rstest]
2086 #[case("500 Internal Server Error")]
2087 #[case("HTTP 503: service unavailable")]
2088 fn test_post_payload_error_maps_exchange_status(#[case] payload: &str) {
2089 let err = map_post_payload_error(payload.to_string(), 1);
2090
2091 assert!(matches!(err, HyperliquidError::Exchange(_)));
2092 }
2093
2094 #[rstest]
2095 #[case("order 429001 rejected")]
2096 #[case("asset 5001 is not tradable")]
2097 #[case("authoritative nonce window exceeded")]
2098 fn test_post_payload_error_does_not_match_embedded_codes_or_words(#[case] payload: &str) {
2099 let err = map_post_payload_error(payload.to_string(), 1);
2100
2101 assert!(matches!(err, HyperliquidError::Exchange(_)));
2102 }
2103}