1use crate::{
2 dex_connector::{slippage_price, string_to_decimal, DexConnector},
3 dex_request::{DexError, DexRequest, HttpMethod},
4 dex_websocket::DexWebSocket,
5 BalanceResponse, CanceledOrder, CanceledOrdersResponse, CombinedBalanceResponse,
6 CreateOrderResponse, FilledOrder, FilledOrdersResponse, LastTradesResponse, OpenOrdersResponse,
7 OrderBookSnapshot, OrderSide, TickerResponse, TpSl, TriggerOrderStyle,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use debot_utils::parse_to_decimal;
12use ethers::{signers::LocalWallet, types::H160};
13use futures::{
14 stream::{SplitSink, SplitStream},
15 SinkExt, StreamExt,
16};
17use hyperliquid_rust_sdk_fork::{
18 BaseUrl, ClientCancelRequest, ClientLimit, ClientOrder, ClientOrderRequest, ClientTrigger,
19 ExchangeClient, ExchangeDataStatus, ExchangeResponseStatus,
20};
21use reqwest::Client;
22use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
23use rust_decimal::Decimal;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{
27 collections::HashMap,
28 str::FromStr,
29 sync::{
30 atomic::{AtomicBool, Ordering},
31 Arc,
32 },
33 time::Duration,
34};
35use tokio::{
36 net::TcpStream,
37 select,
38 signal::unix::{signal, SignalKind},
39 sync::{Mutex, RwLock},
40 task::JoinHandle,
41 time::sleep,
42};
43use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
44
45struct Config {
46 evm_wallet_address: String,
47 symbol_list: Vec<String>,
48}
49
50#[derive(Deserialize, Debug)]
52struct SpotMetaToken {
53 #[serde(rename = "name")]
54 _name: String,
55 #[serde(rename = "szDecimals")]
56 _sz_decimals: u32,
57 #[serde(rename = "weiDecimals")]
58 _wei_decimals: u32,
59 #[serde(rename = "index")]
60 _index: usize,
61}
62
63#[derive(Deserialize, Debug, Clone)]
64struct SpotMetaUniverse {
65 #[serde(rename = "name")]
66 name: String,
67 #[serde(rename = "tokens")]
68 _tokens: Vec<usize>,
69 #[serde(rename = "index")]
70 index: usize,
71}
72
73#[derive(Deserialize, Debug)]
74struct SpotMetaResponse {
75 #[serde(rename = "tokens")]
76 _tokens: Vec<SpotMetaToken>,
77 #[serde(rename = "universe")]
78 universe: Vec<SpotMetaUniverse>,
79}
80
81#[derive(Serialize, Debug)]
82struct InfoRequest<'a> {
83 #[serde(rename = "type")]
84 req_type: &'a str,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 user: Option<&'a str>,
87}
88
89#[derive(Debug)]
90struct TradeResult {
91 pub filled_side: OrderSide,
92 pub filled_size: Decimal,
93 pub filled_value: Decimal,
94 pub filled_fee: Decimal,
95 order_id: String,
96 pub is_rejected: bool,
97}
98
99#[derive(Debug, Clone)]
100pub struct CancelEvent {
101 pub order_id: String,
102 pub timestamp: u64,
103}
104
105#[derive(Default)]
106struct DynamicMarketInfo {
107 pub best_bid: Option<Decimal>,
108 pub best_ask: Option<Decimal>,
109 pub market_price: Option<Decimal>,
110 pub min_tick: Option<Decimal>,
111 pub volume: Option<Decimal>,
112 pub num_trades: Option<u64>,
113 pub open_interest: Option<Decimal>,
114 pub funding_rate: Option<Decimal>,
115 pub oracle_price: Option<Decimal>,
116}
117
118#[derive(Clone)]
119struct StaticMarketInfo {
120 pub decimals: u32,
121 pub _max_leverage: u32,
122}
123
124#[derive(Clone)]
125#[allow(dead_code)]
126struct MaintenanceInfo {
127 next_start: Option<DateTime<Utc>>,
128 fetched_at: DateTime<Utc>,
129}
130
131#[derive(Deserialize, Debug)]
132pub struct OrderUpdateDetail {
133 pub coin: String,
134 #[serde(rename = "oid")]
135 pub oid: u64,
136}
137
138#[derive(Deserialize, Debug)]
139pub struct OrderUpdate {
140 pub order: OrderUpdateDetail,
141 pub status: String,
142 #[serde(rename = "statusTimestamp")]
143 pub status_timestamp: u64,
144}
145
146#[allow(dead_code)]
147#[derive(Deserialize, Debug)]
148struct WsLevel {
149 px: String,
150 sz: String,
151 n: u64,
152}
153
154#[allow(dead_code)]
155#[derive(Deserialize, Debug)]
156struct WsBbo {
157 coin: String,
158 time: u64,
159 bbo: [Option<WsLevel>; 2], }
161
162#[allow(dead_code)]
163#[derive(Deserialize, Debug)]
164struct WsBook {
165 coin: String,
166 time: u64,
167 levels: [Vec<WsLevel>; 2], }
169
170pub struct HyperliquidConnector {
171 config: Config,
172 request: DexRequest,
173 web_socket: DexWebSocket,
174 running: Arc<AtomicBool>,
175 read_socket: Arc<Mutex<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
176 write_socket:
177 Arc<Mutex<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
178 task_handle_read_message: Arc<Mutex<Option<JoinHandle<()>>>>,
179 task_handle_read_sigterm: Arc<Mutex<Option<JoinHandle<()>>>>,
180 trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
182 canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
183 dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
184 static_market_info: HashMap<String, StaticMarketInfo>,
185 spot_index_map: HashMap<String, usize>,
186 spot_reverse_map: Arc<HashMap<usize, String>>,
187 exchange_client: ExchangeClient,
188 maintenance: Arc<RwLock<MaintenanceInfo>>,
189 last_volumes: Arc<Mutex<HashMap<String, Decimal>>>,
190}
191
192#[derive(Debug)]
193struct WebSocketMessage {
194 _channel: String,
195 data: WebSocketData,
196}
197
198#[derive(Debug)]
199enum WebSocketData {
200 AllMidsData(AllMidsData),
201 UserFillsData(UserFillsData),
202 CandleData(CandleData),
203 ActiveAssetCtxData(ActiveAssetCtxData),
204 OrderUpdatesData(Vec<OrderUpdate>),
205 Bbo(WsBbo),
206 L2Book(WsBook),
207}
208
209#[derive(Deserialize, Debug)]
210struct AllMidsData {
211 mids: HashMap<String, String>,
212}
213
214#[allow(dead_code, non_snake_case)]
215#[derive(Deserialize, Debug)]
216struct CandleData {
217 t: u64, T: u64, s: String, i: String, o: Decimal, c: Decimal, h: Decimal, l: Decimal, v: Decimal, n: u64, }
228
229#[derive(Deserialize, Debug)]
230pub struct ActiveAssetCtxData {
231 pub coin: String, pub ctx: PerpsAssetCtx, }
234
235#[allow(dead_code, non_snake_case)]
236#[derive(Deserialize, Debug)]
237pub struct PerpsAssetCtx {
238 pub dayNtlVlm: Decimal, pub prevDayPx: Decimal, pub markPx: Decimal, pub midPx: Option<Decimal>, pub funding: Decimal, pub openInterest: Decimal, pub oraclePx: Decimal, }
246
247#[derive(Serialize, Deserialize, Debug)]
248pub struct UserFillsData {
249 pub user: String,
250 pub fills: Vec<Fill>,
251}
252
253#[derive(Serialize, Deserialize, Debug)]
254pub struct Fill {
255 pub coin: String,
256 pub px: Decimal,
257 pub sz: Decimal,
258 pub side: String,
259 pub dir: String,
260 #[serde(rename = "closedPnl")]
261 pub closed_pnl: Decimal,
262 pub oid: u64,
263 pub tid: u64,
264 pub fee: Decimal,
265}
266
267impl<'de> Deserialize<'de> for WebSocketMessage {
268 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
269 where
270 D: serde::Deserializer<'de>,
271 {
272 #[derive(Deserialize)]
273 struct Helper {
274 channel: String,
275 data: serde_json::Value,
276 }
277 let helper = Helper::deserialize(deserializer)?;
278 let data = match helper.channel.as_str() {
279 "allMids" => AllMidsData::deserialize(helper.data)
280 .map(WebSocketData::AllMidsData)
281 .map_err(serde::de::Error::custom)?,
282 "userFills" => UserFillsData::deserialize(helper.data)
283 .map(WebSocketData::UserFillsData)
284 .map_err(serde::de::Error::custom)?,
285 "orderUpdates" => Vec::<OrderUpdate>::deserialize(helper.data)
286 .map(WebSocketData::OrderUpdatesData)
287 .map_err(serde::de::Error::custom)?,
288 "candle" => CandleData::deserialize(helper.data)
289 .map(WebSocketData::CandleData)
290 .map_err(serde::de::Error::custom)?,
291 "activeAssetCtx" => ActiveAssetCtxData::deserialize(helper.data)
292 .map(WebSocketData::ActiveAssetCtxData)
293 .map_err(serde::de::Error::custom)?,
294 "bbo" => WsBbo::deserialize(helper.data)
295 .map(WebSocketData::Bbo)
296 .map_err(serde::de::Error::custom)?,
297 "l2Book" => WsBook::deserialize(helper.data)
298 .map(WebSocketData::L2Book)
299 .map_err(serde::de::Error::custom)?,
300 _ => return Err(serde::de::Error::custom("unknown channel type")),
301 };
302 Ok(WebSocketMessage {
303 _channel: helper.channel,
304 data,
305 })
306 }
307}
308
309fn is_buy_for_tpsl(position_side: OrderSide) -> bool {
313 matches!(position_side, OrderSide::Short)
314}
315
316impl HyperliquidConnector {
317 pub async fn new(
318 rest_endpoint: &str,
319 web_socket_endpoint: &str,
320 private_key: &str,
321 evm_wallet_address: &str,
322 vault_address: Option<String>,
323 use_agent: bool,
324 agent_name: Option<String>,
325 symbol_list: &[&str],
326 ) -> Result<Self, DexError> {
327 let request = DexRequest::new(rest_endpoint.to_owned()).await?;
328 let web_socket = DexWebSocket::new(web_socket_endpoint.to_owned());
329
330 let evm_wallet_address = vault_address
331 .clone()
332 .unwrap_or_else(|| evm_wallet_address.into());
333 let config = Config {
334 evm_wallet_address,
335 symbol_list: symbol_list.iter().map(|s| s.to_string()).collect(),
336 };
337
338 let vault_address: Option<H160> = vault_address
339 .as_deref()
340 .and_then(|v| H160::from_str(v).ok());
341
342 let mut local_wallet: LocalWallet = private_key.parse().unwrap();
343
344 if use_agent {
345 let ec_tmp =
346 ExchangeClient::new(None, local_wallet, Some(BaseUrl::Mainnet), None, None)
347 .await
348 .map_err(|e| DexError::Other(e.to_string()))?;
349
350 let (pk, status) = ec_tmp
351 .approve_agent(None, agent_name.clone())
352 .await
353 .map_err(|e| DexError::Other(e.to_string()))?;
354
355 match status {
356 ExchangeResponseStatus::Ok(_) => {
357 log::info!("approve_agent succeeded for {:?}", agent_name);
358 local_wallet = pk
359 .parse()
360 .map_err(|e| DexError::Other(format!("Failed to parse agent pk: {e}")))?;
361 }
362 ExchangeResponseStatus::Err(e) => {
363 log::error!("approve_agent failed: {e}");
364 return Err(DexError::Other(format!("approve_agent failed: {e}")));
365 }
366 }
367 }
368
369 let exchange_client = ExchangeClient::new(
370 None,
371 local_wallet,
372 Some(BaseUrl::Mainnet),
373 None,
374 vault_address,
375 )
376 .await
377 .map_err(|e| DexError::Other(e.to_string()))?;
378
379 let mut instance = HyperliquidConnector {
380 config,
381 request,
382 web_socket,
383 trade_results: Arc::new(RwLock::new(HashMap::new())),
384 canceled_results: Arc::new(RwLock::new(HashMap::new())),
385 running: Arc::new(AtomicBool::new(false)),
386 read_socket: Arc::new(Mutex::new(None)),
387 write_socket: Arc::new(Mutex::new(None)),
388 task_handle_read_message: Arc::new(Mutex::new(None)),
389 task_handle_read_sigterm: Arc::new(Mutex::new(None)),
390 dynamic_market_info: Arc::new(RwLock::new(HashMap::new())),
391 static_market_info: HashMap::new(),
392 spot_index_map: HashMap::new(),
393 spot_reverse_map: Arc::new(HashMap::new()),
394 exchange_client,
395 maintenance: Arc::new(RwLock::new(MaintenanceInfo {
396 next_start: None,
397 fetched_at: Utc::now() - ChronoDuration::hours(1),
398 })),
399 last_volumes: Arc::new(Mutex::new(HashMap::new())),
400 };
401
402 instance.spawn_maintenance_watcher();
403
404 instance.retrive_market_metadata().await?;
405
406 let info_payload = serde_json::to_string(&InfoRequest {
407 req_type: "spotMeta",
408 user: None,
409 })
410 .map_err(|e| DexError::Other(e.to_string()))?;
411
412 let spot_meta: SpotMetaResponse = instance
413 .request
414 .handle_request::<SpotMetaResponse, InfoRequest<'_>>(
415 HttpMethod::Post,
416 "/info".into(),
417 &HashMap::new(),
418 info_payload,
419 )
420 .await?;
421
422 let token_name_map: HashMap<usize, String> = spot_meta
424 ._tokens
425 .iter()
426 .map(|t| (t._index, t._name.clone()))
427 .collect();
428
429 let mut idx_from_pair = HashMap::<String, usize>::new();
430 let mut pair_from_idx = HashMap::<usize, String>::new();
431
432 for uni in &spot_meta.universe {
433 let pair = if !uni.name.starts_with('@') {
434 uni.name.clone()
435 } else if uni._tokens.len() == 2 {
436 format!(
437 "{}/{}",
438 token_name_map.get(&uni._tokens[0]).unwrap_or(&"?".into()),
439 token_name_map.get(&uni._tokens[1]).unwrap_or(&"?".into())
440 )
441 } else {
442 log::warn!(
443 "universe idx {} has unexpected token vec {:?}",
444 uni.index,
445 uni._tokens
446 );
447 uni.name.clone()
448 };
449
450 idx_from_pair.insert(pair.clone(), uni.index);
451 pair_from_idx.insert(uni.index, pair);
452 }
453
454 instance.spot_index_map = idx_from_pair;
455 instance.spot_reverse_map = Arc::new(pair_from_idx);
456
457 {
458 let token_decimals: HashMap<String, u32> = spot_meta
459 ._tokens
460 .iter()
461 .map(|t| (t._name.clone(), t._sz_decimals))
462 .collect();
463
464 let mut sm = std::mem::take(&mut instance.static_market_info);
465
466 for uni in &spot_meta.universe {
467 let pair = if !uni.name.starts_with('@') {
468 uni.name.clone()
469 } else if uni._tokens.len() == 2 {
470 format!(
471 "{}/{}",
472 spot_meta._tokens[uni._tokens[0]]._name,
473 spot_meta._tokens[uni._tokens[1]]._name
474 )
475 } else {
476 uni.name.clone()
477 };
478
479 let base = pair.split('/').next().unwrap();
480 let decimals = *token_decimals.get(base).unwrap_or(&0);
481
482 sm.insert(
483 pair.clone(),
484 StaticMarketInfo {
485 decimals,
486 _max_leverage: 0,
487 },
488 );
489 }
490
491 instance.static_market_info = sm;
492 }
493
494 Ok(instance)
495 }
496
497 fn spawn_maintenance_watcher(&self) {
498 let cache = self.maintenance.clone();
499 tokio::spawn(async move {
500 let client = Client::builder()
501 .timeout(std::time::Duration::from_secs(2))
502 .build()
503 .expect("reqwest client");
504
505 loop {
506 if let Ok(res) = client
507 .get("https://hyperliquid.statuspage.io/api/v2/scheduled-maintenances/upcoming.json")
508 .send()
509 .await
510 {
511 if let Ok(json) = res.json::<Value>().await {
512 let scheduled = json
513 .get("scheduled_maintenances")
514 .and_then(|v| v.as_array());
515 let scheduled_count = scheduled.map(|arr| arr.len()).unwrap_or(0);
516 let raw_next = scheduled
517 .and_then(|arr| arr.get(0))
518 .and_then(|v| v.get("scheduled_for"))
519 .and_then(|v| v.as_str());
520
521 log::debug!(
522 "Hyperliquid maintenance poll: count={} raw_next={:?}",
523 scheduled_count,
524 raw_next
525 );
526
527 let next = json
528 .get("scheduled_maintenances")
529 .and_then(|v| v.get(0))
530 .and_then(|v| v.get("scheduled_for"))
531 .and_then(|v| v.as_str())
532 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
533 .map(|dt| dt.with_timezone(&Utc));
534
535 if next.is_none() {
536 log::debug!(
537 "Hyperliquid maintenance parse result: no upcoming window parsed from {:?}",
538 raw_next
539 );
540 }
541
542 *cache.write().await = MaintenanceInfo {
543 next_start: next,
544 fetched_at: Utc::now(),
545 };
546 }
547 }
548 sleep(Duration::from_secs(600)).await;
549 }
550 });
551 }
552
553 pub async fn start_web_socket(&self) -> Result<(), DexError> {
554 log::info!("start_web_socket");
555
556 let (write, read) = self
557 .web_socket
558 .clone()
559 .connect()
560 .await
561 .map_err(|_| DexError::Other("Failed to connect to WebSocket".to_string()))?;
562
563 {
564 let mut read_lock = self.read_socket.lock().await;
565 *read_lock = Some(read);
566 }
567 {
568 let mut write_lock = self.write_socket.lock().await;
569 *write_lock = Some(write);
570 }
571
572 self.running.store(true, Ordering::SeqCst);
573 self.subscribe_to_channels(&self.config.evm_wallet_address)
574 .await?;
575
576 let running = self.running.clone();
577 let read_sock = self.read_socket.clone();
578 let write_sock = self.write_socket.clone();
579 let dmi = self.dynamic_market_info.clone();
580 let trs = self.trade_results.clone();
581 let rev_map = self.spot_reverse_map.clone();
582 let crs = self.canceled_results.clone();
583 let static_info = self.static_market_info.clone();
584
585 let reader_handle = tokio::spawn(async move {
586 let mut idle_counter = 0;
587 while running.load(Ordering::SeqCst) {
588 if let Some(stream) = read_sock.lock().await.as_mut() {
589 tokio::select! {
590 msg = stream.next() => match msg {
591 Some(Ok(Message::Text(txt))) => {
592 idle_counter = 0;
593 if txt == "{}" {
594 if let Some(w) = write_sock.lock().await.as_mut() {
595 let _ = w.send(Message::Text(txt)).await;
596 }
597 } else {
598 if let Err(e) = HyperliquidConnector::handle_websocket_message(
599 Message::Text(txt),
600 dmi.clone(),
601 trs.clone(),
602 rev_map.clone(),
603 crs.clone(),
604 static_info.clone(),
605 ).await {
606 log::error!("WebSocket handler error: {:?}", e);
607 break;
608 }
609 }
610 }
611 Some(Ok(_)) => {
612 }
613 Some(Err(err)) => {
614 log::error!("WebSocket read error: {:?}", err);
615 break;
616 }
617 None => {
618 log::info!("WebSocket stream closed");
619 break;
620 }
621 },
622 _ = tokio::time::sleep(Duration::from_secs(10)) => {
623 idle_counter += 1;
624 if idle_counter >= 10 {
625 log::error!("No WebSocket messages for 100s, shutting down reader");
626 break;
627 }
628 }
629 }
630 }
631 }
632 running.store(false, Ordering::SeqCst);
633 log::info!("WebSocket reader task ended");
634 });
635 *self.task_handle_read_message.lock().await = Some(reader_handle);
636
637 let running_for_sig = self.running.clone();
638 let sig_handle = tokio::spawn(async move {
639 let mut sigterm =
640 signal(SignalKind::terminate()).expect("Failed to bind SIGTERM handler");
641 loop {
642 select! {
643 _ = sigterm.recv() => {
644 log::info!("SIGTERM received, stopping WebSocket");
645 running_for_sig.store(false, Ordering::SeqCst);
646 break;
647 }
648 _ = tokio::time::sleep(Duration::from_secs(1)) => {
649 if !running_for_sig.load(Ordering::SeqCst) {
650 break;
651 }
652 }
653 }
654 }
655 });
656 *self.task_handle_read_sigterm.lock().await = Some(sig_handle);
657
658 Ok(())
659 }
660
661 pub async fn stop_web_socket(&self) -> Result<(), DexError> {
662 log::info!("stop_web_socket");
663 self.running.store(false, Ordering::SeqCst);
664
665 {
666 let mut write_guard = self.write_socket.lock().await;
667 if let Some(write_socket) = write_guard.as_mut() {
668 if let Err(e) = write_socket.send(Message::Close(None)).await {
669 log::error!("Failed to send WebSocket close message: {:?}", e);
670 }
671 }
672 *write_guard = None;
673 }
674
675 {
676 let mut read_guard = self.read_socket.lock().await;
677 *read_guard = None;
678 }
679
680 if let Some(handle) = self.task_handle_read_message.lock().await.take() {
681 let _ = handle.await;
682 }
683
684 if let Some(handle) = self.task_handle_read_sigterm.lock().await.take() {
685 let _ = handle.await;
686 }
687
688 drop(self.web_socket.clone());
689
690 Ok(())
691 }
692
693 async fn subscribe_to_channels(&self, user_address: &str) -> Result<(), DexError> {
694 let all_mids_subscription = serde_json::json!({
695 "method": "subscribe",
696 "subscription": {
697 "type": "allMids"
698 }
699 })
700 .to_string();
701
702 let user_fills_subscription = serde_json::json!({
703 "method": "subscribe",
704 "subscription": {
705 "type": "userFills",
706 "user": user_address
707 }
708 })
709 .to_string();
710
711 let order_updates_subscription = serde_json::json!({
712 "method": "subscribe",
713 "subscription": {
714 "type": "orderUpdates",
715 "user": user_address
716 }
717 })
718 .to_string();
719
720 let mut write_socket_lock = self.write_socket.lock().await;
721
722 if let Some(write_socket) = write_socket_lock.as_mut() {
723 if let Err(e) = write_socket
724 .send(Message::Text(all_mids_subscription))
725 .await
726 {
727 return Err(DexError::WebSocketError(format!(
728 "Failed to subscribe to allMids: {}",
729 e
730 )));
731 }
732
733 if let Err(e) = write_socket
734 .send(Message::Text(order_updates_subscription))
735 .await
736 {
737 return Err(DexError::WebSocketError(format!(
738 "Failed to subscribe to userFills: {}",
739 e
740 )));
741 }
742
743 if let Err(e) = write_socket
744 .send(Message::Text(user_fills_subscription))
745 .await
746 {
747 return Err(DexError::WebSocketError(format!(
748 "Failed to subscribe to userFills: {}",
749 e
750 )));
751 }
752
753 for symbol in &self.config.symbol_list {
754 let coin = resolve_coin(symbol, &self.spot_index_map);
755 let candle_subscription = serde_json::json!({
756 "method": "subscribe",
757 "subscription": {
758 "type": "candle",
759 "coin": coin,
760 "interval": "1m"
761 }
762 })
763 .to_string();
764 if let Err(e) = write_socket.send(Message::Text(candle_subscription)).await {
765 return Err(DexError::WebSocketError(format!(
766 "Failed to subscribe to candle for {}: {}",
767 symbol, e
768 )));
769 }
770
771 let active_asset_ctx_subscription = serde_json::json!({
772 "method": "subscribe",
773 "subscription": {
774 "type": "activeAssetCtx",
775 "coin": coin,
776 }
777 })
778 .to_string();
779 if let Err(e) = write_socket
780 .send(Message::Text(active_asset_ctx_subscription))
781 .await
782 {
783 return Err(DexError::WebSocketError(format!(
784 "Failed to subscribe to activeAssetCtx: {}",
785 e
786 )));
787 }
788
789 let bbo_subscription = serde_json::json!({
790 "method": "subscribe",
791 "subscription": {
792 "type": "bbo",
793 "coin": coin
794 }
795 })
796 .to_string();
797 if let Err(e) = write_socket.send(Message::Text(bbo_subscription)).await {
798 return Err(DexError::WebSocketError(format!(
799 "Failed to subscribe to bbo: {}",
800 e
801 )));
802 }
803
804 let l2_subscription = serde_json::json!({
805 "method": "subscribe",
806 "subscription": {
807 "type": "l2Book",
808 "coin": coin
809 }
810 })
811 .to_string();
812 if let Err(e) = write_socket.send(Message::Text(l2_subscription)).await {
813 return Err(DexError::WebSocketError(format!(
814 "Failed to subscribe to l2: {}",
815 e
816 )));
817 }
818 }
819 } else {
820 return Err(DexError::WebSocketError(
821 "Write socket is not available".to_string(),
822 ));
823 }
824
825 Ok(())
826 }
827
828 async fn handle_websocket_message(
829 msg: Message,
830 dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
831 trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
832 spot_reverse_map: Arc<HashMap<usize, String>>,
833 canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
834 static_market_info: HashMap<String, StaticMarketInfo>,
835 ) -> Result<(), DexError> {
836 if let Message::Text(text) = msg {
837 for line in text.split('\n') {
838 if line.is_empty() {
839 continue;
840 }
841 if let Ok(message) = serde_json::from_str::<WebSocketMessage>(line) {
842 log::trace!("[WebSocketMessage] channel = {}", message._channel);
843
844 match &message.data {
845 WebSocketData::Bbo(bbo) => {
846 log::trace!("[WebSocketMessage] BBO coin = {}", bbo.coin);
847 }
848 WebSocketData::L2Book(book) => {
849 log::trace!("[WebSocketMessage] L2Book coin = {}", book.coin);
850 }
851 WebSocketData::AllMidsData(data) => {
852 log::trace!("[WebSocketMessage] allMids keys = {:?}", data.mids.keys());
853 }
854 _ => {}
855 }
856
857 match message.data {
858 WebSocketData::AllMidsData(ref data) => {
859 Self::process_all_mids_message(
860 data,
861 dynamic_market_info.clone(),
862 spot_reverse_map.clone(),
863 &static_market_info,
864 )
865 .await;
866 }
867 WebSocketData::CandleData(ref data) => {
868 Self::process_candle_message(
869 data,
870 dynamic_market_info.clone(),
871 spot_reverse_map.clone(),
872 )
873 .await;
874 }
875 WebSocketData::UserFillsData(ref data) => {
876 Self::process_account_data(data, trade_results.clone()).await;
877 }
878 WebSocketData::ActiveAssetCtxData(ref data) => {
879 Self::process_active_asset_ctx_message(
880 data,
881 dynamic_market_info.clone(),
882 spot_reverse_map.clone(),
883 )
884 .await;
885 }
886 WebSocketData::OrderUpdatesData(ref orders) => {
887 Self::process_order_updates_message(
888 orders,
889 canceled_results.clone(),
890 trade_results.clone(),
891 )
892 .await;
893 }
894 WebSocketData::Bbo(ref bbo) => {
895 let idx = bbo
897 .coin
898 .strip_prefix('@')
899 .and_then(|s| s.parse::<usize>().ok());
900 let coin = idx
901 .and_then(|i| spot_reverse_map.get(&i).cloned())
902 .unwrap_or_else(|| bbo.coin.clone());
903 let market_key = if coin.contains('/') {
904 coin.clone()
905 } else {
906 format!("{}-USD", coin)
907 };
908 let mut info_map = dynamic_market_info.write().await;
909 let info = info_map.entry(market_key).or_default();
910 info.best_bid = bbo
911 .bbo
912 .get(0)
913 .and_then(|lvl| lvl.as_ref())
914 .map(|l| Decimal::from_str(&l.px).unwrap());
915 info.best_ask = bbo
916 .bbo
917 .get(1)
918 .and_then(|lvl| lvl.as_ref())
919 .map(|l| Decimal::from_str(&l.px).unwrap());
920 }
921 WebSocketData::L2Book(ref book) => {
922 let idx = book
923 .coin
924 .strip_prefix('@')
925 .and_then(|s| s.parse::<usize>().ok());
926 let coin = idx
927 .and_then(|i| spot_reverse_map.get(&i).cloned())
928 .unwrap_or_else(|| book.coin.clone());
929 let market_key = if coin.contains('/') {
930 coin.clone()
931 } else {
932 format!("{}-USD", coin)
933 };
934 let mut info_map = dynamic_market_info.write().await;
935 let info = info_map.entry(market_key).or_default();
936 info.best_bid = book.levels[0]
937 .get(0)
938 .map(|lvl| Decimal::from_str(&lvl.px).unwrap());
939 info.best_ask = book.levels[1]
940 .get(0)
941 .map(|lvl| Decimal::from_str(&lvl.px).unwrap());
942 }
943 }
944 }
945 }
946 }
947 Ok(())
948 }
949
950 async fn process_order_updates_message(
951 orders: &[OrderUpdate],
952 canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
953 trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
954 ) {
955 for upd in orders.iter() {
956 let symbol = if upd.order.coin.contains('/') || upd.order.coin.contains('-') {
957 upd.order.coin.clone()
958 } else {
959 format!("{}-USD", upd.order.coin)
960 };
961
962 match upd.status.as_str() {
963 "canceled" => {
964 log::debug!(
965 "🚫 [FILL_DETECTION] Order canceled: {} ({})",
966 upd.order.oid,
967 symbol
968 );
969 let evt = CancelEvent {
970 order_id: upd.order.oid.to_string(),
971 timestamp: upd.status_timestamp,
972 };
973 canceled_results
974 .write()
975 .await
976 .entry(symbol)
977 .or_default()
978 .insert(evt.order_id.clone(), evt);
979 }
980 "rejected" => {
981 log::debug!(
982 "❌ [FILL_DETECTION] Order rejected: {} ({})",
983 upd.order.oid,
984 symbol
985 );
986 let mut trs = trade_results.write().await;
987 let entry = trs.entry(symbol).or_default();
988 entry.insert(
989 upd.order.oid.to_string(),
990 TradeResult {
991 filled_side: OrderSide::Long,
992 filled_size: Decimal::ZERO,
993 filled_value: Decimal::ZERO,
994 filled_fee: Decimal::ZERO,
995 order_id: upd.order.oid.to_string(),
996 is_rejected: true,
997 },
998 );
999 }
1000 "filled" | "partiallyFilled" => {
1001 log::info!(
1002 "✅ [FILL_DETECTION] Order {} detected: {} ({})",
1003 upd.status,
1004 upd.order.oid,
1005 symbol
1006 );
1007 }
1009 _ => {
1010 log::debug!(
1011 "🔍 [FILL_DETECTION] Unknown order status: '{}' for order {} ({})",
1012 upd.status,
1013 upd.order.oid,
1014 symbol
1015 );
1016 }
1017 }
1018 }
1019 }
1020
1021 async fn process_all_mids_message(
1022 mids_data: &AllMidsData,
1023 dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1024 spot_reverse_map: Arc<HashMap<usize, String>>,
1025 static_market_info: &HashMap<String, StaticMarketInfo>,
1026 ) {
1027 for (raw_coin, mid_price_str) in &mids_data.mids {
1028 let coin = if let Some(stripped) = raw_coin.strip_prefix('@') {
1029 let idx = stripped.parse::<usize>().unwrap_or_else(|_| {
1030 log::warn!("[resolve_coin] invalid @index: {}", stripped);
1031 0
1032 });
1033
1034 let mapped = spot_reverse_map.get(&idx).cloned();
1035 match mapped {
1036 Some(mapped) => mapped,
1037 None => {
1038 log::trace!(
1039 "[resolve_coin] spot_reverse_map missing: {} (index: {})",
1040 raw_coin,
1041 idx
1042 );
1043 raw_coin.clone()
1044 }
1045 }
1046 } else {
1047 raw_coin.clone()
1048 };
1049
1050 let market_key = if coin.contains('/') || coin.contains('-') {
1051 coin.clone() } else {
1053 format!("{}-USD", coin) };
1055
1056 if let Ok(mid) = string_to_decimal(Some(mid_price_str.clone())) {
1057 let mut guard = dynamic_market_info.write().await;
1058 let info = guard.entry(market_key.clone()).or_default();
1059 let sz_decimals = static_market_info
1060 .get(&market_key)
1061 .map(|m| m.decimals)
1062 .unwrap_or_else(|| {
1063 log::trace!("no static for {}, default 0", market_key);
1064 0
1065 });
1066 let is_spot = market_key.contains('/');
1067
1068 let base_tick = Self::calculate_min_tick(mid, sz_decimals, is_spot);
1069 info.min_tick = Some(base_tick);
1070 info.market_price = Some(mid);
1071 }
1072 }
1073 }
1074
1075 async fn process_candle_message(
1076 candle: &CandleData,
1077 dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1078 spot_reverse_map: Arc<HashMap<usize, String>>,
1079 ) {
1080 let coin = if let Some(stripped) = candle.s.strip_prefix('@') {
1081 stripped
1082 .parse::<usize>()
1083 .ok()
1084 .and_then(|idx| spot_reverse_map.get(&idx).cloned())
1085 .unwrap_or_else(|| {
1086 log::trace!(
1087 "in spot_reverse_map: {} is missing (@{})",
1088 candle.s,
1089 stripped
1090 );
1091 candle.s.clone()
1092 })
1093 } else {
1094 candle.s.clone()
1095 };
1096
1097 let market_key = if coin.contains('/') || coin.contains('-') {
1098 coin.clone()
1099 } else {
1100 format!("{}-USD", coin)
1101 };
1102
1103 let mut guard = dynamic_market_info.write().await;
1104 let info = guard.entry(market_key.clone()).or_default();
1105 info.volume = Some(candle.v);
1106 info.num_trades = Some(candle.n);
1107 }
1108
1109 async fn process_active_asset_ctx_message(
1110 asset_data: &ActiveAssetCtxData,
1111 dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1112 spot_reverse_map: Arc<HashMap<usize, String>>,
1113 ) {
1114 let coin = if let Some(stripped) = asset_data.coin.strip_prefix('@') {
1115 stripped
1116 .parse::<usize>()
1117 .ok()
1118 .and_then(|idx| spot_reverse_map.get(&idx).cloned())
1119 .unwrap_or_else(|| {
1120 log::trace!(
1121 "in spot_reverse_map {} is missing (@{})",
1122 asset_data.coin,
1123 stripped
1124 );
1125 asset_data.coin.clone()
1126 })
1127 } else {
1128 asset_data.coin.clone()
1129 };
1130
1131 let market_key = if coin.contains('/') || coin.contains('-') {
1132 coin.clone()
1133 } else {
1134 format!("{}-USD", coin)
1135 };
1136
1137 let mut guard = dynamic_market_info.write().await;
1138 let info = guard
1139 .entry(market_key.clone())
1140 .or_insert_with(DynamicMarketInfo::default);
1141 info.funding_rate = Some(asset_data.ctx.funding);
1142 info.open_interest = Some(asset_data.ctx.openInterest);
1143 info.oracle_price = Some(asset_data.ctx.oraclePx);
1144 }
1145
1146 async fn process_account_data(
1147 data: &UserFillsData,
1148 trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
1149 ) {
1150 for fill in &data.fills {
1151 log::debug!("{:?}", fill);
1152
1153 let filled_side = if fill.side == "A" {
1154 OrderSide::Short
1155 } else {
1156 OrderSide::Long
1157 };
1158
1159 let filled_size = fill.sz;
1160 let filled_price = fill.px;
1161 let filled_value = filled_size * filled_price;
1162 let filled_fee = fill.fee;
1163 let order_id = fill.oid;
1164 let trade_id = fill.tid;
1165
1166 let market_id = if fill.coin.contains('/') || fill.coin.contains('-') {
1167 fill.coin.clone()
1168 } else {
1169 format!("{}-USD", fill.coin)
1170 };
1171
1172 let trade_result = TradeResult {
1173 filled_side,
1174 filled_size,
1175 filled_value,
1176 filled_fee,
1177 order_id: order_id.to_string(),
1178 is_rejected: false,
1179 };
1180
1181 let mut trade_results_guard = trade_results.write().await;
1182 let market_map = trade_results_guard.entry(market_id.clone()).or_default();
1183 let key = trade_id.to_string();
1184
1185 if let Some(existing) = market_map.get_mut(&key) {
1186 existing.filled_size += trade_result.filled_size;
1187 existing.filled_value += trade_result.filled_value;
1188 existing.filled_fee += trade_result.filled_fee;
1189 } else {
1190 market_map.insert(key, trade_result);
1191 }
1192 }
1193 }
1194}
1195
1196#[derive(Serialize, Debug, Clone)]
1197struct HyperliquidDefaultPayload {
1198 r#type: String,
1199 #[serde(skip_serializing_if = "Option::is_none")]
1200 user: Option<String>,
1201}
1202
1203#[derive(Deserialize, Debug)]
1204struct HyperliquidRetrieveUserStateResponse {
1205 #[serde(rename = "marginSummary")]
1206 margin_summary: Option<HyperliquidMarginSummary>,
1207}
1208#[derive(Deserialize, Debug)]
1209struct HyperliquidMarginSummary {
1210 #[serde(rename = "accountValue")]
1211 account_value: String,
1212 #[serde(rename = "totalRawUsd")]
1213 total_rawusd: String,
1214}
1215
1216#[derive(Deserialize, Debug)]
1217struct HyperliquidRetriveUserOpenOrder {
1218 coin: String,
1219 oid: u64,
1220}
1221
1222#[derive(Deserialize, Debug)]
1223struct HyperliquidRetriveUserPositionResponse {
1224 #[serde(rename = "assetPositions")]
1225 asset_positions: Vec<HyperliquidRetriveUserPositionResponseBody>,
1226}
1227#[derive(Deserialize, Debug)]
1228struct HyperliquidRetriveUserPositionResponseBody {
1229 position: HyperliquidRetriveUserPosition,
1230}
1231#[derive(Deserialize, Debug)]
1232struct HyperliquidRetriveUserPosition {
1233 coin: String,
1234 szi: Decimal,
1235}
1236
1237#[derive(Deserialize, Debug)]
1238struct HyperliquidRetriveMarketMetadataResponse {
1239 universe: Vec<HyperliquidRetriveMarketMetadata>,
1240}
1241#[derive(Deserialize, Debug)]
1242struct HyperliquidRetriveMarketMetadata {
1243 name: String,
1244 #[serde(rename = "szDecimals")]
1245 decimals: u32,
1246 #[serde(rename = "maxLeverage")]
1247 max_leverage: u32,
1248}
1249
1250#[derive(Deserialize, Debug)]
1251struct HyperliquidSpotBalanceResponse {
1252 balances: Vec<HyperliquidSpotBalance>,
1253}
1254
1255#[derive(Deserialize, Debug)]
1256struct HyperliquidSpotBalance {
1257 coin: String,
1258 total: String,
1259}
1260
1261#[async_trait]
1262impl DexConnector for HyperliquidConnector {
1263 async fn start(&self) -> Result<(), DexError> {
1264 self.start_web_socket().await?;
1265 sleep(Duration::from_secs(5)).await;
1266 self.wait_for_market_ready(60).await?;
1267 Ok(())
1268 }
1269
1270 async fn stop(&self) -> Result<(), DexError> {
1271 self.stop_web_socket().await?;
1272 Ok(())
1273 }
1274
1275 async fn restart(&self, max_retries: i32) -> Result<(), DexError> {
1276 log::info!("Restarting WebSocket connection...");
1277
1278 let mut retry_count = 0;
1279 let mut backoff_delay = Duration::from_secs(1);
1280
1281 while retry_count < max_retries {
1282 if let Err(e) = self.stop_web_socket().await {
1283 log::error!(
1284 "Failed to stop WebSocket on attempt {}: {:?}",
1285 retry_count + 1,
1286 e
1287 );
1288 } else {
1289 log::info!(
1290 "Successfully stopped WebSocket on attempt {}.",
1291 retry_count + 1
1292 );
1293 }
1294
1295 sleep(backoff_delay).await;
1296
1297 match self.start_web_socket().await {
1298 Ok(_) => {
1299 log::info!(
1300 "Successfully started WebSocket on attempt {}.",
1301 retry_count + 1
1302 );
1303 return Ok(());
1304 }
1305 Err(e) => {
1306 log::error!(
1307 "Failed to start WebSocket on attempt {}: {:?}",
1308 retry_count + 1,
1309 e
1310 );
1311 retry_count += 1;
1312 backoff_delay *= 2; }
1314 }
1315 }
1316
1317 log::error!(
1318 "Failed to restart WebSocket after {} attempts.",
1319 max_retries
1320 );
1321 Err(DexError::Other(format!(
1322 "Failed to restart WebSocket after {} attempts.",
1323 max_retries
1324 )))
1325 }
1326
1327 async fn set_leverage(&self, symbol: &str, leverage: u32) -> Result<(), DexError> {
1328 let asset = Self::extract_asset_name(symbol);
1329 self.exchange_client
1330 .update_leverage(leverage, asset, false, None)
1331 .await
1332 .map_err(|e| DexError::Other(e.to_string()))?;
1333 Ok(())
1334 }
1335
1336 async fn get_ticker(
1337 &self,
1338 symbol: &str,
1339 _test_price: Option<Decimal>,
1340 ) -> Result<TickerResponse, DexError> {
1341 if !self.running.load(Ordering::SeqCst) {
1342 return Err(DexError::NoConnection);
1343 }
1344
1345 let dynamic_info_guard = self.dynamic_market_info.read().await;
1346 let dynamic_info = dynamic_info_guard
1347 .get(symbol)
1348 .ok_or_else(|| DexError::Other("No dynamic market info available".to_string()))?;
1349 let price = dynamic_info
1350 .market_price
1351 .ok_or_else(|| DexError::Other("No price available".to_string()))?;
1352 let min_tick = dynamic_info.min_tick;
1353 let num_trades = dynamic_info.num_trades;
1354 let funding_rate = dynamic_info.funding_rate;
1355 let open_interest = dynamic_info.open_interest;
1356 let oracle_price = dynamic_info.oracle_price;
1357
1358 let cur_vol = dynamic_info.volume.unwrap_or(Decimal::ZERO);
1359 let mut lv = self.last_volumes.lock().await;
1360 let prev_vol = lv.get(symbol).cloned().unwrap_or(Decimal::ZERO);
1361 let delta_vol = if cur_vol >= prev_vol {
1363 cur_vol - prev_vol
1364 } else {
1365 cur_vol
1367 };
1368 lv.insert(symbol.to_string(), cur_vol);
1369
1370 Ok(TickerResponse {
1371 symbol: symbol.to_owned(),
1372 price,
1373 min_tick,
1374 min_order: None,
1375 volume: Some(delta_vol),
1376 num_trades,
1377 funding_rate,
1378 open_interest,
1379 oracle_price,
1380 })
1381 }
1382
1383 async fn get_filled_orders(&self, symbol: &str) -> Result<FilledOrdersResponse, DexError> {
1384 let mut response: Vec<FilledOrder> = vec![];
1385 let trade_results_guard = self.trade_results.read().await;
1386 let orders = match trade_results_guard.get(symbol) {
1387 Some(v) => v,
1388 None => return Ok(FilledOrdersResponse::default()),
1389 };
1390 for (trade_id, order) in orders.iter() {
1391 let filled_order = FilledOrder {
1392 order_id: order.order_id.clone(),
1393 trade_id: trade_id.clone(),
1394 is_rejected: order.is_rejected,
1395 filled_side: Some(order.filled_side.clone()),
1396 filled_size: Some(order.filled_size),
1397 filled_fee: Some(order.filled_fee),
1398 filled_value: Some(order.filled_value),
1399 };
1400 response.push(filled_order);
1401 }
1402
1403 Ok(FilledOrdersResponse { orders: response })
1404 }
1405
1406 async fn get_canceled_orders(&self, symbol: &str) -> Result<CanceledOrdersResponse, DexError> {
1407 let mut resp = Vec::new();
1408 let guard = self.canceled_results.read().await;
1409 if let Some(map) = guard.get(symbol) {
1410 for (_, evt) in map.iter() {
1411 resp.push(CanceledOrder {
1412 order_id: evt.order_id.clone(),
1413 canceled_timestamp: evt.timestamp,
1414 });
1415 }
1416 }
1417 Ok(CanceledOrdersResponse { orders: resp })
1418 }
1419
1420 async fn get_open_orders(&self, symbol: &str) -> Result<OpenOrdersResponse, DexError> {
1421 let all_orders = self.get_orders().await?;
1422
1423 let target_coin = if let Some(internal_id) = self.spot_index_map.get(symbol) {
1425 format!("@{}", internal_id)
1426 } else {
1427 symbol.to_string()
1428 };
1429
1430 let filtered_orders: Vec<crate::OpenOrder> = all_orders
1431 .into_iter()
1432 .filter(|order| order.coin == target_coin || order.coin == symbol)
1433 .map(|hyperliquid_order| crate::OpenOrder {
1434 order_id: hyperliquid_order.oid.to_string(),
1435 symbol: symbol.to_string(),
1436 side: OrderSide::Long, size: Decimal::ZERO, price: Decimal::ZERO, status: "open".to_string(),
1440 })
1441 .collect();
1442
1443 Ok(OpenOrdersResponse {
1444 orders: filtered_orders,
1445 })
1446 }
1447
1448 async fn get_balance(&self, symbol: Option<&str>) -> Result<BalanceResponse, DexError> {
1449 let spot_action = HyperliquidDefaultPayload {
1451 r#type: "spotClearinghouseState".into(),
1452 user: Some(self.config.evm_wallet_address.clone()),
1453 };
1454 let spot_res: HyperliquidSpotBalanceResponse = self
1455 .handle_request_with_action("/info".into(), &spot_action)
1456 .await?;
1457
1458 let perp_action = HyperliquidDefaultPayload {
1459 r#type: "clearinghouseState".into(),
1460 user: Some(self.config.evm_wallet_address.clone()),
1461 };
1462 let perp_res = self
1463 .handle_request_with_action::<HyperliquidRetrieveUserStateResponse, _>(
1464 "/info".into(),
1465 &perp_action,
1466 )
1467 .await?;
1468
1469 log::debug!("spot balances = {:?}", spot_res.balances);
1470 log::debug!("perp margin summary = {:?}", perp_res.margin_summary);
1471
1472 if let Some(pair) = symbol {
1473 let base_coin = pair.split('/').next().unwrap_or(pair);
1475
1476 let mut usdc_total = Decimal::ZERO;
1477 let mut base_total = Decimal::ZERO;
1478 for b in &spot_res.balances {
1479 match b.coin.as_str() {
1480 "USDC" => {
1481 usdc_total = parse_to_decimal(&b.total)?;
1482 log::debug!("USDC total = {}", usdc_total);
1483 }
1484 c if c == base_coin => {
1485 base_total = parse_to_decimal(&b.total)?;
1486 log::debug!("{} total = {}", c, base_total);
1487 }
1488 _ => {}
1489 }
1490 }
1491
1492 let price_key = pair.to_string();
1493 let px = self
1494 .get_market_price(&price_key)
1495 .await
1496 .unwrap_or(Decimal::ZERO);
1497
1498 log::debug!("price_key = {}, px = {}", price_key, px);
1499
1500 let (perp_equity, perp_balance) = if let Some(summary) = &perp_res.margin_summary {
1502 let equity = parse_to_decimal(&summary.account_value)?;
1503 let balance = parse_to_decimal(&summary.total_rawusd)?;
1504 log::debug!("perp equity = {}, perp balance = {}", equity, balance);
1505 (equity, balance)
1506 } else {
1507 (Decimal::ZERO, Decimal::ZERO)
1508 };
1509
1510 let spot_equity = base_total * px + usdc_total;
1512 let total_equity = spot_equity + perp_equity;
1513 let total_balance = usdc_total + perp_balance;
1514
1515 log::debug!(
1516 "final equity = {} (spot: {} + perp: {}), balance = {} (spot: {} + perp: {})",
1517 total_equity,
1518 spot_equity,
1519 perp_equity,
1520 total_balance,
1521 usdc_total,
1522 perp_balance
1523 );
1524
1525 return Ok(BalanceResponse {
1526 equity: total_equity,
1527 balance: total_balance,
1528 position_entry_price: None,
1529 position_sign: None,
1530 });
1531 }
1532
1533 let mut total_equity = Decimal::ZERO;
1535 let mut total_balance = Decimal::ZERO;
1536
1537 for b in &spot_res.balances {
1539 let balance = parse_to_decimal(&b.total)?;
1540 if b.coin == "USDC" {
1541 total_balance += balance;
1542 total_equity += balance;
1543 } else {
1544 let symbol_key = format!("{}/USDC", b.coin);
1546 if let Ok(px) = self.get_market_price(&symbol_key).await {
1547 total_equity += balance * px;
1548 }
1549 }
1550 }
1551
1552 if let Some(summary) = perp_res.margin_summary {
1554 let perp_equity = parse_to_decimal(&summary.account_value)?;
1555 let perp_balance = parse_to_decimal(&summary.total_rawusd)?;
1556 total_equity += perp_equity;
1557 total_balance += perp_balance;
1558 log::debug!(
1559 "perp equity = {}, perp balance = {}",
1560 perp_equity,
1561 perp_balance
1562 );
1563 }
1564
1565 log::debug!(
1566 "final total equity = {}, total balance = {}",
1567 total_equity,
1568 total_balance
1569 );
1570 Ok(BalanceResponse {
1571 equity: total_equity,
1572 balance: total_balance,
1573 position_entry_price: None,
1574 position_sign: None,
1575 })
1576 }
1577
1578 async fn get_combined_balance(&self) -> Result<CombinedBalanceResponse, DexError> {
1579 Err(DexError::Other(
1581 "get_combined_balance not implemented for HyperLiquid".to_string(),
1582 ))
1583 }
1584
1585 async fn get_last_trades(&self, _symbol: &str) -> Result<LastTradesResponse, DexError> {
1586 Err(DexError::Other(
1588 "get_last_trades not implemented for HyperLiquid".to_string(),
1589 ))
1590 }
1591
1592 async fn get_order_book(
1593 &self,
1594 _symbol: &str,
1595 _depth: usize,
1596 ) -> Result<OrderBookSnapshot, DexError> {
1597 Err(DexError::Other(
1598 "get_order_book not implemented for HyperLiquid".to_string(),
1599 ))
1600 }
1601
1602 async fn clear_filled_order(&self, symbol: &str, trade_id: &str) -> Result<(), DexError> {
1603 let mut m = self.trade_results.write().await;
1604 if let Some(map) = m.get_mut(symbol) {
1605 if map.remove(trade_id).is_some() {
1606 Ok(())
1607 } else {
1608 Err(DexError::Other(format!(
1609 "filled trade(trade_id:{}({})) does not exist",
1610 trade_id, symbol
1611 )))
1612 }
1613 } else {
1614 Err(DexError::Other(format!(
1615 "filled trade(symbol:{}({})) does not exist",
1616 symbol, trade_id
1617 )))
1618 }
1619 }
1620
1621 async fn clear_all_filled_orders(&self) -> Result<(), DexError> {
1622 let mut trade_results_guard = self.trade_results.write().await;
1623 trade_results_guard.clear();
1624 Ok(())
1625 }
1626
1627 async fn clear_canceled_order(&self, symbol: &str, order_id: &str) -> Result<(), DexError> {
1628 let mut guard = self.canceled_results.write().await;
1629 if let Some(map) = guard.get_mut(symbol) {
1630 if map.remove(order_id).is_some() {
1631 return Ok(());
1632 }
1633 }
1634 Err(DexError::Other(format!(
1635 "canceled order {} for {} not found",
1636 order_id, symbol
1637 )))
1638 }
1639
1640 async fn clear_all_canceled_orders(&self) -> Result<(), DexError> {
1641 self.canceled_results.write().await.clear();
1642 Ok(())
1643 }
1644
1645 async fn create_order(
1646 &self,
1647 symbol: &str,
1648 size: Decimal,
1649 side: OrderSide,
1650 price: Option<Decimal>,
1651 spread: Option<i64>,
1652 _expiry_secs: Option<u64>, ) -> Result<CreateOrderResponse, DexError> {
1654 let (price, time_in_force) = match price {
1655 Some(v) => {
1656 if spread.is_some() {
1657 let map = self.dynamic_market_info.read().await;
1658 let info = map
1659 .get(symbol)
1660 .ok_or_else(|| DexError::Other(format!("No market info for {}", symbol)))?;
1661 let bid = info
1662 .best_bid
1663 .ok_or_else(|| DexError::Other("No best_bid".into()))?;
1664 let ask = info
1665 .best_ask
1666 .ok_or_else(|| DexError::Other("No best_ask".into()))?;
1667 let mid = (bid + ask) * Decimal::new(5, 1);
1668 let tick = info
1669 .min_tick
1670 .ok_or_else(|| DexError::Other("No min_tick".into()))?;
1671 let spread = Decimal::from(spread.unwrap());
1672 log::debug!(
1673 "bid = {}, min = {}, ask = {}, tick = {}, spread = {}",
1674 bid,
1675 mid,
1676 ask,
1677 tick,
1678 spread
1679 );
1680 let calc = if side == OrderSide::Long {
1681 mid - tick * spread
1682 } else {
1683 mid + tick * spread
1684 };
1685 (calc, "Alo")
1686 } else {
1687 (v, "Alo")
1688 }
1689 }
1690 None => {
1691 let price = self.get_worst_price(symbol, &side).await?;
1692 (price, "Ioc")
1693 }
1694 };
1695
1696 let dynamic_market_info_guard = self.dynamic_market_info.read().await;
1697 let market_info = dynamic_market_info_guard
1698 .get(symbol)
1699 .ok_or_else(|| DexError::Other("Market info not found".to_string()))?;
1700 let min_tick = market_info
1701 .min_tick
1702 .ok_or_else(|| DexError::Other("Min tick not set for market".to_string()))?;
1703
1704 let rounded_price = Self::round_price(price, min_tick, side.clone());
1705 let rounded_size = self.floor_size(size, symbol);
1706
1707 log::info!(
1708 "[create_order] sym={} tif={} px={} size={} notional={} min_tick={} sz_decimals={}",
1709 symbol,
1710 time_in_force,
1711 rounded_price,
1712 rounded_size,
1713 rounded_price * rounded_size,
1714 min_tick,
1715 self.static_market_info
1716 .get(symbol)
1717 .map(|m| m.decimals)
1718 .unwrap_or(0),
1719 );
1720 let asset = resolve_coin(symbol, &self.spot_index_map);
1721
1722 let order = ClientOrderRequest {
1723 asset,
1724 is_buy: side == OrderSide::Long,
1725 reduce_only: false,
1726 limit_px: rounded_price
1727 .to_f64()
1728 .ok_or_else(|| DexError::Other("Conversion to f64 failed".to_string()))?,
1729 sz: rounded_size
1730 .to_f64()
1731 .ok_or_else(|| DexError::Other("Conversion to f64 failed".to_string()))?,
1732 cloid: None,
1733 order_type: ClientOrder::Limit(ClientLimit {
1734 tif: time_in_force.to_string(),
1735 }),
1736 };
1737
1738 let res = self.exchange_client.order(order, None).await.map_err(|e| {
1739 log::error!(
1740 "[create_order] order failed: symbol = {}, size = {}, error = {}",
1741 symbol,
1742 rounded_size,
1743 e
1744 );
1745 DexError::Other(e.to_string())
1746 })?;
1747
1748 let res = match res {
1749 ExchangeResponseStatus::Ok(exchange_response) => exchange_response,
1750 ExchangeResponseStatus::Err(e) => return Err(DexError::ServerResponse(e.to_string())),
1751 };
1752 let status = res.data.unwrap().statuses[0].clone();
1753 let order_id = match status {
1754 ExchangeDataStatus::Filled(order) => order.oid,
1755 ExchangeDataStatus::Resting(order) => order.oid,
1756 _ => {
1757 return Err(DexError::ServerResponse(
1758 "Unknown ExchangeDataStaus".to_owned(),
1759 ))
1760 }
1761 };
1762
1763 Ok(CreateOrderResponse {
1764 order_id: order_id.to_string(),
1765 ordered_price: rounded_price,
1766 ordered_size: rounded_size,
1767 })
1768 }
1769
1770 async fn create_advanced_trigger_order(
1771 &self,
1772 symbol: &str,
1773 size: Decimal,
1774 side: OrderSide,
1775 trigger_px: Decimal,
1776 limit_px: Option<Decimal>,
1777 order_style: TriggerOrderStyle,
1778 slippage_bps: Option<u32>,
1779 tpsl: TpSl,
1780 reduce_only: bool,
1781 expiry_secs: Option<u64>,
1782 ) -> Result<CreateOrderResponse, DexError> {
1783 if let Some(expiry) = expiry_secs {
1785 log::warn!(
1786 "🕐 [HYPERLIQUID_EXPIRY] expiry_secs={} specified but not directly supported by Hyperliquid trigger orders. Consider implementing auto-cancel mechanism.",
1787 expiry
1788 );
1789 }
1790
1791 let asset = resolve_coin(symbol, &self.spot_index_map);
1792
1793 let rounded_size = self.floor_size(size, symbol);
1795
1796 let trigger_price = trigger_px
1797 .to_f64()
1798 .ok_or_else(|| DexError::Other("Failed to convert trigger_px to f64".into()))?;
1799 let sz = rounded_size
1800 .to_f64()
1801 .ok_or_else(|| DexError::Other("Failed to convert size to f64".into()))?;
1802
1803 let (is_market, final_limit_price_opt) = match order_style {
1804 TriggerOrderStyle::Market => (true, None), TriggerOrderStyle::MarketWithSlippageControl => {
1806 if let Some(slippage) = slippage_bps {
1807 let slippage_factor = Decimal::new(slippage as i64, 4); let adjusted_price = match side {
1811 OrderSide::Long => trigger_px * (Decimal::ONE - slippage_factor),
1813 OrderSide::Short => trigger_px * (Decimal::ONE + slippage_factor),
1815 };
1816 let limit_price = adjusted_price.to_f64().ok_or_else(|| {
1817 DexError::Other("Failed to convert adjusted price to f64".into())
1818 })?;
1819 (false, Some(limit_price)) } else {
1821 (true, None) }
1823 }
1824 TriggerOrderStyle::Limit => {
1825 let limit_price = limit_px
1826 .ok_or_else(|| {
1827 DexError::Other("limit_px required for Limit order style".into())
1828 })?
1829 .to_f64()
1830 .ok_or_else(|| DexError::Other("Failed to convert limit_px to f64".into()))?;
1831
1832 match (side, tpsl) {
1835 (OrderSide::Long, TpSl::Sl) => {
1836 if limit_price < trigger_price {
1838 return Err(DexError::Other(
1839 "For Buy Stop Loss, limit_px must be >= trigger_px".into(),
1840 ));
1841 }
1842 }
1843 (OrderSide::Short, TpSl::Sl) => {
1844 if limit_price > trigger_price {
1846 return Err(DexError::Other(
1847 "For Sell Stop Loss, limit_px must be <= trigger_px".into(),
1848 ));
1849 }
1850 }
1851 (OrderSide::Long, TpSl::Tp) => {
1852 if limit_price > trigger_price {
1854 return Err(DexError::Other(
1855 "For Buy Take Profit, limit_px must be <= trigger_px".into(),
1856 ));
1857 }
1858 }
1859 (OrderSide::Short, TpSl::Tp) => {
1860 if limit_price < trigger_price {
1862 return Err(DexError::Other(
1863 "For Sell Take Profit, limit_px must be >= trigger_px".into(),
1864 ));
1865 }
1866 }
1867 }
1868 (false, Some(limit_price))
1869 }
1870 };
1871
1872 let is_buy = is_buy_for_tpsl(side);
1873
1874 let request = ClientOrderRequest {
1875 asset,
1876 is_buy,
1877 reduce_only, limit_px: final_limit_price_opt.unwrap_or(0.0),
1882 sz,
1883 cloid: None,
1884 order_type: ClientOrder::Trigger(ClientTrigger {
1885 is_market,
1886 trigger_px: trigger_price,
1887 tpsl: format!("{:?}", tpsl).to_lowercase(),
1888 }),
1889 };
1890
1891 let resp_status = self
1892 .exchange_client
1893 .order(request, None)
1894 .await
1895 .map_err(|e| {
1896 DexError::Other(format!("Advanced trigger order request failed: {}", e))
1897 })?;
1898
1899 let exchange_response = match resp_status {
1900 ExchangeResponseStatus::Ok(x) => x,
1901 ExchangeResponseStatus::Err(e) => return Err(DexError::ServerResponse(e.to_string())),
1902 };
1903
1904 let status = exchange_response
1905 .data
1906 .unwrap()
1907 .statuses
1908 .into_iter()
1909 .next()
1910 .ok_or_else(|| DexError::Other("No order status returned".into()))?;
1911
1912 let oid = match status {
1913 ExchangeDataStatus::Filled(o) => o.oid,
1914 ExchangeDataStatus::Resting(o) => o.oid,
1915 _ => {
1916 return Err(DexError::ServerResponse(
1917 "Unrecognized exchange status".into(),
1918 ))
1919 }
1920 };
1921
1922 let ordered_price = match final_limit_price_opt {
1924 Some(px) => Decimal::from_f64(px).unwrap_or(trigger_px),
1925 None => trigger_px, };
1927
1928 let order_id = oid.to_string();
1929
1930 Ok(CreateOrderResponse {
1940 order_id,
1941 ordered_price,
1942 ordered_size: rounded_size,
1943 })
1944 }
1945
1946 async fn cancel_order(&self, symbol: &str, order_id: &str) -> Result<(), DexError> {
1947 let asset = resolve_coin(symbol, &self.spot_index_map);
1948 let cancel = ClientCancelRequest {
1949 asset,
1950 oid: u64::from_str(order_id).unwrap_or_default(),
1951 };
1952
1953 self.exchange_client
1954 .cancel(cancel, None)
1955 .await
1956 .map_err(|e| DexError::Other(e.to_string()))?;
1957
1958 Ok(())
1959 }
1960
1961 async fn cancel_all_orders(&self, symbol: Option<String>) -> Result<(), DexError> {
1962 let open_orders = self.get_orders().await?;
1963 let order_ids: Vec<String> = open_orders
1964 .iter()
1965 .filter_map(|order| {
1966 let idx_opt = order.coin.strip_prefix('@').and_then(|s| s.parse::<usize>().ok());
1967 let external_sym = idx_opt
1968 .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
1969 .unwrap_or_else(|| format!("{}-USD", order.coin));
1970
1971 log::debug!(
1972 "cancel_all_orders: raw coin = {}, idx = {:?}, external_sym = {:?}, target = {:?}",
1973 order.coin, idx_opt, external_sym, symbol
1974 );
1975
1976 if symbol.as_deref().map_or(true, |s| s == &external_sym) {
1977 Some(order.oid.to_string())
1978 } else {
1979 None
1980 }
1981 })
1982 .collect();
1983 self.cancel_orders(symbol, order_ids).await
1984 }
1985
1986 async fn cancel_orders(
1987 &self,
1988 symbol: Option<String>,
1989 order_ids: Vec<String>,
1990 ) -> Result<(), DexError> {
1991 let open_orders = self.get_orders().await?;
1992 let mut cancels = Vec::new();
1993
1994 for order in open_orders {
1995 let idx_opt = order
1996 .coin
1997 .strip_prefix('@')
1998 .and_then(|s| s.parse::<usize>().ok());
1999 let external_sym = idx_opt
2000 .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
2001 .unwrap_or_else(|| format!("{}-USD", order.coin));
2002
2003 log::debug!(
2004 "cancel_orders: raw coin = {}, idx = {:?}, external_sym = {:?}, requested_ids = {:?}",
2005 order.coin, idx_opt, external_sym, order_ids
2006 );
2007
2008 if symbol.as_deref().map_or(true, |s| s == &external_sym)
2009 && order_ids.contains(&order.oid.to_string())
2010 {
2011 let asset = resolve_coin(&external_sym, &self.spot_index_map);
2012 cancels.push(ClientCancelRequest {
2013 asset,
2014 oid: order.oid,
2015 });
2016 }
2017 }
2018
2019 if !cancels.is_empty() {
2020 self.exchange_client
2021 .bulk_cancel(cancels, None)
2022 .await
2023 .map_err(|e| DexError::Other(e.to_string()))?;
2024 }
2025 Ok(())
2026 }
2027
2028 async fn close_all_positions(&self, symbol: Option<String>) -> Result<(), DexError> {
2029 let open_positions = self.get_positions().await?;
2030 for p in open_positions {
2031 let position = p.position;
2032 let idx_opt = position
2033 .coin
2034 .strip_prefix('@')
2035 .and_then(|s| s.parse::<usize>().ok());
2036 let external_sym = idx_opt
2037 .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
2038 .unwrap_or_else(|| format!("{}-USD", position.coin));
2039 if symbol.as_deref().map_or(true, |s| s == &external_sym) {
2040 let reversed_side = if position.szi.is_sign_negative() {
2041 OrderSide::Long
2042 } else {
2043 OrderSide::Short
2044 };
2045 let size = position.szi.abs();
2046 let _ = self
2047 .create_order(&external_sym, size, reversed_side, None, None, None) .await;
2049 }
2050 }
2051 Ok(())
2052 }
2053
2054 async fn clear_last_trades(&self, _symbol: &str) -> Result<(), DexError> {
2055 Ok(())
2056 }
2057
2058 async fn is_upcoming_maintenance(&self, hours_ahead: i64) -> bool {
2059 let info = self.maintenance.read().await;
2060 if let Some(start) = info.next_start {
2061 let now = Utc::now();
2062 let lead = ChronoDuration::hours(hours_ahead.max(0));
2063 let active_window = ChronoDuration::minutes(90);
2065 let upcoming = now <= start && (start - now) <= lead;
2066 let already_active = now >= start && (now - start) <= active_window;
2067 let result = upcoming || already_active;
2068 log::debug!(
2069 "Hyperliquid maintenance check: start={:?} now={:?} upcoming={} active={} result={}",
2070 start,
2071 now,
2072 upcoming,
2073 already_active,
2074 result
2075 );
2076 return result;
2077 }
2078 log::debug!("Hyperliquid maintenance check: no cached start");
2079 false
2080 }
2081
2082 async fn sign_evm_65b(&self, _message: &str) -> Result<String, DexError> {
2083 Err(DexError::Other(
2084 "65B EVM signature not supported for Hyperliquid".to_string(),
2085 ))
2086 }
2087
2088 async fn sign_evm_65b_with_eip191(&self, _message: &str) -> Result<String, DexError> {
2089 Err(DexError::Other(
2090 "65B EIP-191 signature not supported for Hyperliquid".to_string(),
2091 ))
2092 }
2093}
2094
2095impl HyperliquidConnector {
2096 async fn handle_request_with_action<T, U>(
2097 &self,
2098 request_url: String,
2099 action: &U,
2100 ) -> Result<T, DexError>
2101 where
2102 T: for<'de> Deserialize<'de>,
2103 U: Serialize + std::fmt::Debug + Clone,
2104 {
2105 let json_payload =
2106 serde_json::to_value(action).map_err(|e| DexError::Other(e.to_string()))?;
2107
2108 log::debug!("json_payload = {:?}", json_payload);
2109
2110 self.request
2111 .handle_request::<T, U>(
2112 HttpMethod::Post,
2113 request_url,
2114 &HashMap::new(),
2115 json_payload.to_string(),
2116 )
2117 .await
2118 .map_err(|e| DexError::Other(e.to_string()))
2119 }
2120
2121 async fn get_positions(
2122 &self,
2123 ) -> Result<Vec<HyperliquidRetriveUserPositionResponseBody>, DexError> {
2124 let request_url = "/info";
2125 let action = HyperliquidDefaultPayload {
2126 r#type: "clearinghouseState".to_owned(),
2127 user: Some(self.config.evm_wallet_address.clone()),
2128 };
2129 let res: HyperliquidRetriveUserPositionResponse = self
2130 .handle_request_with_action::<HyperliquidRetriveUserPositionResponse, HyperliquidDefaultPayload>(
2131 request_url.to_string(),
2132 &action,
2133 )
2134 .await?;
2135
2136 Ok(res.asset_positions)
2137 }
2138
2139 async fn get_orders(&self) -> Result<Vec<HyperliquidRetriveUserOpenOrder>, DexError> {
2140 let request_url = "/info";
2141 let action = HyperliquidDefaultPayload {
2142 r#type: "openOrders".to_owned(),
2143 user: Some(self.config.evm_wallet_address.clone()),
2144 };
2145 let res: Vec<HyperliquidRetriveUserOpenOrder> = self
2146 .handle_request_with_action::<Vec<HyperliquidRetriveUserOpenOrder>, HyperliquidDefaultPayload>(
2147 request_url.to_string(),
2148 &action,
2149 )
2150 .await?;
2151
2152 Ok(res)
2153 }
2154
2155 async fn retrive_market_metadata(&mut self) -> Result<(), DexError> {
2156 let request_url = "/info";
2157 let action = HyperliquidDefaultPayload {
2158 r#type: "meta".to_owned(),
2159 user: None,
2160 };
2161 let res = self
2162 .handle_request_with_action::<HyperliquidRetriveMarketMetadataResponse, HyperliquidDefaultPayload>(
2163 request_url.to_string(),
2164 &action,
2165 )
2166 .await?;
2167
2168 let mut static_market_info_update = HashMap::new();
2169 for metadata in res.universe.into_iter() {
2170 let market_id = format!("{}-USD", metadata.name);
2171 static_market_info_update.insert(
2172 market_id,
2173 StaticMarketInfo {
2174 decimals: metadata.decimals,
2175 _max_leverage: metadata.max_leverage,
2176 },
2177 );
2178 }
2179
2180 self.static_market_info = static_market_info_update;
2181
2182 Ok(())
2183 }
2184
2185 async fn get_worst_price(&self, symbol: &str, side: &OrderSide) -> Result<Decimal, DexError> {
2186 let market_price = self.get_market_price(symbol).await?;
2187
2188 let worst_price = slippage_price(market_price, *side == OrderSide::Long);
2189 Ok(worst_price)
2190 }
2191
2192 async fn get_market_price(&self, symbol: &str) -> Result<Decimal, DexError> {
2193 let market_info_guard = self.dynamic_market_info.read().await;
2194 match market_info_guard.get(symbol) {
2195 Some(v) => match v.market_price {
2196 Some(price) => Ok(price),
2197 None => Err(DexError::Other("Price is None".to_string())),
2198 },
2199 None => Err(DexError::Other("No price available".to_string())),
2200 }
2201 }
2202
2203 fn calculate_min_tick(price: Decimal, sz_decimals: u32, is_spot: bool) -> Decimal {
2204 log::trace!(
2205 "calculate_min_tick called: price={}, sz_decimals={}, is_spot={}",
2206 price,
2207 sz_decimals,
2208 is_spot
2209 );
2210
2211 let price_str = price.to_string();
2212 let integer_part = price_str.split('.').next().unwrap_or("");
2213 let integer_digits = if integer_part == "0" {
2214 0
2215 } else {
2216 integer_part.len()
2217 };
2218
2219 let scale_by_sig: u32 = if integer_digits >= 5 {
2220 0
2221 } else {
2222 (5 - integer_digits) as u32
2223 };
2224
2225 let max_decimals: u32 = if is_spot { 8u32 } else { 6u32 };
2226 let scale_by_dec: u32 = max_decimals.saturating_sub(sz_decimals);
2227 let scale: u32 = scale_by_sig.min(scale_by_dec);
2228
2229 log::trace!(
2230 "calculate_min_tick internals: integer_digits={}, scale_by_sig={}, max_decimals={}, scale_by_dec={}, scale={}",
2231 integer_digits,
2232 scale_by_sig,
2233 max_decimals,
2234 scale_by_dec,
2235 scale
2236 );
2237
2238 let min_tick = Decimal::new(1, scale);
2239
2240 log::trace!(
2241 "calculate_min_tick result: min_tick={}, (1e-{})",
2242 min_tick,
2243 scale
2244 );
2245
2246 min_tick
2247 }
2248
2249 fn round_price(price: Decimal, min_tick: Decimal, order_side: OrderSide) -> Decimal {
2250 if min_tick.is_zero() {
2251 log::error!("round_price: min_tick is zero");
2252 return price;
2253 }
2254
2255 match order_side {
2256 OrderSide::Long => (price / min_tick).floor() * min_tick,
2257 OrderSide::Short => (price / min_tick).ceil() * min_tick,
2258 }
2259 }
2260
2261 fn floor_size(&self, size: Decimal, symbol: &str) -> Decimal {
2262 let decimals = match self.static_market_info.get(symbol) {
2263 Some(v) => v.decimals,
2264 None => {
2265 log::error!("symbol meta is not available: {}", symbol);
2266 return size;
2267 }
2268 };
2269
2270 size.round_dp(decimals)
2271 }
2272
2273 fn extract_asset_name(symbol: &str) -> &str {
2274 symbol.split('-').next().unwrap_or(symbol)
2275 }
2276
2277 async fn wait_for_market_ready(&self, timeout_secs: u64) -> Result<(), DexError> {
2279 use tokio::time::{sleep, Instant};
2280
2281 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
2282
2283 loop {
2284 let mut all_ready = true;
2285 {
2286 let map = self.dynamic_market_info.read().await;
2287 for symbol in &self.config.symbol_list {
2288 if let Some(info) = map.get(symbol) {
2289 log::warn!(
2290 "[wait_for_market_ready] symbol = {}, best_bid = {:?}, best_ask = {:?}",
2291 symbol,
2292 info.best_bid,
2293 info.best_ask
2294 );
2295 if info.best_bid.is_none() || info.best_ask.is_none() {
2296 log::info!("Waiting for best_bid/best_ask for symbol: {}", symbol);
2297 all_ready = false;
2298 break;
2299 }
2300 } else {
2301 log::info!("Market info not found yet for symbol: {}", symbol);
2302 all_ready = false;
2303 break;
2304 }
2305 }
2306 }
2307
2308 if all_ready {
2309 log::info!("All symbols are market-ready.");
2310 return Ok(());
2311 }
2312
2313 if Instant::now() >= deadline {
2314 return Err(DexError::Other(
2315 "Timed out waiting for market data".to_string(),
2316 ));
2317 }
2318
2319 sleep(Duration::from_millis(200)).await;
2320 }
2321 }
2322}
2323
2324fn resolve_coin(sym: &str, map: &HashMap<String, usize>) -> String {
2325 if sym.contains('/') {
2326 match map.get(sym) {
2328 Some(idx) => format!("@{}", idx),
2329 None => {
2330 log::warn!("resolve_coin: {} is not in spot_index_map", sym);
2331 sym.to_string()
2332 }
2333 }
2334 } else if let Some(base) = sym.strip_suffix("-USD") {
2335 base.to_string()
2337 } else {
2338 sym.to_string()
2339 }
2340}