Skip to main content

guilder_client_hyperliquid/
client.rs

1use alloy_primitives::Address;
2use guilder_abstraction::{self, L2Update, Fill, AssetContext, Liquidation, BoxStream, Side, OrderSide, OrderStatus, OrderType, TimeInForce, OrderPlacement, Position, OpenOrder, UserFill, OrderUpdate, FundingPayment, Deposit, Withdrawal};
3use futures_util::{stream, SinkExt, StreamExt};
4use reqwest::Client;
5use rust_decimal::Decimal;
6use serde::Deserialize;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr;
10use tokio_tungstenite::{connect_async, tungstenite::Message};
11
12const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
13const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
14const HYPERLIQUID_WS_URL: &str = "wss://api.hyperliquid.xyz/ws";
15
16async fn parse_response<T: for<'de> serde::Deserialize<'de>>(resp: reqwest::Response) -> Result<T, String> {
17    let text = resp.text().await.map_err(|e| e.to_string())?;
18    serde_json::from_str(&text).map_err(|e| format!("{e}: {text}"))
19}
20
21pub struct HyperliquidClient {
22    client: Client,
23    user_address: Option<Address>,
24    private_key: Option<String>,
25}
26
27impl HyperliquidClient {
28    pub fn new() -> Self {
29        HyperliquidClient { client: Client::new(), user_address: None, private_key: None }
30    }
31
32    pub fn with_auth(user_address: Address, private_key: String) -> Self {
33        HyperliquidClient { client: Client::new(), user_address: Some(user_address), private_key: Some(private_key) }
34    }
35
36    fn require_user_address(&self) -> Result<String, String> {
37        self.user_address
38            .map(|a| format!("{:#x}", a))
39            .ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
40    }
41
42    fn require_private_key(&self) -> Result<&str, String> {
43        self.private_key.as_deref().ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
44    }
45
46    async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
47        let resp = self.client
48            .post(HYPERLIQUID_INFO_URL)
49            .json(&serde_json::json!({"type": "meta"}))
50            .send()
51            .await
52            .map_err(|e| e.to_string())?;
53        let meta: MetaResponse = parse_response(resp).await?;
54        meta.universe.iter()
55            .position(|a| a.name == symbol)
56            .ok_or_else(|| format!("symbol {} not found", symbol))
57    }
58
59    async fn submit_signed_action(&self, action: Value, vault_address: Option<&str>) -> Result<Value, String> {
60        let private_key = self.require_private_key()?;
61        let nonce = std::time::SystemTime::now()
62            .duration_since(std::time::UNIX_EPOCH)
63            .unwrap()
64            .as_millis() as u64;
65
66        let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
67
68        let payload = serde_json::json!({
69            "action": action,
70            "nonce": nonce,
71            "signature": {"r": r, "s": s, "v": v},
72            "vaultAddress": null
73        });
74
75        let resp = self.client
76            .post(HYPERLIQUID_EXCHANGE_URL)
77            .json(&payload)
78            .send()
79            .await
80            .map_err(|e| e.to_string())?;
81
82        let body: Value = parse_response(resp).await?;
83        if body["status"].as_str() == Some("err") {
84            return Err(body["response"].as_str().unwrap_or("unknown error").to_string());
85        }
86        Ok(body)
87    }
88}
89
90// --- REST deserialization types ---
91
92#[derive(Deserialize)]
93struct MetaResponse {
94    universe: Vec<AssetInfo>,
95}
96
97#[derive(Deserialize)]
98struct AssetInfo {
99    name: String,
100}
101
102type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
103
104#[derive(Deserialize)]
105#[serde(rename_all = "camelCase")]
106#[allow(dead_code)]
107struct RestAssetCtx {
108    open_interest: String,
109    funding: String,
110    mark_px: String,
111    day_ntl_vlm: String,
112}
113
114#[derive(Deserialize)]
115#[serde(rename_all = "camelCase")]
116struct ClearinghouseStateResponse {
117    margin_summary: MarginSummary,
118    asset_positions: Vec<AssetPosition>,
119}
120
121#[derive(Deserialize)]
122#[serde(rename_all = "camelCase")]
123struct MarginSummary {
124    account_value: String,
125}
126
127#[derive(Deserialize)]
128struct AssetPosition {
129    position: PositionDetail,
130}
131
132#[derive(Deserialize)]
133#[serde(rename_all = "camelCase")]
134struct PositionDetail {
135    coin: String,
136    /// positive = long, negative = short
137    szi: String,
138    entry_px: Option<String>,
139}
140
141#[derive(Deserialize)]
142#[serde(rename_all = "camelCase")]
143struct RestOpenOrder {
144    coin: String,
145    side: String,
146    limit_px: String,
147    sz: String,
148    oid: i64,
149    orig_sz: String,
150}
151
152// --- WebSocket envelope and data shapes ---
153
154#[derive(Deserialize)]
155struct WsEnvelope {
156    channel: String,
157    data: Value,
158}
159
160#[derive(Deserialize)]
161struct WsBook {
162    coin: String,
163    levels: Vec<Vec<WsLevel>>,
164    time: i64,
165}
166
167#[derive(Deserialize)]
168struct WsLevel {
169    px: String,
170    sz: String,
171}
172
173#[derive(Deserialize)]
174#[serde(rename_all = "camelCase")]
175struct WsAssetCtx {
176    coin: String,
177    ctx: WsPerpsCtx,
178}
179
180#[derive(Deserialize)]
181#[serde(rename_all = "camelCase")]
182struct WsPerpsCtx {
183    open_interest: String,
184    funding: String,
185    mark_px: String,
186    day_ntl_vlm: String,
187}
188
189#[derive(Deserialize)]
190struct WsUserEvent {
191    liquidation: Option<WsLiquidation>,
192    fills: Option<Vec<WsUserFill>>,
193    funding: Option<WsFunding>,
194}
195
196#[derive(Deserialize)]
197struct WsLiquidation {
198    liquidated_user: String,
199    liquidated_ntl_pos: String,
200    liquidated_account_value: String,
201}
202
203#[derive(Deserialize)]
204struct WsUserFill {
205    coin: String,
206    px: String,
207    sz: String,
208    side: String,
209    time: i64,
210    oid: i64,
211    fee: String,
212}
213
214#[derive(Deserialize)]
215struct WsFunding {
216    time: i64,
217    coin: String,
218    usdc: String,
219}
220
221#[derive(Deserialize)]
222struct WsTrade {
223    coin: String,
224    side: String,
225    px: String,
226    sz: String,
227    time: i64,
228    tid: i64,
229}
230
231#[derive(Deserialize)]
232struct WsOrderUpdate {
233    order: WsOrderInfo,
234    status: String,
235    #[serde(rename = "statusTimestamp")]
236    status_timestamp: i64,
237}
238
239#[derive(Deserialize)]
240#[serde(rename_all = "camelCase")]
241struct WsOrderInfo {
242    coin: String,
243    side: String,
244    limit_px: String,
245    sz: String,
246    oid: i64,
247    orig_sz: String,
248}
249
250// --- WebSocket ledger update shapes (deposits / withdrawals) ---
251
252#[derive(Deserialize)]
253struct WsLedgerUpdates {
254    updates: Vec<WsLedgerEntry>,
255}
256
257#[derive(Deserialize)]
258struct WsLedgerEntry {
259    time: i64,
260    delta: WsLedgerDelta,
261}
262
263#[derive(Deserialize)]
264struct WsLedgerDelta {
265    #[serde(rename = "type")]
266    kind: String,
267    usdc: Option<String>,
268}
269
270// --- Helpers ---
271
272fn parse_decimal(s: &str) -> Option<Decimal> {
273    Decimal::from_str(s).ok()
274}
275
276fn keccak256(data: &[u8]) -> [u8; 32] {
277    use sha3::{Digest, Keccak256};
278    Keccak256::digest(data).into()
279}
280
281/// EIP-712 domain separator for Hyperliquid mainnet (Arbitrum, chainId=42161).
282fn hyperliquid_domain_separator() -> [u8; 32] {
283    let type_hash = keccak256(b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)");
284    let name_hash = keccak256(b"Exchange");
285    let version_hash = keccak256(b"1");
286    let mut chain_id = [0u8; 32];
287    chain_id[28..32].copy_from_slice(&42161u32.to_be_bytes());
288    let verifying_contract = [0u8; 32];
289
290    let mut data = [0u8; 160];
291    data[..32].copy_from_slice(&type_hash);
292    data[32..64].copy_from_slice(&name_hash);
293    data[64..96].copy_from_slice(&version_hash);
294    data[96..128].copy_from_slice(&chain_id);
295    data[128..160].copy_from_slice(&verifying_contract);
296    keccak256(&data)
297}
298
299/// Signs a Hyperliquid exchange action using EIP-712.
300/// Returns (r, s, v) where r and s are "0x"-prefixed hex strings and v is 27 or 28.
301fn sign_action(private_key: &str, action: &Value, vault_address: Option<&str>, nonce: u64) -> Result<(String, String, u8), String> {
302    use k256::ecdsa::SigningKey;
303
304    // Step 1: msgpack-encode the action, append nonce + vault flag
305    let msgpack_bytes = rmp_serde::to_vec(action).map_err(|e| e.to_string())?;
306    let mut data = msgpack_bytes;
307    data.extend_from_slice(&nonce.to_be_bytes());
308    match vault_address {
309        None => data.push(0u8),
310        Some(addr) => {
311            data.push(1u8);
312            let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
313                .map_err(|e| format!("invalid vault address: {}", e))?;
314            data.extend_from_slice(&addr_bytes);
315        }
316    }
317    let connection_id = keccak256(&data);
318
319    // Step 2: hash the Agent struct
320    let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
321    let source_hash = keccak256(b"a"); // "a" = mainnet
322    let mut struct_data = [0u8; 96];
323    struct_data[..32].copy_from_slice(&agent_type_hash);
324    struct_data[32..64].copy_from_slice(&source_hash);
325    struct_data[64..96].copy_from_slice(&connection_id);
326    let struct_hash = keccak256(&struct_data);
327
328    // Step 3: EIP-712 final hash
329    let domain_sep = hyperliquid_domain_separator();
330    let mut final_data = Vec::with_capacity(66);
331    final_data.extend_from_slice(b"\x19\x01");
332    final_data.extend_from_slice(&domain_sep);
333    final_data.extend_from_slice(&struct_hash);
334    let final_hash = keccak256(&final_data);
335
336    // Step 4: sign with secp256k1
337    let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
338        .map_err(|e| format!("invalid private key: {}", e))?;
339    let signing_key = SigningKey::from_bytes(key_bytes.as_slice().into())
340        .map_err(|e| e.to_string())?;
341    let (sig, recovery_id) = signing_key.sign_prehash_recoverable(&final_hash)
342        .map_err(|e| e.to_string())?;
343
344    let sig_bytes = sig.to_bytes();
345    let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
346    let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
347    let v = 27u8 + recovery_id.to_byte();
348
349    Ok((r, s, v))
350}
351
352// --- Trait implementations ---
353
354#[allow(async_fn_in_trait)]
355impl guilder_abstraction::TestServer for HyperliquidClient {
356    /// Sends a lightweight allMids request; returns true if the server responds 200 OK.
357    async fn ping(&self) -> Result<bool, String> {
358        self.client
359            .post(HYPERLIQUID_INFO_URL)
360            .json(&serde_json::json!({"type": "allMids"}))
361            .send()
362            .await
363            .map(|r| r.status().is_success())
364            .map_err(|e| e.to_string())
365    }
366
367    /// Hyperliquid has no dedicated server-time endpoint; returns local UTC ms.
368    async fn get_server_time(&self) -> Result<i64, String> {
369        Ok(std::time::SystemTime::now()
370            .duration_since(std::time::UNIX_EPOCH)
371            .map(|d| d.as_millis() as i64)
372            .unwrap_or(0))
373    }
374}
375
376#[allow(async_fn_in_trait)]
377impl guilder_abstraction::GetMarketData for HyperliquidClient {
378    /// Returns all perpetual asset names from Hyperliquid's meta endpoint.
379    async fn get_symbol(&self) -> Result<Vec<String>, String> {
380        let resp = self.client
381            .post(HYPERLIQUID_INFO_URL)
382            .json(&serde_json::json!({"type": "meta"}))
383            .send()
384            .await
385            .map_err(|e| e.to_string())?;
386        parse_response::<MetaResponse>(resp).await
387            .map(|r| r.universe.into_iter().map(|a| a.name).collect())
388    }
389
390    /// Returns the current open interest for `symbol` from metaAndAssetCtxs.
391    async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
392        let resp = self.client
393            .post(HYPERLIQUID_INFO_URL)
394            .json(&serde_json::json!({"type": "metaAndAssetCtxs"}))
395            .send()
396            .await
397            .map_err(|e| e.to_string())?;
398        let (meta, ctxs) = parse_response::<MetaAndAssetCtxsResponse>(resp).await?;
399        meta.universe.iter()
400            .position(|a| a.name == symbol)
401            .and_then(|i| ctxs.get(i))
402            .and_then(|ctx| parse_decimal(&ctx.open_interest))
403            .ok_or_else(|| format!("symbol {} not found", symbol))
404    }
405
406    /// Returns a full AssetContext snapshot for `symbol` from metaAndAssetCtxs.
407    async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
408        let resp = self.client
409            .post(HYPERLIQUID_INFO_URL)
410            .json(&serde_json::json!({"type": "metaAndAssetCtxs"}))
411            .send()
412            .await
413            .map_err(|e| e.to_string())?;
414        let (meta, ctxs) = parse_response::<MetaAndAssetCtxsResponse>(resp).await?;
415        let idx = meta.universe.iter()
416            .position(|a| a.name == symbol)
417            .ok_or_else(|| format!("symbol {} not found", symbol))?;
418        let ctx = ctxs.get(idx).ok_or_else(|| format!("symbol {} not found", symbol))?;
419        Ok(AssetContext {
420            symbol,
421            open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
422            funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
423            mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
424            day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
425        })
426    }
427
428    /// Returns a full L2 orderbook snapshot for `symbol` from the l2Book REST endpoint.
429    /// Levels are returned as individual `L2Update` items; all share the same `sequence` (timestamp).
430    async fn get_l2_orderbook(&self, symbol: String) -> Result<Vec<L2Update>, String> {
431        let resp = self.client
432            .post(HYPERLIQUID_INFO_URL)
433            .json(&serde_json::json!({"type": "l2Book", "coin": symbol}))
434            .send()
435            .await
436            .map_err(|e| e.to_string())?;
437        let book: WsBook = parse_response(resp).await?;
438        let mut levels = Vec::new();
439        for level in book.levels.first().into_iter().flatten() {
440            if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
441                levels.push(L2Update { symbol: book.coin.clone(), price, volume, side: Side::Ask, sequence: book.time });
442            }
443        }
444        for level in book.levels.get(1).into_iter().flatten() {
445            if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
446                levels.push(L2Update { symbol: book.coin.clone(), price, volume, side: Side::Bid, sequence: book.time });
447            }
448        }
449        Ok(levels)
450    }
451
452    /// Returns the mid-price of `symbol` (e.g. "BTC") from allMids.
453    async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
454        let resp = self.client
455            .post(HYPERLIQUID_INFO_URL)
456            .json(&serde_json::json!({"type": "allMids"}))
457            .send()
458            .await
459            .map_err(|e| e.to_string())?;
460        parse_response::<HashMap<String, String>>(resp).await?
461            .get(&symbol)
462            .and_then(|s| parse_decimal(s))
463            .ok_or_else(|| format!("symbol {} not found", symbol))
464    }
465}
466
467#[allow(async_fn_in_trait)]
468impl guilder_abstraction::ManageOrder for HyperliquidClient {
469    /// Places an order on Hyperliquid. Requires `with_auth`. Returns an `OrderPlacement` with
470    /// the exchange-assigned order ID. Market orders are submitted as aggressive limit orders (IOC).
471    async fn place_order(&self, symbol: String, side: OrderSide, price: Decimal, volume: Decimal, order_type: OrderType, time_in_force: TimeInForce) -> Result<OrderPlacement, String> {
472        let asset_idx = self.get_asset_index(&symbol).await?;
473        let is_buy = matches!(side, OrderSide::Buy);
474
475        let tif_str = match time_in_force {
476            TimeInForce::Gtc => "Gtc",
477            TimeInForce::Ioc => "Ioc",
478            TimeInForce::Fok => "Fok",
479        };
480        // Market orders are IOC limit orders at a wide price
481        let order_type_val = match order_type {
482            OrderType::Limit => serde_json::json!({"limit": {"tif": tif_str}}),
483            OrderType::Market => serde_json::json!({"limit": {"tif": "Ioc"}}),
484        };
485
486        let action = serde_json::json!({
487            "type": "order",
488            "orders": [{
489                "a": asset_idx,
490                "b": is_buy,
491                "p": price.to_string(),
492                "s": volume.to_string(),
493                "r": false,
494                "t": order_type_val
495            }],
496            "grouping": "na"
497        });
498
499        let resp = self.submit_signed_action(action, None).await?;
500        let oid = resp["response"]["data"]["statuses"][0]["resting"]["oid"]
501            .as_i64()
502            .or_else(|| resp["response"]["data"]["statuses"][0]["filled"]["oid"].as_i64())
503            .ok_or_else(|| format!("unexpected response: {}", resp))?;
504
505        let timestamp_ms = std::time::SystemTime::now()
506            .duration_since(std::time::UNIX_EPOCH)
507            .unwrap()
508            .as_millis() as i64;
509
510        Ok(OrderPlacement { order_id: oid, symbol, side, price, quantity: volume, timestamp_ms })
511    }
512
513    /// Modifies price and size of an existing order by its order ID. Requires `with_auth`.
514    /// Fetches the order's current coin and side before submitting the modify action.
515    async fn change_order_by_cloid(&self, cloid: i64, price: Decimal, volume: Decimal) -> Result<i64, String> {
516        let user = self.require_user_address()?;
517
518        let resp = self.client
519            .post(HYPERLIQUID_INFO_URL)
520            .json(&serde_json::json!({"type": "openOrders", "user": user}))
521            .send()
522            .await
523            .map_err(|e| e.to_string())?;
524        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
525        let order = orders.iter()
526            .find(|o| o.oid == cloid)
527            .ok_or_else(|| format!("order {} not found", cloid))?;
528
529        let asset_idx = self.get_asset_index(&order.coin).await?;
530        let is_buy = order.side == "B";
531
532        let action = serde_json::json!({
533            "type": "batchModify",
534            "modifies": [{
535                "oid": cloid,
536                "order": {
537                    "a": asset_idx,
538                    "b": is_buy,
539                    "p": price.to_string(),
540                    "s": volume.to_string(),
541                    "r": false,
542                    "t": {"limit": {"tif": "Gtc"}}
543                }
544            }]
545        });
546
547        self.submit_signed_action(action, None).await?;
548        Ok(cloid)
549    }
550
551    /// Cancels a single order by its order ID. Requires `with_auth`.
552    /// Fetches open orders to resolve the coin/asset before cancelling.
553    async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
554        let user = self.require_user_address()?;
555
556        let resp = self.client
557            .post(HYPERLIQUID_INFO_URL)
558            .json(&serde_json::json!({"type": "openOrders", "user": user}))
559            .send()
560            .await
561            .map_err(|e| e.to_string())?;
562        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
563        let order = orders.iter()
564            .find(|o| o.oid == cloid)
565            .ok_or_else(|| format!("order {} not found", cloid))?;
566
567        let asset_idx = self.get_asset_index(&order.coin).await?;
568        let action = serde_json::json!({
569            "type": "cancel",
570            "cancels": [{"a": asset_idx, "o": cloid}]
571        });
572
573        self.submit_signed_action(action, None).await?;
574        Ok(cloid)
575    }
576
577    /// Cancels all open orders. Requires `with_auth`.
578    /// Fetches all open orders and submits a batch cancel in a single signed request.
579    async fn cancel_all_order(&self) -> Result<bool, String> {
580        let user = self.require_user_address()?;
581
582        let resp = self.client
583            .post(HYPERLIQUID_INFO_URL)
584            .json(&serde_json::json!({"type": "openOrders", "user": user}))
585            .send()
586            .await
587            .map_err(|e| e.to_string())?;
588        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
589        if orders.is_empty() {
590            return Ok(true);
591        }
592
593        let meta_resp = self.client
594            .post(HYPERLIQUID_INFO_URL)
595            .json(&serde_json::json!({"type": "meta"}))
596            .send()
597            .await
598            .map_err(|e| e.to_string())?;
599        let meta: MetaResponse = parse_response(meta_resp).await?;
600
601        let cancels: Vec<Value> = orders.iter()
602            .filter_map(|o| {
603                let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
604                Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
605            })
606            .collect();
607
608        let action = serde_json::json!({"type": "cancel", "cancels": cancels});
609        self.submit_signed_action(action, None).await?;
610        Ok(true)
611    }
612}
613
614#[allow(async_fn_in_trait)]
615impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
616    /// Streams L2 orderbook updates for `symbol`. Each message from Hyperliquid is a
617    /// full-depth snapshot; every level is emitted as an individual `L2Update` event.
618    /// All levels in the same snapshot share the same `sequence` value.
619    fn subscribe_l2_update(&self, symbol: String) -> BoxStream<L2Update> {
620        Box::pin(async_stream::stream! {
621            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
622            let sub = serde_json::json!({
623                "method": "subscribe",
624                "subscription": {"type": "l2Book", "coin": symbol}
625            });
626            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
627
628            while let Some(Ok(Message::Text(text))) = ws.next().await {
629                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
630                if env.channel != "l2Book" { continue; }
631                let Ok(book) = serde_json::from_value::<WsBook>(env.data) else { continue; };
632
633                for level in book.levels.first().into_iter().flatten() {
634                    if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
635                        yield L2Update { symbol: book.coin.clone(), price, volume, side: Side::Ask, sequence: book.time };
636                    }
637                }
638                for level in book.levels.get(1).into_iter().flatten() {
639                    if let (Some(price), Some(volume)) = (parse_decimal(&level.px), parse_decimal(&level.sz)) {
640                        yield L2Update { symbol: book.coin.clone(), price, volume, side: Side::Bid, sequence: book.time };
641                    }
642                }
643            }
644        })
645    }
646
647    /// Streams asset context updates for `symbol` via Hyperliquid's `activeAssetCtx` subscription.
648    fn subscribe_asset_context(&self, symbol: String) -> BoxStream<AssetContext> {
649        Box::pin(async_stream::stream! {
650            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
651            let sub = serde_json::json!({
652                "method": "subscribe",
653                "subscription": {"type": "activeAssetCtx", "coin": symbol}
654            });
655            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
656
657            while let Some(Ok(Message::Text(text))) = ws.next().await {
658                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
659                if env.channel != "activeAssetCtx" { continue; }
660                let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else { continue; };
661                let ctx = &update.ctx;
662                if let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
663                    parse_decimal(&ctx.open_interest),
664                    parse_decimal(&ctx.funding),
665                    parse_decimal(&ctx.mark_px),
666                    parse_decimal(&ctx.day_ntl_vlm),
667                ) {
668                    yield AssetContext { symbol: update.coin, open_interest, funding_rate, mark_price, day_volume };
669                }
670            }
671        })
672    }
673
674    /// Streams liquidation events for a user address via Hyperliquid's `userEvents` subscription.
675    /// Hyperliquid's liquidation event is account-level; `symbol` is empty and `side` is `Sell`.
676    fn subscribe_liquidation(&self, user: String) -> BoxStream<Liquidation> {
677        Box::pin(async_stream::stream! {
678            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
679            let sub = serde_json::json!({
680                "method": "subscribe",
681                "subscription": {"type": "userEvents", "user": user}
682            });
683            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
684
685            while let Some(Ok(Message::Text(text))) = ws.next().await {
686                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
687                if env.channel != "userEvents" { continue; }
688                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
689                let Some(liq) = event.liquidation else { continue; };
690                if let (Some(notional_position), Some(account_value)) = (
691                    parse_decimal(&liq.liquidated_ntl_pos),
692                    parse_decimal(&liq.liquidated_account_value),
693                ) {
694                    yield Liquidation {
695                        symbol: String::new(),
696                        side: OrderSide::Sell,
697                        liquidated_user: liq.liquidated_user,
698                        notional_position,
699                        account_value,
700                    };
701                }
702            }
703        })
704    }
705
706    /// Streams public trade fills for `symbol`. "B" (buyer aggressor) → Buy, otherwise → Sell.
707    fn subscribe_fill(&self, symbol: String) -> BoxStream<Fill> {
708        Box::pin(async_stream::stream! {
709            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
710            let sub = serde_json::json!({
711                "method": "subscribe",
712                "subscription": {"type": "trades", "coin": symbol}
713            });
714            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
715
716            while let Some(Ok(Message::Text(text))) = ws.next().await {
717                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
718                if env.channel != "trades" { continue; }
719                let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else { continue; };
720
721                for trade in trades {
722                    let side = if trade.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
723                    if let (Some(price), Some(volume)) = (parse_decimal(&trade.px), parse_decimal(&trade.sz)) {
724                        yield Fill { symbol: trade.coin, price, volume, side, timestamp_ms: trade.time, trade_id: trade.tid };
725                    }
726                }
727            }
728        })
729    }
730}
731
732#[allow(async_fn_in_trait)]
733impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
734    /// Returns open positions from `clearinghouseState`. Requires `with_auth`.
735    /// Zero-size positions are filtered out. Positive `szi` = long, negative = short.
736    async fn get_positions(&self) -> Result<Vec<Position>, String> {
737        let user = self.require_user_address()?;
738        let resp = self.client
739            .post(HYPERLIQUID_INFO_URL)
740            .json(&serde_json::json!({"type": "clearinghouseState", "user": user}))
741            .send()
742            .await
743            .map_err(|e| e.to_string())?;
744        let state: ClearinghouseStateResponse = parse_response(resp).await?;
745
746        Ok(state.asset_positions.into_iter()
747            .filter_map(|ap| {
748                let p = ap.position;
749                let size = parse_decimal(&p.szi)?;
750                if size.is_zero() { return None; }
751                let entry_price = p.entry_px.as_deref().and_then(parse_decimal).unwrap_or_default();
752                let side = if size > Decimal::ZERO { OrderSide::Buy } else { OrderSide::Sell };
753                Some(Position { symbol: p.coin, side, size: size.abs(), entry_price })
754            })
755            .collect())
756    }
757
758    /// Returns resting orders from Hyperliquid's `openOrders` endpoint. Requires `with_auth`.
759    /// `filled_quantity` is derived as `origSz - sz` (original size minus remaining size).
760    async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
761        let user = self.require_user_address()?;
762        let resp = self.client
763            .post(HYPERLIQUID_INFO_URL)
764            .json(&serde_json::json!({"type": "openOrders", "user": user}))
765            .send()
766            .await
767            .map_err(|e| e.to_string())?;
768        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
769
770        Ok(orders.into_iter()
771            .filter_map(|o| {
772                let price = parse_decimal(&o.limit_px)?;
773                let quantity = parse_decimal(&o.orig_sz)?;
774                let remaining = parse_decimal(&o.sz)?;
775                let filled_quantity = quantity - remaining;
776                let side = if o.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
777                Some(OpenOrder { order_id: o.oid, symbol: o.coin, side, price, quantity, filled_quantity })
778            })
779            .collect())
780    }
781
782    /// Returns total account value (collateral) from `clearinghouseState`. Requires `with_auth`.
783    async fn get_collateral(&self) -> Result<Decimal, String> {
784        let user = self.require_user_address()?;
785        let resp = self.client
786            .post(HYPERLIQUID_INFO_URL)
787            .json(&serde_json::json!({"type": "clearinghouseState", "user": user}))
788            .send()
789            .await
790            .map_err(|e| e.to_string())?;
791        let state: ClearinghouseStateResponse = parse_response(resp).await?;
792        parse_decimal(&state.margin_summary.account_value)
793            .ok_or_else(|| "invalid account value".to_string())
794    }
795}
796
797#[allow(async_fn_in_trait)]
798impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
799    /// Streams the user's own order executions via the `userEvents` WS subscription.
800    /// Requires `with_auth` (streams are empty if no user address is set).
801    fn subscribe_user_fills(&self) -> BoxStream<UserFill> {
802        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
803        let user = format!("{:#x}", addr);
804        Box::pin(async_stream::stream! {
805            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
806            let sub = serde_json::json!({
807                "method": "subscribe",
808                "subscription": {"type": "userEvents", "user": user}
809            });
810            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
811
812            while let Some(Ok(Message::Text(text))) = ws.next().await {
813                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
814                if env.channel != "userEvents" { continue; }
815                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
816                for fill in event.fills.unwrap_or_default() {
817                    let side = if fill.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
818                    if let (Some(price), Some(quantity), Some(fee_usd)) = (
819                        parse_decimal(&fill.px),
820                        parse_decimal(&fill.sz),
821                        parse_decimal(&fill.fee),
822                    ) {
823                        yield UserFill { order_id: fill.oid, symbol: fill.coin, side, price, quantity, fee_usd, timestamp_ms: fill.time };
824                    }
825                }
826            }
827        })
828    }
829
830    /// Streams order lifecycle events via the `orderUpdates` WS subscription.
831    /// Requires `with_auth`.
832    fn subscribe_order_updates(&self) -> BoxStream<OrderUpdate> {
833        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
834        let user = format!("{:#x}", addr);
835        Box::pin(async_stream::stream! {
836            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
837            let sub = serde_json::json!({
838                "method": "subscribe",
839                "subscription": {"type": "orderUpdates", "user": user}
840            });
841            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
842
843            while let Some(Ok(Message::Text(text))) = ws.next().await {
844                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
845                if env.channel != "orderUpdates" { continue; }
846                let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else { continue; };
847                for upd in updates {
848                    let status = match upd.status.as_str() {
849                        "open" => OrderStatus::Placed,
850                        "filled" => OrderStatus::Filled,
851                        "canceled" | "cancelled" => OrderStatus::Cancelled,
852                        _ => OrderStatus::PartiallyFilled,
853                    };
854                    let side = if upd.order.side == "B" { OrderSide::Buy } else { OrderSide::Sell };
855                    yield OrderUpdate {
856                        order_id: upd.order.oid,
857                        symbol: upd.order.coin,
858                        status,
859                        side: Some(side),
860                        price: parse_decimal(&upd.order.limit_px),
861                        quantity: parse_decimal(&upd.order.orig_sz),
862                        remaining_quantity: parse_decimal(&upd.order.sz),
863                        timestamp_ms: upd.status_timestamp,
864                    };
865                }
866            }
867        })
868    }
869
870    /// Streams funding payments applied to positions via the `userEvents` WS subscription.
871    /// Requires `with_auth`.
872    fn subscribe_funding_payments(&self) -> BoxStream<FundingPayment> {
873        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
874        let user = format!("{:#x}", addr);
875        Box::pin(async_stream::stream! {
876            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
877            let sub = serde_json::json!({
878                "method": "subscribe",
879                "subscription": {"type": "userEvents", "user": user}
880            });
881            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
882
883            while let Some(Ok(Message::Text(text))) = ws.next().await {
884                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
885                if env.channel != "userEvents" { continue; }
886                let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else { continue; };
887                let Some(funding) = event.funding else { continue; };
888                if let Some(amount_usd) = parse_decimal(&funding.usdc) {
889                    yield FundingPayment { symbol: funding.coin, amount_usd, timestamp_ms: funding.time };
890                }
891            }
892        })
893    }
894
895    fn subscribe_deposits(&self) -> BoxStream<Deposit> {
896        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
897        let user = format!("{:#x}", addr);
898        Box::pin(async_stream::stream! {
899            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
900            let sub = serde_json::json!({
901                "method": "subscribe",
902                "subscription": {"type": "userNonFundingLedgerUpdates", "user": user}
903            });
904            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
905            while let Some(Ok(Message::Text(text))) = ws.next().await {
906                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
907                if env.channel != "userNonFundingLedgerUpdates" { continue; }
908                let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else { continue; };
909                for entry in ledger.updates {
910                    if entry.delta.kind == "deposit" {
911                        if let Some(amount_usd) = entry.delta.usdc.as_deref().and_then(parse_decimal) {
912                            yield Deposit { asset: "USDC".to_string(), amount_usd, timestamp_ms: entry.time };
913                        }
914                    }
915                }
916            }
917        })
918    }
919
920    fn subscribe_withdrawals(&self) -> BoxStream<Withdrawal> {
921        let Some(addr) = self.user_address else { return Box::pin(stream::empty()); };
922        let user = format!("{:#x}", addr);
923        Box::pin(async_stream::stream! {
924            let Ok((mut ws, _)) = connect_async(HYPERLIQUID_WS_URL).await else { return; };
925            let sub = serde_json::json!({
926                "method": "subscribe",
927                "subscription": {"type": "userNonFundingLedgerUpdates", "user": user}
928            });
929            if ws.send(Message::Text(sub.to_string().into())).await.is_err() { return; }
930            while let Some(Ok(Message::Text(text))) = ws.next().await {
931                let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else { continue; };
932                if env.channel != "userNonFundingLedgerUpdates" { continue; }
933                let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else { continue; };
934                for entry in ledger.updates {
935                    if entry.delta.kind == "withdraw" {
936                        if let Some(amount_usd) = entry.delta.usdc.as_deref().and_then(parse_decimal) {
937                            yield Withdrawal { asset: "USDC".to_string(), amount_usd, timestamp_ms: entry.time };
938                        }
939                    }
940                }
941            }
942        })
943    }
944}