Skip to main content

guilder_client_hyperliquid/
client.rs

1use crate::rate_limiter::{RestRateLimiter, AddressRateLimiter};
2use alloy_primitives::Address;
3use futures_util::{stream, StreamExt};
4use guilder_abstraction::{
5    self, AssetContext, BoxStream, Deposit, Fill, FundingPayment, L2Update, Liquidation, OpenOrder,
6    OrderPlacement, OrderSide, OrderStatus, OrderType, OrderUpdate, Position, PredictedFunding,
7    Side, TimeInForce, UserFill, Withdrawal,
8};
9use reqwest::Client;
10use rust_decimal::Decimal;
11use serde::Deserialize;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
17const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
18
19async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
20    resp: reqwest::Response,
21) -> Result<T, String> {
22    let text = resp.text().await.map_err(|e| e.to_string())?;
23    serde_json::from_str(&text).map_err(|e| format!("{e}: {text}"))
24}
25
26pub struct HyperliquidClient {
27    client: Client,
28    user_address: Option<Address>,
29    private_key: Option<String>,
30    rest_limiter: Arc<RestRateLimiter>,
31    address_limiter: Arc<AddressRateLimiter>,
32    ws_mux: crate::ws::WsMux,
33}
34
35impl Default for HyperliquidClient {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl HyperliquidClient {
42    pub fn new() -> Self {
43        HyperliquidClient {
44            client: Client::new(),
45            user_address: None,
46            private_key: None,
47            rest_limiter: Arc::new(RestRateLimiter::new()),
48            address_limiter: Arc::new(AddressRateLimiter::new()),
49            ws_mux: crate::ws::WsMux::new(),
50        }
51    }
52
53    pub fn with_auth(user_address: Address, private_key: String) -> Self {
54        HyperliquidClient {
55            client: Client::new(),
56            user_address: Some(user_address),
57            private_key: Some(private_key),
58            rest_limiter: Arc::new(RestRateLimiter::new()),
59            address_limiter: Arc::new(AddressRateLimiter::new()),
60            ws_mux: crate::ws::WsMux::new(),
61        }
62    }
63
64    /// Configure rate limit budgets (rest_weight/min, address_requests).
65    /// Defaults: 1200 rest weight/min, 10000 address requests.
66    pub fn with_budgets(mut self, rest_weight: u32, addr_budget: u64) -> Self {
67        self.rest_limiter = Arc::new(RestRateLimiter::new_with_budget(rest_weight));
68        self.address_limiter = Arc::new(AddressRateLimiter::new_with_budget(addr_budget));
69        self
70    }
71
72    /// POST to the info endpoint, consuming `weight` from the REST rate-limit budget.
73    async fn info_post(&self, body: Value, weight: u32, call: &str) -> Result<reqwest::Response, String> {
74        self.rest_limiter.acquire_blocking(weight, call).await;
75        self.client
76            .post(HYPERLIQUID_INFO_URL)
77            .json(&body)
78            .send()
79            .await
80            .map_err(|e| e.to_string())
81    }
82
83    /// POST to the exchange endpoint, consuming `weight` from the REST rate-limit budget.
84    /// Weight = 1 + floor(batch_length / 40).
85    async fn exchange_post(&self, body: Value, weight: u32, call: &str) -> Result<reqwest::Response, String> {
86        self.rest_limiter.acquire_blocking(weight, call).await;
87        self.client
88            .post(HYPERLIQUID_EXCHANGE_URL)
89            .json(&body)
90            .send()
91            .await
92            .map_err(|e| e.to_string())
93    }
94
95    fn require_user_address(&self) -> Result<String, String> {
96        self.user_address
97            .map(|a| format!("{:#x}", a))
98            .ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
99    }
100
101    fn require_private_key(&self) -> Result<&str, String> {
102        self.private_key
103            .as_deref()
104            .ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
105    }
106
107    async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
108        // `meta` is an "all other info" request → weight 20
109        let resp = self
110            .info_post(serde_json::json!({"type": "meta"}), 20, "get_asset_index")
111            .await?;
112        let meta: MetaResponse = parse_response(resp).await?;
113        meta.universe
114            .iter()
115            .position(|a| a.name == symbol)
116            .ok_or_else(|| format!("symbol {} not found", symbol))
117    }
118
119    async fn submit_signed_action(
120        &self,
121        action: Value,
122        vault_address: Option<&str>,
123    ) -> Result<Value, String> {
124        let private_key = self.require_private_key()?;
125        let nonce = std::time::SystemTime::now()
126            .duration_since(std::time::UNIX_EPOCH)
127            .unwrap()
128            .as_millis() as u64;
129
130        let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
131
132        let payload = serde_json::json!({
133            "action": action,
134            "nonce": nonce,
135            "signature": {"r": r, "s": s, "v": v},
136            "vaultAddress": null
137        });
138
139        // Single unbatched action → exchange weight 1
140        let resp = self.exchange_post(payload, 1, "submit_signed_action").await?;
141
142        let body: Value = parse_response(resp).await?;
143        if body["status"].as_str() == Some("err") {
144            return Err(body["response"]
145                .as_str()
146                .unwrap_or("unknown error")
147                .to_string());
148        }
149        Ok(body)
150    }
151}
152
153// --- REST deserialization types ---
154
155#[derive(Deserialize)]
156struct MetaResponse {
157    universe: Vec<AssetInfo>,
158}
159
160#[derive(Deserialize)]
161struct AssetInfo {
162    name: String,
163}
164
165type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
166
167#[derive(Deserialize)]
168#[serde(rename_all = "camelCase")]
169#[allow(dead_code)]
170struct RestAssetCtx {
171    open_interest: String,
172    funding: String,
173    mark_px: String,
174    day_ntl_vlm: String,
175    mid_px: Option<String>,
176    oracle_px: Option<String>,
177    premium: Option<String>,
178    prev_day_px: Option<String>,
179}
180
181#[derive(Deserialize)]
182#[serde(rename_all = "camelCase")]
183struct ClearinghouseStateResponse {
184    margin_summary: MarginSummary,
185    asset_positions: Vec<AssetPosition>,
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190struct MarginSummary {
191    account_value: String,
192}
193
194#[derive(Deserialize)]
195struct AssetPosition {
196    position: PositionDetail,
197}
198
199#[derive(Deserialize)]
200#[serde(rename_all = "camelCase")]
201struct PositionDetail {
202    coin: String,
203    /// positive = long, negative = short
204    szi: String,
205    entry_px: Option<String>,
206}
207
208#[derive(Deserialize)]
209#[serde(rename_all = "camelCase")]
210struct RestOpenOrder {
211    coin: String,
212    side: String,
213    limit_px: String,
214    sz: String,
215    oid: i64,
216    orig_sz: String,
217}
218
219// predictedFundings response: Vec<(coin, Vec<(venue, entry_or_null)>)>
220// The API returns null for venues that don't list the coin.
221type PredictedFundingsResponse = Vec<(String, Vec<(String, Option<PredictedFundingEntry>)>)>;
222
223#[derive(Deserialize)]
224#[serde(rename_all = "camelCase")]
225struct PredictedFundingEntry {
226    funding_rate: String,
227    next_funding_time: i64,
228}
229
230// --- WebSocket envelope and data shapes ---
231
232#[derive(Deserialize)]
233struct WsEnvelope {
234    channel: String,
235    #[serde(default)]
236    data: Value,
237}
238
239#[derive(Deserialize)]
240struct WsBook {
241    coin: String,
242    levels: Vec<Vec<WsLevel>>,
243    time: i64,
244}
245
246#[derive(Deserialize)]
247struct WsLevel {
248    px: String,
249    sz: String,
250}
251
252#[derive(Deserialize)]
253#[serde(rename_all = "camelCase")]
254struct WsAssetCtx {
255    coin: String,
256    ctx: WsPerpsCtx,
257}
258
259#[derive(Deserialize)]
260#[serde(rename_all = "camelCase")]
261struct WsPerpsCtx {
262    open_interest: String,
263    funding: String,
264    mark_px: String,
265    day_ntl_vlm: String,
266    mid_px: Option<String>,
267    oracle_px: Option<String>,
268    premium: Option<String>,
269    prev_day_px: Option<String>,
270}
271
272#[derive(Deserialize)]
273struct WsUserEvent {
274    liquidation: Option<WsLiquidation>,
275    fills: Option<Vec<WsUserFill>>,
276    funding: Option<WsFunding>,
277    spot_state: Option<WsSpotState>,
278}
279
280#[derive(Deserialize)]
281struct WsSpotState {
282    balances: Option<Vec<WsSpotBalance>>,
283}
284
285#[derive(Deserialize)]
286struct WsSpotBalance {
287    coin: String,
288    total: String,
289    hold: String,
290}
291
292#[derive(Deserialize)]
293struct WsLiquidation {
294    liquidated_user: String,
295    liquidated_ntl_pos: String,
296    liquidated_account_value: String,
297}
298
299#[derive(Deserialize)]
300struct WsUserFill {
301    coin: String,
302    px: String,
303    sz: String,
304    side: String,
305    time: i64,
306    oid: i64,
307    fee: String,
308    /// Client order ID assigned at placement, if any.
309    #[serde(default)]
310    cloid: Option<String>,
311}
312
313#[derive(Deserialize)]
314struct WsFunding {
315    time: i64,
316    coin: String,
317    usdc: String,
318}
319
320#[derive(Deserialize)]
321struct WsTrade {
322    coin: String,
323    side: String,
324    px: String,
325    sz: String,
326    time: i64,
327    tid: i64,
328}
329
330#[derive(Deserialize)]
331struct WsOrderUpdate {
332    order: WsOrderInfo,
333    status: String,
334    #[serde(rename = "statusTimestamp")]
335    status_timestamp: i64,
336}
337
338#[derive(Deserialize)]
339#[serde(rename_all = "camelCase")]
340struct WsOrderInfo {
341    coin: String,
342    side: String,
343    limit_px: String,
344    sz: String,
345    oid: i64,
346    orig_sz: String,
347    /// Client order ID assigned at placement, if any.
348    #[serde(default)]
349    cloid: Option<String>,
350}
351
352// --- WebSocket ledger update shapes (deposits / withdrawals) ---
353
354#[derive(Deserialize)]
355struct WsLedgerUpdates {
356    updates: Vec<WsLedgerEntry>,
357}
358
359#[derive(Deserialize)]
360struct WsLedgerEntry {
361    time: i64,
362    delta: WsLedgerDelta,
363}
364
365#[derive(Deserialize)]
366struct WsLedgerDelta {
367    #[serde(rename = "type")]
368    kind: String,
369    usdc: Option<String>,
370}
371
372// --- Helpers ---
373
374fn parse_decimal(s: &str) -> Option<Decimal> {
375    Decimal::from_str(s).ok()
376}
377
378fn keccak256(data: &[u8]) -> [u8; 32] {
379    use sha3::{Digest, Keccak256};
380    Keccak256::digest(data).into()
381}
382
383/// EIP-712 domain separator for Hyperliquid mainnet (Arbitrum, chainId=42161).
384fn hyperliquid_domain_separator() -> [u8; 32] {
385    let type_hash = keccak256(
386        b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)",
387    );
388    let name_hash = keccak256(b"Exchange");
389    let version_hash = keccak256(b"1");
390    let mut chain_id = [0u8; 32];
391    chain_id[28..32].copy_from_slice(&42161u32.to_be_bytes());
392    let verifying_contract = [0u8; 32];
393
394    let mut data = [0u8; 160];
395    data[..32].copy_from_slice(&type_hash);
396    data[32..64].copy_from_slice(&name_hash);
397    data[64..96].copy_from_slice(&version_hash);
398    data[96..128].copy_from_slice(&chain_id);
399    data[128..160].copy_from_slice(&verifying_contract);
400    keccak256(&data)
401}
402
403/// Signs a Hyperliquid exchange action using EIP-712.
404/// Returns (r, s, v) where r and s are "0x"-prefixed hex strings and v is 27 or 28.
405fn sign_action(
406    private_key: &str,
407    action: &Value,
408    vault_address: Option<&str>,
409    nonce: u64,
410) -> Result<(String, String, u8), String> {
411    use k256::ecdsa::SigningKey;
412
413    // Step 1: msgpack-encode the action, append nonce + vault flag
414    let msgpack_bytes = rmp_serde::to_vec(action).map_err(|e| e.to_string())?;
415    let mut data = msgpack_bytes;
416    data.extend_from_slice(&nonce.to_be_bytes());
417    match vault_address {
418        None => data.push(0u8),
419        Some(addr) => {
420            data.push(1u8);
421            let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
422                .map_err(|e| format!("invalid vault address: {}", e))?;
423            data.extend_from_slice(&addr_bytes);
424        }
425    }
426    let connection_id = keccak256(&data);
427
428    // Step 2: hash the Agent struct
429    let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
430    let source_hash = keccak256(b"a"); // "a" = mainnet
431    let mut struct_data = [0u8; 96];
432    struct_data[..32].copy_from_slice(&agent_type_hash);
433    struct_data[32..64].copy_from_slice(&source_hash);
434    struct_data[64..96].copy_from_slice(&connection_id);
435    let struct_hash = keccak256(&struct_data);
436
437    // Step 3: EIP-712 final hash
438    let domain_sep = hyperliquid_domain_separator();
439    let mut final_data = Vec::with_capacity(66);
440    final_data.extend_from_slice(b"\x19\x01");
441    final_data.extend_from_slice(&domain_sep);
442    final_data.extend_from_slice(&struct_hash);
443    let final_hash = keccak256(&final_data);
444
445    // Step 4: sign with secp256k1
446    let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
447        .map_err(|e| format!("invalid private key: {}", e))?;
448    let signing_key =
449        SigningKey::from_bytes(key_bytes.as_slice().into()).map_err(|e| e.to_string())?;
450    let (sig, recovery_id) = signing_key
451        .sign_prehash_recoverable(&final_hash)
452        .map_err(|e| e.to_string())?;
453
454    let sig_bytes = sig.to_bytes();
455    let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
456    let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
457    let v = 27u8 + recovery_id.to_byte();
458
459    Ok((r, s, v))
460}
461
462// --- Trait implementations ---
463
464#[allow(async_fn_in_trait)]
465impl guilder_abstraction::TestServer for HyperliquidClient {
466    /// Sends a lightweight allMids request; returns true if the server responds 200 OK.
467    async fn ping(&self) -> Result<bool, String> {
468        // allMids → weight 2
469        self.info_post(serde_json::json!({"type": "allMids"}), 2, "ping")
470            .await
471            .map(|r| r.status().is_success())
472    }
473
474    /// Hyperliquid has no dedicated server-time endpoint; returns local UTC ms.
475    async fn get_server_time(&self) -> Result<i64, String> {
476        Ok(std::time::SystemTime::now()
477            .duration_since(std::time::UNIX_EPOCH)
478            .map(|d| d.as_millis() as i64)
479            .unwrap_or(0))
480    }
481}
482
483#[allow(async_fn_in_trait)]
484impl guilder_abstraction::GetMarketData for HyperliquidClient {
485    /// Returns all perpetual asset names from Hyperliquid's meta endpoint.
486    async fn get_symbol(&self) -> Result<Vec<String>, String> {
487        // meta → weight 20
488        let resp = self
489            .info_post(serde_json::json!({"type": "meta"}), 20, "get_symbol")
490            .await?;
491        parse_response::<MetaResponse>(resp)
492            .await
493            .map(|r| r.universe.into_iter().map(|a| a.name).collect())
494    }
495
496    /// Returns the current open interest for `symbol` from metaAndAssetCtxs.
497    async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
498        // metaAndAssetCtxs → weight 20
499        let resp = self
500            .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20, "get_open_interest")
501            .await?;
502        let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
503            .await?
504            .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
505        meta.universe
506            .iter()
507            .position(|a| a.name == symbol)
508            .and_then(|i| ctxs.get(i))
509            .and_then(|ctx| parse_decimal(&ctx.open_interest))
510            .ok_or_else(|| format!("symbol {} not found", symbol))
511    }
512
513    /// Returns a full AssetContext snapshot for `symbol` from metaAndAssetCtxs.
514    async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
515        // metaAndAssetCtxs → weight 20
516        let resp = self
517            .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20, "get_asset_context")
518            .await?;
519        let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
520            .await?
521            .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
522        let idx = meta
523            .universe
524            .iter()
525            .position(|a| a.name == symbol)
526            .ok_or_else(|| format!("symbol {} not found", symbol))?;
527        let ctx = ctxs
528            .get(idx)
529            .ok_or_else(|| format!("symbol {} not found", symbol))?;
530        Ok(AssetContext {
531            symbol,
532            open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
533            funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
534            mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
535            day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
536            mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
537            oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
538            premium: ctx.premium.as_deref().and_then(parse_decimal),
539            prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
540        })
541    }
542
543    /// Fetches metaAndAssetCtxs once and returns all asset contexts in universe order.
544    /// Prefer this over repeated `get_asset_context` calls to avoid rate-limiting.
545    async fn get_all_asset_contexts(&self) -> Result<Vec<AssetContext>, String> {
546        // metaAndAssetCtxs → weight 20
547        let resp = self
548            .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20, "get_all_asset_contexts")
549            .await?;
550        let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
551            .await?
552            .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
553        let mut result = Vec::with_capacity(meta.universe.len());
554        for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
555            let Some(open_interest) = parse_decimal(&ctx.open_interest) else {
556                continue;
557            };
558            let Some(funding_rate) = parse_decimal(&ctx.funding) else {
559                continue;
560            };
561            let Some(mark_price) = parse_decimal(&ctx.mark_px) else {
562                continue;
563            };
564            let Some(day_volume) = parse_decimal(&ctx.day_ntl_vlm) else {
565                continue;
566            };
567            result.push(AssetContext {
568                symbol: asset.name.clone(),
569                open_interest,
570                funding_rate,
571                mark_price,
572                day_volume,
573                mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
574                oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
575                premium: ctx.premium.as_deref().and_then(parse_decimal),
576                prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
577            });
578        }
579        Ok(result)
580    }
581
582    /// Returns a full L2 orderbook snapshot for `symbol` from the l2Book REST endpoint.
583    /// Levels are returned as individual `L2Update` items; all share the same `sequence` (timestamp).
584    async fn get_l2_orderbook(&self, symbol: String) -> Result<Vec<L2Update>, String> {
585        // l2Book → weight 2
586        let resp = self
587            .info_post(serde_json::json!({"type": "l2Book", "coin": symbol}), 2, "get_l2_orderbook")
588            .await?;
589        let book: Option<WsBook> = parse_response(resp).await?;
590        let book = match book {
591            Some(b) => b,
592            None => return Ok(vec![]),
593        };
594        let mut levels = Vec::new();
595        for level in book.levels.first().into_iter().flatten() {
596            if let (Some(price), Some(volume)) =
597                (parse_decimal(&level.px), parse_decimal(&level.sz))
598            {
599                levels.push(L2Update {
600                    symbol: book.coin.clone(),
601                    price,
602                    volume,
603                    side: Side::Ask,
604                    sequence: book.time,
605                });
606            }
607        }
608        for level in book.levels.get(1).into_iter().flatten() {
609            if let (Some(price), Some(volume)) =
610                (parse_decimal(&level.px), parse_decimal(&level.sz))
611            {
612                levels.push(L2Update {
613                    symbol: book.coin.clone(),
614                    price,
615                    volume,
616                    side: Side::Bid,
617                    sequence: book.time,
618                });
619            }
620        }
621        Ok(levels)
622    }
623
624    /// Returns the mid-price of `symbol` (e.g. "BTC") from allMids.
625    async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
626        // allMids → weight 2
627        let resp = self
628            .info_post(serde_json::json!({"type": "allMids"}), 2, "get_price")
629            .await?;
630        parse_response::<HashMap<String, String>>(resp)
631            .await?
632            .get(&symbol)
633            .and_then(|s| parse_decimal(s))
634            .ok_or_else(|| format!("symbol {} not found", symbol))
635    }
636
637    /// Returns predicted funding rates for all symbols across all venues.
638    /// Null venue entries (unsupported coins) are silently skipped.
639    async fn get_predicted_fundings(&self) -> Result<Vec<PredictedFunding>, String> {
640        // predictedFundings → weight 20
641        let resp = self
642            .info_post(serde_json::json!({"type": "predictedFundings"}), 20, "get_predicted_fundings")
643            .await?;
644        let data: PredictedFundingsResponse = parse_response(resp).await?;
645        let mut result = Vec::new();
646        for (symbol, venues) in data {
647            for (venue, entry) in venues {
648                let Some(entry) = entry else { continue };
649                if let Some(funding_rate) = parse_decimal(&entry.funding_rate) {
650                    result.push(PredictedFunding {
651                        symbol: symbol.clone(),
652                        venue,
653                        funding_rate,
654                        next_funding_time_ms: entry.next_funding_time,
655                    });
656                }
657            }
658        }
659        Ok(result)
660    }
661}
662
663#[allow(async_fn_in_trait)]
664impl guilder_abstraction::ManageOrder for HyperliquidClient {
665    /// Places an order on Hyperliquid. Requires `with_auth`. Returns an `OrderPlacement` with
666    /// the exchange-assigned order ID. Market orders are submitted as aggressive limit orders (IOC).
667    ///
668    /// If `cloid` is provided, Hyperliquid attaches it to the order lifecycle — fills and order
669    /// updates will carry the same cloid back, enabling end-to-end intent tracing without a
670    /// separate order_id mapping.
671    async fn place_order(
672        &self,
673        symbol: String,
674        side: OrderSide,
675        price: Decimal,
676        volume: Decimal,
677        order_type: OrderType,
678        time_in_force: TimeInForce,
679        cloid: Option<String>,
680    ) -> Result<OrderPlacement, String> {
681        let asset_idx = self.get_asset_index(&symbol).await?;
682        let is_buy = matches!(side, OrderSide::Buy);
683
684        let tif_str = match time_in_force {
685            TimeInForce::Gtc => "Gtc",
686            TimeInForce::Ioc => "Ioc",
687            TimeInForce::Fok => "Fok",
688        };
689        // Market orders are IOC limit orders at a wide price
690        let order_type_val = match order_type {
691            OrderType::Limit => serde_json::json!({"limit": {"tif": tif_str}}),
692            OrderType::Market => serde_json::json!({"limit": {"tif": "Ioc"}}),
693        };
694
695        let mut order_json = serde_json::json!({
696            "a": asset_idx,
697            "b": is_buy,
698            "p": price.to_string(),
699            "s": volume.to_string(),
700            "r": false,
701            "t": order_type_val
702        });
703        if let Some(ref c) = cloid {
704            order_json["c"] = serde_json::json!(c);
705        }
706
707        let action = serde_json::json!({
708            "type": "order",
709            "orders": [order_json],
710            "grouping": "na"
711        });
712
713        let resp = self.submit_signed_action(action, None).await?;
714        let oid = resp["response"]["data"]["statuses"][0]["resting"]["oid"]
715            .as_i64()
716            .or_else(|| resp["response"]["data"]["statuses"][0]["filled"]["oid"].as_i64())
717            .ok_or_else(|| format!("unexpected response: {}", resp))?;
718
719        let timestamp_ms = std::time::SystemTime::now()
720            .duration_since(std::time::UNIX_EPOCH)
721            .unwrap()
722            .as_millis() as i64;
723
724        Ok(OrderPlacement {
725            order_id: oid,
726            symbol,
727            side,
728            price,
729            quantity: volume,
730            timestamp_ms,
731            cloid,
732        })
733    }
734
735    /// Modifies price and size of an existing order by its order ID. Requires `with_auth`.
736    /// Fetches the order's current coin and side before submitting the modify action.
737    async fn change_order_by_cloid(
738        &self,
739        cloid: i64,
740        price: Decimal,
741        volume: Decimal,
742    ) -> Result<i64, String> {
743        let user = self.require_user_address()?;
744
745        // openOrders → weight 20; get_asset_index → meta weight 20
746        let resp = self
747            .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20, "change_order_by_cloid")
748            .await?;
749        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
750        let order = orders
751            .iter()
752            .find(|o| o.oid == cloid)
753            .ok_or_else(|| format!("order {} not found", cloid))?;
754
755        let asset_idx = self.get_asset_index(&order.coin).await?;
756        let is_buy = order.side == "B";
757
758        let action = serde_json::json!({
759            "type": "batchModify",
760            "modifies": [{
761                "oid": cloid,
762                "order": {
763                    "a": asset_idx,
764                    "b": is_buy,
765                    "p": price.to_string(),
766                    "s": volume.to_string(),
767                    "r": false,
768                    "t": {"limit": {"tif": "Gtc"}}
769                }
770            }]
771        });
772
773        self.submit_signed_action(action, None).await?;
774        Ok(cloid)
775    }
776
777    /// Cancels a single order by its order ID. Requires `with_auth`.
778    /// Fetches open orders to resolve the coin/asset before cancelling.
779    async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
780        let user = self.require_user_address()?;
781
782        // openOrders → weight 20
783        let resp = self
784            .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20, "cancel_order")
785            .await?;
786        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
787        let order = orders
788            .iter()
789            .find(|o| o.oid == cloid)
790            .ok_or_else(|| format!("order {} not found", cloid))?;
791
792        let asset_idx = self.get_asset_index(&order.coin).await?;
793        let action = serde_json::json!({
794            "type": "cancel",
795            "cancels": [{"a": asset_idx, "o": cloid}]
796        });
797
798        self.submit_signed_action(action, None).await?;
799        Ok(cloid)
800    }
801
802    /// Cancels all open orders. Requires `with_auth`.
803    /// Fetches all open orders and submits a batch cancel in a single signed request.
804    async fn cancel_all_order(&self) -> Result<bool, String> {
805        let user = self.require_user_address()?;
806
807        // openOrders → weight 20
808        let resp = self
809            .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20, "cancel_all_order")
810            .await?;
811        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
812        if orders.is_empty() {
813            return Ok(true);
814        }
815
816        // meta → weight 20
817        let meta_resp = self
818            .info_post(serde_json::json!({"type": "meta"}), 20, "cancel_all_order")
819            .await?;
820        let meta: MetaResponse = parse_response(meta_resp).await?;
821
822        let cancels: Vec<Value> = orders
823            .iter()
824            .filter_map(|o| {
825                let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
826                Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
827            })
828            .collect();
829
830        let action = serde_json::json!({"type": "cancel", "cancels": cancels});
831        self.submit_signed_action(action, None).await?;
832        Ok(true)
833    }
834}
835
836#[allow(async_fn_in_trait)]
837impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
838    fn subscribe_l2_update(&self, symbol: String) -> BoxStream<Result<L2Update, String>> {
839        let sub = serde_json::json!({
840            "method": "subscribe",
841            "subscription": {"type": "l2Book", "coin": symbol.clone()}
842        });
843        let key = crate::ws::SubKey {
844            channel: "l2Book".to_string(),
845            routing_key: symbol,
846        };
847        let stream = self.ws_mux.subscribe(key, sub);
848        Box::pin(async_stream::stream! {
849            for await msg in stream {
850                let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
851                    continue;
852                };
853                if env.channel != "l2Book" {
854                    continue;
855                }
856                let Ok(book) = serde_json::from_value::<WsBook>(env.data) else {
857                    continue;
858                };
859                for level in book.levels.first().into_iter().flatten() {
860                    if let (Some(price), Some(volume)) =
861                        (parse_decimal(&level.px), parse_decimal(&level.sz))
862                    {
863                        yield Ok(L2Update {
864                            symbol: book.coin.clone(),
865                            price,
866                            volume,
867                            side: Side::Ask,
868                            sequence: book.time,
869                        });
870                    }
871                }
872                for level in book.levels.get(1).into_iter().flatten() {
873                    if let (Some(price), Some(volume)) =
874                        (parse_decimal(&level.px), parse_decimal(&level.sz))
875                    {
876                        yield Ok(L2Update {
877                            symbol: book.coin.clone(),
878                            price,
879                            volume,
880                            side: Side::Bid,
881                            sequence: book.time,
882                        });
883                    }
884                }
885            }
886        })
887    }
888
889    fn subscribe_asset_context(&self, symbol: String) -> BoxStream<Result<AssetContext, String>> {
890        let sub = serde_json::json!({
891            "method": "subscribe",
892            "subscription": {"type": "activeAssetCtx", "coin": symbol.clone()}
893        });
894        let key = crate::ws::SubKey {
895            channel: "activeAssetCtx".to_string(),
896            routing_key: symbol,
897        };
898        let stream = self.ws_mux.subscribe(key, sub);
899        Box::pin(async_stream::stream! {
900            for await msg in stream {
901                let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
902                    continue;
903                };
904                if env.channel != "activeAssetCtx" {
905                    continue;
906                }
907                let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else {
908                    continue;
909                };
910                let ctx = &update.ctx;
911                let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
912                    parse_decimal(&ctx.open_interest),
913                    parse_decimal(&ctx.funding),
914                    parse_decimal(&ctx.mark_px),
915                    parse_decimal(&ctx.day_ntl_vlm),
916                ) else {
917                    continue;
918                };
919                yield Ok(AssetContext {
920                    symbol: update.coin,
921                    open_interest,
922                    funding_rate,
923                    mark_price,
924                    day_volume,
925                    mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
926                    oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
927                    premium: ctx.premium.as_deref().and_then(parse_decimal),
928                    prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
929                });
930            }
931        })
932    }
933
934    fn subscribe_liquidation(&self, user: String) -> BoxStream<Result<Liquidation, String>> {
935        let sub = serde_json::json!({
936            "method": "subscribe",
937            "subscription": {"type": "userEvents", "user": user.clone()}
938        });
939        let key = crate::ws::SubKey {
940            channel: "userEvents".to_string(),
941            routing_key: user,
942        };
943        let raw_stream = self.ws_mux.subscribe(key, sub);
944        Box::pin(raw_stream.filter_map(|text| async move {
945            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
946                return None;
947            };
948            if env.channel != "userEvents" {
949                return None;
950            }
951            let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
952                return None;
953            };
954            let Some(liq) = event.liquidation else {
955                return None;
956            };
957            let (Some(notional_position), Some(account_value)) = (
958                parse_decimal(&liq.liquidated_ntl_pos),
959                parse_decimal(&liq.liquidated_account_value),
960            ) else {
961                return None;
962            };
963            let item = Liquidation {
964                symbol: String::new(),
965                side: OrderSide::Sell,
966                liquidated_user: liq.liquidated_user,
967                notional_position,
968                account_value,
969            };
970            Some(stream::iter(vec![Ok(item)].into_iter()))
971        }).flatten())
972    }
973
974    fn subscribe_fill(&self, symbol: String) -> BoxStream<Result<Fill, String>> {
975        let sub = serde_json::json!({
976            "method": "subscribe",
977            "subscription": {"type": "trades", "coin": symbol.clone()}
978        });
979        let key = crate::ws::SubKey {
980            channel: "trades".to_string(),
981            routing_key: symbol,
982        };
983        let stream = self.ws_mux.subscribe(key, sub);
984        Box::pin(async_stream::stream! {
985            for await msg in stream {
986                let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
987                    continue;
988                };
989                if env.channel != "trades" {
990                    continue;
991                }
992                let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else {
993                    continue;
994                };
995                for trade in trades {
996                    let side = if trade.side == "B" {
997                        OrderSide::Buy
998                    } else {
999                        OrderSide::Sell
1000                    };
1001                    let price = parse_decimal(&trade.px);
1002                    let volume = parse_decimal(&trade.sz);
1003                    if let (Some(price), Some(volume)) = (price, volume) {
1004                        yield Ok(Fill {
1005                            symbol: trade.coin,
1006                            price,
1007                            volume,
1008                            side,
1009                            timestamp_ms: trade.time,
1010                            trade_id: trade.tid,
1011                        });
1012                    }
1013                }
1014            }
1015        })
1016    }
1017}
1018
1019#[allow(async_fn_in_trait)]
1020impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
1021    /// Returns open positions from `clearinghouseState`. Requires `with_auth`.
1022    /// Zero-size positions are filtered out. Positive `szi` = long, negative = short.
1023    async fn get_positions(&self) -> Result<Vec<Position>, String> {
1024        let user = self.require_user_address()?;
1025        // clearinghouseState → weight 2
1026        let resp = self
1027            .info_post(
1028                serde_json::json!({"type": "clearinghouseState", "user": user}),
1029                2,
1030                "get_positions",
1031            )
1032            .await?;
1033        let state: ClearinghouseStateResponse = parse_response(resp).await?;
1034
1035        Ok(state
1036            .asset_positions
1037            .into_iter()
1038            .filter_map(|ap| {
1039                let p = ap.position;
1040                let size = parse_decimal(&p.szi)?;
1041                if size.is_zero() {
1042                    return None;
1043                }
1044                let entry_price = p
1045                    .entry_px
1046                    .as_deref()
1047                    .and_then(parse_decimal)
1048                    .unwrap_or_default();
1049                let side = if size > Decimal::ZERO {
1050                    OrderSide::Buy
1051                } else {
1052                    OrderSide::Sell
1053                };
1054                Some(Position {
1055                    symbol: p.coin,
1056                    side,
1057                    size: size.abs(),
1058                    entry_price,
1059                })
1060            })
1061            .collect())
1062    }
1063
1064    /// Returns resting orders from Hyperliquid's `openOrders` endpoint. Requires `with_auth`.
1065    /// `filled_quantity` is derived as `origSz - sz` (original size minus remaining size).
1066    async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
1067        let user = self.require_user_address()?;
1068        // openOrders → weight 20
1069        let resp = self
1070            .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20, "get_open_orders")
1071            .await?;
1072        let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
1073
1074        Ok(orders
1075            .into_iter()
1076            .filter_map(|o| {
1077                let price = parse_decimal(&o.limit_px)?;
1078                let quantity = parse_decimal(&o.orig_sz)?;
1079                let remaining = parse_decimal(&o.sz)?;
1080                let filled_quantity = quantity - remaining;
1081                let side = if o.side == "B" {
1082                    OrderSide::Buy
1083                } else {
1084                    OrderSide::Sell
1085                };
1086                Some(OpenOrder {
1087                    order_id: o.oid,
1088                    symbol: o.coin,
1089                    side,
1090                    price,
1091                    quantity,
1092                    filled_quantity,
1093                })
1094            })
1095            .collect())
1096    }
1097
1098    /// Returns total account value (collateral) from `clearinghouseState`. Requires `with_auth`.
1099    async fn get_collateral(&self) -> Result<Decimal, String> {
1100        let user = self.require_user_address()?;
1101        // clearinghouseState → weight 2
1102        let resp = self
1103            .info_post(
1104                serde_json::json!({"type": "clearinghouseState", "user": user}),
1105                2,
1106                "get_collateral",
1107            )
1108            .await?;
1109        let state: ClearinghouseStateResponse = parse_response(resp).await?;
1110        parse_decimal(&state.margin_summary.account_value)
1111            .ok_or_else(|| "invalid account value".to_string())
1112    }
1113
1114    /// Returns all spot wallet balances from `spotState`. Requires `with_auth`.
1115    async fn get_spot_balance(&self) -> Result<Vec<guilder_abstraction::Balance>, String> {
1116        let user = self.require_user_address()?;
1117        // spotClearinghouseState → weight 15
1118        let resp = self
1119            .info_post(
1120                serde_json::json!({"type": "spotClearinghouseState", "user": user}),
1121                15,
1122                "get_spot_balance",
1123            )
1124            .await?;
1125
1126        #[derive(Deserialize)]
1127        struct SpotStateResponse {
1128            balances: Vec<SpotBalance>,
1129        }
1130
1131        #[derive(Deserialize)]
1132        struct SpotBalance {
1133            coin: String,
1134            total: String,
1135            hold: String,
1136            #[serde(default)]
1137            token: Option<i32>,
1138            #[serde(default)]
1139            entryNtl: Option<String>,
1140        }
1141
1142        let state: SpotStateResponse = parse_response(resp).await?;
1143
1144        state.balances
1145            .into_iter()
1146            .map(|balance| {
1147                let total = parse_decimal(&balance.total)
1148                    .ok_or_else(|| "invalid total balance".to_string())?;
1149                let locked = parse_decimal(&balance.hold)
1150                    .ok_or_else(|| "invalid hold balance".to_string())?;
1151                let available = total - locked;
1152
1153                Ok(guilder_abstraction::Balance {
1154                    coin: balance.coin,
1155                    total,
1156                    available,
1157                    locked,
1158                })
1159            })
1160            .collect()
1161    }
1162
1163    /// Returns clearing house collateral balance for an asset. Currently returns the total collateral.
1164    /// Requires `with_auth`.
1165    async fn get_collateral_balance(&self, asset: String) -> Result<guilder_abstraction::Balance, String> {
1166        if asset.to_uppercase() != "USDC" {
1167            return Err(format!("only USDC collateral is supported, got {}", asset));
1168        }
1169
1170        let total = self.get_collateral().await?;
1171        Ok(guilder_abstraction::Balance {
1172            coin: "USDC".to_string(),
1173            total,
1174            available: total,
1175            locked: Decimal::ZERO,
1176        })
1177    }
1178}
1179
1180#[allow(async_fn_in_trait)]
1181impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
1182    fn subscribe_user_fills(&self) -> BoxStream<Result<UserFill, String>> {
1183        let Some(addr) = self.user_address else {
1184            return Box::pin(stream::empty());
1185        };
1186        let addr_str = format!("{:#x}", addr);
1187        let sub = serde_json::json!({
1188            "method": "subscribe",
1189            "subscription": {"type": "userEvents", "user": addr_str.clone()}
1190        });
1191        let key = crate::ws::SubKey {
1192            channel: "userEvents".to_string(),
1193            routing_key: addr_str,
1194        };
1195        let raw_stream = self.ws_mux.subscribe(key, sub);
1196        Box::pin(raw_stream.filter_map(|text| async move {
1197            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1198                return None;
1199            };
1200            if env.channel != "userEvents" {
1201                return None;
1202            }
1203            let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1204                return None;
1205            };
1206            let items: Vec<_> = event
1207                .fills
1208                .unwrap_or_default()
1209                .into_iter()
1210                .filter_map(|fill| {
1211                    let side = if fill.side == "B" {
1212                        OrderSide::Buy
1213                    } else {
1214                        OrderSide::Sell
1215                    };
1216                    let price = parse_decimal(&fill.px)?;
1217                    let quantity = parse_decimal(&fill.sz)?;
1218                    let fee_usd = parse_decimal(&fill.fee)?;
1219                    Some(UserFill {
1220                        order_id: fill.oid,
1221                        symbol: fill.coin,
1222                        side,
1223                        price,
1224                        quantity,
1225                        fee_usd,
1226                        timestamp_ms: fill.time,
1227                        cloid: fill.cloid,
1228                    })
1229                })
1230                .collect();
1231            if items.is_empty() {
1232                None
1233            } else {
1234                Some(stream::iter(items.into_iter().map(Ok)))
1235            }
1236        }).flatten())
1237    }
1238
1239    fn subscribe_order_updates(&self) -> BoxStream<Result<OrderUpdate, String>> {
1240        let Some(addr) = self.user_address else {
1241            return Box::pin(stream::empty());
1242        };
1243        let addr_str = format!("{:#x}", addr);
1244        let sub = serde_json::json!({
1245            "method": "subscribe",
1246            "subscription": {"type": "orderUpdates", "user": addr_str.clone()}
1247        });
1248        let key = crate::ws::SubKey {
1249            channel: "orderUpdates".to_string(),
1250            routing_key: addr_str,
1251        };
1252        let raw_stream = self.ws_mux.subscribe(key, sub);
1253        Box::pin(raw_stream.filter_map(|text| async move {
1254            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1255                return None;
1256            };
1257            if env.channel != "orderUpdates" {
1258                return None;
1259            }
1260            let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else {
1261                return None;
1262            };
1263            let items: Vec<_> = updates
1264                .into_iter()
1265                .map(|upd| {
1266                    let status = match upd.status.as_str() {
1267                        "open" => OrderStatus::Placed,
1268                        "filled" => OrderStatus::Filled,
1269                        "canceled" | "cancelled" => OrderStatus::Cancelled,
1270                        _ => OrderStatus::PartiallyFilled,
1271                    };
1272                    let side = if upd.order.side == "B" {
1273                        OrderSide::Buy
1274                    } else {
1275                        OrderSide::Sell
1276                    };
1277                    OrderUpdate {
1278                        order_id: upd.order.oid,
1279                        symbol: upd.order.coin,
1280                        status,
1281                        side: Some(side),
1282                        price: parse_decimal(&upd.order.limit_px),
1283                        quantity: parse_decimal(&upd.order.orig_sz),
1284                        remaining_quantity: parse_decimal(&upd.order.sz),
1285                        timestamp_ms: upd.status_timestamp,
1286                        cloid: upd.order.cloid,
1287                    }
1288                })
1289                .collect();
1290            if items.is_empty() {
1291                None
1292            } else {
1293                Some(stream::iter(items.into_iter().map(Ok)))
1294            }
1295        }).flatten())
1296    }
1297
1298    fn subscribe_funding_payments(&self) -> BoxStream<Result<FundingPayment, String>> {
1299        let Some(addr) = self.user_address else {
1300            return Box::pin(stream::empty());
1301        };
1302        let addr_str = format!("{:#x}", addr);
1303        let sub = serde_json::json!({
1304            "method": "subscribe",
1305            "subscription": {"type": "userEvents", "user": addr_str.clone()}
1306        });
1307        let key = crate::ws::SubKey {
1308            channel: "userEvents".to_string(),
1309            routing_key: addr_str,
1310        };
1311        let raw_stream = self.ws_mux.subscribe(key, sub);
1312        Box::pin(raw_stream.filter_map(|text| async move {
1313            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1314                return None;
1315            };
1316            if env.channel != "userEvents" {
1317                return None;
1318            }
1319            let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1320                return None;
1321            };
1322            let Some(funding) = event.funding else {
1323                return None;
1324            };
1325            let Some(amount_usd) = parse_decimal(&funding.usdc) else {
1326                return None;
1327            };
1328            let item = FundingPayment {
1329                symbol: funding.coin,
1330                amount_usd,
1331                timestamp_ms: funding.time,
1332            };
1333            Some(stream::iter(vec![Ok(item)].into_iter()))
1334        }).flatten())
1335    }
1336
1337    fn subscribe_deposits(&self) -> BoxStream<Result<Deposit, String>> {
1338        let Some(addr) = self.user_address else {
1339            return Box::pin(stream::empty());
1340        };
1341        let addr_str = format!("{:#x}", addr);
1342        let sub = serde_json::json!({
1343            "method": "subscribe",
1344            "subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
1345        });
1346        let key = crate::ws::SubKey {
1347            channel: "userNonFundingLedgerUpdates".to_string(),
1348            routing_key: addr_str,
1349        };
1350        let raw_stream = self.ws_mux.subscribe(key, sub);
1351        Box::pin(raw_stream.filter_map(|text| async move {
1352            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1353                return None;
1354            };
1355            if env.channel != "userNonFundingLedgerUpdates" {
1356                return None;
1357            }
1358            let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1359                return None;
1360            };
1361            let items: Vec<_> = ledger
1362                .updates
1363                .into_iter()
1364                .filter_map(|e| {
1365                    if e.delta.kind != "deposit" {
1366                        return None;
1367                    }
1368                    let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1369                    Some(Deposit {
1370                        asset: "USDC".to_string(),
1371                        amount_usd,
1372                        timestamp_ms: e.time,
1373                    })
1374                })
1375                .collect();
1376            if items.is_empty() {
1377                None
1378            } else {
1379                Some(stream::iter(items.into_iter().map(Ok)))
1380            }
1381        }).flatten())
1382    }
1383
1384    fn subscribe_withdrawals(&self) -> BoxStream<Result<Withdrawal, String>> {
1385        let Some(addr) = self.user_address else {
1386            return Box::pin(stream::empty());
1387        };
1388        let addr_str = format!("{:#x}", addr);
1389        let sub = serde_json::json!({
1390            "method": "subscribe",
1391            "subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
1392        });
1393        let key = crate::ws::SubKey {
1394            channel: "userNonFundingLedgerUpdates".to_string(),
1395            routing_key: addr_str,
1396        };
1397        let raw_stream = self.ws_mux.subscribe(key, sub);
1398        Box::pin(raw_stream.filter_map(|text| async move {
1399            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1400                return None;
1401            };
1402            if env.channel != "userNonFundingLedgerUpdates" {
1403                return None;
1404            }
1405            let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1406                return None;
1407            };
1408            let items: Vec<_> = ledger
1409                .updates
1410                .into_iter()
1411                .filter_map(|e| {
1412                    if e.delta.kind != "withdraw" {
1413                        return None;
1414                    }
1415                    let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1416                    Some(Withdrawal {
1417                        asset: "USDC".to_string(),
1418                        amount_usd,
1419                        timestamp_ms: e.time,
1420                    })
1421                })
1422                .collect();
1423            if items.is_empty() {
1424                None
1425            } else {
1426                Some(stream::iter(items.into_iter().map(Ok)))
1427            }
1428        }).flatten())
1429    }
1430
1431    /// Subscribe to spot wallet balance updates for the registered user address.
1432    /// Requires authentication (address must be set). Returns error if address not registered.
1433    fn subscribe_spot_balance(&self) -> BoxStream<Result<Vec<guilder_abstraction::Balance>, String>> {
1434        let Some(addr) = self.user_address else {
1435            return Box::pin(stream::iter(vec![Err("user address not registered".to_string())]));
1436        };
1437        let addr_str = format!("{:#x}", addr);
1438        self.subscribe_spot_balance_with_address(addr_str)
1439    }
1440
1441    /// Subscribe to spot wallet balance updates for a specific address.
1442    fn subscribe_spot_balance_with_address(&self, address: String) -> BoxStream<Result<Vec<guilder_abstraction::Balance>, String>> {
1443        let addr_str = address;
1444        let sub = serde_json::json!({
1445            "method": "subscribe",
1446            "subscription": {"type": "userEvents", "user": addr_str.clone()}
1447        });
1448        let key = crate::ws::SubKey {
1449            channel: "userEvents".to_string(),
1450            routing_key: addr_str,
1451        };
1452        let raw_stream = self.ws_mux.subscribe(key, sub);
1453        Box::pin(raw_stream.filter_map(|text| async move {
1454            let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1455                return None;
1456            };
1457            if env.channel != "userEvents" {
1458                return None;
1459            }
1460            let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1461                return None;
1462            };
1463
1464            // Extract spot balances from the event
1465            let spot_state = event.spot_state?;
1466            let balances = spot_state.balances?;
1467
1468            let items: Vec<_> = balances
1469                .into_iter()
1470                .filter_map(|b| {
1471                    let total = parse_decimal(&b.total)?;
1472                    let locked = parse_decimal(&b.hold)?;
1473                    let available = total - locked;
1474                    Some(guilder_abstraction::Balance {
1475                        coin: b.coin,
1476                        total,
1477                        available,
1478                        locked,
1479                    })
1480                })
1481                .collect();
1482
1483            if items.is_empty() {
1484                None
1485            } else {
1486                Some(Ok(items))
1487            }
1488        }))
1489    }
1490}