1use crate::rate_limiter::RestRateLimiter;
2use alloy_primitives::Address;
3use futures_util::{stream, StreamExt};
4use guilder_abstraction::{
5 self, AssetContext, BoxStream, Deposit, Fill, FundingPayment, L2Update, Liquidation, OpenOrder,
6 OrderPlacement, OrderSide, OrderStatus, OrderType, OrderUpdate, Position, PredictedFunding,
7 Side, TimeInForce, UserFill, Withdrawal,
8};
9use reqwest::Client;
10use rust_decimal::Decimal;
11use serde::Deserialize;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
17const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
18
19async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
20 resp: reqwest::Response,
21) -> Result<T, String> {
22 let text = resp.text().await.map_err(|e| e.to_string())?;
23 serde_json::from_str(&text).map_err(|e| format!("{e}: {text}"))
24}
25
26pub struct HyperliquidClient {
27 client: Client,
28 user_address: Option<Address>,
29 private_key: Option<String>,
30 rest_limiter: Arc<RestRateLimiter>,
31 ws_mux: crate::ws::WsMux,
32}
33
34impl Default for HyperliquidClient {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl HyperliquidClient {
41 pub fn new() -> Self {
42 HyperliquidClient {
43 client: Client::new(),
44 user_address: None,
45 private_key: None,
46 rest_limiter: Arc::new(RestRateLimiter::new()),
47 ws_mux: crate::ws::WsMux::new(),
48 }
49 }
50
51 pub fn with_auth(user_address: Address, private_key: String) -> Self {
52 HyperliquidClient {
53 client: Client::new(),
54 user_address: Some(user_address),
55 private_key: Some(private_key),
56 rest_limiter: Arc::new(RestRateLimiter::new()),
57 ws_mux: crate::ws::WsMux::new(),
58 }
59 }
60
61 async fn info_post(&self, body: Value, weight: u32) -> Result<reqwest::Response, String> {
63 self.rest_limiter.acquire_blocking(weight).await;
64 self.client
65 .post(HYPERLIQUID_INFO_URL)
66 .json(&body)
67 .send()
68 .await
69 .map_err(|e| e.to_string())
70 }
71
72 async fn exchange_post(&self, body: Value, weight: u32) -> Result<reqwest::Response, String> {
75 self.rest_limiter.acquire_blocking(weight).await;
76 self.client
77 .post(HYPERLIQUID_EXCHANGE_URL)
78 .json(&body)
79 .send()
80 .await
81 .map_err(|e| e.to_string())
82 }
83
84 fn require_user_address(&self) -> Result<String, String> {
85 self.user_address
86 .map(|a| format!("{:#x}", a))
87 .ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
88 }
89
90 fn require_private_key(&self) -> Result<&str, String> {
91 self.private_key
92 .as_deref()
93 .ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
94 }
95
96 async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
97 let resp = self
99 .info_post(serde_json::json!({"type": "meta"}), 20)
100 .await?;
101 let meta: MetaResponse = parse_response(resp).await?;
102 meta.universe
103 .iter()
104 .position(|a| a.name == symbol)
105 .ok_or_else(|| format!("symbol {} not found", symbol))
106 }
107
108 async fn submit_signed_action(
109 &self,
110 action: Value,
111 vault_address: Option<&str>,
112 ) -> Result<Value, String> {
113 let private_key = self.require_private_key()?;
114 let nonce = std::time::SystemTime::now()
115 .duration_since(std::time::UNIX_EPOCH)
116 .unwrap()
117 .as_millis() as u64;
118
119 let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
120
121 let payload = serde_json::json!({
122 "action": action,
123 "nonce": nonce,
124 "signature": {"r": r, "s": s, "v": v},
125 "vaultAddress": null
126 });
127
128 let resp = self.exchange_post(payload, 1).await?;
130
131 let body: Value = parse_response(resp).await?;
132 if body["status"].as_str() == Some("err") {
133 return Err(body["response"]
134 .as_str()
135 .unwrap_or("unknown error")
136 .to_string());
137 }
138 Ok(body)
139 }
140}
141
142#[derive(Deserialize)]
145struct MetaResponse {
146 universe: Vec<AssetInfo>,
147}
148
149#[derive(Deserialize)]
150struct AssetInfo {
151 name: String,
152}
153
154type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
155
156#[derive(Deserialize)]
157#[serde(rename_all = "camelCase")]
158#[allow(dead_code)]
159struct RestAssetCtx {
160 open_interest: String,
161 funding: String,
162 mark_px: String,
163 day_ntl_vlm: String,
164 mid_px: Option<String>,
165 oracle_px: Option<String>,
166 premium: Option<String>,
167 prev_day_px: Option<String>,
168}
169
170#[derive(Deserialize)]
171#[serde(rename_all = "camelCase")]
172struct ClearinghouseStateResponse {
173 margin_summary: MarginSummary,
174 asset_positions: Vec<AssetPosition>,
175}
176
177#[derive(Deserialize)]
178#[serde(rename_all = "camelCase")]
179struct MarginSummary {
180 account_value: String,
181}
182
183#[derive(Deserialize)]
184struct AssetPosition {
185 position: PositionDetail,
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190struct PositionDetail {
191 coin: String,
192 szi: String,
194 entry_px: Option<String>,
195}
196
197#[derive(Deserialize)]
198#[serde(rename_all = "camelCase")]
199struct RestOpenOrder {
200 coin: String,
201 side: String,
202 limit_px: String,
203 sz: String,
204 oid: i64,
205 orig_sz: String,
206}
207
208type PredictedFundingsResponse = Vec<(String, Vec<(String, Option<PredictedFundingEntry>)>)>;
211
212#[derive(Deserialize)]
213#[serde(rename_all = "camelCase")]
214struct PredictedFundingEntry {
215 funding_rate: String,
216 next_funding_time: i64,
217}
218
219#[derive(Deserialize)]
222struct WsEnvelope {
223 channel: String,
224 #[serde(default)]
225 data: Value,
226}
227
228#[derive(Deserialize)]
229struct WsBook {
230 coin: String,
231 levels: Vec<Vec<WsLevel>>,
232 time: i64,
233}
234
235#[derive(Deserialize)]
236struct WsLevel {
237 px: String,
238 sz: String,
239}
240
241#[derive(Deserialize)]
242#[serde(rename_all = "camelCase")]
243struct WsAssetCtx {
244 coin: String,
245 ctx: WsPerpsCtx,
246}
247
248#[derive(Deserialize)]
249#[serde(rename_all = "camelCase")]
250struct WsPerpsCtx {
251 open_interest: String,
252 funding: String,
253 mark_px: String,
254 day_ntl_vlm: String,
255 mid_px: Option<String>,
256 oracle_px: Option<String>,
257 premium: Option<String>,
258 prev_day_px: Option<String>,
259}
260
261#[derive(Deserialize)]
262struct WsUserEvent {
263 liquidation: Option<WsLiquidation>,
264 fills: Option<Vec<WsUserFill>>,
265 funding: Option<WsFunding>,
266}
267
268#[derive(Deserialize)]
269struct WsLiquidation {
270 liquidated_user: String,
271 liquidated_ntl_pos: String,
272 liquidated_account_value: String,
273}
274
275#[derive(Deserialize)]
276struct WsUserFill {
277 coin: String,
278 px: String,
279 sz: String,
280 side: String,
281 time: i64,
282 oid: i64,
283 fee: String,
284}
285
286#[derive(Deserialize)]
287struct WsFunding {
288 time: i64,
289 coin: String,
290 usdc: String,
291}
292
293#[derive(Deserialize)]
294struct WsTrade {
295 coin: String,
296 side: String,
297 px: String,
298 sz: String,
299 time: i64,
300 tid: i64,
301}
302
303#[derive(Deserialize)]
304struct WsOrderUpdate {
305 order: WsOrderInfo,
306 status: String,
307 #[serde(rename = "statusTimestamp")]
308 status_timestamp: i64,
309}
310
311#[derive(Deserialize)]
312#[serde(rename_all = "camelCase")]
313struct WsOrderInfo {
314 coin: String,
315 side: String,
316 limit_px: String,
317 sz: String,
318 oid: i64,
319 orig_sz: String,
320}
321
322#[derive(Deserialize)]
325struct WsLedgerUpdates {
326 updates: Vec<WsLedgerEntry>,
327}
328
329#[derive(Deserialize)]
330struct WsLedgerEntry {
331 time: i64,
332 delta: WsLedgerDelta,
333}
334
335#[derive(Deserialize)]
336struct WsLedgerDelta {
337 #[serde(rename = "type")]
338 kind: String,
339 usdc: Option<String>,
340}
341
342fn parse_decimal(s: &str) -> Option<Decimal> {
345 Decimal::from_str(s).ok()
346}
347
348fn keccak256(data: &[u8]) -> [u8; 32] {
349 use sha3::{Digest, Keccak256};
350 Keccak256::digest(data).into()
351}
352
353fn hyperliquid_domain_separator() -> [u8; 32] {
355 let type_hash = keccak256(
356 b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)",
357 );
358 let name_hash = keccak256(b"Exchange");
359 let version_hash = keccak256(b"1");
360 let mut chain_id = [0u8; 32];
361 chain_id[28..32].copy_from_slice(&42161u32.to_be_bytes());
362 let verifying_contract = [0u8; 32];
363
364 let mut data = [0u8; 160];
365 data[..32].copy_from_slice(&type_hash);
366 data[32..64].copy_from_slice(&name_hash);
367 data[64..96].copy_from_slice(&version_hash);
368 data[96..128].copy_from_slice(&chain_id);
369 data[128..160].copy_from_slice(&verifying_contract);
370 keccak256(&data)
371}
372
373fn sign_action(
376 private_key: &str,
377 action: &Value,
378 vault_address: Option<&str>,
379 nonce: u64,
380) -> Result<(String, String, u8), String> {
381 use k256::ecdsa::SigningKey;
382
383 let msgpack_bytes = rmp_serde::to_vec(action).map_err(|e| e.to_string())?;
385 let mut data = msgpack_bytes;
386 data.extend_from_slice(&nonce.to_be_bytes());
387 match vault_address {
388 None => data.push(0u8),
389 Some(addr) => {
390 data.push(1u8);
391 let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
392 .map_err(|e| format!("invalid vault address: {}", e))?;
393 data.extend_from_slice(&addr_bytes);
394 }
395 }
396 let connection_id = keccak256(&data);
397
398 let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
400 let source_hash = keccak256(b"a"); let mut struct_data = [0u8; 96];
402 struct_data[..32].copy_from_slice(&agent_type_hash);
403 struct_data[32..64].copy_from_slice(&source_hash);
404 struct_data[64..96].copy_from_slice(&connection_id);
405 let struct_hash = keccak256(&struct_data);
406
407 let domain_sep = hyperliquid_domain_separator();
409 let mut final_data = Vec::with_capacity(66);
410 final_data.extend_from_slice(b"\x19\x01");
411 final_data.extend_from_slice(&domain_sep);
412 final_data.extend_from_slice(&struct_hash);
413 let final_hash = keccak256(&final_data);
414
415 let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
417 .map_err(|e| format!("invalid private key: {}", e))?;
418 let signing_key =
419 SigningKey::from_bytes(key_bytes.as_slice().into()).map_err(|e| e.to_string())?;
420 let (sig, recovery_id) = signing_key
421 .sign_prehash_recoverable(&final_hash)
422 .map_err(|e| e.to_string())?;
423
424 let sig_bytes = sig.to_bytes();
425 let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
426 let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
427 let v = 27u8 + recovery_id.to_byte();
428
429 Ok((r, s, v))
430}
431
432#[allow(async_fn_in_trait)]
435impl guilder_abstraction::TestServer for HyperliquidClient {
436 async fn ping(&self) -> Result<bool, String> {
438 self.info_post(serde_json::json!({"type": "allMids"}), 2)
440 .await
441 .map(|r| r.status().is_success())
442 }
443
444 async fn get_server_time(&self) -> Result<i64, String> {
446 Ok(std::time::SystemTime::now()
447 .duration_since(std::time::UNIX_EPOCH)
448 .map(|d| d.as_millis() as i64)
449 .unwrap_or(0))
450 }
451}
452
453#[allow(async_fn_in_trait)]
454impl guilder_abstraction::GetMarketData for HyperliquidClient {
455 async fn get_symbol(&self) -> Result<Vec<String>, String> {
457 let resp = self
459 .info_post(serde_json::json!({"type": "meta"}), 20)
460 .await?;
461 parse_response::<MetaResponse>(resp)
462 .await
463 .map(|r| r.universe.into_iter().map(|a| a.name).collect())
464 }
465
466 async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
468 let resp = self
470 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
471 .await?;
472 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
473 .await?
474 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
475 meta.universe
476 .iter()
477 .position(|a| a.name == symbol)
478 .and_then(|i| ctxs.get(i))
479 .and_then(|ctx| parse_decimal(&ctx.open_interest))
480 .ok_or_else(|| format!("symbol {} not found", symbol))
481 }
482
483 async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
485 let resp = self
487 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
488 .await?;
489 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
490 .await?
491 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
492 let idx = meta
493 .universe
494 .iter()
495 .position(|a| a.name == symbol)
496 .ok_or_else(|| format!("symbol {} not found", symbol))?;
497 let ctx = ctxs
498 .get(idx)
499 .ok_or_else(|| format!("symbol {} not found", symbol))?;
500 Ok(AssetContext {
501 symbol,
502 open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
503 funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
504 mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
505 day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
506 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
507 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
508 premium: ctx.premium.as_deref().and_then(parse_decimal),
509 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
510 })
511 }
512
513 async fn get_all_asset_contexts(&self) -> Result<Vec<AssetContext>, String> {
516 let resp = self
518 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
519 .await?;
520 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
521 .await?
522 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
523 let mut result = Vec::with_capacity(meta.universe.len());
524 for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
525 let Some(open_interest) = parse_decimal(&ctx.open_interest) else {
526 continue;
527 };
528 let Some(funding_rate) = parse_decimal(&ctx.funding) else {
529 continue;
530 };
531 let Some(mark_price) = parse_decimal(&ctx.mark_px) else {
532 continue;
533 };
534 let Some(day_volume) = parse_decimal(&ctx.day_ntl_vlm) else {
535 continue;
536 };
537 result.push(AssetContext {
538 symbol: asset.name.clone(),
539 open_interest,
540 funding_rate,
541 mark_price,
542 day_volume,
543 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
544 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
545 premium: ctx.premium.as_deref().and_then(parse_decimal),
546 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
547 });
548 }
549 Ok(result)
550 }
551
552 async fn get_l2_orderbook(&self, symbol: String) -> Result<Vec<L2Update>, String> {
555 let resp = self
557 .info_post(serde_json::json!({"type": "l2Book", "coin": symbol}), 2)
558 .await?;
559 let book: Option<WsBook> = parse_response(resp).await?;
560 let book = match book {
561 Some(b) => b,
562 None => return Ok(vec![]),
563 };
564 let mut levels = Vec::new();
565 for level in book.levels.first().into_iter().flatten() {
566 if let (Some(price), Some(volume)) =
567 (parse_decimal(&level.px), parse_decimal(&level.sz))
568 {
569 levels.push(L2Update {
570 symbol: book.coin.clone(),
571 price,
572 volume,
573 side: Side::Ask,
574 sequence: book.time,
575 });
576 }
577 }
578 for level in book.levels.get(1).into_iter().flatten() {
579 if let (Some(price), Some(volume)) =
580 (parse_decimal(&level.px), parse_decimal(&level.sz))
581 {
582 levels.push(L2Update {
583 symbol: book.coin.clone(),
584 price,
585 volume,
586 side: Side::Bid,
587 sequence: book.time,
588 });
589 }
590 }
591 Ok(levels)
592 }
593
594 async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
596 let resp = self
598 .info_post(serde_json::json!({"type": "allMids"}), 2)
599 .await?;
600 parse_response::<HashMap<String, String>>(resp)
601 .await?
602 .get(&symbol)
603 .and_then(|s| parse_decimal(s))
604 .ok_or_else(|| format!("symbol {} not found", symbol))
605 }
606
607 async fn get_predicted_fundings(&self) -> Result<Vec<PredictedFunding>, String> {
610 let resp = self
612 .info_post(serde_json::json!({"type": "predictedFundings"}), 20)
613 .await?;
614 let data: PredictedFundingsResponse = parse_response(resp).await?;
615 let mut result = Vec::new();
616 for (symbol, venues) in data {
617 for (venue, entry) in venues {
618 let Some(entry) = entry else { continue };
619 if let Some(funding_rate) = parse_decimal(&entry.funding_rate) {
620 result.push(PredictedFunding {
621 symbol: symbol.clone(),
622 venue,
623 funding_rate,
624 next_funding_time_ms: entry.next_funding_time,
625 });
626 }
627 }
628 }
629 Ok(result)
630 }
631}
632
633#[allow(async_fn_in_trait)]
634impl guilder_abstraction::ManageOrder for HyperliquidClient {
635 async fn place_order(
638 &self,
639 symbol: String,
640 side: OrderSide,
641 price: Decimal,
642 volume: Decimal,
643 order_type: OrderType,
644 time_in_force: TimeInForce,
645 ) -> Result<OrderPlacement, String> {
646 let asset_idx = self.get_asset_index(&symbol).await?;
647 let is_buy = matches!(side, OrderSide::Buy);
648
649 let tif_str = match time_in_force {
650 TimeInForce::Gtc => "Gtc",
651 TimeInForce::Ioc => "Ioc",
652 TimeInForce::Fok => "Fok",
653 };
654 let order_type_val = match order_type {
656 OrderType::Limit => serde_json::json!({"limit": {"tif": tif_str}}),
657 OrderType::Market => serde_json::json!({"limit": {"tif": "Ioc"}}),
658 };
659
660 let action = serde_json::json!({
661 "type": "order",
662 "orders": [{
663 "a": asset_idx,
664 "b": is_buy,
665 "p": price.to_string(),
666 "s": volume.to_string(),
667 "r": false,
668 "t": order_type_val
669 }],
670 "grouping": "na"
671 });
672
673 let resp = self.submit_signed_action(action, None).await?;
674 let oid = resp["response"]["data"]["statuses"][0]["resting"]["oid"]
675 .as_i64()
676 .or_else(|| resp["response"]["data"]["statuses"][0]["filled"]["oid"].as_i64())
677 .ok_or_else(|| format!("unexpected response: {}", resp))?;
678
679 let timestamp_ms = std::time::SystemTime::now()
680 .duration_since(std::time::UNIX_EPOCH)
681 .unwrap()
682 .as_millis() as i64;
683
684 Ok(OrderPlacement {
685 order_id: oid,
686 symbol,
687 side,
688 price,
689 quantity: volume,
690 timestamp_ms,
691 })
692 }
693
694 async fn change_order_by_cloid(
697 &self,
698 cloid: i64,
699 price: Decimal,
700 volume: Decimal,
701 ) -> Result<i64, String> {
702 let user = self.require_user_address()?;
703
704 let resp = self
706 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
707 .await?;
708 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
709 let order = orders
710 .iter()
711 .find(|o| o.oid == cloid)
712 .ok_or_else(|| format!("order {} not found", cloid))?;
713
714 let asset_idx = self.get_asset_index(&order.coin).await?;
715 let is_buy = order.side == "B";
716
717 let action = serde_json::json!({
718 "type": "batchModify",
719 "modifies": [{
720 "oid": cloid,
721 "order": {
722 "a": asset_idx,
723 "b": is_buy,
724 "p": price.to_string(),
725 "s": volume.to_string(),
726 "r": false,
727 "t": {"limit": {"tif": "Gtc"}}
728 }
729 }]
730 });
731
732 self.submit_signed_action(action, None).await?;
733 Ok(cloid)
734 }
735
736 async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
739 let user = self.require_user_address()?;
740
741 let resp = self
743 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
744 .await?;
745 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
746 let order = orders
747 .iter()
748 .find(|o| o.oid == cloid)
749 .ok_or_else(|| format!("order {} not found", cloid))?;
750
751 let asset_idx = self.get_asset_index(&order.coin).await?;
752 let action = serde_json::json!({
753 "type": "cancel",
754 "cancels": [{"a": asset_idx, "o": cloid}]
755 });
756
757 self.submit_signed_action(action, None).await?;
758 Ok(cloid)
759 }
760
761 async fn cancel_all_order(&self) -> Result<bool, String> {
764 let user = self.require_user_address()?;
765
766 let resp = self
768 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
769 .await?;
770 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
771 if orders.is_empty() {
772 return Ok(true);
773 }
774
775 let meta_resp = self
777 .info_post(serde_json::json!({"type": "meta"}), 20)
778 .await?;
779 let meta: MetaResponse = parse_response(meta_resp).await?;
780
781 let cancels: Vec<Value> = orders
782 .iter()
783 .filter_map(|o| {
784 let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
785 Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
786 })
787 .collect();
788
789 let action = serde_json::json!({"type": "cancel", "cancels": cancels});
790 self.submit_signed_action(action, None).await?;
791 Ok(true)
792 }
793}
794
795#[allow(async_fn_in_trait)]
796impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
797 fn subscribe_l2_update(&self, symbol: String) -> BoxStream<Result<L2Update, String>> {
798 let sub = serde_json::json!({
799 "method": "subscribe",
800 "subscription": {"type": "l2Book", "coin": symbol.clone()}
801 });
802 let key = crate::ws::SubKey {
803 channel: "l2Book".to_string(),
804 routing_key: symbol,
805 };
806 let stream = self.ws_mux.subscribe(key, sub);
807 Box::pin(async_stream::stream! {
808 for await msg in stream {
809 let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
810 continue;
811 };
812 if env.channel != "l2Book" {
813 continue;
814 }
815 let Ok(book) = serde_json::from_value::<WsBook>(env.data) else {
816 continue;
817 };
818 for level in book.levels.first().into_iter().flatten() {
819 if let (Some(price), Some(volume)) =
820 (parse_decimal(&level.px), parse_decimal(&level.sz))
821 {
822 yield Ok(L2Update {
823 symbol: book.coin.clone(),
824 price,
825 volume,
826 side: Side::Ask,
827 sequence: book.time,
828 });
829 }
830 }
831 for level in book.levels.get(1).into_iter().flatten() {
832 if let (Some(price), Some(volume)) =
833 (parse_decimal(&level.px), parse_decimal(&level.sz))
834 {
835 yield Ok(L2Update {
836 symbol: book.coin.clone(),
837 price,
838 volume,
839 side: Side::Bid,
840 sequence: book.time,
841 });
842 }
843 }
844 }
845 })
846 }
847
848 fn subscribe_asset_context(&self, symbol: String) -> BoxStream<Result<AssetContext, String>> {
849 let sub = serde_json::json!({
850 "method": "subscribe",
851 "subscription": {"type": "activeAssetCtx", "coin": symbol.clone()}
852 });
853 let key = crate::ws::SubKey {
854 channel: "activeAssetCtx".to_string(),
855 routing_key: symbol,
856 };
857 let stream = self.ws_mux.subscribe(key, sub);
858 Box::pin(async_stream::stream! {
859 for await msg in stream {
860 let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
861 continue;
862 };
863 if env.channel != "activeAssetCtx" {
864 continue;
865 }
866 let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else {
867 continue;
868 };
869 let ctx = &update.ctx;
870 let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
871 parse_decimal(&ctx.open_interest),
872 parse_decimal(&ctx.funding),
873 parse_decimal(&ctx.mark_px),
874 parse_decimal(&ctx.day_ntl_vlm),
875 ) else {
876 continue;
877 };
878 yield Ok(AssetContext {
879 symbol: update.coin,
880 open_interest,
881 funding_rate,
882 mark_price,
883 day_volume,
884 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
885 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
886 premium: ctx.premium.as_deref().and_then(parse_decimal),
887 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
888 });
889 }
890 })
891 }
892
893 fn subscribe_liquidation(&self, user: String) -> BoxStream<Result<Liquidation, String>> {
894 let sub = serde_json::json!({
895 "method": "subscribe",
896 "subscription": {"type": "userEvents", "user": user.clone()}
897 });
898 let key = crate::ws::SubKey {
899 channel: "userEvents".to_string(),
900 routing_key: user,
901 };
902 let raw_stream = self.ws_mux.subscribe(key, sub);
903 Box::pin(raw_stream.filter_map(|text| async move {
904 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
905 return None;
906 };
907 if env.channel != "userEvents" {
908 return None;
909 }
910 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
911 return None;
912 };
913 let Some(liq) = event.liquidation else {
914 return None;
915 };
916 let (Some(notional_position), Some(account_value)) = (
917 parse_decimal(&liq.liquidated_ntl_pos),
918 parse_decimal(&liq.liquidated_account_value),
919 ) else {
920 return None;
921 };
922 let item = Liquidation {
923 symbol: String::new(),
924 side: OrderSide::Sell,
925 liquidated_user: liq.liquidated_user,
926 notional_position,
927 account_value,
928 };
929 Some(stream::iter(vec![Ok(item)].into_iter()))
930 }).flatten())
931 }
932
933 fn subscribe_fill(&self, symbol: String) -> BoxStream<Result<Fill, String>> {
934 let sub = serde_json::json!({
935 "method": "subscribe",
936 "subscription": {"type": "trades", "coin": symbol.clone()}
937 });
938 let key = crate::ws::SubKey {
939 channel: "trades".to_string(),
940 routing_key: symbol,
941 };
942 let stream = self.ws_mux.subscribe(key, sub);
943 Box::pin(async_stream::stream! {
944 for await msg in stream {
945 let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
946 continue;
947 };
948 if env.channel != "trades" {
949 continue;
950 }
951 let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else {
952 continue;
953 };
954 for trade in trades {
955 let side = if trade.side == "B" {
956 OrderSide::Buy
957 } else {
958 OrderSide::Sell
959 };
960 let price = parse_decimal(&trade.px);
961 let volume = parse_decimal(&trade.sz);
962 if let (Some(price), Some(volume)) = (price, volume) {
963 yield Ok(Fill {
964 symbol: trade.coin,
965 price,
966 volume,
967 side,
968 timestamp_ms: trade.time,
969 trade_id: trade.tid,
970 });
971 }
972 }
973 }
974 })
975 }
976}
977
978#[allow(async_fn_in_trait)]
979impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
980 async fn get_positions(&self) -> Result<Vec<Position>, String> {
983 let user = self.require_user_address()?;
984 let resp = self
986 .info_post(
987 serde_json::json!({"type": "clearinghouseState", "user": user}),
988 2,
989 )
990 .await?;
991 let state: ClearinghouseStateResponse = parse_response(resp).await?;
992
993 Ok(state
994 .asset_positions
995 .into_iter()
996 .filter_map(|ap| {
997 let p = ap.position;
998 let size = parse_decimal(&p.szi)?;
999 if size.is_zero() {
1000 return None;
1001 }
1002 let entry_price = p
1003 .entry_px
1004 .as_deref()
1005 .and_then(parse_decimal)
1006 .unwrap_or_default();
1007 let side = if size > Decimal::ZERO {
1008 OrderSide::Buy
1009 } else {
1010 OrderSide::Sell
1011 };
1012 Some(Position {
1013 symbol: p.coin,
1014 side,
1015 size: size.abs(),
1016 entry_price,
1017 })
1018 })
1019 .collect())
1020 }
1021
1022 async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
1025 let user = self.require_user_address()?;
1026 let resp = self
1028 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
1029 .await?;
1030 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
1031
1032 Ok(orders
1033 .into_iter()
1034 .filter_map(|o| {
1035 let price = parse_decimal(&o.limit_px)?;
1036 let quantity = parse_decimal(&o.orig_sz)?;
1037 let remaining = parse_decimal(&o.sz)?;
1038 let filled_quantity = quantity - remaining;
1039 let side = if o.side == "B" {
1040 OrderSide::Buy
1041 } else {
1042 OrderSide::Sell
1043 };
1044 Some(OpenOrder {
1045 order_id: o.oid,
1046 symbol: o.coin,
1047 side,
1048 price,
1049 quantity,
1050 filled_quantity,
1051 })
1052 })
1053 .collect())
1054 }
1055
1056 async fn get_collateral(&self) -> Result<Decimal, String> {
1058 let user = self.require_user_address()?;
1059 let resp = self
1061 .info_post(
1062 serde_json::json!({"type": "clearinghouseState", "user": user}),
1063 2,
1064 )
1065 .await?;
1066 let state: ClearinghouseStateResponse = parse_response(resp).await?;
1067 parse_decimal(&state.margin_summary.account_value)
1068 .ok_or_else(|| "invalid account value".to_string())
1069 }
1070}
1071
1072#[allow(async_fn_in_trait)]
1073impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
1074 fn subscribe_user_fills(&self) -> BoxStream<Result<UserFill, String>> {
1075 let Some(addr) = self.user_address else {
1076 return Box::pin(stream::empty());
1077 };
1078 let addr_str = format!("{:#x}", addr);
1079 let sub = serde_json::json!({
1080 "method": "subscribe",
1081 "subscription": {"type": "userEvents", "user": addr_str.clone()}
1082 });
1083 let key = crate::ws::SubKey {
1084 channel: "userEvents".to_string(),
1085 routing_key: addr_str,
1086 };
1087 let raw_stream = self.ws_mux.subscribe(key, sub);
1088 Box::pin(raw_stream.filter_map(|text| async move {
1089 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1090 return None;
1091 };
1092 if env.channel != "userEvents" {
1093 return None;
1094 }
1095 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1096 return None;
1097 };
1098 let items: Vec<_> = event
1099 .fills
1100 .unwrap_or_default()
1101 .into_iter()
1102 .filter_map(|fill| {
1103 let side = if fill.side == "B" {
1104 OrderSide::Buy
1105 } else {
1106 OrderSide::Sell
1107 };
1108 let price = parse_decimal(&fill.px)?;
1109 let quantity = parse_decimal(&fill.sz)?;
1110 let fee_usd = parse_decimal(&fill.fee)?;
1111 Some(UserFill {
1112 order_id: fill.oid,
1113 symbol: fill.coin,
1114 side,
1115 price,
1116 quantity,
1117 fee_usd,
1118 timestamp_ms: fill.time,
1119 })
1120 })
1121 .collect();
1122 if items.is_empty() {
1123 None
1124 } else {
1125 Some(stream::iter(items.into_iter().map(Ok)))
1126 }
1127 }).flatten())
1128 }
1129
1130 fn subscribe_order_updates(&self) -> BoxStream<Result<OrderUpdate, String>> {
1131 let Some(addr) = self.user_address else {
1132 return Box::pin(stream::empty());
1133 };
1134 let addr_str = format!("{:#x}", addr);
1135 let sub = serde_json::json!({
1136 "method": "subscribe",
1137 "subscription": {"type": "orderUpdates", "user": addr_str.clone()}
1138 });
1139 let key = crate::ws::SubKey {
1140 channel: "orderUpdates".to_string(),
1141 routing_key: addr_str,
1142 };
1143 let raw_stream = self.ws_mux.subscribe(key, sub);
1144 Box::pin(raw_stream.filter_map(|text| async move {
1145 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1146 return None;
1147 };
1148 if env.channel != "orderUpdates" {
1149 return None;
1150 }
1151 let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else {
1152 return None;
1153 };
1154 let items: Vec<_> = updates
1155 .into_iter()
1156 .map(|upd| {
1157 let status = match upd.status.as_str() {
1158 "open" => OrderStatus::Placed,
1159 "filled" => OrderStatus::Filled,
1160 "canceled" | "cancelled" => OrderStatus::Cancelled,
1161 _ => OrderStatus::PartiallyFilled,
1162 };
1163 let side = if upd.order.side == "B" {
1164 OrderSide::Buy
1165 } else {
1166 OrderSide::Sell
1167 };
1168 OrderUpdate {
1169 order_id: upd.order.oid,
1170 symbol: upd.order.coin,
1171 status,
1172 side: Some(side),
1173 price: parse_decimal(&upd.order.limit_px),
1174 quantity: parse_decimal(&upd.order.orig_sz),
1175 remaining_quantity: parse_decimal(&upd.order.sz),
1176 timestamp_ms: upd.status_timestamp,
1177 }
1178 })
1179 .collect();
1180 if items.is_empty() {
1181 None
1182 } else {
1183 Some(stream::iter(items.into_iter().map(Ok)))
1184 }
1185 }).flatten())
1186 }
1187
1188 fn subscribe_funding_payments(&self) -> BoxStream<Result<FundingPayment, String>> {
1189 let Some(addr) = self.user_address else {
1190 return Box::pin(stream::empty());
1191 };
1192 let addr_str = format!("{:#x}", addr);
1193 let sub = serde_json::json!({
1194 "method": "subscribe",
1195 "subscription": {"type": "userEvents", "user": addr_str.clone()}
1196 });
1197 let key = crate::ws::SubKey {
1198 channel: "userEvents".to_string(),
1199 routing_key: addr_str,
1200 };
1201 let raw_stream = self.ws_mux.subscribe(key, sub);
1202 Box::pin(raw_stream.filter_map(|text| async move {
1203 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1204 return None;
1205 };
1206 if env.channel != "userEvents" {
1207 return None;
1208 }
1209 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1210 return None;
1211 };
1212 let Some(funding) = event.funding else {
1213 return None;
1214 };
1215 let Some(amount_usd) = parse_decimal(&funding.usdc) else {
1216 return None;
1217 };
1218 let item = FundingPayment {
1219 symbol: funding.coin,
1220 amount_usd,
1221 timestamp_ms: funding.time,
1222 };
1223 Some(stream::iter(vec![Ok(item)].into_iter()))
1224 }).flatten())
1225 }
1226
1227 fn subscribe_deposits(&self) -> BoxStream<Result<Deposit, String>> {
1228 let Some(addr) = self.user_address else {
1229 return Box::pin(stream::empty());
1230 };
1231 let addr_str = format!("{:#x}", addr);
1232 let sub = serde_json::json!({
1233 "method": "subscribe",
1234 "subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
1235 });
1236 let key = crate::ws::SubKey {
1237 channel: "userNonFundingLedgerUpdates".to_string(),
1238 routing_key: addr_str,
1239 };
1240 let raw_stream = self.ws_mux.subscribe(key, sub);
1241 Box::pin(raw_stream.filter_map(|text| async move {
1242 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1243 return None;
1244 };
1245 if env.channel != "userNonFundingLedgerUpdates" {
1246 return None;
1247 }
1248 let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1249 return None;
1250 };
1251 let items: Vec<_> = ledger
1252 .updates
1253 .into_iter()
1254 .filter_map(|e| {
1255 if e.delta.kind != "deposit" {
1256 return None;
1257 }
1258 let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1259 Some(Deposit {
1260 asset: "USDC".to_string(),
1261 amount_usd,
1262 timestamp_ms: e.time,
1263 })
1264 })
1265 .collect();
1266 if items.is_empty() {
1267 None
1268 } else {
1269 Some(stream::iter(items.into_iter().map(Ok)))
1270 }
1271 }).flatten())
1272 }
1273
1274 fn subscribe_withdrawals(&self) -> BoxStream<Result<Withdrawal, String>> {
1275 let Some(addr) = self.user_address else {
1276 return Box::pin(stream::empty());
1277 };
1278 let addr_str = format!("{:#x}", addr);
1279 let sub = serde_json::json!({
1280 "method": "subscribe",
1281 "subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
1282 });
1283 let key = crate::ws::SubKey {
1284 channel: "userNonFundingLedgerUpdates".to_string(),
1285 routing_key: addr_str,
1286 };
1287 let raw_stream = self.ws_mux.subscribe(key, sub);
1288 Box::pin(raw_stream.filter_map(|text| async move {
1289 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
1290 return None;
1291 };
1292 if env.channel != "userNonFundingLedgerUpdates" {
1293 return None;
1294 }
1295 let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1296 return None;
1297 };
1298 let items: Vec<_> = ledger
1299 .updates
1300 .into_iter()
1301 .filter_map(|e| {
1302 if e.delta.kind != "withdraw" {
1303 return None;
1304 }
1305 let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1306 Some(Withdrawal {
1307 asset: "USDC".to_string(),
1308 amount_usd,
1309 timestamp_ms: e.time,
1310 })
1311 })
1312 .collect();
1313 if items.is_empty() {
1314 None
1315 } else {
1316 Some(stream::iter(items.into_iter().map(Ok)))
1317 }
1318 }).flatten())
1319 }
1320}