dex_connector/
hyperliquid_connector.rs

1use crate::{
2    dex_connector::{slippage_price, string_to_decimal, DexConnector},
3    dex_request::{DexError, DexRequest, HttpMethod},
4    dex_websocket::DexWebSocket,
5    BalanceResponse, CanceledOrder, CanceledOrdersResponse, CombinedBalanceResponse,
6    CreateOrderResponse, FilledOrder, FilledOrdersResponse, LastTradesResponse, OpenOrdersResponse,
7    OrderBookSnapshot, OrderSide, TickerResponse, TpSl, TriggerOrderStyle,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use debot_utils::parse_to_decimal;
12use ethers::{signers::LocalWallet, types::H160};
13use futures::{
14    stream::{SplitSink, SplitStream},
15    SinkExt, StreamExt,
16};
17use hyperliquid_rust_sdk_fork::{
18    BaseUrl, ClientCancelRequest, ClientLimit, ClientOrder, ClientOrderRequest, ClientTrigger,
19    ExchangeClient, ExchangeDataStatus, ExchangeResponseStatus,
20};
21use reqwest::Client;
22use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
23use rust_decimal::Decimal;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{
27    collections::HashMap,
28    str::FromStr,
29    sync::{
30        atomic::{AtomicBool, Ordering},
31        Arc,
32    },
33    time::Duration,
34};
35use tokio::{
36    net::TcpStream,
37    select,
38    signal::unix::{signal, SignalKind},
39    sync::{Mutex, RwLock},
40    task::JoinHandle,
41    time::sleep,
42};
43use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
44
45struct Config {
46    evm_wallet_address: String,
47    symbol_list: Vec<String>,
48}
49
50// --- Spot metadata support ---
51#[derive(Deserialize, Debug)]
52struct SpotMetaToken {
53    #[serde(rename = "name")]
54    _name: String,
55    #[serde(rename = "szDecimals")]
56    _sz_decimals: u32,
57    #[serde(rename = "weiDecimals")]
58    _wei_decimals: u32,
59    #[serde(rename = "index")]
60    _index: usize,
61}
62
63#[derive(Deserialize, Debug, Clone)]
64struct SpotMetaUniverse {
65    #[serde(rename = "name")]
66    name: String,
67    #[serde(rename = "tokens")]
68    _tokens: Vec<usize>,
69    #[serde(rename = "index")]
70    index: usize,
71}
72
73#[derive(Deserialize, Debug)]
74struct SpotMetaResponse {
75    #[serde(rename = "tokens")]
76    _tokens: Vec<SpotMetaToken>,
77    #[serde(rename = "universe")]
78    universe: Vec<SpotMetaUniverse>,
79}
80
81#[derive(Serialize, Debug)]
82struct InfoRequest<'a> {
83    #[serde(rename = "type")]
84    req_type: &'a str,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    user: Option<&'a str>,
87}
88
89#[derive(Debug)]
90struct TradeResult {
91    pub filled_side: OrderSide,
92    pub filled_size: Decimal,
93    pub filled_value: Decimal,
94    pub filled_fee: Decimal,
95    order_id: String,
96    pub is_rejected: bool,
97}
98
99#[derive(Debug, Clone)]
100pub struct CancelEvent {
101    pub order_id: String,
102    pub timestamp: u64,
103}
104
105#[derive(Default)]
106struct DynamicMarketInfo {
107    pub best_bid: Option<Decimal>,
108    pub best_ask: Option<Decimal>,
109    pub market_price: Option<Decimal>,
110    pub min_tick: Option<Decimal>,
111    pub volume: Option<Decimal>,
112    pub num_trades: Option<u64>,
113    pub open_interest: Option<Decimal>,
114    pub funding_rate: Option<Decimal>,
115    pub oracle_price: Option<Decimal>,
116}
117
118#[derive(Clone)]
119struct StaticMarketInfo {
120    pub decimals: u32,
121    pub _max_leverage: u32,
122}
123
124#[derive(Clone)]
125#[allow(dead_code)]
126struct MaintenanceInfo {
127    next_start: Option<DateTime<Utc>>,
128    fetched_at: DateTime<Utc>,
129}
130
131#[derive(Deserialize, Debug)]
132pub struct OrderUpdateDetail {
133    pub coin: String,
134    #[serde(rename = "oid")]
135    pub oid: u64,
136}
137
138#[derive(Deserialize, Debug)]
139pub struct OrderUpdate {
140    pub order: OrderUpdateDetail,
141    pub status: String,
142    #[serde(rename = "statusTimestamp")]
143    pub status_timestamp: u64,
144}
145
146#[allow(dead_code)]
147#[derive(Deserialize, Debug)]
148struct WsLevel {
149    px: String,
150    sz: String,
151    n: u64,
152}
153
154#[allow(dead_code)]
155#[derive(Deserialize, Debug)]
156struct WsBbo {
157    coin: String,
158    time: u64,
159    bbo: [Option<WsLevel>; 2], // [bestBid?, bestAsk?]
160}
161
162#[allow(dead_code)]
163#[derive(Deserialize, Debug)]
164struct WsBook {
165    coin: String,
166    time: u64,
167    levels: [Vec<WsLevel>; 2], // [bids, asks]
168}
169
170pub struct HyperliquidConnector {
171    config: Config,
172    request: DexRequest,
173    web_socket: DexWebSocket,
174    running: Arc<AtomicBool>,
175    read_socket: Arc<Mutex<Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>>,
176    write_socket:
177        Arc<Mutex<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
178    task_handle_read_message: Arc<Mutex<Option<JoinHandle<()>>>>,
179    task_handle_read_sigterm: Arc<Mutex<Option<JoinHandle<()>>>>,
180    // 1st key = symbol, 2nd key = order_id
181    trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
182    canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
183    dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
184    static_market_info: HashMap<String, StaticMarketInfo>,
185    spot_index_map: HashMap<String, usize>,
186    spot_reverse_map: Arc<HashMap<usize, String>>,
187    exchange_client: ExchangeClient,
188    maintenance: Arc<RwLock<MaintenanceInfo>>,
189    last_volumes: Arc<Mutex<HashMap<String, Decimal>>>,
190}
191
192#[derive(Debug)]
193struct WebSocketMessage {
194    _channel: String,
195    data: WebSocketData,
196}
197
198#[derive(Debug)]
199enum WebSocketData {
200    AllMidsData(AllMidsData),
201    UserFillsData(UserFillsData),
202    CandleData(CandleData),
203    ActiveAssetCtxData(ActiveAssetCtxData),
204    OrderUpdatesData(Vec<OrderUpdate>),
205    Bbo(WsBbo),
206    L2Book(WsBook),
207}
208
209#[derive(Deserialize, Debug)]
210struct AllMidsData {
211    mids: HashMap<String, String>,
212}
213
214#[allow(dead_code, non_snake_case)]
215#[derive(Deserialize, Debug)]
216struct CandleData {
217    t: u64,     // Open time (milliseconds)
218    T: u64,     // Close time (milliseconds)
219    s: String,  // Symbol
220    i: String,  // Interval
221    o: Decimal, // Open price
222    c: Decimal, // Close price
223    h: Decimal, // High price
224    l: Decimal, // Low price
225    v: Decimal, // Volume
226    n: u64,     // Number of trades
227}
228
229#[derive(Deserialize, Debug)]
230pub struct ActiveAssetCtxData {
231    pub coin: String,       // The asset symbol (e.g., BTC-USD)
232    pub ctx: PerpsAssetCtx, // The asset context containing market details
233}
234
235#[allow(dead_code, non_snake_case)]
236#[derive(Deserialize, Debug)]
237pub struct PerpsAssetCtx {
238    pub dayNtlVlm: Decimal,     // Daily notional volume
239    pub prevDayPx: Decimal,     // Previous day's price
240    pub markPx: Decimal,        // Mark price
241    pub midPx: Option<Decimal>, // Mid price (optional)
242    pub funding: Decimal,       // Funding rate
243    pub openInterest: Decimal,  // Open interest
244    pub oraclePx: Decimal,      // Oracle price
245}
246
247#[derive(Serialize, Deserialize, Debug)]
248pub struct UserFillsData {
249    pub user: String,
250    pub fills: Vec<Fill>,
251}
252
253#[derive(Serialize, Deserialize, Debug)]
254pub struct Fill {
255    pub coin: String,
256    pub px: Decimal,
257    pub sz: Decimal,
258    pub side: String,
259    pub dir: String,
260    #[serde(rename = "closedPnl")]
261    pub closed_pnl: Decimal,
262    pub oid: u64,
263    pub tid: u64,
264    pub fee: Decimal,
265}
266
267impl<'de> Deserialize<'de> for WebSocketMessage {
268    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
269    where
270        D: serde::Deserializer<'de>,
271    {
272        #[derive(Deserialize)]
273        struct Helper {
274            channel: String,
275            data: serde_json::Value,
276        }
277        let helper = Helper::deserialize(deserializer)?;
278        let data = match helper.channel.as_str() {
279            "allMids" => AllMidsData::deserialize(helper.data)
280                .map(WebSocketData::AllMidsData)
281                .map_err(serde::de::Error::custom)?,
282            "userFills" => UserFillsData::deserialize(helper.data)
283                .map(WebSocketData::UserFillsData)
284                .map_err(serde::de::Error::custom)?,
285            "orderUpdates" => Vec::<OrderUpdate>::deserialize(helper.data)
286                .map(WebSocketData::OrderUpdatesData)
287                .map_err(serde::de::Error::custom)?,
288            "candle" => CandleData::deserialize(helper.data)
289                .map(WebSocketData::CandleData)
290                .map_err(serde::de::Error::custom)?,
291            "activeAssetCtx" => ActiveAssetCtxData::deserialize(helper.data)
292                .map(WebSocketData::ActiveAssetCtxData)
293                .map_err(serde::de::Error::custom)?,
294            "bbo" => WsBbo::deserialize(helper.data)
295                .map(WebSocketData::Bbo)
296                .map_err(serde::de::Error::custom)?,
297            "l2Book" => WsBook::deserialize(helper.data)
298                .map(WebSocketData::L2Book)
299                .map_err(serde::de::Error::custom)?,
300            _ => return Err(serde::de::Error::custom("unknown channel type")),
301        };
302        Ok(WebSocketMessage {
303            _channel: helper.channel,
304            data,
305        })
306    }
307}
308
309/// Determine buy/sell direction for SL/TP orders based on position direction
310/// - Long position SL/TP = Sell order (close position)
311/// - Short position SL/TP = Buy order (close position)
312fn is_buy_for_tpsl(position_side: OrderSide) -> bool {
313    matches!(position_side, OrderSide::Short)
314}
315
316impl HyperliquidConnector {
317    pub async fn new(
318        rest_endpoint: &str,
319        web_socket_endpoint: &str,
320        private_key: &str,
321        evm_wallet_address: &str,
322        vault_address: Option<String>,
323        use_agent: bool,
324        agent_name: Option<String>,
325        symbol_list: &[&str],
326    ) -> Result<Self, DexError> {
327        let request = DexRequest::new(rest_endpoint.to_owned()).await?;
328        let web_socket = DexWebSocket::new(web_socket_endpoint.to_owned());
329
330        let evm_wallet_address = vault_address
331            .clone()
332            .unwrap_or_else(|| evm_wallet_address.into());
333        let config = Config {
334            evm_wallet_address,
335            symbol_list: symbol_list.iter().map(|s| s.to_string()).collect(),
336        };
337
338        let vault_address: Option<H160> = vault_address
339            .as_deref()
340            .and_then(|v| H160::from_str(v).ok());
341
342        let mut local_wallet: LocalWallet = private_key.parse().unwrap();
343
344        if use_agent {
345            let ec_tmp =
346                ExchangeClient::new(None, local_wallet, Some(BaseUrl::Mainnet), None, None)
347                    .await
348                    .map_err(|e| DexError::Other(e.to_string()))?;
349
350            let (pk, status) = ec_tmp
351                .approve_agent(None, agent_name.clone())
352                .await
353                .map_err(|e| DexError::Other(e.to_string()))?;
354
355            match status {
356                ExchangeResponseStatus::Ok(_) => {
357                    log::info!("approve_agent succeeded for {:?}", agent_name);
358                    local_wallet = pk
359                        .parse()
360                        .map_err(|e| DexError::Other(format!("Failed to parse agent pk: {e}")))?;
361                }
362                ExchangeResponseStatus::Err(e) => {
363                    log::error!("approve_agent failed: {e}");
364                    return Err(DexError::Other(format!("approve_agent failed: {e}")));
365                }
366            }
367        }
368
369        let exchange_client = ExchangeClient::new(
370            None,
371            local_wallet,
372            Some(BaseUrl::Mainnet),
373            None,
374            vault_address,
375        )
376        .await
377        .map_err(|e| DexError::Other(e.to_string()))?;
378
379        let mut instance = HyperliquidConnector {
380            config,
381            request,
382            web_socket,
383            trade_results: Arc::new(RwLock::new(HashMap::new())),
384            canceled_results: Arc::new(RwLock::new(HashMap::new())),
385            running: Arc::new(AtomicBool::new(false)),
386            read_socket: Arc::new(Mutex::new(None)),
387            write_socket: Arc::new(Mutex::new(None)),
388            task_handle_read_message: Arc::new(Mutex::new(None)),
389            task_handle_read_sigterm: Arc::new(Mutex::new(None)),
390            dynamic_market_info: Arc::new(RwLock::new(HashMap::new())),
391            static_market_info: HashMap::new(),
392            spot_index_map: HashMap::new(),
393            spot_reverse_map: Arc::new(HashMap::new()),
394            exchange_client,
395            maintenance: Arc::new(RwLock::new(MaintenanceInfo {
396                next_start: None,
397                fetched_at: Utc::now() - ChronoDuration::hours(1),
398            })),
399            last_volumes: Arc::new(Mutex::new(HashMap::new())),
400        };
401
402        instance.spawn_maintenance_watcher();
403
404        instance.retrive_market_metadata().await?;
405
406        let info_payload = serde_json::to_string(&InfoRequest {
407            req_type: "spotMeta",
408            user: None,
409        })
410        .map_err(|e| DexError::Other(e.to_string()))?;
411
412        let spot_meta: SpotMetaResponse = instance
413            .request
414            .handle_request::<SpotMetaResponse, InfoRequest<'_>>(
415                HttpMethod::Post,
416                "/info".into(),
417                &HashMap::new(),
418                info_payload,
419            )
420            .await?;
421
422        // index → token_name
423        let token_name_map: HashMap<usize, String> = spot_meta
424            ._tokens
425            .iter()
426            .map(|t| (t._index, t._name.clone()))
427            .collect();
428
429        let mut idx_from_pair = HashMap::<String, usize>::new();
430        let mut pair_from_idx = HashMap::<usize, String>::new();
431
432        for uni in &spot_meta.universe {
433            let pair = if !uni.name.starts_with('@') {
434                uni.name.clone()
435            } else if uni._tokens.len() == 2 {
436                format!(
437                    "{}/{}",
438                    token_name_map.get(&uni._tokens[0]).unwrap_or(&"?".into()),
439                    token_name_map.get(&uni._tokens[1]).unwrap_or(&"?".into())
440                )
441            } else {
442                log::warn!(
443                    "universe idx {} has unexpected token vec {:?}",
444                    uni.index,
445                    uni._tokens
446                );
447                uni.name.clone()
448            };
449
450            idx_from_pair.insert(pair.clone(), uni.index);
451            pair_from_idx.insert(uni.index, pair);
452        }
453
454        instance.spot_index_map = idx_from_pair;
455        instance.spot_reverse_map = Arc::new(pair_from_idx);
456
457        {
458            let token_decimals: HashMap<String, u32> = spot_meta
459                ._tokens
460                .iter()
461                .map(|t| (t._name.clone(), t._sz_decimals))
462                .collect();
463
464            let mut sm = std::mem::take(&mut instance.static_market_info);
465
466            for uni in &spot_meta.universe {
467                let pair = if !uni.name.starts_with('@') {
468                    uni.name.clone()
469                } else if uni._tokens.len() == 2 {
470                    format!(
471                        "{}/{}",
472                        spot_meta._tokens[uni._tokens[0]]._name,
473                        spot_meta._tokens[uni._tokens[1]]._name
474                    )
475                } else {
476                    uni.name.clone()
477                };
478
479                let base = pair.split('/').next().unwrap();
480                let decimals = *token_decimals.get(base).unwrap_or(&0);
481
482                sm.insert(
483                    pair.clone(),
484                    StaticMarketInfo {
485                        decimals,
486                        _max_leverage: 0,
487                    },
488                );
489            }
490
491            instance.static_market_info = sm;
492        }
493
494        Ok(instance)
495    }
496
497    fn spawn_maintenance_watcher(&self) {
498        let cache = self.maintenance.clone();
499        tokio::spawn(async move {
500            let client = Client::builder()
501                .timeout(std::time::Duration::from_secs(2))
502                .build()
503                .expect("reqwest client");
504
505            loop {
506                if let Ok(res) = client
507                    .get("https://hyperliquid.statuspage.io/api/v2/scheduled-maintenances/upcoming.json")
508                    .send()
509                    .await
510                {
511                    if let Ok(json) = res.json::<Value>().await {
512                        let scheduled = json
513                            .get("scheduled_maintenances")
514                            .and_then(|v| v.as_array());
515                        let scheduled_count = scheduled.map(|arr| arr.len()).unwrap_or(0);
516                        let raw_next = scheduled
517                            .and_then(|arr| arr.get(0))
518                            .and_then(|v| v.get("scheduled_for"))
519                            .and_then(|v| v.as_str());
520
521                        log::debug!(
522                            "Hyperliquid maintenance poll: count={} raw_next={:?}",
523                            scheduled_count,
524                            raw_next
525                        );
526
527                        let next = json
528                            .get("scheduled_maintenances")
529                            .and_then(|v| v.get(0))
530                            .and_then(|v| v.get("scheduled_for"))
531                            .and_then(|v| v.as_str())
532                            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
533                            .map(|dt| dt.with_timezone(&Utc));
534
535                        if next.is_none() {
536                            log::debug!(
537                                "Hyperliquid maintenance parse result: no upcoming window parsed from {:?}",
538                                raw_next
539                            );
540                        }
541
542                        *cache.write().await = MaintenanceInfo {
543                            next_start: next,
544                            fetched_at: Utc::now(),
545                        };
546                    }
547                }
548                sleep(Duration::from_secs(600)).await;
549            }
550        });
551    }
552
553    pub async fn start_web_socket(&self) -> Result<(), DexError> {
554        log::info!("start_web_socket");
555
556        let (write, read) = self
557            .web_socket
558            .clone()
559            .connect()
560            .await
561            .map_err(|_| DexError::Other("Failed to connect to WebSocket".to_string()))?;
562
563        {
564            let mut read_lock = self.read_socket.lock().await;
565            *read_lock = Some(read);
566        }
567        {
568            let mut write_lock = self.write_socket.lock().await;
569            *write_lock = Some(write);
570        }
571
572        self.running.store(true, Ordering::SeqCst);
573        self.subscribe_to_channels(&self.config.evm_wallet_address)
574            .await?;
575
576        let running = self.running.clone();
577        let read_sock = self.read_socket.clone();
578        let write_sock = self.write_socket.clone();
579        let dmi = self.dynamic_market_info.clone();
580        let trs = self.trade_results.clone();
581        let rev_map = self.spot_reverse_map.clone();
582        let crs = self.canceled_results.clone();
583        let static_info = self.static_market_info.clone();
584
585        let reader_handle = tokio::spawn(async move {
586            let mut idle_counter = 0;
587            while running.load(Ordering::SeqCst) {
588                if let Some(stream) = read_sock.lock().await.as_mut() {
589                    tokio::select! {
590                        msg = stream.next() => match msg {
591                            Some(Ok(Message::Text(txt))) => {
592                                idle_counter = 0;
593                                if txt == "{}" {
594                                    if let Some(w) = write_sock.lock().await.as_mut() {
595                                        let _ = w.send(Message::Text(txt)).await;
596                                    }
597                                } else {
598                                    if let Err(e) = HyperliquidConnector::handle_websocket_message(
599                                        Message::Text(txt),
600                                        dmi.clone(),
601                                        trs.clone(),
602                                        rev_map.clone(),
603                                        crs.clone(),
604                                        static_info.clone(),
605                                    ).await {
606                                        log::error!("WebSocket handler error: {:?}", e);
607                                        break;
608                                    }
609                                }
610                            }
611                            Some(Ok(_)) => {
612                            }
613                            Some(Err(err)) => {
614                                log::error!("WebSocket read error: {:?}", err);
615                                break;
616                            }
617                            None => {
618                                log::info!("WebSocket stream closed");
619                                break;
620                            }
621                        },
622                        _ = tokio::time::sleep(Duration::from_secs(10)) => {
623                            idle_counter += 1;
624                            if idle_counter >= 10 {
625                                log::error!("No WebSocket messages for 100s, shutting down reader");
626                                break;
627                            }
628                        }
629                    }
630                }
631            }
632            running.store(false, Ordering::SeqCst);
633            log::info!("WebSocket reader task ended");
634        });
635        *self.task_handle_read_message.lock().await = Some(reader_handle);
636
637        let running_for_sig = self.running.clone();
638        let sig_handle = tokio::spawn(async move {
639            let mut sigterm =
640                signal(SignalKind::terminate()).expect("Failed to bind SIGTERM handler");
641            loop {
642                select! {
643                    _ = sigterm.recv() => {
644                        log::info!("SIGTERM received, stopping WebSocket");
645                        running_for_sig.store(false, Ordering::SeqCst);
646                        break;
647                    }
648                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
649                        if !running_for_sig.load(Ordering::SeqCst) {
650                            break;
651                        }
652                    }
653                }
654            }
655        });
656        *self.task_handle_read_sigterm.lock().await = Some(sig_handle);
657
658        Ok(())
659    }
660
661    pub async fn stop_web_socket(&self) -> Result<(), DexError> {
662        log::info!("stop_web_socket");
663        self.running.store(false, Ordering::SeqCst);
664
665        {
666            let mut write_guard = self.write_socket.lock().await;
667            if let Some(write_socket) = write_guard.as_mut() {
668                if let Err(e) = write_socket.send(Message::Close(None)).await {
669                    log::error!("Failed to send WebSocket close message: {:?}", e);
670                }
671            }
672            *write_guard = None;
673        }
674
675        {
676            let mut read_guard = self.read_socket.lock().await;
677            *read_guard = None;
678        }
679
680        if let Some(handle) = self.task_handle_read_message.lock().await.take() {
681            let _ = handle.await;
682        }
683
684        if let Some(handle) = self.task_handle_read_sigterm.lock().await.take() {
685            let _ = handle.await;
686        }
687
688        drop(self.web_socket.clone());
689
690        Ok(())
691    }
692
693    async fn subscribe_to_channels(&self, user_address: &str) -> Result<(), DexError> {
694        let all_mids_subscription = serde_json::json!({
695            "method": "subscribe",
696            "subscription": {
697                "type": "allMids"
698            }
699        })
700        .to_string();
701
702        let user_fills_subscription = serde_json::json!({
703            "method": "subscribe",
704            "subscription": {
705                "type": "userFills",
706                "user": user_address
707            }
708        })
709        .to_string();
710
711        let order_updates_subscription = serde_json::json!({
712            "method": "subscribe",
713            "subscription": {
714                "type": "orderUpdates",
715                "user": user_address
716            }
717        })
718        .to_string();
719
720        let mut write_socket_lock = self.write_socket.lock().await;
721
722        if let Some(write_socket) = write_socket_lock.as_mut() {
723            if let Err(e) = write_socket
724                .send(Message::Text(all_mids_subscription))
725                .await
726            {
727                return Err(DexError::WebSocketError(format!(
728                    "Failed to subscribe to allMids: {}",
729                    e
730                )));
731            }
732
733            if let Err(e) = write_socket
734                .send(Message::Text(order_updates_subscription))
735                .await
736            {
737                return Err(DexError::WebSocketError(format!(
738                    "Failed to subscribe to userFills: {}",
739                    e
740                )));
741            }
742
743            if let Err(e) = write_socket
744                .send(Message::Text(user_fills_subscription))
745                .await
746            {
747                return Err(DexError::WebSocketError(format!(
748                    "Failed to subscribe to userFills: {}",
749                    e
750                )));
751            }
752
753            for symbol in &self.config.symbol_list {
754                let coin = resolve_coin(symbol, &self.spot_index_map);
755                let candle_subscription = serde_json::json!({
756                    "method": "subscribe",
757                    "subscription": {
758                        "type": "candle",
759                        "coin": coin,
760                        "interval": "1m"
761                    }
762                })
763                .to_string();
764                if let Err(e) = write_socket.send(Message::Text(candle_subscription)).await {
765                    return Err(DexError::WebSocketError(format!(
766                        "Failed to subscribe to candle for {}: {}",
767                        symbol, e
768                    )));
769                }
770
771                let active_asset_ctx_subscription = serde_json::json!({
772                    "method": "subscribe",
773                    "subscription": {
774                        "type": "activeAssetCtx",
775                        "coin": coin,
776                    }
777                })
778                .to_string();
779                if let Err(e) = write_socket
780                    .send(Message::Text(active_asset_ctx_subscription))
781                    .await
782                {
783                    return Err(DexError::WebSocketError(format!(
784                        "Failed to subscribe to activeAssetCtx: {}",
785                        e
786                    )));
787                }
788
789                let bbo_subscription = serde_json::json!({
790                    "method": "subscribe",
791                    "subscription": {
792                        "type": "bbo",
793                        "coin": coin
794                    }
795                })
796                .to_string();
797                if let Err(e) = write_socket.send(Message::Text(bbo_subscription)).await {
798                    return Err(DexError::WebSocketError(format!(
799                        "Failed to subscribe to bbo: {}",
800                        e
801                    )));
802                }
803
804                let l2_subscription = serde_json::json!({
805                    "method": "subscribe",
806                    "subscription": {
807                        "type": "l2Book",
808                        "coin": coin
809                    }
810                })
811                .to_string();
812                if let Err(e) = write_socket.send(Message::Text(l2_subscription)).await {
813                    return Err(DexError::WebSocketError(format!(
814                        "Failed to subscribe to l2: {}",
815                        e
816                    )));
817                }
818            }
819        } else {
820            return Err(DexError::WebSocketError(
821                "Write socket is not available".to_string(),
822            ));
823        }
824
825        Ok(())
826    }
827
828    async fn handle_websocket_message(
829        msg: Message,
830        dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
831        trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
832        spot_reverse_map: Arc<HashMap<usize, String>>,
833        canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
834        static_market_info: HashMap<String, StaticMarketInfo>,
835    ) -> Result<(), DexError> {
836        if let Message::Text(text) = msg {
837            for line in text.split('\n') {
838                if line.is_empty() {
839                    continue;
840                }
841                if let Ok(message) = serde_json::from_str::<WebSocketMessage>(line) {
842                    log::trace!("[WebSocketMessage] channel = {}", message._channel);
843
844                    match &message.data {
845                        WebSocketData::Bbo(bbo) => {
846                            log::trace!("[WebSocketMessage] BBO coin = {}", bbo.coin);
847                        }
848                        WebSocketData::L2Book(book) => {
849                            log::trace!("[WebSocketMessage] L2Book coin = {}", book.coin);
850                        }
851                        WebSocketData::AllMidsData(data) => {
852                            log::trace!("[WebSocketMessage] allMids keys = {:?}", data.mids.keys());
853                        }
854                        _ => {}
855                    }
856
857                    match message.data {
858                        WebSocketData::AllMidsData(ref data) => {
859                            Self::process_all_mids_message(
860                                data,
861                                dynamic_market_info.clone(),
862                                spot_reverse_map.clone(),
863                                &static_market_info,
864                            )
865                            .await;
866                        }
867                        WebSocketData::CandleData(ref data) => {
868                            Self::process_candle_message(
869                                data,
870                                dynamic_market_info.clone(),
871                                spot_reverse_map.clone(),
872                            )
873                            .await;
874                        }
875                        WebSocketData::UserFillsData(ref data) => {
876                            Self::process_account_data(data, trade_results.clone()).await;
877                        }
878                        WebSocketData::ActiveAssetCtxData(ref data) => {
879                            Self::process_active_asset_ctx_message(
880                                data,
881                                dynamic_market_info.clone(),
882                                spot_reverse_map.clone(),
883                            )
884                            .await;
885                        }
886                        WebSocketData::OrderUpdatesData(ref orders) => {
887                            Self::process_order_updates_message(
888                                orders,
889                                canceled_results.clone(),
890                                trade_results.clone(),
891                            )
892                            .await;
893                        }
894                        WebSocketData::Bbo(ref bbo) => {
895                            // "@123" → 123 → "UBTC/USDC"
896                            let idx = bbo
897                                .coin
898                                .strip_prefix('@')
899                                .and_then(|s| s.parse::<usize>().ok());
900                            let coin = idx
901                                .and_then(|i| spot_reverse_map.get(&i).cloned())
902                                .unwrap_or_else(|| bbo.coin.clone());
903                            let market_key = if coin.contains('/') {
904                                coin.clone()
905                            } else {
906                                format!("{}-USD", coin)
907                            };
908                            let mut info_map = dynamic_market_info.write().await;
909                            let info = info_map.entry(market_key).or_default();
910                            info.best_bid = bbo
911                                .bbo
912                                .get(0)
913                                .and_then(|lvl| lvl.as_ref())
914                                .map(|l| Decimal::from_str(&l.px).unwrap());
915                            info.best_ask = bbo
916                                .bbo
917                                .get(1)
918                                .and_then(|lvl| lvl.as_ref())
919                                .map(|l| Decimal::from_str(&l.px).unwrap());
920                        }
921                        WebSocketData::L2Book(ref book) => {
922                            let idx = book
923                                .coin
924                                .strip_prefix('@')
925                                .and_then(|s| s.parse::<usize>().ok());
926                            let coin = idx
927                                .and_then(|i| spot_reverse_map.get(&i).cloned())
928                                .unwrap_or_else(|| book.coin.clone());
929                            let market_key = if coin.contains('/') {
930                                coin.clone()
931                            } else {
932                                format!("{}-USD", coin)
933                            };
934                            let mut info_map = dynamic_market_info.write().await;
935                            let info = info_map.entry(market_key).or_default();
936                            info.best_bid = book.levels[0]
937                                .get(0)
938                                .map(|lvl| Decimal::from_str(&lvl.px).unwrap());
939                            info.best_ask = book.levels[1]
940                                .get(0)
941                                .map(|lvl| Decimal::from_str(&lvl.px).unwrap());
942                        }
943                    }
944                }
945            }
946        }
947        Ok(())
948    }
949
950    async fn process_order_updates_message(
951        orders: &[OrderUpdate],
952        canceled_results: Arc<RwLock<HashMap<String, HashMap<String, CancelEvent>>>>,
953        trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
954    ) {
955        for upd in orders.iter() {
956            let symbol = if upd.order.coin.contains('/') || upd.order.coin.contains('-') {
957                upd.order.coin.clone()
958            } else {
959                format!("{}-USD", upd.order.coin)
960            };
961
962            match upd.status.as_str() {
963                "canceled" => {
964                    log::debug!(
965                        "🚫 [FILL_DETECTION] Order canceled: {} ({})",
966                        upd.order.oid,
967                        symbol
968                    );
969                    let evt = CancelEvent {
970                        order_id: upd.order.oid.to_string(),
971                        timestamp: upd.status_timestamp,
972                    };
973                    canceled_results
974                        .write()
975                        .await
976                        .entry(symbol)
977                        .or_default()
978                        .insert(evt.order_id.clone(), evt);
979                }
980                "rejected" => {
981                    log::debug!(
982                        "❌ [FILL_DETECTION] Order rejected: {} ({})",
983                        upd.order.oid,
984                        symbol
985                    );
986                    let mut trs = trade_results.write().await;
987                    let entry = trs.entry(symbol).or_default();
988                    entry.insert(
989                        upd.order.oid.to_string(),
990                        TradeResult {
991                            filled_side: OrderSide::Long,
992                            filled_size: Decimal::ZERO,
993                            filled_value: Decimal::ZERO,
994                            filled_fee: Decimal::ZERO,
995                            order_id: upd.order.oid.to_string(),
996                            is_rejected: true,
997                        },
998                    );
999                }
1000                "filled" | "partiallyFilled" => {
1001                    log::info!(
1002                        "✅ [FILL_DETECTION] Order {} detected: {} ({})",
1003                        upd.status,
1004                        upd.order.oid,
1005                        symbol
1006                    );
1007                    // TODO: Add fill event processing here
1008                }
1009                _ => {
1010                    log::debug!(
1011                        "🔍 [FILL_DETECTION] Unknown order status: '{}' for order {} ({})",
1012                        upd.status,
1013                        upd.order.oid,
1014                        symbol
1015                    );
1016                }
1017            }
1018        }
1019    }
1020
1021    async fn process_all_mids_message(
1022        mids_data: &AllMidsData,
1023        dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1024        spot_reverse_map: Arc<HashMap<usize, String>>,
1025        static_market_info: &HashMap<String, StaticMarketInfo>,
1026    ) {
1027        for (raw_coin, mid_price_str) in &mids_data.mids {
1028            let coin = if let Some(stripped) = raw_coin.strip_prefix('@') {
1029                let idx = stripped.parse::<usize>().unwrap_or_else(|_| {
1030                    log::warn!("[resolve_coin] invalid @index: {}", stripped);
1031                    0
1032                });
1033
1034                let mapped = spot_reverse_map.get(&idx).cloned();
1035                match mapped {
1036                    Some(mapped) => mapped,
1037                    None => {
1038                        log::trace!(
1039                            "[resolve_coin] spot_reverse_map missing: {} (index: {})",
1040                            raw_coin,
1041                            idx
1042                        );
1043                        raw_coin.clone()
1044                    }
1045                }
1046            } else {
1047                raw_coin.clone()
1048            };
1049
1050            let market_key = if coin.contains('/') || coin.contains('-') {
1051                coin.clone() // Spot: UBTC/USDC,  etc.
1052            } else {
1053                format!("{}-USD", coin) // Perp: BTC-USD, etc.
1054            };
1055
1056            if let Ok(mid) = string_to_decimal(Some(mid_price_str.clone())) {
1057                let mut guard = dynamic_market_info.write().await;
1058                let info = guard.entry(market_key.clone()).or_default();
1059                let sz_decimals = static_market_info
1060                    .get(&market_key)
1061                    .map(|m| m.decimals)
1062                    .unwrap_or_else(|| {
1063                        log::trace!("no static for {}, default 0", market_key);
1064                        0
1065                    });
1066                let is_spot = market_key.contains('/');
1067
1068                let base_tick = Self::calculate_min_tick(mid, sz_decimals, is_spot);
1069                info.min_tick = Some(base_tick);
1070                info.market_price = Some(mid);
1071            }
1072        }
1073    }
1074
1075    async fn process_candle_message(
1076        candle: &CandleData,
1077        dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1078        spot_reverse_map: Arc<HashMap<usize, String>>,
1079    ) {
1080        let coin = if let Some(stripped) = candle.s.strip_prefix('@') {
1081            stripped
1082                .parse::<usize>()
1083                .ok()
1084                .and_then(|idx| spot_reverse_map.get(&idx).cloned())
1085                .unwrap_or_else(|| {
1086                    log::trace!(
1087                        "in spot_reverse_map: {} is missing (@{})",
1088                        candle.s,
1089                        stripped
1090                    );
1091                    candle.s.clone()
1092                })
1093        } else {
1094            candle.s.clone()
1095        };
1096
1097        let market_key = if coin.contains('/') || coin.contains('-') {
1098            coin.clone()
1099        } else {
1100            format!("{}-USD", coin)
1101        };
1102
1103        let mut guard = dynamic_market_info.write().await;
1104        let info = guard.entry(market_key.clone()).or_default();
1105        info.volume = Some(candle.v);
1106        info.num_trades = Some(candle.n);
1107    }
1108
1109    async fn process_active_asset_ctx_message(
1110        asset_data: &ActiveAssetCtxData,
1111        dynamic_market_info: Arc<RwLock<HashMap<String, DynamicMarketInfo>>>,
1112        spot_reverse_map: Arc<HashMap<usize, String>>,
1113    ) {
1114        let coin = if let Some(stripped) = asset_data.coin.strip_prefix('@') {
1115            stripped
1116                .parse::<usize>()
1117                .ok()
1118                .and_then(|idx| spot_reverse_map.get(&idx).cloned())
1119                .unwrap_or_else(|| {
1120                    log::trace!(
1121                        "in spot_reverse_map {} is missing (@{})",
1122                        asset_data.coin,
1123                        stripped
1124                    );
1125                    asset_data.coin.clone()
1126                })
1127        } else {
1128            asset_data.coin.clone()
1129        };
1130
1131        let market_key = if coin.contains('/') || coin.contains('-') {
1132            coin.clone()
1133        } else {
1134            format!("{}-USD", coin)
1135        };
1136
1137        let mut guard = dynamic_market_info.write().await;
1138        let info = guard
1139            .entry(market_key.clone())
1140            .or_insert_with(DynamicMarketInfo::default);
1141        info.funding_rate = Some(asset_data.ctx.funding);
1142        info.open_interest = Some(asset_data.ctx.openInterest);
1143        info.oracle_price = Some(asset_data.ctx.oraclePx);
1144    }
1145
1146    async fn process_account_data(
1147        data: &UserFillsData,
1148        trade_results: Arc<RwLock<HashMap<String, HashMap<String, TradeResult>>>>,
1149    ) {
1150        for fill in &data.fills {
1151            log::debug!("{:?}", fill);
1152
1153            let filled_side = if fill.side == "A" {
1154                OrderSide::Short
1155            } else {
1156                OrderSide::Long
1157            };
1158
1159            let filled_size = fill.sz;
1160            let filled_price = fill.px;
1161            let filled_value = filled_size * filled_price;
1162            let filled_fee = fill.fee;
1163            let order_id = fill.oid;
1164            let trade_id = fill.tid;
1165
1166            let market_id = if fill.coin.contains('/') || fill.coin.contains('-') {
1167                fill.coin.clone()
1168            } else {
1169                format!("{}-USD", fill.coin)
1170            };
1171
1172            let trade_result = TradeResult {
1173                filled_side,
1174                filled_size,
1175                filled_value,
1176                filled_fee,
1177                order_id: order_id.to_string(),
1178                is_rejected: false,
1179            };
1180
1181            let mut trade_results_guard = trade_results.write().await;
1182            let market_map = trade_results_guard.entry(market_id.clone()).or_default();
1183            let key = trade_id.to_string();
1184
1185            if let Some(existing) = market_map.get_mut(&key) {
1186                existing.filled_size += trade_result.filled_size;
1187                existing.filled_value += trade_result.filled_value;
1188                existing.filled_fee += trade_result.filled_fee;
1189            } else {
1190                market_map.insert(key, trade_result);
1191            }
1192        }
1193    }
1194}
1195
1196#[derive(Serialize, Debug, Clone)]
1197struct HyperliquidDefaultPayload {
1198    r#type: String,
1199    #[serde(skip_serializing_if = "Option::is_none")]
1200    user: Option<String>,
1201}
1202
1203#[derive(Deserialize, Debug)]
1204struct HyperliquidRetrieveUserStateResponse {
1205    #[serde(rename = "marginSummary")]
1206    margin_summary: Option<HyperliquidMarginSummary>,
1207}
1208#[derive(Deserialize, Debug)]
1209struct HyperliquidMarginSummary {
1210    #[serde(rename = "accountValue")]
1211    account_value: String,
1212    #[serde(rename = "totalRawUsd")]
1213    total_rawusd: String,
1214}
1215
1216#[derive(Deserialize, Debug)]
1217struct HyperliquidRetriveUserOpenOrder {
1218    coin: String,
1219    oid: u64,
1220}
1221
1222#[derive(Deserialize, Debug)]
1223struct HyperliquidRetriveUserPositionResponse {
1224    #[serde(rename = "assetPositions")]
1225    asset_positions: Vec<HyperliquidRetriveUserPositionResponseBody>,
1226}
1227#[derive(Deserialize, Debug)]
1228struct HyperliquidRetriveUserPositionResponseBody {
1229    position: HyperliquidRetriveUserPosition,
1230}
1231#[derive(Deserialize, Debug)]
1232struct HyperliquidRetriveUserPosition {
1233    coin: String,
1234    szi: Decimal,
1235}
1236
1237#[derive(Deserialize, Debug)]
1238struct HyperliquidRetriveMarketMetadataResponse {
1239    universe: Vec<HyperliquidRetriveMarketMetadata>,
1240}
1241#[derive(Deserialize, Debug)]
1242struct HyperliquidRetriveMarketMetadata {
1243    name: String,
1244    #[serde(rename = "szDecimals")]
1245    decimals: u32,
1246    #[serde(rename = "maxLeverage")]
1247    max_leverage: u32,
1248}
1249
1250#[derive(Deserialize, Debug)]
1251struct HyperliquidSpotBalanceResponse {
1252    balances: Vec<HyperliquidSpotBalance>,
1253}
1254
1255#[derive(Deserialize, Debug)]
1256struct HyperliquidSpotBalance {
1257    coin: String,
1258    total: String,
1259}
1260
1261#[async_trait]
1262impl DexConnector for HyperliquidConnector {
1263    async fn start(&self) -> Result<(), DexError> {
1264        self.start_web_socket().await?;
1265        sleep(Duration::from_secs(5)).await;
1266        self.wait_for_market_ready(60).await?;
1267        Ok(())
1268    }
1269
1270    async fn stop(&self) -> Result<(), DexError> {
1271        self.stop_web_socket().await?;
1272        Ok(())
1273    }
1274
1275    async fn restart(&self, max_retries: i32) -> Result<(), DexError> {
1276        log::info!("Restarting WebSocket connection...");
1277
1278        let mut retry_count = 0;
1279        let mut backoff_delay = Duration::from_secs(1);
1280
1281        while retry_count < max_retries {
1282            if let Err(e) = self.stop_web_socket().await {
1283                log::error!(
1284                    "Failed to stop WebSocket on attempt {}: {:?}",
1285                    retry_count + 1,
1286                    e
1287                );
1288            } else {
1289                log::info!(
1290                    "Successfully stopped WebSocket on attempt {}.",
1291                    retry_count + 1
1292                );
1293            }
1294
1295            sleep(backoff_delay).await;
1296
1297            match self.start_web_socket().await {
1298                Ok(_) => {
1299                    log::info!(
1300                        "Successfully started WebSocket on attempt {}.",
1301                        retry_count + 1
1302                    );
1303                    return Ok(());
1304                }
1305                Err(e) => {
1306                    log::error!(
1307                        "Failed to start WebSocket on attempt {}: {:?}",
1308                        retry_count + 1,
1309                        e
1310                    );
1311                    retry_count += 1;
1312                    backoff_delay *= 2; // Exponential backoff
1313                }
1314            }
1315        }
1316
1317        log::error!(
1318            "Failed to restart WebSocket after {} attempts.",
1319            max_retries
1320        );
1321        Err(DexError::Other(format!(
1322            "Failed to restart WebSocket after {} attempts.",
1323            max_retries
1324        )))
1325    }
1326
1327    async fn set_leverage(&self, symbol: &str, leverage: u32) -> Result<(), DexError> {
1328        let asset = Self::extract_asset_name(symbol);
1329        self.exchange_client
1330            .update_leverage(leverage, asset, false, None)
1331            .await
1332            .map_err(|e| DexError::Other(e.to_string()))?;
1333        Ok(())
1334    }
1335
1336    async fn get_ticker(
1337        &self,
1338        symbol: &str,
1339        _test_price: Option<Decimal>,
1340    ) -> Result<TickerResponse, DexError> {
1341        if !self.running.load(Ordering::SeqCst) {
1342            return Err(DexError::NoConnection);
1343        }
1344
1345        let dynamic_info_guard = self.dynamic_market_info.read().await;
1346        let dynamic_info = dynamic_info_guard
1347            .get(symbol)
1348            .ok_or_else(|| DexError::Other("No dynamic market info available".to_string()))?;
1349        let price = dynamic_info
1350            .market_price
1351            .ok_or_else(|| DexError::Other("No price available".to_string()))?;
1352        let min_tick = dynamic_info.min_tick;
1353        let num_trades = dynamic_info.num_trades;
1354        let funding_rate = dynamic_info.funding_rate;
1355        let open_interest = dynamic_info.open_interest;
1356        let oracle_price = dynamic_info.oracle_price;
1357
1358        let cur_vol = dynamic_info.volume.unwrap_or(Decimal::ZERO);
1359        let mut lv = self.last_volumes.lock().await;
1360        let prev_vol = lv.get(symbol).cloned().unwrap_or(Decimal::ZERO);
1361        // make sure we never return a negative delta if cur_vol resets each candle
1362        let delta_vol = if cur_vol >= prev_vol {
1363            cur_vol - prev_vol
1364        } else {
1365            // volume counter has rolled over/reset at candle boundary
1366            cur_vol
1367        };
1368        lv.insert(symbol.to_string(), cur_vol);
1369
1370        Ok(TickerResponse {
1371            symbol: symbol.to_owned(),
1372            price,
1373            min_tick,
1374            min_order: None,
1375            volume: Some(delta_vol),
1376            num_trades,
1377            funding_rate,
1378            open_interest,
1379            oracle_price,
1380        })
1381    }
1382
1383    async fn get_filled_orders(&self, symbol: &str) -> Result<FilledOrdersResponse, DexError> {
1384        let mut response: Vec<FilledOrder> = vec![];
1385        let trade_results_guard = self.trade_results.read().await;
1386        let orders = match trade_results_guard.get(symbol) {
1387            Some(v) => v,
1388            None => return Ok(FilledOrdersResponse::default()),
1389        };
1390        for (trade_id, order) in orders.iter() {
1391            let filled_order = FilledOrder {
1392                order_id: order.order_id.clone(),
1393                trade_id: trade_id.clone(),
1394                is_rejected: order.is_rejected,
1395                filled_side: Some(order.filled_side.clone()),
1396                filled_size: Some(order.filled_size),
1397                filled_fee: Some(order.filled_fee),
1398                filled_value: Some(order.filled_value),
1399            };
1400            response.push(filled_order);
1401        }
1402
1403        Ok(FilledOrdersResponse { orders: response })
1404    }
1405
1406    async fn get_canceled_orders(&self, symbol: &str) -> Result<CanceledOrdersResponse, DexError> {
1407        let mut resp = Vec::new();
1408        let guard = self.canceled_results.read().await;
1409        if let Some(map) = guard.get(symbol) {
1410            for (_, evt) in map.iter() {
1411                resp.push(CanceledOrder {
1412                    order_id: evt.order_id.clone(),
1413                    canceled_timestamp: evt.timestamp,
1414                });
1415            }
1416        }
1417        Ok(CanceledOrdersResponse { orders: resp })
1418    }
1419
1420    async fn get_open_orders(&self, symbol: &str) -> Result<OpenOrdersResponse, DexError> {
1421        let all_orders = self.get_orders().await?;
1422
1423        // Convert symbol to internal representation if needed
1424        let target_coin = if let Some(internal_id) = self.spot_index_map.get(symbol) {
1425            format!("@{}", internal_id)
1426        } else {
1427            symbol.to_string()
1428        };
1429
1430        let filtered_orders: Vec<crate::OpenOrder> = all_orders
1431            .into_iter()
1432            .filter(|order| order.coin == target_coin || order.coin == symbol)
1433            .map(|hyperliquid_order| crate::OpenOrder {
1434                order_id: hyperliquid_order.oid.to_string(),
1435                symbol: symbol.to_string(),
1436                side: OrderSide::Long, // Default, we need more detailed API to get actual side
1437                size: Decimal::ZERO,   // Default, we need more detailed API to get actual size
1438                price: Decimal::ZERO,  // Default, we need more detailed API to get actual price
1439                status: "open".to_string(),
1440            })
1441            .collect();
1442
1443        Ok(OpenOrdersResponse {
1444            orders: filtered_orders,
1445        })
1446    }
1447
1448    async fn get_balance(&self, symbol: Option<&str>) -> Result<BalanceResponse, DexError> {
1449        // Always get both spot and perp balances
1450        let spot_action = HyperliquidDefaultPayload {
1451            r#type: "spotClearinghouseState".into(),
1452            user: Some(self.config.evm_wallet_address.clone()),
1453        };
1454        let spot_res: HyperliquidSpotBalanceResponse = self
1455            .handle_request_with_action("/info".into(), &spot_action)
1456            .await?;
1457
1458        let perp_action = HyperliquidDefaultPayload {
1459            r#type: "clearinghouseState".into(),
1460            user: Some(self.config.evm_wallet_address.clone()),
1461        };
1462        let perp_res = self
1463            .handle_request_with_action::<HyperliquidRetrieveUserStateResponse, _>(
1464                "/info".into(),
1465                &perp_action,
1466            )
1467            .await?;
1468
1469        log::debug!("spot balances = {:?}", spot_res.balances);
1470        log::debug!("perp margin summary = {:?}", perp_res.margin_summary);
1471
1472        if let Some(pair) = symbol {
1473            // "UBTC/USDC" → "UBTC"
1474            let base_coin = pair.split('/').next().unwrap_or(pair);
1475
1476            let mut usdc_total = Decimal::ZERO;
1477            let mut base_total = Decimal::ZERO;
1478            for b in &spot_res.balances {
1479                match b.coin.as_str() {
1480                    "USDC" => {
1481                        usdc_total = parse_to_decimal(&b.total)?;
1482                        log::debug!("USDC total = {}", usdc_total);
1483                    }
1484                    c if c == base_coin => {
1485                        base_total = parse_to_decimal(&b.total)?;
1486                        log::debug!("{} total = {}", c, base_total);
1487                    }
1488                    _ => {}
1489                }
1490            }
1491
1492            let price_key = pair.to_string();
1493            let px = self
1494                .get_market_price(&price_key)
1495                .await
1496                .unwrap_or(Decimal::ZERO);
1497
1498            log::debug!("price_key = {}, px = {}", price_key, px);
1499
1500            // Get perp balance as well
1501            let (perp_equity, perp_balance) = if let Some(summary) = &perp_res.margin_summary {
1502                let equity = parse_to_decimal(&summary.account_value)?;
1503                let balance = parse_to_decimal(&summary.total_rawusd)?;
1504                log::debug!("perp equity = {}, perp balance = {}", equity, balance);
1505                (equity, balance)
1506            } else {
1507                (Decimal::ZERO, Decimal::ZERO)
1508            };
1509
1510            // Combine spot and perp balances
1511            let spot_equity = base_total * px + usdc_total;
1512            let total_equity = spot_equity + perp_equity;
1513            let total_balance = usdc_total + perp_balance;
1514
1515            log::debug!(
1516                "final equity = {} (spot: {} + perp: {}), balance = {} (spot: {} + perp: {})",
1517                total_equity,
1518                spot_equity,
1519                perp_equity,
1520                total_balance,
1521                usdc_total,
1522                perp_balance
1523            );
1524
1525            return Ok(BalanceResponse {
1526                equity: total_equity,
1527                balance: total_balance,
1528                position_entry_price: None,
1529                position_sign: None,
1530            });
1531        }
1532
1533        // When no symbol is specified, return total balance from both spot and perp
1534        let mut total_equity = Decimal::ZERO;
1535        let mut total_balance = Decimal::ZERO;
1536
1537        // Add spot balances
1538        for b in &spot_res.balances {
1539            let balance = parse_to_decimal(&b.total)?;
1540            if b.coin == "USDC" {
1541                total_balance += balance;
1542                total_equity += balance;
1543            } else {
1544                // For non-USDC tokens, try to get their price and add to equity
1545                let symbol_key = format!("{}/USDC", b.coin);
1546                if let Ok(px) = self.get_market_price(&symbol_key).await {
1547                    total_equity += balance * px;
1548                }
1549            }
1550        }
1551
1552        // Add perp balances
1553        if let Some(summary) = perp_res.margin_summary {
1554            let perp_equity = parse_to_decimal(&summary.account_value)?;
1555            let perp_balance = parse_to_decimal(&summary.total_rawusd)?;
1556            total_equity += perp_equity;
1557            total_balance += perp_balance;
1558            log::debug!(
1559                "perp equity = {}, perp balance = {}",
1560                perp_equity,
1561                perp_balance
1562            );
1563        }
1564
1565        log::debug!(
1566            "final total equity = {}, total balance = {}",
1567            total_equity,
1568            total_balance
1569        );
1570        Ok(BalanceResponse {
1571            equity: total_equity,
1572            balance: total_balance,
1573            position_entry_price: None,
1574            position_sign: None,
1575        })
1576    }
1577
1578    async fn get_combined_balance(&self) -> Result<CombinedBalanceResponse, DexError> {
1579        // Hyperliquid connector: minimal implementation for compilation only
1580        Err(DexError::Other(
1581            "get_combined_balance not implemented for HyperLiquid".to_string(),
1582        ))
1583    }
1584
1585    async fn get_last_trades(&self, _symbol: &str) -> Result<LastTradesResponse, DexError> {
1586        // TODO: Implement HyperLiquid last trades functionality
1587        Err(DexError::Other(
1588            "get_last_trades not implemented for HyperLiquid".to_string(),
1589        ))
1590    }
1591
1592    async fn get_order_book(
1593        &self,
1594        _symbol: &str,
1595        _depth: usize,
1596    ) -> Result<OrderBookSnapshot, DexError> {
1597        Err(DexError::Other(
1598            "get_order_book not implemented for HyperLiquid".to_string(),
1599        ))
1600    }
1601
1602    async fn clear_filled_order(&self, symbol: &str, trade_id: &str) -> Result<(), DexError> {
1603        let mut m = self.trade_results.write().await;
1604        if let Some(map) = m.get_mut(symbol) {
1605            if map.remove(trade_id).is_some() {
1606                Ok(())
1607            } else {
1608                Err(DexError::Other(format!(
1609                    "filled trade(trade_id:{}({})) does not exist",
1610                    trade_id, symbol
1611                )))
1612            }
1613        } else {
1614            Err(DexError::Other(format!(
1615                "filled trade(symbol:{}({})) does not exist",
1616                symbol, trade_id
1617            )))
1618        }
1619    }
1620
1621    async fn clear_all_filled_orders(&self) -> Result<(), DexError> {
1622        let mut trade_results_guard = self.trade_results.write().await;
1623        trade_results_guard.clear();
1624        Ok(())
1625    }
1626
1627    async fn clear_canceled_order(&self, symbol: &str, order_id: &str) -> Result<(), DexError> {
1628        let mut guard = self.canceled_results.write().await;
1629        if let Some(map) = guard.get_mut(symbol) {
1630            if map.remove(order_id).is_some() {
1631                return Ok(());
1632            }
1633        }
1634        Err(DexError::Other(format!(
1635            "canceled order {} for {} not found",
1636            order_id, symbol
1637        )))
1638    }
1639
1640    async fn clear_all_canceled_orders(&self) -> Result<(), DexError> {
1641        self.canceled_results.write().await.clear();
1642        Ok(())
1643    }
1644
1645    async fn create_order(
1646        &self,
1647        symbol: &str,
1648        size: Decimal,
1649        side: OrderSide,
1650        price: Option<Decimal>,
1651        spread: Option<i64>,
1652        _expiry_secs: Option<u64>, // Ignored for Hyperliquid
1653    ) -> Result<CreateOrderResponse, DexError> {
1654        let (price, time_in_force) = match price {
1655            Some(v) => {
1656                if spread.is_some() {
1657                    let map = self.dynamic_market_info.read().await;
1658                    let info = map
1659                        .get(symbol)
1660                        .ok_or_else(|| DexError::Other(format!("No market info for {}", symbol)))?;
1661                    let bid = info
1662                        .best_bid
1663                        .ok_or_else(|| DexError::Other("No best_bid".into()))?;
1664                    let ask = info
1665                        .best_ask
1666                        .ok_or_else(|| DexError::Other("No best_ask".into()))?;
1667                    let mid = (bid + ask) * Decimal::new(5, 1);
1668                    let tick = info
1669                        .min_tick
1670                        .ok_or_else(|| DexError::Other("No min_tick".into()))?;
1671                    let spread = Decimal::from(spread.unwrap());
1672                    log::debug!(
1673                        "bid = {}, min = {}, ask = {}, tick = {}, spread = {}",
1674                        bid,
1675                        mid,
1676                        ask,
1677                        tick,
1678                        spread
1679                    );
1680                    let calc = if side == OrderSide::Long {
1681                        mid - tick * spread
1682                    } else {
1683                        mid + tick * spread
1684                    };
1685                    (calc, "Alo")
1686                } else {
1687                    (v, "Alo")
1688                }
1689            }
1690            None => {
1691                let price = self.get_worst_price(symbol, &side).await?;
1692                (price, "Ioc")
1693            }
1694        };
1695
1696        let dynamic_market_info_guard = self.dynamic_market_info.read().await;
1697        let market_info = dynamic_market_info_guard
1698            .get(symbol)
1699            .ok_or_else(|| DexError::Other("Market info not found".to_string()))?;
1700        let min_tick = market_info
1701            .min_tick
1702            .ok_or_else(|| DexError::Other("Min tick not set for market".to_string()))?;
1703
1704        let rounded_price = Self::round_price(price, min_tick, side.clone());
1705        let rounded_size = self.floor_size(size, symbol);
1706
1707        log::info!(
1708            "[create_order] sym={} tif={} px={} size={} notional={} min_tick={} sz_decimals={}",
1709            symbol,
1710            time_in_force,
1711            rounded_price,
1712            rounded_size,
1713            rounded_price * rounded_size,
1714            min_tick,
1715            self.static_market_info
1716                .get(symbol)
1717                .map(|m| m.decimals)
1718                .unwrap_or(0),
1719        );
1720        let asset = resolve_coin(symbol, &self.spot_index_map);
1721
1722        let order = ClientOrderRequest {
1723            asset,
1724            is_buy: side == OrderSide::Long,
1725            reduce_only: false,
1726            limit_px: rounded_price
1727                .to_f64()
1728                .ok_or_else(|| DexError::Other("Conversion to f64 failed".to_string()))?,
1729            sz: rounded_size
1730                .to_f64()
1731                .ok_or_else(|| DexError::Other("Conversion to f64 failed".to_string()))?,
1732            cloid: None,
1733            order_type: ClientOrder::Limit(ClientLimit {
1734                tif: time_in_force.to_string(),
1735            }),
1736        };
1737
1738        let res = self.exchange_client.order(order, None).await.map_err(|e| {
1739            log::error!(
1740                "[create_order] order failed: symbol = {}, size = {}, error = {}",
1741                symbol,
1742                rounded_size,
1743                e
1744            );
1745            DexError::Other(e.to_string())
1746        })?;
1747
1748        let res = match res {
1749            ExchangeResponseStatus::Ok(exchange_response) => exchange_response,
1750            ExchangeResponseStatus::Err(e) => return Err(DexError::ServerResponse(e.to_string())),
1751        };
1752        let status = res.data.unwrap().statuses[0].clone();
1753        let order_id = match status {
1754            ExchangeDataStatus::Filled(order) => order.oid,
1755            ExchangeDataStatus::Resting(order) => order.oid,
1756            _ => {
1757                return Err(DexError::ServerResponse(
1758                    "Unknown ExchangeDataStaus".to_owned(),
1759                ))
1760            }
1761        };
1762
1763        Ok(CreateOrderResponse {
1764            order_id: order_id.to_string(),
1765            ordered_price: rounded_price,
1766            ordered_size: rounded_size,
1767        })
1768    }
1769
1770    async fn create_advanced_trigger_order(
1771        &self,
1772        symbol: &str,
1773        size: Decimal,
1774        side: OrderSide,
1775        trigger_px: Decimal,
1776        limit_px: Option<Decimal>,
1777        order_style: TriggerOrderStyle,
1778        slippage_bps: Option<u32>,
1779        tpsl: TpSl,
1780        reduce_only: bool,
1781        expiry_secs: Option<u64>,
1782    ) -> Result<CreateOrderResponse, DexError> {
1783        // Log expiry_secs if provided
1784        if let Some(expiry) = expiry_secs {
1785            log::warn!(
1786                "🕐 [HYPERLIQUID_EXPIRY] expiry_secs={} specified but not directly supported by Hyperliquid trigger orders. Consider implementing auto-cancel mechanism.",
1787                expiry
1788            );
1789        }
1790
1791        let asset = resolve_coin(symbol, &self.spot_index_map);
1792
1793        // Round size to proper decimals for this symbol
1794        let rounded_size = self.floor_size(size, symbol);
1795
1796        let trigger_price = trigger_px
1797            .to_f64()
1798            .ok_or_else(|| DexError::Other("Failed to convert trigger_px to f64".into()))?;
1799        let sz = rounded_size
1800            .to_f64()
1801            .ok_or_else(|| DexError::Other("Failed to convert size to f64".into()))?;
1802
1803        let (is_market, final_limit_price_opt) = match order_style {
1804            TriggerOrderStyle::Market => (true, None), // Pure market order, no limit price
1805            TriggerOrderStyle::MarketWithSlippageControl => {
1806                if let Some(slippage) = slippage_bps {
1807                    // MarketWithSlippageControl: prioritize execution over price
1808                    // Calculate slippage-controlled price toward "worse" direction for guaranteed fills
1809                    let slippage_factor = Decimal::new(slippage as i64, 4); // convert bps to decimal
1810                    let adjusted_price = match side {
1811                        // Long SL/TP = Sell order: worse price = lower (less favorable)
1812                        OrderSide::Long => trigger_px * (Decimal::ONE - slippage_factor),
1813                        // Short SL/TP = Buy order: worse price = higher (less favorable)
1814                        OrderSide::Short => trigger_px * (Decimal::ONE + slippage_factor),
1815                    };
1816                    let limit_price = adjusted_price.to_f64().ok_or_else(|| {
1817                        DexError::Other("Failed to convert adjusted price to f64".into())
1818                    })?;
1819                    (false, Some(limit_price)) // Use limit order with calculated price
1820                } else {
1821                    (true, None) // Fallback to pure market
1822                }
1823            }
1824            TriggerOrderStyle::Limit => {
1825                let limit_price = limit_px
1826                    .ok_or_else(|| {
1827                        DexError::Other("limit_px required for Limit order style".into())
1828                    })?
1829                    .to_f64()
1830                    .ok_or_else(|| DexError::Other("Failed to convert limit_px to f64".into()))?;
1831
1832                // Validate that limit price is appropriate for the order side and TPSL type
1833                // The validation should be based on order execution direction, not position side
1834                match (side, tpsl) {
1835                    (OrderSide::Long, TpSl::Sl) => {
1836                        // Buy stop loss: limit should be >= trigger (worse price for buying)
1837                        if limit_price < trigger_price {
1838                            return Err(DexError::Other(
1839                                "For Buy Stop Loss, limit_px must be >= trigger_px".into(),
1840                            ));
1841                        }
1842                    }
1843                    (OrderSide::Short, TpSl::Sl) => {
1844                        // Sell stop loss: limit should be <= trigger (worse price for selling)
1845                        if limit_price > trigger_price {
1846                            return Err(DexError::Other(
1847                                "For Sell Stop Loss, limit_px must be <= trigger_px".into(),
1848                            ));
1849                        }
1850                    }
1851                    (OrderSide::Long, TpSl::Tp) => {
1852                        // Buy take profit: limit should be <= trigger (better price for buying)
1853                        if limit_price > trigger_price {
1854                            return Err(DexError::Other(
1855                                "For Buy Take Profit, limit_px must be <= trigger_px".into(),
1856                            ));
1857                        }
1858                    }
1859                    (OrderSide::Short, TpSl::Tp) => {
1860                        // Sell take profit: limit should be >= trigger (better price for selling)
1861                        if limit_price < trigger_price {
1862                            return Err(DexError::Other(
1863                                "For Sell Take Profit, limit_px must be >= trigger_px".into(),
1864                            ));
1865                        }
1866                    }
1867                }
1868                (false, Some(limit_price))
1869            }
1870        };
1871
1872        let is_buy = is_buy_for_tpsl(side);
1873
1874        let request = ClientOrderRequest {
1875            asset,
1876            is_buy,
1877            reduce_only, // Use the passed reduce_only argument
1878            // NOTE: is_market=true の場合、HL側で limit_px は無視される実装前提。
1879            // もし将来のAPIで解釈されるなら 0.0 は危険なのでOption化を検討。
1880            // Market order failures should be logged for debugging if 0.0 causes issues.
1881            limit_px: final_limit_price_opt.unwrap_or(0.0),
1882            sz,
1883            cloid: None,
1884            order_type: ClientOrder::Trigger(ClientTrigger {
1885                is_market,
1886                trigger_px: trigger_price,
1887                tpsl: format!("{:?}", tpsl).to_lowercase(),
1888            }),
1889        };
1890
1891        let resp_status = self
1892            .exchange_client
1893            .order(request, None)
1894            .await
1895            .map_err(|e| {
1896                DexError::Other(format!("Advanced trigger order request failed: {}", e))
1897            })?;
1898
1899        let exchange_response = match resp_status {
1900            ExchangeResponseStatus::Ok(x) => x,
1901            ExchangeResponseStatus::Err(e) => return Err(DexError::ServerResponse(e.to_string())),
1902        };
1903
1904        let status = exchange_response
1905            .data
1906            .unwrap()
1907            .statuses
1908            .into_iter()
1909            .next()
1910            .ok_or_else(|| DexError::Other("No order status returned".into()))?;
1911
1912        let oid = match status {
1913            ExchangeDataStatus::Filled(o) => o.oid,
1914            ExchangeDataStatus::Resting(o) => o.oid,
1915            _ => {
1916                return Err(DexError::ServerResponse(
1917                    "Unrecognized exchange status".into(),
1918                ))
1919            }
1920        };
1921
1922        // Calculate the actual ordered price based on what was sent to the exchange
1923        let ordered_price = match final_limit_price_opt {
1924            Some(px) => Decimal::from_f64(px).unwrap_or(trigger_px),
1925            None => trigger_px, // Market系:暫定的にtrigger参照(型追加できるなら Option に)
1926        };
1927
1928        let order_id = oid.to_string();
1929
1930        // TODO: Implement auto-cancel mechanism for expiry_secs
1931        // if let Some(expiry) = expiry_secs {
1932        //     tokio::spawn(async move {
1933        //         tokio::time::sleep(tokio::time::Duration::from_secs(expiry)).await;
1934        //         // Cancel order with order_id
1935        //         log::info!("Auto-canceling order {} after {} seconds", order_id, expiry);
1936        //     });
1937        // }
1938
1939        Ok(CreateOrderResponse {
1940            order_id,
1941            ordered_price,
1942            ordered_size: rounded_size,
1943        })
1944    }
1945
1946    async fn cancel_order(&self, symbol: &str, order_id: &str) -> Result<(), DexError> {
1947        let asset = resolve_coin(symbol, &self.spot_index_map);
1948        let cancel = ClientCancelRequest {
1949            asset,
1950            oid: u64::from_str(order_id).unwrap_or_default(),
1951        };
1952
1953        self.exchange_client
1954            .cancel(cancel, None)
1955            .await
1956            .map_err(|e| DexError::Other(e.to_string()))?;
1957
1958        Ok(())
1959    }
1960
1961    async fn cancel_all_orders(&self, symbol: Option<String>) -> Result<(), DexError> {
1962        let open_orders = self.get_orders().await?;
1963        let order_ids: Vec<String> = open_orders
1964            .iter()
1965            .filter_map(|order| {
1966                let idx_opt = order.coin.strip_prefix('@').and_then(|s| s.parse::<usize>().ok());
1967                let external_sym = idx_opt
1968                    .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
1969                    .unwrap_or_else(|| format!("{}-USD", order.coin));
1970
1971                    log::debug!(
1972                        "cancel_all_orders: raw coin = {}, idx = {:?}, external_sym = {:?}, target = {:?}",
1973                        order.coin, idx_opt, external_sym, symbol
1974                    );
1975
1976                if symbol.as_deref().map_or(true, |s| s == &external_sym) {
1977                    Some(order.oid.to_string())
1978                } else {
1979                    None
1980                }
1981            })
1982            .collect();
1983        self.cancel_orders(symbol, order_ids).await
1984    }
1985
1986    async fn cancel_orders(
1987        &self,
1988        symbol: Option<String>,
1989        order_ids: Vec<String>,
1990    ) -> Result<(), DexError> {
1991        let open_orders = self.get_orders().await?;
1992        let mut cancels = Vec::new();
1993
1994        for order in open_orders {
1995            let idx_opt = order
1996                .coin
1997                .strip_prefix('@')
1998                .and_then(|s| s.parse::<usize>().ok());
1999            let external_sym = idx_opt
2000                .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
2001                .unwrap_or_else(|| format!("{}-USD", order.coin));
2002
2003            log::debug!(
2004                    "cancel_orders: raw coin = {}, idx = {:?}, external_sym = {:?}, requested_ids = {:?}",
2005                    order.coin, idx_opt, external_sym, order_ids
2006                );
2007
2008            if symbol.as_deref().map_or(true, |s| s == &external_sym)
2009                && order_ids.contains(&order.oid.to_string())
2010            {
2011                let asset = resolve_coin(&external_sym, &self.spot_index_map);
2012                cancels.push(ClientCancelRequest {
2013                    asset,
2014                    oid: order.oid,
2015                });
2016            }
2017        }
2018
2019        if !cancels.is_empty() {
2020            self.exchange_client
2021                .bulk_cancel(cancels, None)
2022                .await
2023                .map_err(|e| DexError::Other(e.to_string()))?;
2024        }
2025        Ok(())
2026    }
2027
2028    async fn close_all_positions(&self, symbol: Option<String>) -> Result<(), DexError> {
2029        let open_positions = self.get_positions().await?;
2030        for p in open_positions {
2031            let position = p.position;
2032            let idx_opt = position
2033                .coin
2034                .strip_prefix('@')
2035                .and_then(|s| s.parse::<usize>().ok());
2036            let external_sym = idx_opt
2037                .and_then(|idx| self.spot_reverse_map.get(&idx).cloned())
2038                .unwrap_or_else(|| format!("{}-USD", position.coin));
2039            if symbol.as_deref().map_or(true, |s| s == &external_sym) {
2040                let reversed_side = if position.szi.is_sign_negative() {
2041                    OrderSide::Long
2042                } else {
2043                    OrderSide::Short
2044                };
2045                let size = position.szi.abs();
2046                let _ = self
2047                    .create_order(&external_sym, size, reversed_side, None, None, None) // No expiry for position closing
2048                    .await;
2049            }
2050        }
2051        Ok(())
2052    }
2053
2054    async fn clear_last_trades(&self, _symbol: &str) -> Result<(), DexError> {
2055        Ok(())
2056    }
2057
2058    async fn is_upcoming_maintenance(&self, hours_ahead: i64) -> bool {
2059        let info = self.maintenance.read().await;
2060        if let Some(start) = info.next_start {
2061            let now = Utc::now();
2062            let lead = ChronoDuration::hours(hours_ahead.max(0));
2063            // Treat as maintenance if scheduled within lead time OR already started within a 90-minute window.
2064            let active_window = ChronoDuration::minutes(90);
2065            let upcoming = now <= start && (start - now) <= lead;
2066            let already_active = now >= start && (now - start) <= active_window;
2067            let result = upcoming || already_active;
2068            log::debug!(
2069                "Hyperliquid maintenance check: start={:?} now={:?} upcoming={} active={} result={}",
2070                start,
2071                now,
2072                upcoming,
2073                already_active,
2074                result
2075            );
2076            return result;
2077        }
2078        log::debug!("Hyperliquid maintenance check: no cached start");
2079        false
2080    }
2081
2082    async fn sign_evm_65b(&self, _message: &str) -> Result<String, DexError> {
2083        Err(DexError::Other(
2084            "65B EVM signature not supported for Hyperliquid".to_string(),
2085        ))
2086    }
2087
2088    async fn sign_evm_65b_with_eip191(&self, _message: &str) -> Result<String, DexError> {
2089        Err(DexError::Other(
2090            "65B EIP-191 signature not supported for Hyperliquid".to_string(),
2091        ))
2092    }
2093}
2094
2095impl HyperliquidConnector {
2096    async fn handle_request_with_action<T, U>(
2097        &self,
2098        request_url: String,
2099        action: &U,
2100    ) -> Result<T, DexError>
2101    where
2102        T: for<'de> Deserialize<'de>,
2103        U: Serialize + std::fmt::Debug + Clone,
2104    {
2105        let json_payload =
2106            serde_json::to_value(action).map_err(|e| DexError::Other(e.to_string()))?;
2107
2108        log::debug!("json_payload = {:?}", json_payload);
2109
2110        self.request
2111            .handle_request::<T, U>(
2112                HttpMethod::Post,
2113                request_url,
2114                &HashMap::new(),
2115                json_payload.to_string(),
2116            )
2117            .await
2118            .map_err(|e| DexError::Other(e.to_string()))
2119    }
2120
2121    async fn get_positions(
2122        &self,
2123    ) -> Result<Vec<HyperliquidRetriveUserPositionResponseBody>, DexError> {
2124        let request_url = "/info";
2125        let action = HyperliquidDefaultPayload {
2126            r#type: "clearinghouseState".to_owned(),
2127            user: Some(self.config.evm_wallet_address.clone()),
2128        };
2129        let res: HyperliquidRetriveUserPositionResponse = self
2130            .handle_request_with_action::<HyperliquidRetriveUserPositionResponse, HyperliquidDefaultPayload>(
2131                request_url.to_string(),
2132                &action,
2133            )
2134            .await?;
2135
2136        Ok(res.asset_positions)
2137    }
2138
2139    async fn get_orders(&self) -> Result<Vec<HyperliquidRetriveUserOpenOrder>, DexError> {
2140        let request_url = "/info";
2141        let action = HyperliquidDefaultPayload {
2142            r#type: "openOrders".to_owned(),
2143            user: Some(self.config.evm_wallet_address.clone()),
2144        };
2145        let res: Vec<HyperliquidRetriveUserOpenOrder> = self
2146            .handle_request_with_action::<Vec<HyperliquidRetriveUserOpenOrder>, HyperliquidDefaultPayload>(
2147                request_url.to_string(),
2148                &action,
2149            )
2150            .await?;
2151
2152        Ok(res)
2153    }
2154
2155    async fn retrive_market_metadata(&mut self) -> Result<(), DexError> {
2156        let request_url = "/info";
2157        let action = HyperliquidDefaultPayload {
2158            r#type: "meta".to_owned(),
2159            user: None,
2160        };
2161        let res = self
2162            .handle_request_with_action::<HyperliquidRetriveMarketMetadataResponse, HyperliquidDefaultPayload>(
2163                request_url.to_string(),
2164                &action,
2165            )
2166            .await?;
2167
2168        let mut static_market_info_update = HashMap::new();
2169        for metadata in res.universe.into_iter() {
2170            let market_id = format!("{}-USD", metadata.name);
2171            static_market_info_update.insert(
2172                market_id,
2173                StaticMarketInfo {
2174                    decimals: metadata.decimals,
2175                    _max_leverage: metadata.max_leverage,
2176                },
2177            );
2178        }
2179
2180        self.static_market_info = static_market_info_update;
2181
2182        Ok(())
2183    }
2184
2185    async fn get_worst_price(&self, symbol: &str, side: &OrderSide) -> Result<Decimal, DexError> {
2186        let market_price = self.get_market_price(symbol).await?;
2187
2188        let worst_price = slippage_price(market_price, *side == OrderSide::Long);
2189        Ok(worst_price)
2190    }
2191
2192    async fn get_market_price(&self, symbol: &str) -> Result<Decimal, DexError> {
2193        let market_info_guard = self.dynamic_market_info.read().await;
2194        match market_info_guard.get(symbol) {
2195            Some(v) => match v.market_price {
2196                Some(price) => Ok(price),
2197                None => Err(DexError::Other("Price is None".to_string())),
2198            },
2199            None => Err(DexError::Other("No price available".to_string())),
2200        }
2201    }
2202
2203    fn calculate_min_tick(price: Decimal, sz_decimals: u32, is_spot: bool) -> Decimal {
2204        log::trace!(
2205            "calculate_min_tick called: price={}, sz_decimals={}, is_spot={}",
2206            price,
2207            sz_decimals,
2208            is_spot
2209        );
2210
2211        let price_str = price.to_string();
2212        let integer_part = price_str.split('.').next().unwrap_or("");
2213        let integer_digits = if integer_part == "0" {
2214            0
2215        } else {
2216            integer_part.len()
2217        };
2218
2219        let scale_by_sig: u32 = if integer_digits >= 5 {
2220            0
2221        } else {
2222            (5 - integer_digits) as u32
2223        };
2224
2225        let max_decimals: u32 = if is_spot { 8u32 } else { 6u32 };
2226        let scale_by_dec: u32 = max_decimals.saturating_sub(sz_decimals);
2227        let scale: u32 = scale_by_sig.min(scale_by_dec);
2228
2229        log::trace!(
2230            "calculate_min_tick internals: integer_digits={}, scale_by_sig={}, max_decimals={}, scale_by_dec={}, scale={}",
2231            integer_digits,
2232            scale_by_sig,
2233            max_decimals,
2234            scale_by_dec,
2235            scale
2236        );
2237
2238        let min_tick = Decimal::new(1, scale);
2239
2240        log::trace!(
2241            "calculate_min_tick result: min_tick={}, (1e-{})",
2242            min_tick,
2243            scale
2244        );
2245
2246        min_tick
2247    }
2248
2249    fn round_price(price: Decimal, min_tick: Decimal, order_side: OrderSide) -> Decimal {
2250        if min_tick.is_zero() {
2251            log::error!("round_price: min_tick is zero");
2252            return price;
2253        }
2254
2255        match order_side {
2256            OrderSide::Long => (price / min_tick).floor() * min_tick,
2257            OrderSide::Short => (price / min_tick).ceil() * min_tick,
2258        }
2259    }
2260
2261    fn floor_size(&self, size: Decimal, symbol: &str) -> Decimal {
2262        let decimals = match self.static_market_info.get(symbol) {
2263            Some(v) => v.decimals,
2264            None => {
2265                log::error!("symbol meta is not available: {}", symbol);
2266                return size;
2267            }
2268        };
2269
2270        size.round_dp(decimals)
2271    }
2272
2273    fn extract_asset_name(symbol: &str) -> &str {
2274        symbol.split('-').next().unwrap_or(symbol)
2275    }
2276
2277    /// Wait until best_bid and best_ask are available for all configured symbols
2278    async fn wait_for_market_ready(&self, timeout_secs: u64) -> Result<(), DexError> {
2279        use tokio::time::{sleep, Instant};
2280
2281        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
2282
2283        loop {
2284            let mut all_ready = true;
2285            {
2286                let map = self.dynamic_market_info.read().await;
2287                for symbol in &self.config.symbol_list {
2288                    if let Some(info) = map.get(symbol) {
2289                        log::warn!(
2290                            "[wait_for_market_ready] symbol = {}, best_bid = {:?}, best_ask = {:?}",
2291                            symbol,
2292                            info.best_bid,
2293                            info.best_ask
2294                        );
2295                        if info.best_bid.is_none() || info.best_ask.is_none() {
2296                            log::info!("Waiting for best_bid/best_ask for symbol: {}", symbol);
2297                            all_ready = false;
2298                            break;
2299                        }
2300                    } else {
2301                        log::info!("Market info not found yet for symbol: {}", symbol);
2302                        all_ready = false;
2303                        break;
2304                    }
2305                }
2306            }
2307
2308            if all_ready {
2309                log::info!("All symbols are market-ready.");
2310                return Ok(());
2311            }
2312
2313            if Instant::now() >= deadline {
2314                return Err(DexError::Other(
2315                    "Timed out waiting for market data".to_string(),
2316                ));
2317            }
2318
2319            sleep(Duration::from_millis(200)).await;
2320        }
2321    }
2322}
2323
2324fn resolve_coin(sym: &str, map: &HashMap<String, usize>) -> String {
2325    if sym.contains('/') {
2326        // ---- Spot ----
2327        match map.get(sym) {
2328            Some(idx) => format!("@{}", idx),
2329            None => {
2330                log::warn!("resolve_coin: {} is not in spot_index_map", sym);
2331                sym.to_string()
2332            }
2333        }
2334    } else if let Some(base) = sym.strip_suffix("-USD") {
2335        // ---- Perp ----
2336        base.to_string()
2337    } else {
2338        sym.to_string()
2339    }
2340}