Skip to main content

guilder_client_hyperliquid/
client.rs

1use alloy_primitives::Address;
2use guilder_abstraction::{self, L2Update, Fill, AssetContext, Liquidation, BoxStream, Side, OrderSide, OrderStatus, OrderType, TimeInForce, OrderPlacement, Position, OpenOrder, UserFill, OrderUpdate, FundingPayment, Deposit, Withdrawal};
3use futures_util::{stream, SinkExt, StreamExt};
4use reqwest::Client;
5use rust_decimal::Decimal;
6use serde::Deserialize;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr;
10use tokio_tungstenite::{connect_async, tungstenite::Message};
11
12const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
13const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
14const HYPERLIQUID_WS_URL: &str = "wss://api.hyperliquid.xyz/ws";
15
16pub struct HyperliquidClient {
17    client: Client,
18    user_address: Option<Address>,
19    private_key: Option<String>,
20}
21
22impl HyperliquidClient {
23    pub fn new() -> Self {
24        HyperliquidClient { client: Client::new(), user_address: None, private_key: None }
25    }
26
27    pub fn with_auth(user_address: Address, private_key: String) -> Self {
28        HyperliquidClient { client: Client::new(), user_address: Some(user_address), private_key: Some(private_key) }
29    }
30
31    fn require_user_address(&self) -> Result<String, String> {
32        self.user_address
33            .map(|a| format!("{:#x}", a))
34            .ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
35    }
36
37    fn require_private_key(&self) -> Result<&str, String> {
38        self.private_key.as_deref().ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
39    }
40
41    async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
42        let resp = self.client
43            .post(HYPERLIQUID_INFO_URL)
44            .json(&serde_json::json!({"type": "meta"}))
45            .send()
46            .await
47            .map_err(|e| e.to_string())?;
48        let meta: MetaResponse = resp.json().await.map_err(|e| e.to_string())?;
49        meta.universe.iter()
50            .position(|a| a.name == symbol)
51            .ok_or_else(|| format!("symbol {} not found", symbol))
52    }
53
54    async fn submit_signed_action(&self, action: Value, vault_address: Option<&str>) -> Result<Value, String> {
55        let private_key = self.require_private_key()?;
56        let nonce = std::time::SystemTime::now()
57            .duration_since(std::time::UNIX_EPOCH)
58            .unwrap()
59            .as_millis() as u64;
60
61        let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
62
63        let payload = serde_json::json!({
64            "action": action,
65            "nonce": nonce,
66            "signature": {"r": r, "s": s, "v": v},
67            "vaultAddress": null
68        });
69
70        let resp = self.client
71            .post(HYPERLIQUID_EXCHANGE_URL)
72            .json(&payload)
73            .send()
74            .await
75            .map_err(|e| e.to_string())?;
76
77        let body: Value = resp.json().await.map_err(|e| e.to_string())?;
78        if body["status"].as_str() == Some("err") {
79            return Err(body["response"].as_str().unwrap_or("unknown error").to_string());
80        }
81        Ok(body)
82    }
83}
84
85// --- REST deserialization types ---
86
87#[derive(Deserialize)]
88struct MetaResponse {
89    universe: Vec<AssetInfo>,
90}
91
92#[derive(Deserialize)]
93struct AssetInfo {
94    name: String,
95}
96
97type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
98
99#[derive(Deserialize)]
100#[serde(rename_all = "camelCase")]
101#[allow(dead_code)]
102struct RestAssetCtx {
103    open_interest: String,
104    funding: String,
105    mark_px: String,
106    day_ntl_vlm: String,
107}
108
109#[derive(Deserialize)]
110#[serde(rename_all = "camelCase")]
111struct ClearinghouseStateResponse {
112    margin_summary: MarginSummary,
113    asset_positions: Vec<AssetPosition>,
114}
115
116#[derive(Deserialize)]
117#[serde(rename_all = "camelCase")]
118struct MarginSummary {
119    account_value: String,
120}
121
122#[derive(Deserialize)]
123struct AssetPosition {
124    position: PositionDetail,
125}
126
127#[derive(Deserialize)]
128#[serde(rename_all = "camelCase")]
129struct PositionDetail {
130    coin: String,
131    /// positive = long, negative = short
132    szi: String,
133    entry_px: Option<String>,
134}
135
136#[derive(Deserialize)]
137#[serde(rename_all = "camelCase")]
138struct RestOpenOrder {
139    coin: String,
140    side: String,
141    limit_px: String,
142    sz: String,
143    oid: i64,
144    orig_sz: String,
145}
146
147// --- WebSocket envelope and data shapes ---
148
149#[derive(Deserialize)]
150struct WsEnvelope {
151    channel: String,
152    data: Value,
153}
154
155#[derive(Deserialize)]
156struct WsBook {
157    coin: String,
158    levels: Vec<Vec<WsLevel>>,
159    time: i64,
160}
161
162#[derive(Deserialize)]
163struct WsLevel {
164    px: String,
165    sz: String,
166}
167
168#[derive(Deserialize)]
169#[serde(rename_all = "camelCase")]
170struct WsAssetCtx {
171    coin: String,
172    ctx: WsPerpsCtx,
173}
174
175#[derive(Deserialize)]
176#[serde(rename_all = "camelCase")]
177struct WsPerpsCtx {
178    open_interest: String,
179    funding: String,
180    mark_px: String,
181    day_ntl_vlm: String,
182}
183
184#[derive(Deserialize)]
185struct WsUserEvent {
186    liquidation: Option<WsLiquidation>,
187    fills: Option<Vec<WsUserFill>>,
188    funding: Option<WsFunding>,
189}
190
191#[derive(Deserialize)]
192struct WsLiquidation {
193    liquidated_user: String,
194    liquidated_ntl_pos: String,
195    liquidated_account_value: String,
196}
197
198#[derive(Deserialize)]
199struct WsUserFill {
200    coin: String,
201    px: String,
202    sz: String,
203    side: String,
204    time: i64,
205    oid: i64,
206    fee: String,
207}
208
209#[derive(Deserialize)]
210struct WsFunding {
211    time: i64,
212    coin: String,
213    usdc: String,
214}
215
216#[derive(Deserialize)]
217struct WsTrade {
218    coin: String,
219    side: String,
220    px: String,
221    sz: String,
222    time: i64,
223    tid: i64,
224}
225
226#[derive(Deserialize)]
227struct WsOrderUpdate {
228    order: WsOrderInfo,
229    status: String,
230    #[serde(rename = "statusTimestamp")]
231    status_timestamp: i64,
232}
233
234#[derive(Deserialize)]
235#[serde(rename_all = "camelCase")]
236struct WsOrderInfo {
237    coin: String,
238    side: String,
239    limit_px: String,
240    sz: String,
241    oid: i64,
242    orig_sz: String,
243}
244
245// --- WebSocket ledger update shapes (deposits / withdrawals) ---
246
247#[derive(Deserialize)]
248struct WsLedgerUpdates {
249    updates: Vec<WsLedgerEntry>,
250}
251
252#[derive(Deserialize)]
253struct WsLedgerEntry {
254    time: i64,
255    delta: WsLedgerDelta,
256}
257
258#[derive(Deserialize)]
259struct WsLedgerDelta {
260    #[serde(rename = "type")]
261    kind: String,
262    usdc: Option<String>,
263}
264
265// --- Helpers ---
266
267fn parse_decimal(s: &str) -> Option<Decimal> {
268    Decimal::from_str(s).ok()
269}
270
271fn keccak256(data: &[u8]) -> [u8; 32] {
272    use sha3::{Digest, Keccak256};
273    Keccak256::digest(data).into()
274}
275
276/// EIP-712 domain separator for Hyperliquid mainnet (Arbitrum, chainId=42161).
277fn hyperliquid_domain_separator() -> [u8; 32] {
278    let type_hash = keccak256(b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)");
279    let name_hash = keccak256(b"Exchange");
280    let version_hash = keccak256(b"1");
281    let mut chain_id = [0u8; 32];
282    chain_id[28..32].copy_from_slice(&42161u32.to_be_bytes());
283    let verifying_contract = [0u8; 32];
284
285    let mut data = [0u8; 160];
286    data[..32].copy_from_slice(&type_hash);
287    data[32..64].copy_from_slice(&name_hash);
288    data[64..96].copy_from_slice(&version_hash);
289    data[96..128].copy_from_slice(&chain_id);
290    data[128..160].copy_from_slice(&verifying_contract);
291    keccak256(&data)
292}
293
294/// Signs a Hyperliquid exchange action using EIP-712.
295/// Returns (r, s, v) where r and s are "0x"-prefixed hex strings and v is 27 or 28.
296fn sign_action(private_key: &str, action: &Value, vault_address: Option<&str>, nonce: u64) -> Result<(String, String, u8), String> {
297    use k256::ecdsa::SigningKey;
298
299    // Step 1: msgpack-encode the action, append nonce + vault flag
300    let msgpack_bytes = rmp_serde::to_vec(action).map_err(|e| e.to_string())?;
301    let mut data = msgpack_bytes;
302    data.extend_from_slice(&nonce.to_be_bytes());
303    match vault_address {
304        None => data.push(0u8),
305        Some(addr) => {
306            data.push(1u8);
307            let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
308                .map_err(|e| format!("invalid vault address: {}", e))?;
309            data.extend_from_slice(&addr_bytes);
310        }
311    }
312    let connection_id = keccak256(&data);
313
314    // Step 2: hash the Agent struct
315    let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
316    let source_hash = keccak256(b"a"); // "a" = mainnet
317    let mut struct_data = [0u8; 96];
318    struct_data[..32].copy_from_slice(&agent_type_hash);
319    struct_data[32..64].copy_from_slice(&source_hash);
320    struct_data[64..96].copy_from_slice(&connection_id);
321    let struct_hash = keccak256(&struct_data);
322
323    // Step 3: EIP-712 final hash
324    let domain_sep = hyperliquid_domain_separator();
325    let mut final_data = Vec::with_capacity(66);
326    final_data.extend_from_slice(b"\x19\x01");
327    final_data.extend_from_slice(&domain_sep);
328    final_data.extend_from_slice(&struct_hash);
329    let final_hash = keccak256(&final_data);
330
331    // Step 4: sign with secp256k1
332    let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
333        .map_err(|e| format!("invalid private key: {}", e))?;
334    let signing_key = SigningKey::from_bytes(key_bytes.as_slice().into())
335        .map_err(|e| e.to_string())?;
336    let (sig, recovery_id) = signing_key.sign_prehash_recoverable(&final_hash)
337        .map_err(|e| e.to_string())?;
338
339    let sig_bytes = sig.to_bytes();
340    let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
341    let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
342    let v = 27u8 + recovery_id.to_byte();
343
344    Ok((r, s, v))
345}
346
347// --- Trait implementations ---
348
349#[allow(async_fn_in_trait)]
350impl guilder_abstraction::TestServer for HyperliquidClient {
351    /// Sends a lightweight allMids request; returns true if the server responds 200 OK.
352    async fn ping(&self) -> Result<bool, String> {
353        self.client
354            .post(HYPERLIQUID_INFO_URL)
355            .json(&serde_json::json!({"type": "allMids"}))
356            .send()
357            .await
358            .map(|r| r.status().is_success())
359            .map_err(|e| e.to_string())
360    }
361
362    /// Hyperliquid has no dedicated server-time endpoint; returns local UTC ms.
363    async fn get_server_time(&self) -> Result<i64, String> {
364        Ok(std::time::SystemTime::now()
365            .duration_since(std::time::UNIX_EPOCH)
366            .map(|d| d.as_millis() as i64)
367            .unwrap_or(0))
368    }
369}
370
371#[allow(async_fn_in_trait)]
372impl guilder_abstraction::GetMarketData for HyperliquidClient {
373    /// Returns all perpetual asset names from Hyperliquid's meta endpoint.
374    async fn get_symbol(&self) -> Result<Vec<String>, String> {
375        let resp = self.client
376            .post(HYPERLIQUID_INFO_URL)
377            .json(&serde_json::json!({"type": "meta"}))
378            .send()
379            .await
380            .map_err(|e| e.to_string())?;
381        resp.json::<MetaResponse>()
382            .await
383            .map(|r| r.universe.into_iter().map(|a| a.name).collect())
384            .map_err(|e| e.to_string())
385    }
386
387    /// Returns the current open interest for `symbol` from metaAndAssetCtxs.
388    async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
389        let resp = self.client
390            .post(HYPERLIQUID_INFO_URL)
391            .json(&serde_json::json!({"type": "metaAndAssetCtxs"}))
392            .send()
393            .await
394            .map_err(|e| e.to_string())?;
395        let (meta, ctxs) = resp.json::<MetaAndAssetCtxsResponse>()
396            .await
397            .map_err(|e| e.to_string())?;
398        meta.universe.iter()
399            .position(|a| a.name == symbol)
400            .and_then(|i| ctxs.get(i))
401            .and_then(|ctx| parse_decimal(&ctx.open_interest))
402            .ok_or_else(|| format!("symbol {} not found", symbol))
403    }
404
405    /// Returns a full AssetContext snapshot for `symbol` from metaAndAssetCtxs.
406    async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
407        let resp = self.client
408            .post(HYPERLIQUID_INFO_URL)
409            .json(&serde_json::json!({"type": "metaAndAssetCtxs"}))
410            .send()
411            .await
412            .map_err(|e| e.to_string())?;
413        let (meta, ctxs) = resp.json::<MetaAndAssetCtxsResponse>()
414            .await
415            .map_err(|e| e.to_string())?;
416        let idx = meta.universe.iter()
417            .position(|a| a.name == symbol)
418            .ok_or_else(|| format!("symbol {} not found", symbol))?;
419        let ctx = ctxs.get(idx).ok_or_else(|| format!("symbol {} not found", symbol))?;
420        Ok(AssetContext {
421            symbol,
422            open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
423            funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
424            mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
425            day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
426        })
427    }
428
429    /// Returns the mid-price of `symbol` (e.g. "BTC") from allMids.
430    async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
431        let resp = self.client
432            .post(HYPERLIQUID_INFO_URL)
433            .json(&serde_json::json!({"type": "allMids"}))
434            .send()
435            .await
436            .map_err(|e| e.to_string())?;
437        resp.json::<HashMap<String, String>>()
438            .await
439            .map_err(|e| e.to_string())?
440            .get(&symbol)
441            .and_then(|s| parse_decimal(s))
442            .ok_or_else(|| format!("symbol {} not found", symbol))
443    }
444}
445
446#[allow(async_fn_in_trait)]
447impl guilder_abstraction::ManageOrder for HyperliquidClient {
448    /// Places an order on Hyperliquid. Requires `with_auth`. Returns an `OrderPlacement` with
449    /// the exchange-assigned order ID. Market orders are submitted as aggressive limit orders (IOC).
450    async fn place_order(&self, symbol: String, side: OrderSide, price: Decimal, volume: Decimal, order_type: OrderType, time_in_force: TimeInForce) -> Result<OrderPlacement, String> {
451        let asset_idx = self.get_asset_index(&symbol).await?;
452        let is_buy = matches!(side, OrderSide::Buy);
453
454        let tif_str = match time_in_force {
455            TimeInForce::Gtc => "Gtc",
456            TimeInForce::Ioc => "Ioc",
457            TimeInForce::Fok => "Fok",
458        };
459        // Market orders are IOC limit orders at a wide price
460        let order_type_val = match order_type {
461            OrderType::Limit => serde_json::json!({"limit": {"tif": tif_str}}),
462            OrderType::Market => serde_json::json!({"limit": {"tif": "Ioc"}}),
463        };
464
465        let action = serde_json::json!({
466            "type": "order",
467            "orders": [{
468                "a": asset_idx,
469                "b": is_buy,
470                "p": price.to_string(),
471                "s": volume.to_string(),
472                "r": false,
473                "t": order_type_val
474            }],
475            "grouping": "na"
476        });
477
478        let resp = self.submit_signed_action(action, None).await?;
479        let oid = resp["response"]["data"]["statuses"][0]["resting"]["oid"]
480            .as_i64()
481            .or_else(|| resp["response"]["data"]["statuses"][0]["filled"]["oid"].as_i64())
482            .ok_or_else(|| format!("unexpected response: {}", resp))?;
483
484        let timestamp_ms = std::time::SystemTime::now()
485            .duration_since(std::time::UNIX_EPOCH)
486            .unwrap()
487            .as_millis() as i64;
488
489        Ok(OrderPlacement { order_id: oid, symbol, side, price, quantity: volume, timestamp_ms })
490    }
491
492    /// Modifies price and size of an existing order by its order ID. Requires `with_auth`.
493    /// Fetches the order's current coin and side before submitting the modify action.
494    async fn change_order_by_cloid(&self, cloid: i64, price: Decimal, volume: Decimal) -> Result<i64, String> {
495        let user = self.require_user_address()?;
496
497        let resp = self.client
498            .post(HYPERLIQUID_INFO_URL)
499            .json(&serde_json::json!({"type": "openOrders", "user": user}))
500            .send()
501            .await
502            .map_err(|e| e.to_string())?;
503        let orders: Vec<RestOpenOrder> = resp.json().await.map_err(|e| e.to_string())?;
504        let order = orders.iter()
505            .find(|o| o.oid == cloid)
506            .ok_or_else(|| format!("order {} not found", cloid))?;
507
508        let asset_idx = self.get_asset_index(&order.coin).await?;
509        let is_buy = order.side == "B";
510
511        let action = serde_json::json!({
512            "type": "batchModify",
513            "modifies": [{
514                "oid": cloid,
515                "order": {
516                    "a": asset_idx,
517                    "b": is_buy,
518                    "p": price.to_string(),
519                    "s": volume.to_string(),
520                    "r": false,
521                    "t": {"limit": {"tif": "Gtc"}}
522                }
523            }]
524        });
525
526        self.submit_signed_action(action, None).await?;
527        Ok(cloid)
528    }
529
530    /// Cancels a single order by its order ID. Requires `with_auth`.
531    /// Fetches open orders to resolve the coin/asset before cancelling.
532    async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
533        let user = self.require_user_address()?;
534
535        let resp = self.client
536            .post(HYPERLIQUID_INFO_URL)
537            .json(&serde_json::json!({"type": "openOrders", "user": user}))
538            .send()
539            .await
540            .map_err(|e| e.to_string())?;
541        let orders: Vec<RestOpenOrder> = resp.json().await.map_err(|e| e.to_string())?;
542        let order = orders.iter()
543            .find(|o| o.oid == cloid)
544            .ok_or_else(|| format!("order {} not found", cloid))?;
545
546        let asset_idx = self.get_asset_index(&order.coin).await?;
547        let action = serde_json::json!({
548            "type": "cancel",
549            "cancels": [{"a": asset_idx, "o": cloid}]
550        });
551
552        self.submit_signed_action(action, None).await?;
553        Ok(cloid)
554    }
555
556    /// Cancels all open orders. Requires `with_auth`.
557    /// Fetches all open orders and submits a batch cancel in a single signed request.
558    async fn cancel_all_order(&self) -> Result<bool, String> {
559        let user = self.require_user_address()?;
560
561        let resp = self.client
562            .post(HYPERLIQUID_INFO_URL)
563            .json(&serde_json::json!({"type": "openOrders", "user": user}))
564            .send()
565            .await
566            .map_err(|e| e.to_string())?;
567        let orders: Vec<RestOpenOrder> = resp.json().await.map_err(|e| e.to_string())?;
568        if orders.is_empty() {
569            return Ok(true);
570        }
571
572        let meta_resp = self.client
573            .post(HYPERLIQUID_INFO_URL)
574            .json(&serde_json::json!({"type": "meta"}))
575            .send()
576            .await
577            .map_err(|e| e.to_string())?;
578        let meta: MetaResponse = meta_resp.json().await.map_err(|e| e.to_string())?;
579
580        let cancels: Vec<Value> = orders.iter()
581            .filter_map(|o| {
582                let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
583                Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
584            })
585            .collect();
586
587        let action = serde_json::json!({"type": "cancel", "cancels": cancels});
588        self.submit_signed_action(action, None).await?;
589        Ok(true)
590    }
591}
592
593#[allow(async_fn_in_trait)]
594impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
595    /// Streams L2 orderbook updates for `symbol`. Each message from Hyperliquid is a
596    /// full-depth snapshot; every level is emitted as an individual `L2Update` event.
597    /// All levels in the same snapshot share the same `sequence` value.
598    fn subscribe_l2_update(&self, symbol: String) -> BoxStream<L2Update> {
599        Box::pin(async_stream::stream! {
600            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
601            let sub = serde_json::json!({
602                "method": "subscribe",
603                "subscription": {"type": "l2Book", "coin": symbol}
604            });
605            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
606
607            while let Some(Ok(Message::Text(text))) = ws.next().await {
608                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
609                if env.channel != "l2Book" { continue; }
610                let Ok(book) = serde_json::from_value::<WsBook>(env.data) else { continue; };
611
612                for level in book.levels.first().into_iter().flatten() {
613                    if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
614                        yield L2Update { symbol: book.coin.clone(), price, volume, side: Side::Ask, sequence: book.time };
615                    }
616                }
617                for level in book.levels.get(1).into_iter().flatten() {
618                    if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
619                        yield L2Update { symbol: book.coin.clone(), price, volume, side: Side::Bid, sequence: book.time };
620                    }
621                }
622            }
623        })
624    }
625
626    /// Streams asset context updates for `symbol` via Hyperliquid's `activeAssetCtx` subscription.
627    fn subscribe_asset_context(&self, symbol: String) -> BoxStream<AssetContext> {
628        Box::pin(async_stream::stream! {
629            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
630            let sub = serde_json::json!({
631                "method": "subscribe",
632                "subscription": {"type": "activeAssetCtx", "coin": symbol}
633            });
634            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
635
636            while let Some(Ok(Message::Text(text))) = ws.next().await {
637                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
638                if env.channel != "activeAssetCtx" { continue; }
639                let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else { continue; };
640                let ctx = &update.ctx;
641                if let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
642                    parse_decimal(&ctx.open_interest),
643                    parse_decimal(&ctx.funding),
644                    parse_decimal(&ctx.mark_px),
645                    parse_decimal(&ctx.day_ntl_vlm),
646                ) {
647                    yield AssetContext { symbol: update.coin, open_interest, funding_rate, mark_price, day_volume };
648                }
649            }
650        })
651    }
652
653    /// Streams liquidation events for a user address via Hyperliquid's `userEvents` subscription.
654    /// Hyperliquid's liquidation event is account-level; `symbol` is empty and `side` is `Sell`.
655    fn subscribe_liquidation(&self, user: String) -> BoxStream<Liquidation> {
656        Box::pin(async_stream::stream! {
657            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
658            let sub = serde_json::json!({
659                "method": "subscribe",
660                "subscription": {"type": "userEvents", "user": user}
661            });
662            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
663
664            while let Some(Ok(Message::Text(text))) = ws.next().await {
665                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
666                if env.channel != "userEvents" { continue; }
667                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
668                let Some(liq) = event.liquidation else { continue; };
669                if let (Some(notional_position), Some(account_value)) = (
670                    parse_decimal(&liq.liquidated_ntl_pos),
671                    parse_decimal(&liq.liquidated_account_value),
672                ) {
673                    yield Liquidation {
674                        symbol: String::new(),
675                        side: OrderSide::Sell,
676                        liquidated_user: liq.liquidated_user,
677                        notional_position,
678                        account_value,
679                    };
680                }
681            }
682        })
683    }
684
685    /// Streams public trade fills for `symbol`. "B" (buyer aggressor) → Buy, otherwise → Sell.
686    fn subscribe_fill(&self, symbol: String) -> BoxStream<Fill> {
687        Box::pin(async_stream::stream! {
688            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
689            let sub = serde_json::json!({
690                "method": "subscribe",
691                "subscription": {"type": "trades", "coin": symbol}
692            });
693            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
694
695            while let Some(Ok(Message::Text(text))) = ws.next().await {
696                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
697                if env.channel != "trades" { continue; }
698                let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else { continue; };
699
700                for trade in trades {
701                    let side = if trade.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
702                    if let (Some(price), Some(volume)) = (parse_decimal(&trade.px), parse_decimal(&trade.sz)) {
703                        yield Fill { symbol: trade.coin, price, volume, side, timestamp_ms: trade.time, trade_id: trade.tid };
704                    }
705                }
706            }
707        })
708    }
709}
710
711#[allow(async_fn_in_trait)]
712impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
713    /// Returns open positions from `clearinghouseState`. Requires `with_auth`.
714    /// Zero-size positions are filtered out. Positive `szi` = long, negative = short.
715    async fn get_positions(&self) -> Result<Vec<Position>, String> {
716        let user = self.require_user_address()?;
717        let resp = self.client
718            .post(HYPERLIQUID_INFO_URL)
719            .json(&serde_json::json!({"type": "clearinghouseState", "user": user}))
720            .send()
721            .await
722            .map_err(|e| e.to_string())?;
723        let state: ClearinghouseStateResponse = resp.json().await.map_err(|e| e.to_string())?;
724
725        Ok(state.asset_positions.into_iter()
726            .filter_map(|ap| {
727                let p = ap.position;
728                let size = parse_decimal(&p.szi)?;
729                if size.is_zero() { return None; }
730                let entry_price = p.entry_px.as_deref().and_then(parse_decimal).unwrap_or_default();
731                let side = if size > Decimal::ZERO { OrderSide::Buy } else { OrderSide::Sell };
732                Some(Position { symbol: p.coin, side, size: size.abs(), entry_price })
733            })
734            .collect())
735    }
736
737    /// Returns resting orders from Hyperliquid's `openOrders` endpoint. Requires `with_auth`.
738    /// `filled_quantity` is derived as `origSz - sz` (original size minus remaining size).
739    async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
740        let user = self.require_user_address()?;
741        let resp = self.client
742            .post(HYPERLIQUID_INFO_URL)
743            .json(&serde_json::json!({"type": "openOrders", "user": user}))
744            .send()
745            .await
746            .map_err(|e| e.to_string())?;
747        let orders: Vec<RestOpenOrder> = resp.json().await.map_err(|e| e.to_string())?;
748
749        Ok(orders.into_iter()
750            .filter_map(|o| {
751                let price = parse_decimal(&o.limit_px)?;
752                let quantity = parse_decimal(&o.orig_sz)?;
753                let remaining = parse_decimal(&o.sz)?;
754                let filled_quantity = quantity - remaining;
755                let side = if o.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
756                Some(OpenOrder { order_id: o.oid, symbol: o.coin, side, price, quantity, filled_quantity })
757            })
758            .collect())
759    }
760
761    /// Returns total account value (collateral) from `clearinghouseState`. Requires `with_auth`.
762    async fn get_collateral(&self) -> Result<Decimal, String> {
763        let user = self.require_user_address()?;
764        let resp = self.client
765            .post(HYPERLIQUID_INFO_URL)
766            .json(&serde_json::json!({"type": "clearinghouseState", "user": user}))
767            .send()
768            .await
769            .map_err(|e| e.to_string())?;
770        let state: ClearinghouseStateResponse = resp.json().await.map_err(|e| e.to_string())?;
771        parse_decimal(&state.margin_summary.account_value)
772            .ok_or_else(|| "invalid account value".to_string())
773    }
774}
775
776#[allow(async_fn_in_trait)]
777impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
778    /// Streams the user's own order executions via the `userEvents` WS subscription.
779    /// Requires `with_auth` (streams are empty if no user address is set).
780    fn subscribe_user_fills(&self) -> BoxStream<UserFill> {
781        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
782        let user = format!("{:#x}", addr);
783        Box::pin(async_stream::stream! {
784            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
785            let sub = serde_json::json!({
786                "method": "subscribe",
787                "subscription": {"type": "userEvents", "user": user}
788            });
789            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
790
791            while let Some(Ok(Message::Text(text))) = ws.next().await {
792                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
793                if env.channel != "userEvents" { continue; }
794                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
795                for fill in event.fills.unwrap_or_default() {
796                    let side = if fill.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
797                    if let (Some(price), Some(quantity), Some(fee_usd)) = (
798                        parse_decimal(&fill.px),
799                        parse_decimal(&fill.sz),
800                        parse_decimal(&fill.fee),
801                    ) {
802                        yield UserFill { order_id: fill.oid, symbol: fill.coin, side, price, quantity, fee_usd, timestamp_ms: fill.time };
803                    }
804                }
805            }
806        })
807    }
808
809    /// Streams order lifecycle events via the `orderUpdates` WS subscription.
810    /// Requires `with_auth`.
811    fn subscribe_order_updates(&self) -> BoxStream<OrderUpdate> {
812        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
813        let user = format!("{:#x}", addr);
814        Box::pin(async_stream::stream! {
815            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
816            let sub = serde_json::json!({
817                "method": "subscribe",
818                "subscription": {"type": "orderUpdates", "user": user}
819            });
820            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
821
822            while let Some(Ok(Message::Text(text))) = ws.next().await {
823                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
824                if env.channel != "orderUpdates" { continue; }
825                let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else { continue; };
826                for upd in updates {
827                    let status = match upd.status.as_str() {
828                        "open" => OrderStatus::Placed,
829                        "filled" => OrderStatus::Filled,
830                        "canceled" | "cancelled" => OrderStatus::Cancelled,
831                        _ => OrderStatus::PartiallyFilled,
832                    };
833                    let side = if upd.order.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
834                    yield OrderUpdate {
835                        order_id: upd.order.oid,
836                        symbol: upd.order.coin,
837                        status,
838                        side: Some(side),
839                        price: parse_decimal(&upd.order.limit_px),
840                        quantity: parse_decimal(&upd.order.orig_sz),
841                        remaining_quantity: parse_decimal(&upd.order.sz),
842                        timestamp_ms: upd.status_timestamp,
843                    };
844                }
845            }
846        })
847    }
848
849    /// Streams funding payments applied to positions via the `userEvents` WS subscription.
850    /// Requires `with_auth`.
851    fn subscribe_funding_payments(&self) -> BoxStream<FundingPayment> {
852        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
853        let user = format!("{:#x}", addr);
854        Box::pin(async_stream::stream! {
855            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
856            let sub = serde_json::json!({
857                "method": "subscribe",
858                "subscription": {"type": "userEvents", "user": user}
859            });
860            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
861
862            while let Some(Ok(Message::Text(text))) = ws.next().await {
863                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
864                if env.channel != "userEvents" { continue; }
865                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
866                let Some(funding) = event.funding else { continue; };
867                if let Some(amount_usd) = parse_decimal(&funding.usdc) {
868                    yield FundingPayment { symbol: funding.coin, amount_usd, timestamp_ms: funding.time };
869                }
870            }
871        })
872    }
873
874    fn subscribe_deposits(&self) -> BoxStream<Deposit> {
875        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
876        let user = format!("{:#x}", addr);
877        Box::pin(async_stream::stream! {
878            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
879            let sub = serde_json::json!({
880                "method": "subscribe",
881                "subscription": {"type": "userNonFundingLedgerUpdates", "user": user}
882            });
883            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
884            while let Some(Ok(Message::Text(text))) = ws.next().await {
885                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
886                if env.channel != "userNonFundingLedgerUpdates" { continue; }
887                let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else { continue; };
888                for entry in ledger.updates {
889                    if entry.delta.kind == "deposit" {
890                        if let Some(amount_usd) = entry.delta.usdc.as_deref().and_then(parse_decimal) {
891                            yield Deposit { asset: "USDC".to_string(), amount_usd, timestamp_ms: entry.time };
892                        }
893                    }
894                }
895            }
896        })
897    }
898
899    fn subscribe_withdrawals(&self) -> BoxStream<Withdrawal> {
900        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
901        let user = format!("{:#x}", addr);
902        Box::pin(async_stream::stream! {
903            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
904            let sub = serde_json::json!({
905                "method": "subscribe",
906                "subscription": {"type": "userNonFundingLedgerUpdates", "user": user}
907            });
908            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
909            while let Some(Ok(Message::Text(text))) = ws.next().await {
910                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
911                if env.channel != "userNonFundingLedgerUpdates" { continue; }
912                let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else { continue; };
913                for entry in ledger.updates {
914                    if entry.delta.kind == "withdraw" {
915                        if let Some(amount_usd) = entry.delta.usdc.as_deref().and_then(parse_decimal) {
916                            yield Withdrawal { asset: "USDC".to_string(), amount_usd, timestamp_ms: entry.time };
917                        }
918                    }
919                }
920            }
921        })
922    }
923}