1use crate::rate_limiter::RestRateLimiter;
2use alloy_primitives::Address;
3use futures_util::{stream, SinkExt, StreamExt};
4use guilder_abstraction::{
5 self, AssetContext, BoxStream, Deposit, Fill, FundingPayment, L2Update, Liquidation, OpenOrder,
6 OrderPlacement, OrderSide, OrderStatus, OrderType, OrderUpdate, Position, PredictedFunding,
7 Side, TimeInForce, UserFill, Withdrawal,
8};
9use reqwest::Client;
10use rust_decimal::Decimal;
11use serde::Deserialize;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16use tokio_tungstenite::{connect_async, tungstenite::Message};
17
18const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
19const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
20const HYPERLIQUID_WS_URL: &str = "wss://api.hyperliquid.xyz/ws";
21
22async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
23 resp: reqwest::Response,
24) -> Result<T, String> {
25 let text = resp.text().await.map_err(|e| e.to_string())?;
26 serde_json::from_str(&text).map_err(|e| format!("{e}: {text}"))
27}
28
29pub struct HyperliquidClient {
30 client: Client,
31 user_address: Option<Address>,
32 private_key: Option<String>,
33 rest_limiter: Arc<RestRateLimiter>,
34}
35
36impl Default for HyperliquidClient {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl HyperliquidClient {
43 pub fn new() -> Self {
44 HyperliquidClient {
45 client: Client::new(),
46 user_address: None,
47 private_key: None,
48 rest_limiter: Arc::new(RestRateLimiter::new()),
49 }
50 }
51
52 pub fn with_auth(user_address: Address, private_key: String) -> Self {
53 HyperliquidClient {
54 client: Client::new(),
55 user_address: Some(user_address),
56 private_key: Some(private_key),
57 rest_limiter: Arc::new(RestRateLimiter::new()),
58 }
59 }
60
61 async fn info_post(&self, body: Value, weight: u32) -> Result<reqwest::Response, String> {
63 self.rest_limiter.acquire_blocking(weight).await;
64 self.client
65 .post(HYPERLIQUID_INFO_URL)
66 .json(&body)
67 .send()
68 .await
69 .map_err(|e| e.to_string())
70 }
71
72 async fn exchange_post(&self, body: Value, weight: u32) -> Result<reqwest::Response, String> {
75 self.rest_limiter.acquire_blocking(weight).await;
76 self.client
77 .post(HYPERLIQUID_EXCHANGE_URL)
78 .json(&body)
79 .send()
80 .await
81 .map_err(|e| e.to_string())
82 }
83
84 fn require_user_address(&self) -> Result<String, String> {
85 self.user_address
86 .map(|a| format!("{:#x}", a))
87 .ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
88 }
89
90 fn require_private_key(&self) -> Result<&str, String> {
91 self.private_key
92 .as_deref()
93 .ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
94 }
95
96 async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
97 let resp = self
99 .info_post(serde_json::json!({"type": "meta"}), 20)
100 .await?;
101 let meta: MetaResponse = parse_response(resp).await?;
102 meta.universe
103 .iter()
104 .position(|a| a.name == symbol)
105 .ok_or_else(|| format!("symbol {} not found", symbol))
106 }
107
108 async fn submit_signed_action(
109 &self,
110 action: Value,
111 vault_address: Option<&str>,
112 ) -> Result<Value, String> {
113 let private_key = self.require_private_key()?;
114 let nonce = std::time::SystemTime::now()
115 .duration_since(std::time::UNIX_EPOCH)
116 .unwrap()
117 .as_millis() as u64;
118
119 let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
120
121 let payload = serde_json::json!({
122 "action": action,
123 "nonce": nonce,
124 "signature": {"r": r, "s": s, "v": v},
125 "vaultAddress": null
126 });
127
128 let resp = self.exchange_post(payload, 1).await?;
130
131 let body: Value = parse_response(resp).await?;
132 if body["status"].as_str() == Some("err") {
133 return Err(body["response"]
134 .as_str()
135 .unwrap_or("unknown error")
136 .to_string());
137 }
138 Ok(body)
139 }
140}
141
142#[derive(Deserialize)]
145struct MetaResponse {
146 universe: Vec<AssetInfo>,
147}
148
149#[derive(Deserialize)]
150struct AssetInfo {
151 name: String,
152}
153
154type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
155
156#[derive(Deserialize)]
157#[serde(rename_all = "camelCase")]
158#[allow(dead_code)]
159struct RestAssetCtx {
160 open_interest: String,
161 funding: String,
162 mark_px: String,
163 day_ntl_vlm: String,
164 mid_px: Option<String>,
165 oracle_px: Option<String>,
166 premium: Option<String>,
167 prev_day_px: Option<String>,
168}
169
170#[derive(Deserialize)]
171#[serde(rename_all = "camelCase")]
172struct ClearinghouseStateResponse {
173 margin_summary: MarginSummary,
174 asset_positions: Vec<AssetPosition>,
175}
176
177#[derive(Deserialize)]
178#[serde(rename_all = "camelCase")]
179struct MarginSummary {
180 account_value: String,
181}
182
183#[derive(Deserialize)]
184struct AssetPosition {
185 position: PositionDetail,
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190struct PositionDetail {
191 coin: String,
192 szi: String,
194 entry_px: Option<String>,
195}
196
197#[derive(Deserialize)]
198#[serde(rename_all = "camelCase")]
199struct RestOpenOrder {
200 coin: String,
201 side: String,
202 limit_px: String,
203 sz: String,
204 oid: i64,
205 orig_sz: String,
206}
207
208type PredictedFundingsResponse = Vec<(String, Vec<(String, Option<PredictedFundingEntry>)>)>;
211
212#[derive(Deserialize)]
213#[serde(rename_all = "camelCase")]
214struct PredictedFundingEntry {
215 funding_rate: String,
216 next_funding_time: i64,
217}
218
219#[derive(Deserialize)]
222struct WsEnvelope {
223 channel: String,
224 #[serde(default)]
225 data: Value,
226}
227
228#[derive(Deserialize)]
229struct WsBook {
230 coin: String,
231 levels: Vec<Vec<WsLevel>>,
232 time: i64,
233}
234
235#[derive(Deserialize)]
236struct WsLevel {
237 px: String,
238 sz: String,
239}
240
241#[derive(Deserialize)]
242#[serde(rename_all = "camelCase")]
243struct WsAssetCtx {
244 coin: String,
245 ctx: WsPerpsCtx,
246}
247
248#[derive(Deserialize)]
249#[serde(rename_all = "camelCase")]
250struct WsPerpsCtx {
251 open_interest: String,
252 funding: String,
253 mark_px: String,
254 day_ntl_vlm: String,
255 mid_px: Option<String>,
256 oracle_px: Option<String>,
257 premium: Option<String>,
258 prev_day_px: Option<String>,
259}
260
261#[derive(Deserialize)]
262struct WsUserEvent {
263 liquidation: Option<WsLiquidation>,
264 fills: Option<Vec<WsUserFill>>,
265 funding: Option<WsFunding>,
266}
267
268#[derive(Deserialize)]
269struct WsLiquidation {
270 liquidated_user: String,
271 liquidated_ntl_pos: String,
272 liquidated_account_value: String,
273}
274
275#[derive(Deserialize)]
276struct WsUserFill {
277 coin: String,
278 px: String,
279 sz: String,
280 side: String,
281 time: i64,
282 oid: i64,
283 fee: String,
284}
285
286#[derive(Deserialize)]
287struct WsFunding {
288 time: i64,
289 coin: String,
290 usdc: String,
291}
292
293#[derive(Deserialize)]
294struct WsTrade {
295 coin: String,
296 side: String,
297 px: String,
298 sz: String,
299 time: i64,
300 tid: i64,
301}
302
303#[derive(Deserialize)]
304struct WsOrderUpdate {
305 order: WsOrderInfo,
306 status: String,
307 #[serde(rename = "statusTimestamp")]
308 status_timestamp: i64,
309}
310
311#[derive(Deserialize)]
312#[serde(rename_all = "camelCase")]
313struct WsOrderInfo {
314 coin: String,
315 side: String,
316 limit_px: String,
317 sz: String,
318 oid: i64,
319 orig_sz: String,
320}
321
322#[derive(Deserialize)]
325struct WsLedgerUpdates {
326 updates: Vec<WsLedgerEntry>,
327}
328
329#[derive(Deserialize)]
330struct WsLedgerEntry {
331 time: i64,
332 delta: WsLedgerDelta,
333}
334
335#[derive(Deserialize)]
336struct WsLedgerDelta {
337 #[serde(rename = "type")]
338 kind: String,
339 usdc: Option<String>,
340}
341
342fn parse_decimal(s: &str) -> Option<Decimal> {
345 Decimal::from_str(s).ok()
346}
347
348fn keccak256(data: &[u8]) -> [u8; 32] {
349 use sha3::{Digest, Keccak256};
350 Keccak256::digest(data).into()
351}
352
353fn hyperliquid_domain_separator() -> [u8; 32] {
355 let type_hash = keccak256(
356 b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)",
357 );
358 let name_hash = keccak256(b"Exchange");
359 let version_hash = keccak256(b"1");
360 let mut chain_id = [0u8; 32];
361 chain_id[28..32].copy_from_slice(&42161u32.to_be_bytes());
362 let verifying_contract = [0u8; 32];
363
364 let mut data = [0u8; 160];
365 data[..32].copy_from_slice(&type_hash);
366 data[32..64].copy_from_slice(&name_hash);
367 data[64..96].copy_from_slice(&version_hash);
368 data[96..128].copy_from_slice(&chain_id);
369 data[128..160].copy_from_slice(&verifying_contract);
370 keccak256(&data)
371}
372
373fn sign_action(
376 private_key: &str,
377 action: &Value,
378 vault_address: Option<&str>,
379 nonce: u64,
380) -> Result<(String, String, u8), String> {
381 use k256::ecdsa::SigningKey;
382
383 let msgpack_bytes = rmp_serde::to_vec(action).map_err(|e| e.to_string())?;
385 let mut data = msgpack_bytes;
386 data.extend_from_slice(&nonce.to_be_bytes());
387 match vault_address {
388 None => data.push(0u8),
389 Some(addr) => {
390 data.push(1u8);
391 let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
392 .map_err(|e| format!("invalid vault address: {}", e))?;
393 data.extend_from_slice(&addr_bytes);
394 }
395 }
396 let connection_id = keccak256(&data);
397
398 let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
400 let source_hash = keccak256(b"a"); let mut struct_data = [0u8; 96];
402 struct_data[..32].copy_from_slice(&agent_type_hash);
403 struct_data[32..64].copy_from_slice(&source_hash);
404 struct_data[64..96].copy_from_slice(&connection_id);
405 let struct_hash = keccak256(&struct_data);
406
407 let domain_sep = hyperliquid_domain_separator();
409 let mut final_data = Vec::with_capacity(66);
410 final_data.extend_from_slice(b"\x19\x01");
411 final_data.extend_from_slice(&domain_sep);
412 final_data.extend_from_slice(&struct_hash);
413 let final_hash = keccak256(&final_data);
414
415 let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
417 .map_err(|e| format!("invalid private key: {}", e))?;
418 let signing_key =
419 SigningKey::from_bytes(key_bytes.as_slice().into()).map_err(|e| e.to_string())?;
420 let (sig, recovery_id) = signing_key
421 .sign_prehash_recoverable(&final_hash)
422 .map_err(|e| e.to_string())?;
423
424 let sig_bytes = sig.to_bytes();
425 let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
426 let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
427 let v = 27u8 + recovery_id.to_byte();
428
429 Ok((r, s, v))
430}
431
432fn ws_subscribe<T, F>(subscription: Value, mut parse: F) -> BoxStream<Result<T, String>>
441where
442 T: Send + 'static,
443 F: FnMut(WsEnvelope) -> Vec<T> + Send + 'static,
444{
445 Box::pin(async_stream::stream! {
446 const MAX_RECONNECT_ATTEMPTS: u32 = 10;
447 const PONG_TIMEOUT_SECS: u64 = 30;
448 let mut backoff_secs: u64 = 1;
449 let mut reconnect_attempts: u32 = 0;
450 loop {
451 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
452 yield Err(format!("ws max reconnect attempts ({MAX_RECONNECT_ATTEMPTS}) reached — giving up"));
453 break;
454 }
455 let ws = match connect_async(HYPERLIQUID_WS_URL).await {
456 Ok((ws, _)) => ws,
457 Err(e) => {
458 reconnect_attempts += 1;
459 yield Err(format!("ws connect failed: {e} — reconnecting in {backoff_secs}s ({reconnect_attempts}/{MAX_RECONNECT_ATTEMPTS})"));
460 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
461 backoff_secs = (backoff_secs * 2).min(60);
462 continue;
463 }
464 };
465 let (mut sink, mut stream) = ws.split();
466 if let Err(e) = sink.send(Message::Text(subscription.to_string().into())).await {
467 reconnect_attempts += 1;
468 yield Err(format!("ws subscribe failed: {e} — reconnecting in {backoff_secs}s ({reconnect_attempts}/{MAX_RECONNECT_ATTEMPTS})"));
469 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
470 backoff_secs = (backoff_secs * 2).min(60);
471 continue;
472 }
473 backoff_secs = 1;
475 reconnect_attempts = 0;
476 let mut ping_interval = tokio::time::interval_at(
477 tokio::time::Instant::now() + std::time::Duration::from_secs(50),
478 std::time::Duration::from_secs(50),
479 );
480 let mut pong_deadline: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
481 let should_reconnect;
482 loop {
483 tokio::select! {
484 _ = ping_interval.tick() => {
485 if let Err(e) = sink.send(Message::Text(r#"{"method":"ping"}"#.to_string().into())).await {
486 yield Err(format!("ws ping failed: {e} — reconnecting in {backoff_secs}s"));
487 should_reconnect = true;
488 break;
489 }
490 pong_deadline = Some(Box::pin(tokio::time::sleep(
491 std::time::Duration::from_secs(PONG_TIMEOUT_SECS),
492 )));
493 }
494 _ = async { pong_deadline.as_mut().unwrap().await }, if pong_deadline.is_some() => {
495 yield Err(format!("ws pong timeout ({PONG_TIMEOUT_SECS}s) — reconnecting in {backoff_secs}s"));
496 should_reconnect = true;
497 break;
498 }
499 msg = stream.next() => {
500 match msg {
501 None => {
502 yield Err(format!("ws stream ended — reconnecting in {backoff_secs}s"));
503 should_reconnect = true;
504 break;
505 }
506 Some(Err(e)) => {
507 yield Err(format!("ws error: {e} — reconnecting in {backoff_secs}s"));
508 should_reconnect = true;
509 break;
510 }
511 Some(Ok(Message::Ping(data))) => { let _ = sink.send(Message::Pong(data)).await; }
512 Some(Ok(Message::Close(_))) => {
513 yield Err(format!("websocket closed — reconnecting in {backoff_secs}s"));
514 should_reconnect = true;
515 break;
516 }
517 Some(Ok(Message::Text(text))) => {
518 let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
519 yield Err(format!("unexpected ws message: {text}"));
520 continue;
521 };
522 match env.channel.as_str() {
523 "pong" => { pong_deadline = None; }
524 "subscriptionResponse" => {}
525 _ => {
526 for item in parse(env) {
527 yield Ok(item);
528 }
529 }
530 }
531 }
532 Some(Ok(_)) => {}
533 }
534 }
535 }
536 }
537 if should_reconnect {
538 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
539 backoff_secs = (backoff_secs * 2).min(60);
540 }
541 }
542 })
543}
544
545#[allow(async_fn_in_trait)]
548impl guilder_abstraction::TestServer for HyperliquidClient {
549 async fn ping(&self) -> Result<bool, String> {
551 self.info_post(serde_json::json!({"type": "allMids"}), 2)
553 .await
554 .map(|r| r.status().is_success())
555 }
556
557 async fn get_server_time(&self) -> Result<i64, String> {
559 Ok(std::time::SystemTime::now()
560 .duration_since(std::time::UNIX_EPOCH)
561 .map(|d| d.as_millis() as i64)
562 .unwrap_or(0))
563 }
564}
565
566#[allow(async_fn_in_trait)]
567impl guilder_abstraction::GetMarketData for HyperliquidClient {
568 async fn get_symbol(&self) -> Result<Vec<String>, String> {
570 let resp = self
572 .info_post(serde_json::json!({"type": "meta"}), 20)
573 .await?;
574 parse_response::<MetaResponse>(resp)
575 .await
576 .map(|r| r.universe.into_iter().map(|a| a.name).collect())
577 }
578
579 async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
581 let resp = self
583 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
584 .await?;
585 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
586 .await?
587 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
588 meta.universe
589 .iter()
590 .position(|a| a.name == symbol)
591 .and_then(|i| ctxs.get(i))
592 .and_then(|ctx| parse_decimal(&ctx.open_interest))
593 .ok_or_else(|| format!("symbol {} not found", symbol))
594 }
595
596 async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
598 let resp = self
600 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
601 .await?;
602 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
603 .await?
604 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
605 let idx = meta
606 .universe
607 .iter()
608 .position(|a| a.name == symbol)
609 .ok_or_else(|| format!("symbol {} not found", symbol))?;
610 let ctx = ctxs
611 .get(idx)
612 .ok_or_else(|| format!("symbol {} not found", symbol))?;
613 Ok(AssetContext {
614 symbol,
615 open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
616 funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
617 mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
618 day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
619 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
620 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
621 premium: ctx.premium.as_deref().and_then(parse_decimal),
622 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
623 })
624 }
625
626 async fn get_all_asset_contexts(&self) -> Result<Vec<AssetContext>, String> {
629 let resp = self
631 .info_post(serde_json::json!({"type": "metaAndAssetCtxs"}), 20)
632 .await?;
633 let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
634 .await?
635 .ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
636 let mut result = Vec::with_capacity(meta.universe.len());
637 for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
638 let Some(open_interest) = parse_decimal(&ctx.open_interest) else {
639 continue;
640 };
641 let Some(funding_rate) = parse_decimal(&ctx.funding) else {
642 continue;
643 };
644 let Some(mark_price) = parse_decimal(&ctx.mark_px) else {
645 continue;
646 };
647 let Some(day_volume) = parse_decimal(&ctx.day_ntl_vlm) else {
648 continue;
649 };
650 result.push(AssetContext {
651 symbol: asset.name.clone(),
652 open_interest,
653 funding_rate,
654 mark_price,
655 day_volume,
656 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
657 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
658 premium: ctx.premium.as_deref().and_then(parse_decimal),
659 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
660 });
661 }
662 Ok(result)
663 }
664
665 async fn get_l2_orderbook(&self, symbol: String) -> Result<Vec<L2Update>, String> {
668 let resp = self
670 .info_post(serde_json::json!({"type": "l2Book", "coin": symbol}), 2)
671 .await?;
672 let book: Option<WsBook> = parse_response(resp).await?;
673 let book = match book {
674 Some(b) => b,
675 None => return Ok(vec![]),
676 };
677 let mut levels = Vec::new();
678 for level in book.levels.first().into_iter().flatten() {
679 if let (Some(price), Some(volume)) =
680 (parse_decimal(&level.px), parse_decimal(&level.sz))
681 {
682 levels.push(L2Update {
683 symbol: book.coin.clone(),
684 price,
685 volume,
686 side: Side::Ask,
687 sequence: book.time,
688 });
689 }
690 }
691 for level in book.levels.get(1).into_iter().flatten() {
692 if let (Some(price), Some(volume)) =
693 (parse_decimal(&level.px), parse_decimal(&level.sz))
694 {
695 levels.push(L2Update {
696 symbol: book.coin.clone(),
697 price,
698 volume,
699 side: Side::Bid,
700 sequence: book.time,
701 });
702 }
703 }
704 Ok(levels)
705 }
706
707 async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
709 let resp = self
711 .info_post(serde_json::json!({"type": "allMids"}), 2)
712 .await?;
713 parse_response::<HashMap<String, String>>(resp)
714 .await?
715 .get(&symbol)
716 .and_then(|s| parse_decimal(s))
717 .ok_or_else(|| format!("symbol {} not found", symbol))
718 }
719
720 async fn get_predicted_fundings(&self) -> Result<Vec<PredictedFunding>, String> {
723 let resp = self
725 .info_post(serde_json::json!({"type": "predictedFundings"}), 20)
726 .await?;
727 let data: PredictedFundingsResponse = parse_response(resp).await?;
728 let mut result = Vec::new();
729 for (symbol, venues) in data {
730 for (venue, entry) in venues {
731 let Some(entry) = entry else { continue };
732 if let Some(funding_rate) = parse_decimal(&entry.funding_rate) {
733 result.push(PredictedFunding {
734 symbol: symbol.clone(),
735 venue,
736 funding_rate,
737 next_funding_time_ms: entry.next_funding_time,
738 });
739 }
740 }
741 }
742 Ok(result)
743 }
744}
745
746#[allow(async_fn_in_trait)]
747impl guilder_abstraction::ManageOrder for HyperliquidClient {
748 async fn place_order(
751 &self,
752 symbol: String,
753 side: OrderSide,
754 price: Decimal,
755 volume: Decimal,
756 order_type: OrderType,
757 time_in_force: TimeInForce,
758 ) -> Result<OrderPlacement, String> {
759 let asset_idx = self.get_asset_index(&symbol).await?;
760 let is_buy = matches!(side, OrderSide::Buy);
761
762 let tif_str = match time_in_force {
763 TimeInForce::Gtc => "Gtc",
764 TimeInForce::Ioc => "Ioc",
765 TimeInForce::Fok => "Fok",
766 };
767 let order_type_val = match order_type {
769 OrderType::Limit => serde_json::json!({"limit": {"tif": tif_str}}),
770 OrderType::Market => serde_json::json!({"limit": {"tif": "Ioc"}}),
771 };
772
773 let action = serde_json::json!({
774 "type": "order",
775 "orders": [{
776 "a": asset_idx,
777 "b": is_buy,
778 "p": price.to_string(),
779 "s": volume.to_string(),
780 "r": false,
781 "t": order_type_val
782 }],
783 "grouping": "na"
784 });
785
786 let resp = self.submit_signed_action(action, None).await?;
787 let oid = resp["response"]["data"]["statuses"][0]["resting"]["oid"]
788 .as_i64()
789 .or_else(|| resp["response"]["data"]["statuses"][0]["filled"]["oid"].as_i64())
790 .ok_or_else(|| format!("unexpected response: {}", resp))?;
791
792 let timestamp_ms = std::time::SystemTime::now()
793 .duration_since(std::time::UNIX_EPOCH)
794 .unwrap()
795 .as_millis() as i64;
796
797 Ok(OrderPlacement {
798 order_id: oid,
799 symbol,
800 side,
801 price,
802 quantity: volume,
803 timestamp_ms,
804 })
805 }
806
807 async fn change_order_by_cloid(
810 &self,
811 cloid: i64,
812 price: Decimal,
813 volume: Decimal,
814 ) -> Result<i64, String> {
815 let user = self.require_user_address()?;
816
817 let resp = self
819 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
820 .await?;
821 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
822 let order = orders
823 .iter()
824 .find(|o| o.oid == cloid)
825 .ok_or_else(|| format!("order {} not found", cloid))?;
826
827 let asset_idx = self.get_asset_index(&order.coin).await?;
828 let is_buy = order.side == "B";
829
830 let action = serde_json::json!({
831 "type": "batchModify",
832 "modifies": [{
833 "oid": cloid,
834 "order": {
835 "a": asset_idx,
836 "b": is_buy,
837 "p": price.to_string(),
838 "s": volume.to_string(),
839 "r": false,
840 "t": {"limit": {"tif": "Gtc"}}
841 }
842 }]
843 });
844
845 self.submit_signed_action(action, None).await?;
846 Ok(cloid)
847 }
848
849 async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
852 let user = self.require_user_address()?;
853
854 let resp = self
856 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
857 .await?;
858 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
859 let order = orders
860 .iter()
861 .find(|o| o.oid == cloid)
862 .ok_or_else(|| format!("order {} not found", cloid))?;
863
864 let asset_idx = self.get_asset_index(&order.coin).await?;
865 let action = serde_json::json!({
866 "type": "cancel",
867 "cancels": [{"a": asset_idx, "o": cloid}]
868 });
869
870 self.submit_signed_action(action, None).await?;
871 Ok(cloid)
872 }
873
874 async fn cancel_all_order(&self) -> Result<bool, String> {
877 let user = self.require_user_address()?;
878
879 let resp = self
881 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
882 .await?;
883 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
884 if orders.is_empty() {
885 return Ok(true);
886 }
887
888 let meta_resp = self
890 .info_post(serde_json::json!({"type": "meta"}), 20)
891 .await?;
892 let meta: MetaResponse = parse_response(meta_resp).await?;
893
894 let cancels: Vec<Value> = orders
895 .iter()
896 .filter_map(|o| {
897 let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
898 Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
899 })
900 .collect();
901
902 let action = serde_json::json!({"type": "cancel", "cancels": cancels});
903 self.submit_signed_action(action, None).await?;
904 Ok(true)
905 }
906}
907
908#[allow(async_fn_in_trait)]
909impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
910 fn subscribe_l2_update(&self, symbol: String) -> BoxStream<Result<L2Update, String>> {
911 let sub = serde_json::json!({
912 "method": "subscribe",
913 "subscription": {"type": "l2Book", "coin": symbol}
914 });
915 ws_subscribe(sub, |env| {
916 if env.channel != "l2Book" {
917 return vec![];
918 }
919 let Ok(book) = serde_json::from_value::<WsBook>(env.data) else {
920 return vec![];
921 };
922 let mut items = Vec::new();
923 for level in book.levels.first().into_iter().flatten() {
924 if let (Some(price), Some(volume)) =
925 (parse_decimal(&level.px), parse_decimal(&level.sz))
926 {
927 items.push(L2Update {
928 symbol: book.coin.clone(),
929 price,
930 volume,
931 side: Side::Ask,
932 sequence: book.time,
933 });
934 }
935 }
936 for level in book.levels.get(1).into_iter().flatten() {
937 if let (Some(price), Some(volume)) =
938 (parse_decimal(&level.px), parse_decimal(&level.sz))
939 {
940 items.push(L2Update {
941 symbol: book.coin.clone(),
942 price,
943 volume,
944 side: Side::Bid,
945 sequence: book.time,
946 });
947 }
948 }
949 items
950 })
951 }
952
953 fn subscribe_asset_context(&self, symbol: String) -> BoxStream<Result<AssetContext, String>> {
954 let sub = serde_json::json!({
955 "method": "subscribe",
956 "subscription": {"type": "activeAssetCtx", "coin": symbol}
957 });
958 ws_subscribe(sub, |env| {
959 if env.channel != "activeAssetCtx" {
960 return vec![];
961 }
962 let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else {
963 return vec![];
964 };
965 let ctx = &update.ctx;
966 let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
967 parse_decimal(&ctx.open_interest),
968 parse_decimal(&ctx.funding),
969 parse_decimal(&ctx.mark_px),
970 parse_decimal(&ctx.day_ntl_vlm),
971 ) else {
972 return vec![];
973 };
974 vec![AssetContext {
975 symbol: update.coin,
976 open_interest,
977 funding_rate,
978 mark_price,
979 day_volume,
980 mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
981 oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
982 premium: ctx.premium.as_deref().and_then(parse_decimal),
983 prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
984 }]
985 })
986 }
987
988 fn subscribe_liquidation(&self, user: String) -> BoxStream<Result<Liquidation, String>> {
989 let sub = serde_json::json!({
990 "method": "subscribe",
991 "subscription": {"type": "userEvents", "user": user}
992 });
993 ws_subscribe(sub, |env| {
994 if env.channel != "userEvents" {
995 return vec![];
996 }
997 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
998 return vec![];
999 };
1000 let Some(liq) = event.liquidation else {
1001 return vec![];
1002 };
1003 let (Some(notional_position), Some(account_value)) = (
1004 parse_decimal(&liq.liquidated_ntl_pos),
1005 parse_decimal(&liq.liquidated_account_value),
1006 ) else {
1007 return vec![];
1008 };
1009 vec![Liquidation {
1010 symbol: String::new(),
1011 side: OrderSide::Sell,
1012 liquidated_user: liq.liquidated_user,
1013 notional_position,
1014 account_value,
1015 }]
1016 })
1017 }
1018
1019 fn subscribe_fill(&self, symbol: String) -> BoxStream<Result<Fill, String>> {
1020 let sub = serde_json::json!({
1021 "method": "subscribe",
1022 "subscription": {"type": "trades", "coin": symbol}
1023 });
1024 ws_subscribe(sub, |env| {
1025 if env.channel != "trades" {
1026 return vec![];
1027 }
1028 let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else {
1029 return vec![];
1030 };
1031 trades
1032 .into_iter()
1033 .filter_map(|trade| {
1034 let side = if trade.side == "B" {
1035 OrderSide::Buy
1036 } else {
1037 OrderSide::Sell
1038 };
1039 let price = parse_decimal(&trade.px)?;
1040 let volume = parse_decimal(&trade.sz)?;
1041 Some(Fill {
1042 symbol: trade.coin,
1043 price,
1044 volume,
1045 side,
1046 timestamp_ms: trade.time,
1047 trade_id: trade.tid,
1048 })
1049 })
1050 .collect()
1051 })
1052 }
1053}
1054
1055#[allow(async_fn_in_trait)]
1056impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
1057 async fn get_positions(&self) -> Result<Vec<Position>, String> {
1060 let user = self.require_user_address()?;
1061 let resp = self
1063 .info_post(
1064 serde_json::json!({"type": "clearinghouseState", "user": user}),
1065 2,
1066 )
1067 .await?;
1068 let state: ClearinghouseStateResponse = parse_response(resp).await?;
1069
1070 Ok(state
1071 .asset_positions
1072 .into_iter()
1073 .filter_map(|ap| {
1074 let p = ap.position;
1075 let size = parse_decimal(&p.szi)?;
1076 if size.is_zero() {
1077 return None;
1078 }
1079 let entry_price = p
1080 .entry_px
1081 .as_deref()
1082 .and_then(parse_decimal)
1083 .unwrap_or_default();
1084 let side = if size > Decimal::ZERO {
1085 OrderSide::Buy
1086 } else {
1087 OrderSide::Sell
1088 };
1089 Some(Position {
1090 symbol: p.coin,
1091 side,
1092 size: size.abs(),
1093 entry_price,
1094 })
1095 })
1096 .collect())
1097 }
1098
1099 async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
1102 let user = self.require_user_address()?;
1103 let resp = self
1105 .info_post(serde_json::json!({"type": "openOrders", "user": user}), 20)
1106 .await?;
1107 let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
1108
1109 Ok(orders
1110 .into_iter()
1111 .filter_map(|o| {
1112 let price = parse_decimal(&o.limit_px)?;
1113 let quantity = parse_decimal(&o.orig_sz)?;
1114 let remaining = parse_decimal(&o.sz)?;
1115 let filled_quantity = quantity - remaining;
1116 let side = if o.side == "B" {
1117 OrderSide::Buy
1118 } else {
1119 OrderSide::Sell
1120 };
1121 Some(OpenOrder {
1122 order_id: o.oid,
1123 symbol: o.coin,
1124 side,
1125 price,
1126 quantity,
1127 filled_quantity,
1128 })
1129 })
1130 .collect())
1131 }
1132
1133 async fn get_collateral(&self) -> Result<Decimal, String> {
1135 let user = self.require_user_address()?;
1136 let resp = self
1138 .info_post(
1139 serde_json::json!({"type": "clearinghouseState", "user": user}),
1140 2,
1141 )
1142 .await?;
1143 let state: ClearinghouseStateResponse = parse_response(resp).await?;
1144 parse_decimal(&state.margin_summary.account_value)
1145 .ok_or_else(|| "invalid account value".to_string())
1146 }
1147}
1148
1149#[allow(async_fn_in_trait)]
1150impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
1151 fn subscribe_user_fills(&self) -> BoxStream<Result<UserFill, String>> {
1152 let Some(addr) = self.user_address else {
1153 return Box::pin(stream::empty());
1154 };
1155 let sub = serde_json::json!({
1156 "method": "subscribe",
1157 "subscription": {"type": "userEvents", "user": format!("{:#x}", addr)}
1158 });
1159 ws_subscribe(sub, |env| {
1160 if env.channel != "userEvents" {
1161 return vec![];
1162 }
1163 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1164 return vec![];
1165 };
1166 event
1167 .fills
1168 .unwrap_or_default()
1169 .into_iter()
1170 .filter_map(|fill| {
1171 let side = if fill.side == "B" {
1172 OrderSide::Buy
1173 } else {
1174 OrderSide::Sell
1175 };
1176 let price = parse_decimal(&fill.px)?;
1177 let quantity = parse_decimal(&fill.sz)?;
1178 let fee_usd = parse_decimal(&fill.fee)?;
1179 Some(UserFill {
1180 order_id: fill.oid,
1181 symbol: fill.coin,
1182 side,
1183 price,
1184 quantity,
1185 fee_usd,
1186 timestamp_ms: fill.time,
1187 })
1188 })
1189 .collect()
1190 })
1191 }
1192
1193 fn subscribe_order_updates(&self) -> BoxStream<Result<OrderUpdate, String>> {
1194 let Some(addr) = self.user_address else {
1195 return Box::pin(stream::empty());
1196 };
1197 let sub = serde_json::json!({
1198 "method": "subscribe",
1199 "subscription": {"type": "orderUpdates", "user": format!("{:#x}", addr)}
1200 });
1201 ws_subscribe(sub, |env| {
1202 if env.channel != "orderUpdates" {
1203 return vec![];
1204 }
1205 let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else {
1206 return vec![];
1207 };
1208 updates
1209 .into_iter()
1210 .map(|upd| {
1211 let status = match upd.status.as_str() {
1212 "open" => OrderStatus::Placed,
1213 "filled" => OrderStatus::Filled,
1214 "canceled" | "cancelled" => OrderStatus::Cancelled,
1215 _ => OrderStatus::PartiallyFilled,
1216 };
1217 let side = if upd.order.side == "B" {
1218 OrderSide::Buy
1219 } else {
1220 OrderSide::Sell
1221 };
1222 OrderUpdate {
1223 order_id: upd.order.oid,
1224 symbol: upd.order.coin,
1225 status,
1226 side: Some(side),
1227 price: parse_decimal(&upd.order.limit_px),
1228 quantity: parse_decimal(&upd.order.orig_sz),
1229 remaining_quantity: parse_decimal(&upd.order.sz),
1230 timestamp_ms: upd.status_timestamp,
1231 }
1232 })
1233 .collect()
1234 })
1235 }
1236
1237 fn subscribe_funding_payments(&self) -> BoxStream<Result<FundingPayment, String>> {
1238 let Some(addr) = self.user_address else {
1239 return Box::pin(stream::empty());
1240 };
1241 let sub = serde_json::json!({
1242 "method": "subscribe",
1243 "subscription": {"type": "userEvents", "user": format!("{:#x}", addr)}
1244 });
1245 ws_subscribe(sub, |env| {
1246 if env.channel != "userEvents" {
1247 return vec![];
1248 }
1249 let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
1250 return vec![];
1251 };
1252 let Some(funding) = event.funding else {
1253 return vec![];
1254 };
1255 let Some(amount_usd) = parse_decimal(&funding.usdc) else {
1256 return vec![];
1257 };
1258 vec![FundingPayment {
1259 symbol: funding.coin,
1260 amount_usd,
1261 timestamp_ms: funding.time,
1262 }]
1263 })
1264 }
1265
1266 fn subscribe_deposits(&self) -> BoxStream<Result<Deposit, String>> {
1267 let Some(addr) = self.user_address else {
1268 return Box::pin(stream::empty());
1269 };
1270 let sub = serde_json::json!({
1271 "method": "subscribe",
1272 "subscription": {"type": "userNonFundingLedgerUpdates", "user": format!("{:#x}", addr)}
1273 });
1274 ws_subscribe(sub, |env| {
1275 if env.channel != "userNonFundingLedgerUpdates" {
1276 return vec![];
1277 }
1278 let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1279 return vec![];
1280 };
1281 ledger
1282 .updates
1283 .into_iter()
1284 .filter_map(|e| {
1285 if e.delta.kind != "deposit" {
1286 return None;
1287 }
1288 let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1289 Some(Deposit {
1290 asset: "USDC".to_string(),
1291 amount_usd,
1292 timestamp_ms: e.time,
1293 })
1294 })
1295 .collect()
1296 })
1297 }
1298
1299 fn subscribe_withdrawals(&self) -> BoxStream<Result<Withdrawal, String>> {
1300 let Some(addr) = self.user_address else {
1301 return Box::pin(stream::empty());
1302 };
1303 let sub = serde_json::json!({
1304 "method": "subscribe",
1305 "subscription": {"type": "userNonFundingLedgerUpdates", "user": format!("{:#x}", addr)}
1306 });
1307 ws_subscribe(sub, |env| {
1308 if env.channel != "userNonFundingLedgerUpdates" {
1309 return vec![];
1310 }
1311 let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
1312 return vec![];
1313 };
1314 ledger
1315 .updates
1316 .into_iter()
1317 .filter_map(|e| {
1318 if e.delta.kind != "withdraw" {
1319 return None;
1320 }
1321 let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
1322 Some(Withdrawal {
1323 asset: "USDC".to_string(),
1324 amount_usd,
1325 timestamp_ms: e.time,
1326 })
1327 })
1328 .collect()
1329 })
1330 }
1331}