use crate::rate_limiter::{AddressRateLimiter, RestRateLimiter};
use futures_util::{stream, StreamExt};
use guilder_abstraction::{
self, AssetContext, BoxStream, Deposit, Fill, FundingPayment, L2Update, Liquidation, OpenOrder,
OrderPlacement, OrderSide, OrderStatus, OrderType, OrderUpdate, Position, PredictedFunding,
Side, TimeInForce, UserFill, Withdrawal,
};
use reqwest::Client;
use rust_decimal::Decimal;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
const HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
const HYPERLIQUID_EXCHANGE_URL: &str = "https://api.hyperliquid.xyz/exchange";
async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
resp: reqwest::Response,
) -> Result<T, String> {
let status = resp.status();
let text = resp.text().await.map_err(|e| {
format!("failed to read response body (status {status}): {e}")
})?;
if text.is_empty() {
return Err(format!(
"empty response body from Hyperliquid (HTTP {status})"
));
}
serde_json::from_str(&text).map_err(|e| {
let snippet = if text.len() > 512 {
format!("{}... ({} bytes total)", &text[..256], text.len())
} else {
text.clone()
};
format!("deserialize error (HTTP {status}): {e}: {snippet}")
})
}
pub struct HyperliquidClient {
client: Client,
user_address: Option<String>,
private_key: Option<String>,
rest_limiter: Arc<RestRateLimiter>,
address_limiter: Arc<AddressRateLimiter>,
ws_mux: crate::ws::WsMux,
}
impl Default for HyperliquidClient {
fn default() -> Self {
Self::new()
}
}
impl HyperliquidClient {
pub fn new() -> Self {
HyperliquidClient {
client: Client::new(),
user_address: None,
private_key: None,
rest_limiter: Arc::new(RestRateLimiter::new()),
address_limiter: Arc::new(AddressRateLimiter::new()),
ws_mux: crate::ws::WsMux::new(),
}
}
pub fn with_auth(user_address: impl Into<String>, private_key: String) -> Self {
HyperliquidClient {
client: Client::new(),
user_address: Some(user_address.into()),
private_key: Some(private_key),
rest_limiter: Arc::new(RestRateLimiter::new()),
address_limiter: Arc::new(AddressRateLimiter::new()),
ws_mux: crate::ws::WsMux::new(),
}
}
pub fn with_budgets(mut self, rest_weight: u32, addr_budget: u64) -> Self {
self.rest_limiter = Arc::new(RestRateLimiter::new_with_budget(rest_weight));
self.address_limiter = Arc::new(AddressRateLimiter::new_with_budget(addr_budget));
self
}
async fn info_post(
&self,
body: Value,
weight: u32,
call: &str,
) -> Result<reqwest::Response, String> {
self.rest_limiter.acquire(weight).await.map_err(|e| {
format!(
"rate_limited: info_post ({call}) budget exhausted, retry_after_ms={}",
e.retry_after.as_millis()
)
})?;
self.client
.post(HYPERLIQUID_INFO_URL)
.json(&body)
.send()
.await
.map_err(|e| e.to_string())
}
fn require_user_address(&self) -> Result<String, String> {
self.user_address
.clone()
.ok_or_else(|| "user address required: use HyperliquidClient::with_auth".to_string())
}
fn require_private_key(&self) -> Result<&str, String> {
self.private_key
.as_deref()
.ok_or_else(|| "private key required: use HyperliquidClient::with_auth".to_string())
}
async fn get_asset_index(&self, symbol: &str) -> Result<usize, String> {
let resp = self
.info_post(serde_json::json!({"type": "meta"}), 20, "get_asset_index")
.await?;
let meta: MetaResponse = parse_response(resp).await?;
meta.universe
.iter()
.position(|a| a.name == symbol)
.ok_or_else(|| format!("symbol {} not found", symbol))
}
async fn submit_signed_action(
&self,
action: Value,
vault_address: Option<&str>,
) -> Result<Value, String> {
let private_key = self.require_private_key()?;
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let (r, s, v) = sign_action(private_key, &action, vault_address, nonce)?;
let payload = serde_json::json!({
"action": action,
"nonce": nonce,
"signature": {"r": r, "s": s, "v": v},
"vaultAddress": null,
"expiresAfter": null
});
self.rest_limiter.acquire(1).await.map_err(|e| {
format!(
"rate_limited: rest_weight exhausted, retry_after_ms={}",
e.retry_after.as_millis()
)
})?;
self.address_limiter.acquire(1, false).await.map_err(|e| {
format!(
"rate_limited: address quota exhausted, retry_after_ms={}",
e.retry_after.as_millis()
)
})?;
let resp = self
.client
.post(HYPERLIQUID_EXCHANGE_URL)
.json(&payload)
.send()
.await
.map_err(|e| e.to_string())?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.map_err(|e| e.to_string())?;
return Err(format!("HTTP {status}: {text}"));
}
let body: Value = parse_response(resp).await?;
if body["status"].as_str() == Some("err") {
return Err(body["response"]
.as_str()
.unwrap_or("unknown error")
.to_string());
}
Ok(body)
}
}
#[derive(Deserialize)]
struct MetaResponse {
universe: Vec<AssetInfo>,
}
#[derive(Deserialize)]
struct AssetInfo {
name: String,
}
type MetaAndAssetCtxsResponse = (MetaResponse, Vec<RestAssetCtx>);
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(dead_code)]
struct RestAssetCtx {
open_interest: String,
funding: String,
mark_px: String,
day_ntl_vlm: String,
mid_px: Option<String>,
oracle_px: Option<String>,
premium: Option<String>,
prev_day_px: Option<String>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClearinghouseStateResponse {
margin_summary: MarginSummary,
asset_positions: Vec<AssetPosition>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct MarginSummary {
account_value: String,
}
#[derive(Deserialize)]
struct AssetPosition {
position: PositionDetail,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct PositionDetail {
coin: String,
szi: String,
entry_px: Option<String>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RestOpenOrder {
coin: String,
side: String,
limit_px: String,
sz: String,
oid: i64,
orig_sz: String,
}
type PredictedFundingsResponse = Vec<(String, Vec<(String, Option<PredictedFundingEntry>)>)>;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct PredictedFundingEntry {
funding_rate: String,
next_funding_time: i64,
}
#[derive(Deserialize)]
struct WsEnvelope {
channel: String,
#[serde(default)]
data: Value,
}
#[derive(Deserialize)]
struct WsBook {
coin: String,
levels: Vec<Vec<WsLevel>>,
time: i64,
}
#[derive(Deserialize)]
struct WsLevel {
px: String,
sz: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WsAssetCtx {
coin: String,
ctx: WsPerpsCtx,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WsPerpsCtx {
open_interest: String,
funding: String,
mark_px: String,
day_ntl_vlm: String,
mid_px: Option<String>,
oracle_px: Option<String>,
premium: Option<String>,
prev_day_px: Option<String>,
}
#[derive(Deserialize)]
struct WsUserEvent {
liquidation: Option<WsLiquidation>,
fills: Option<Vec<WsUserFill>>,
funding: Option<WsFunding>,
spot_state: Option<WsSpotState>,
}
#[derive(Deserialize)]
struct WsSpotState {
balances: Option<Vec<WsSpotBalance>>,
}
#[derive(Deserialize)]
struct WsSpotBalance {
coin: String,
total: String,
hold: String,
}
#[derive(Deserialize)]
struct WsLiquidation {
liquidated_user: String,
liquidated_ntl_pos: String,
liquidated_account_value: String,
}
#[derive(Deserialize)]
struct WsUserFill {
coin: String,
px: String,
sz: String,
side: String,
time: i64,
oid: i64,
fee: String,
#[serde(default)]
cloid: Option<String>,
}
#[derive(Deserialize)]
struct WsFunding {
time: i64,
coin: String,
usdc: String,
}
#[derive(Deserialize)]
struct WsTrade {
coin: String,
side: String,
px: String,
sz: String,
time: i64,
tid: i64,
}
#[derive(Deserialize)]
struct WsOrderUpdate {
order: WsOrderInfo,
status: String,
#[serde(rename = "statusTimestamp")]
status_timestamp: i64,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WsOrderInfo {
coin: String,
side: String,
limit_px: String,
sz: String,
oid: i64,
orig_sz: String,
#[serde(default)]
cloid: Option<String>,
}
#[derive(Deserialize)]
struct WsLedgerUpdates {
updates: Vec<WsLedgerEntry>,
}
#[derive(Deserialize)]
struct WsLedgerEntry {
time: i64,
delta: WsLedgerDelta,
}
#[derive(Deserialize)]
struct WsLedgerDelta {
#[serde(rename = "type")]
kind: String,
usdc: Option<String>,
}
fn parse_decimal(s: &str) -> Option<Decimal> {
Decimal::from_str(s).ok()
}
fn keccak256(data: &[u8]) -> [u8; 32] {
use sha3::{Digest, Keccak256};
Keccak256::digest(data).into()
}
fn hyperliquid_domain_separator() -> [u8; 32] {
let type_hash = keccak256(
b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)",
);
let name_hash = keccak256(b"Exchange");
let version_hash = keccak256(b"1");
let mut chain_id = [0u8; 32];
chain_id[28..32].copy_from_slice(&1337u32.to_be_bytes());
let verifying_contract = [0u8; 32];
let mut data = [0u8; 160];
data[..32].copy_from_slice(&type_hash);
data[32..64].copy_from_slice(&name_hash);
data[64..96].copy_from_slice(&version_hash);
data[96..128].copy_from_slice(&chain_id);
data[128..160].copy_from_slice(&verifying_contract);
keccak256(&data)
}
fn value_to_msgpack(val: &Value) -> Vec<u8> {
match val {
Value::Null => vec![0xc0],
Value::Bool(true) => vec![0xc3],
Value::Bool(false) => vec![0xc2],
Value::Number(n) => {
if let Some(i) = n.as_i64() {
if i >= 0 {
if i <= 127 {
vec![i as u8]
} else if i <= 255 {
vec![0xcc, i as u8]
} else if i <= 65535 {
let mut buf = vec![0xcd];
buf.extend_from_slice(&(i as u16).to_be_bytes());
buf
} else if i <= 4294967295 {
let mut buf = vec![0xce];
buf.extend_from_slice(&(i as u32).to_be_bytes());
buf
} else {
let mut buf = vec![0xcf];
buf.extend_from_slice(&(i as u64).to_be_bytes());
buf
}
} else if i >= -32 {
vec![0xe0 | (i as u8)]
} else if i >= -128 {
vec![0xd0, i as i8 as u8]
} else if i >= -32768 {
let mut buf = vec![0xd1];
buf.extend_from_slice(&(i as i16).to_be_bytes());
buf
} else if i >= -2147483648 {
let mut buf = vec![0xd2];
buf.extend_from_slice(&(i as i32).to_be_bytes());
buf
} else {
let mut buf = vec![0xd3];
buf.extend_from_slice(&i.to_be_bytes());
buf
}
} else if let Some(f) = n.as_f64() {
let mut buf = vec![0xcb];
buf.extend_from_slice(&f.to_be_bytes());
buf
} else {
let u = n.as_u64().unwrap();
if u <= 127 {
vec![u as u8]
} else if u <= 255 {
vec![0xcc, u as u8]
} else if u <= 65535 {
let mut buf = vec![0xcd];
buf.extend_from_slice(&(u as u16).to_be_bytes());
buf
} else if u <= 4294967295 {
let mut buf = vec![0xce];
buf.extend_from_slice(&(u as u32).to_be_bytes());
buf
} else {
let mut buf = vec![0xcf];
buf.extend_from_slice(&u.to_be_bytes());
buf
}
}
}
Value::String(s) => {
let bytes = s.as_bytes();
let len = bytes.len();
let mut buf = Vec::new();
if len <= 31 {
buf.push(0xa0 | (len as u8));
} else if len <= 255 {
buf.push(0xd9);
buf.push(len as u8);
} else if len <= 65535 {
buf.push(0xda);
buf.extend_from_slice(&(len as u16).to_be_bytes());
} else {
buf.push(0xdb);
buf.extend_from_slice(&(len as u32).to_be_bytes());
}
buf.extend_from_slice(bytes);
buf
}
Value::Array(arr) => {
let len = arr.len();
let mut buf = Vec::new();
if len <= 15 {
buf.push(0x90 | (len as u8));
} else if len <= 65535 {
buf.push(0xdc);
buf.extend_from_slice(&(len as u16).to_be_bytes());
} else {
buf.push(0xdd);
buf.extend_from_slice(&(len as u32).to_be_bytes());
}
for item in arr {
buf.extend_from_slice(&value_to_msgpack(item));
}
buf
}
Value::Object(map) => {
let len = map.len();
let mut buf = Vec::new();
if len <= 15 {
buf.push(0x80 | (len as u8));
} else if len <= 65535 {
buf.push(0xde);
buf.extend_from_slice(&(len as u16).to_be_bytes());
} else {
buf.push(0xdf);
buf.extend_from_slice(&(len as u32).to_be_bytes());
}
for (key, value) in map {
buf.extend_from_slice(&value_to_msgpack(&Value::String(key.clone())));
buf.extend_from_slice(&value_to_msgpack(value));
}
buf
}
}
}
fn action_to_canonical_msgpack(action: &Value) -> Result<Vec<u8>, String> {
Ok(value_to_msgpack(action))
}
fn build_order_msgpack(
asset_idx: usize,
is_buy: bool,
price: &str,
size: &str,
reduce_only: bool,
order_kind: &str,
tif: &[u8],
cloid: Option<&str>,
) -> Vec<u8> {
let field_count = if cloid.is_some() { 7 } else { 6 };
let mut buf = Vec::new();
buf.push(0x80 | (field_count as u8));
buf.extend_from_slice(&value_to_msgpack(&Value::String("a".to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::Number(serde_json::Number::from(asset_idx))));
buf.extend_from_slice(&value_to_msgpack(&Value::String("b".to_string())));
buf.push(if is_buy { 0xc3 } else { 0xc2 });
buf.extend_from_slice(&value_to_msgpack(&Value::String("p".to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String(price.to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String("s".to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String(size.to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String("r".to_string())));
buf.push(if reduce_only { 0xc3 } else { 0xc2 });
buf.extend_from_slice(&value_to_msgpack(&Value::String("t".to_string())));
buf.push(0x81);
buf.extend_from_slice(&value_to_msgpack(&Value::String(order_kind.to_string())));
buf.push(0x81);
buf.extend_from_slice(&value_to_msgpack(&Value::String("tif".to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String(String::from_utf8_lossy(tif).to_string())));
if let Some(c) = cloid {
buf.extend_from_slice(&value_to_msgpack(&Value::String("c".to_string())));
buf.extend_from_slice(&value_to_msgpack(&Value::String(c.to_string())));
}
buf
}
fn sign_with_msgpack(
msgpack: &[u8],
private_key: &str,
nonce: u64,
vault_address: Option<&str>,
) -> Result<(String, String, u8), String> {
use k256::ecdsa::SigningKey;
let mut data = msgpack.to_vec();
data.extend_from_slice(&nonce.to_be_bytes());
match vault_address {
None => data.push(0u8),
Some(addr) => {
data.push(1u8);
let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
.map_err(|e| format!("invalid vault address: {}", e))?;
data.extend_from_slice(&addr_bytes);
}
}
let connection_id = keccak256(&data);
let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
let source_hash = keccak256(b"a");
let mut struct_data = [0u8; 96];
struct_data[..32].copy_from_slice(&agent_type_hash);
struct_data[32..64].copy_from_slice(&source_hash);
struct_data[64..96].copy_from_slice(&connection_id);
let struct_hash = keccak256(&struct_data);
let domain_sep = hyperliquid_domain_separator();
let mut final_data = Vec::with_capacity(66);
final_data.extend_from_slice(b"\x19\x01");
final_data.extend_from_slice(&domain_sep);
final_data.extend_from_slice(&struct_hash);
let final_hash = keccak256(&final_data);
let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
.map_err(|e| format!("invalid private key: {}", e))?;
let signing_key =
SigningKey::from_bytes(key_bytes.as_slice().into()).map_err(|e| e.to_string())?;
let (sig, recovery_id) = signing_key
.sign_prehash_recoverable(&final_hash)
.map_err(|e| e.to_string())?;
let sig_bytes = sig.to_bytes();
let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
let v = 27u8 + recovery_id.to_byte();
Ok((r, s, v))
}
fn sign_action(
private_key: &str,
action: &Value,
vault_address: Option<&str>,
nonce: u64,
) -> Result<(String, String, u8), String> {
use k256::ecdsa::SigningKey;
let msgpack_bytes = action_to_canonical_msgpack(action)?;
let mut data = msgpack_bytes;
data.extend_from_slice(&nonce.to_be_bytes());
match vault_address {
None => data.push(0u8),
Some(addr) => {
data.push(1u8);
let addr_bytes = hex::decode(addr.trim_start_matches("0x"))
.map_err(|e| format!("invalid vault address: {}", e))?;
data.extend_from_slice(&addr_bytes);
}
}
let connection_id = keccak256(&data);
let agent_type_hash = keccak256(b"Agent(string source,bytes32 connectionId)");
let source_hash = keccak256(b"a"); let mut struct_data = [0u8; 96];
struct_data[..32].copy_from_slice(&agent_type_hash);
struct_data[32..64].copy_from_slice(&source_hash);
struct_data[64..96].copy_from_slice(&connection_id);
let struct_hash = keccak256(&struct_data);
let domain_sep = hyperliquid_domain_separator();
let mut final_data = Vec::with_capacity(66);
final_data.extend_from_slice(b"\x19\x01");
final_data.extend_from_slice(&domain_sep);
final_data.extend_from_slice(&struct_hash);
let final_hash = keccak256(&final_data);
let key_bytes = hex::decode(private_key.trim_start_matches("0x"))
.map_err(|e| format!("invalid private key: {}", e))?;
let signing_key =
SigningKey::from_bytes(key_bytes.as_slice().into()).map_err(|e| e.to_string())?;
let (sig, recovery_id) = signing_key
.sign_prehash_recoverable(&final_hash)
.map_err(|e| e.to_string())?;
let sig_bytes = sig.to_bytes();
let r = format!("0x{}", hex::encode(&sig_bytes[..32]));
let s = format!("0x{}", hex::encode(&sig_bytes[32..64]));
let v = 27u8 + recovery_id.to_byte();
Ok((r, s, v))
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::TestServer for HyperliquidClient {
async fn ping(&self) -> Result<bool, String> {
self.info_post(serde_json::json!({"type": "allMids"}), 2, "ping")
.await
.map(|r| r.status().is_success())
}
async fn get_server_time(&self) -> Result<i64, String> {
Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0))
}
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::GetMarketData for HyperliquidClient {
async fn get_symbol(&self) -> Result<Vec<String>, String> {
let resp = self
.info_post(serde_json::json!({"type": "meta"}), 20, "get_symbol")
.await?;
parse_response::<MetaResponse>(resp)
.await
.map(|r| r.universe.into_iter().map(|a| a.name).collect())
}
async fn get_open_interest(&self, symbol: String) -> Result<Decimal, String> {
let resp = self
.info_post(
serde_json::json!({"type": "metaAndAssetCtxs"}),
20,
"get_open_interest",
)
.await?;
let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
.await?
.ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
meta.universe
.iter()
.position(|a| a.name == symbol)
.and_then(|i| ctxs.get(i))
.and_then(|ctx| parse_decimal(&ctx.open_interest))
.ok_or_else(|| format!("symbol {} not found", symbol))
}
async fn get_asset_context(&self, symbol: String) -> Result<AssetContext, String> {
let resp = self
.info_post(
serde_json::json!({"type": "metaAndAssetCtxs"}),
20,
"get_asset_context",
)
.await?;
let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
.await?
.ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
let idx = meta
.universe
.iter()
.position(|a| a.name == symbol)
.ok_or_else(|| format!("symbol {} not found", symbol))?;
let ctx = ctxs
.get(idx)
.ok_or_else(|| format!("symbol {} not found", symbol))?;
Ok(AssetContext {
symbol,
open_interest: parse_decimal(&ctx.open_interest).ok_or("invalid open_interest")?,
funding_rate: parse_decimal(&ctx.funding).ok_or("invalid funding")?,
mark_price: parse_decimal(&ctx.mark_px).ok_or("invalid mark_px")?,
day_volume: parse_decimal(&ctx.day_ntl_vlm).ok_or("invalid day_ntl_vlm")?,
mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
premium: ctx.premium.as_deref().and_then(parse_decimal),
prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
})
}
async fn get_all_asset_contexts(&self) -> Result<Vec<AssetContext>, String> {
let resp = self
.info_post(
serde_json::json!({"type": "metaAndAssetCtxs"}),
20,
"get_all_asset_contexts",
)
.await?;
let (meta, ctxs) = parse_response::<Option<MetaAndAssetCtxsResponse>>(resp)
.await?
.ok_or_else(|| "metaAndAssetCtxs returned null".to_string())?;
let mut result = Vec::with_capacity(meta.universe.len());
for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
let Some(open_interest) = parse_decimal(&ctx.open_interest) else {
continue;
};
let Some(funding_rate) = parse_decimal(&ctx.funding) else {
continue;
};
let Some(mark_price) = parse_decimal(&ctx.mark_px) else {
continue;
};
let Some(day_volume) = parse_decimal(&ctx.day_ntl_vlm) else {
continue;
};
result.push(AssetContext {
symbol: asset.name.clone(),
open_interest,
funding_rate,
mark_price,
day_volume,
mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
premium: ctx.premium.as_deref().and_then(parse_decimal),
prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
});
}
Ok(result)
}
async fn get_l2_orderbook(&self, symbol: String) -> Result<Vec<L2Update>, String> {
let resp = self
.info_post(
serde_json::json!({"type": "l2Book", "coin": symbol}),
2,
"get_l2_orderbook",
)
.await?;
let book: Option<WsBook> = parse_response(resp).await?;
let book = match book {
Some(b) => b,
None => return Ok(vec![]),
};
let mut levels = Vec::new();
for level in book.levels.first().into_iter().flatten() {
if let (Some(price), Some(volume)) =
(parse_decimal(&level.px), parse_decimal(&level.sz))
{
levels.push(L2Update {
symbol: book.coin.clone(),
price,
volume,
side: Side::Ask,
sequence: book.time,
});
}
}
for level in book.levels.get(1).into_iter().flatten() {
if let (Some(price), Some(volume)) =
(parse_decimal(&level.px), parse_decimal(&level.sz))
{
levels.push(L2Update {
symbol: book.coin.clone(),
price,
volume,
side: Side::Bid,
sequence: book.time,
});
}
}
Ok(levels)
}
async fn get_price(&self, symbol: String) -> Result<Decimal, String> {
let resp = self
.info_post(serde_json::json!({"type": "allMids"}), 2, "get_price")
.await?;
parse_response::<HashMap<String, String>>(resp)
.await?
.get(&symbol)
.and_then(|s| parse_decimal(s))
.ok_or_else(|| format!("symbol {} not found", symbol))
}
async fn get_predicted_fundings(&self) -> Result<Vec<PredictedFunding>, String> {
let resp = self
.info_post(
serde_json::json!({"type": "predictedFundings"}),
20,
"get_predicted_fundings",
)
.await?;
let data: PredictedFundingsResponse = parse_response(resp).await?;
let mut result = Vec::new();
for (symbol, venues) in data {
for (venue, entry) in venues {
let Some(entry) = entry else { continue };
if let Some(funding_rate) = parse_decimal(&entry.funding_rate) {
result.push(PredictedFunding {
symbol: symbol.clone(),
venue,
funding_rate,
next_funding_time_ms: entry.next_funding_time,
});
}
}
}
Ok(result)
}
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::ManageOrder for HyperliquidClient {
async fn place_order(
&self,
symbol: String,
side: OrderSide,
price: Decimal,
volume: Decimal,
order_type: OrderType,
time_in_force: TimeInForce,
cloid: Option<String>,
) -> Result<OrderPlacement, String> {
let asset_idx = self.get_asset_index(&symbol).await?;
let is_buy = matches!(side, OrderSide::Buy);
let tif_str = match time_in_force {
TimeInForce::Gtc => "Gtc",
TimeInForce::Ioc => "Ioc",
TimeInForce::Fok => "Fok",
};
let (order_kind, tif_bytes) = match order_type {
OrderType::Limit => ("limit", tif_str.as_bytes()),
OrderType::Market => ("limit", b"Ioc".as_slice()),
};
let price_str = price.to_string();
let size_str = volume.to_string();
let cloid_hex = cloid.as_ref().map(|c| {
let hash = keccak256(c.as_bytes());
format!("0x{}", hex::encode(&hash[..16]))
});
let order_msgpack = build_order_msgpack(
asset_idx,
is_buy,
&price_str,
&size_str,
false, order_kind,
tif_bytes,
cloid_hex.as_deref(),
);
let mut action_msgpack = Vec::new();
action_msgpack.push(0x83); action_msgpack.extend_from_slice(&value_to_msgpack(&Value::String("type".to_string())));
action_msgpack.extend_from_slice(&value_to_msgpack(&Value::String("order".to_string())));
action_msgpack.extend_from_slice(&value_to_msgpack(&Value::String("orders".to_string())));
action_msgpack.push(0x91); action_msgpack.extend_from_slice(&order_msgpack);
action_msgpack.extend_from_slice(&value_to_msgpack(&Value::String("grouping".to_string())));
action_msgpack.extend_from_slice(&value_to_msgpack(&Value::String("na".to_string())));
let order_type_json = match order_type {
OrderType::Limit => format!(r#"{{"limit":{{"tif":"{tif_str}"}}}}"#),
OrderType::Market => r#"{"limit":{"tif":"Ioc"}}"#.to_string(),
};
let cloid_json = if let Some(ref c) = cloid_hex {
format!(r#","c":"{c}""#)
} else {
String::new()
};
let action_json_str = format!(
r#"{{"type":"order","orders":[{{"a":{asset_idx},"b":{is_buy},"p":"{price}","s":"{size}","r":false,"t":{order_type_json}{cloid_json}}}],"grouping":"na"}}"#,
price = price_str,
size = size_str,
);
let private_key = self.require_private_key()?;
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let (r, s, v) = sign_with_msgpack(&action_msgpack, private_key, nonce, None)?;
let payload_str = format!(
r#"{{"action":{},"nonce":{},"signature":{{"r":"{}","s":"{}","v":{}}},"vaultAddress":null,"expiresAfter":null}}"#,
action_json_str,
nonce,
r, s, v
);
self.rest_limiter.acquire(1).await.map_err(|e| {
format!(
"rate_limited: rest_weight exhausted, retry_after_ms={}",
e.retry_after.as_millis()
)
})?;
self.address_limiter.acquire(1, false).await.map_err(|e| {
format!(
"rate_limited: address quota exhausted, retry_after_ms={}",
e.retry_after.as_millis()
)
})?;
let resp = self
.client
.post(HYPERLIQUID_EXCHANGE_URL)
.header("Content-Type", "application/json")
.body(payload_str)
.send()
.await
.map_err(|e| e.to_string())?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.map_err(|e| e.to_string())?;
return Err(format!("HTTP {status}: {text}"));
}
let body: Value = parse_response(resp).await?;
if body["status"].as_str() == Some("err") {
return Err(body["response"]
.as_str()
.unwrap_or("unknown error")
.to_string());
}
let statuses = &body["response"]["data"]["statuses"][0];
let (oid, returned_cloid, timestamp_ms) = if let Some(resting) = statuses.get("resting") {
let oid = resting["oid"]
.as_i64()
.ok_or_else(|| format!("resting status missing oid: {}", body))?;
let returned_cloid = resting["cloid"].as_str().map(|s: &str| s.to_string());
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
(oid, returned_cloid, ts)
} else if let Some(filled) = statuses.get("filled") {
let oid = filled["oid"]
.as_i64()
.ok_or_else(|| format!("filled status missing oid: {}", body))?;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
(oid, None, ts)
} else if let Some(error) = statuses.get("error") {
return Err(error
.as_str()
.unwrap_or("order rejected with unknown error")
.to_string());
} else {
return Err(format!("unexpected order status: {}", body));
};
Ok(OrderPlacement {
order_id: oid,
symbol,
side,
price,
quantity: volume,
timestamp_ms,
cloid: returned_cloid.or(cloid),
})
}
async fn change_order_by_cloid(
&self,
cloid: i64,
price: Decimal,
volume: Decimal,
) -> Result<i64, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "openOrders", "user": user}),
20,
"change_order_by_cloid",
)
.await?;
let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
let order = orders
.iter()
.find(|o| o.oid == cloid)
.ok_or_else(|| format!("order {} not found", cloid))?;
let asset_idx = self.get_asset_index(&order.coin).await?;
let is_buy = order.side == "B";
let action = serde_json::json!({
"type": "batchModify",
"modifies": [{
"oid": cloid,
"order": {
"a": asset_idx,
"b": is_buy,
"p": price.to_string(),
"s": volume.to_string(),
"r": false,
"t": {"limit": {"tif": "Gtc"}}
}
}]
});
self.submit_signed_action(action, None).await?;
Ok(cloid)
}
async fn cancel_order(&self, cloid: i64) -> Result<i64, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "openOrders", "user": user}),
20,
"cancel_order",
)
.await?;
let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
let order = orders
.iter()
.find(|o| o.oid == cloid)
.ok_or_else(|| format!("order {} not found", cloid))?;
let asset_idx = self.get_asset_index(&order.coin).await?;
let action = serde_json::json!({
"type": "cancel",
"cancels": [{"a": asset_idx, "o": cloid}]
});
self.submit_signed_action(action, None).await?;
Ok(cloid)
}
async fn cancel_all_order(&self) -> Result<bool, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "openOrders", "user": user}),
20,
"cancel_all_order",
)
.await?;
let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
if orders.is_empty() {
return Ok(true);
}
let meta_resp = self
.info_post(serde_json::json!({"type": "meta"}), 20, "cancel_all_order")
.await?;
let meta: MetaResponse = parse_response(meta_resp).await?;
let cancels: Vec<Value> = orders
.iter()
.filter_map(|o| {
let asset_idx = meta.universe.iter().position(|a| a.name == o.coin)?;
Some(serde_json::json!({"a": asset_idx, "o": o.oid}))
})
.collect();
let action = serde_json::json!({"type": "cancel", "cancels": cancels});
self.submit_signed_action(action, None).await?;
Ok(true)
}
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::SubscribeMarketData for HyperliquidClient {
fn subscribe_l2_update(&self, symbol: String) -> BoxStream<Result<L2Update, String>> {
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": symbol.clone()}
});
let key = crate::ws::SubKey {
channel: "l2Book".to_string(),
routing_key: symbol,
};
let stream = self.ws_mux.subscribe(key, sub);
Box::pin(async_stream::stream! {
for await msg in stream {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
continue;
};
if env.channel != "l2Book" {
continue;
}
let Ok(book) = serde_json::from_value::<WsBook>(env.data) else {
continue;
};
for level in book.levels.first().into_iter().flatten() {
if let (Some(price), Some(volume)) =
(parse_decimal(&level.px), parse_decimal(&level.sz))
{
yield Ok(L2Update {
symbol: book.coin.clone(),
price,
volume,
side: Side::Ask,
sequence: book.time,
});
}
}
for level in book.levels.get(1).into_iter().flatten() {
if let (Some(price), Some(volume)) =
(parse_decimal(&level.px), parse_decimal(&level.sz))
{
yield Ok(L2Update {
symbol: book.coin.clone(),
price,
volume,
side: Side::Bid,
sequence: book.time,
});
}
}
}
})
}
fn subscribe_asset_context(&self, symbol: String) -> BoxStream<Result<AssetContext, String>> {
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "activeAssetCtx", "coin": symbol.clone()}
});
let key = crate::ws::SubKey {
channel: "activeAssetCtx".to_string(),
routing_key: symbol,
};
let stream = self.ws_mux.subscribe(key, sub);
Box::pin(async_stream::stream! {
for await msg in stream {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
continue;
};
if env.channel != "activeAssetCtx" {
continue;
}
let Ok(update) = serde_json::from_value::<WsAssetCtx>(env.data) else {
continue;
};
let ctx = &update.ctx;
let (Some(open_interest), Some(funding_rate), Some(mark_price), Some(day_volume)) = (
parse_decimal(&ctx.open_interest),
parse_decimal(&ctx.funding),
parse_decimal(&ctx.mark_px),
parse_decimal(&ctx.day_ntl_vlm),
) else {
continue;
};
yield Ok(AssetContext {
symbol: update.coin,
open_interest,
funding_rate,
mark_price,
day_volume,
mid_price: ctx.mid_px.as_deref().and_then(parse_decimal),
oracle_price: ctx.oracle_px.as_deref().and_then(parse_decimal),
premium: ctx.premium.as_deref().and_then(parse_decimal),
prev_day_price: ctx.prev_day_px.as_deref().and_then(parse_decimal),
});
}
})
}
fn subscribe_liquidation(&self, user: String) -> BoxStream<Result<Liquidation, String>> {
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userEvents", "user": user.clone()}
});
let key = crate::ws::SubKey {
channel: "userEvents".to_string(),
routing_key: user,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userEvents" {
return None;
}
let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
return None;
};
let liq = event.liquidation?;
let (Some(notional_position), Some(account_value)) = (
parse_decimal(&liq.liquidated_ntl_pos),
parse_decimal(&liq.liquidated_account_value),
) else {
return None;
};
let item = Liquidation {
symbol: String::new(),
side: OrderSide::Sell,
liquidated_user: liq.liquidated_user,
notional_position,
account_value,
};
Some(stream::iter(vec![Ok(item)]))
})
.flatten(),
)
}
fn subscribe_fill(&self, symbol: String) -> BoxStream<Result<Fill, String>> {
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "trades", "coin": symbol.clone()}
});
let key = crate::ws::SubKey {
channel: "trades".to_string(),
routing_key: symbol,
};
let stream = self.ws_mux.subscribe(key, sub);
Box::pin(async_stream::stream! {
for await msg in stream {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&msg) else {
continue;
};
if env.channel != "trades" {
continue;
}
let Ok(trades) = serde_json::from_value::<Vec<WsTrade>>(env.data) else {
continue;
};
for trade in trades {
let side = if trade.side == "B" {
OrderSide::Buy
} else {
OrderSide::Sell
};
let price = parse_decimal(&trade.px);
let volume = parse_decimal(&trade.sz);
if let (Some(price), Some(volume)) = (price, volume) {
yield Ok(Fill {
symbol: trade.coin,
price,
volume,
side,
timestamp_ms: trade.time,
trade_id: trade.tid,
});
}
}
}
})
}
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::GetAccountSnapshot for HyperliquidClient {
async fn get_positions(&self) -> Result<Vec<Position>, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "clearinghouseState", "user": user}),
2,
"get_positions",
)
.await?;
let state: ClearinghouseStateResponse = parse_response(resp).await?;
Ok(state
.asset_positions
.into_iter()
.filter_map(|ap| {
let p = ap.position;
let size = parse_decimal(&p.szi)?;
if size.is_zero() {
return None;
}
let entry_price = p
.entry_px
.as_deref()
.and_then(parse_decimal)
.unwrap_or_default();
let side = if size > Decimal::ZERO {
OrderSide::Buy
} else {
OrderSide::Sell
};
Some(Position {
symbol: p.coin,
side,
size: size.abs(),
entry_price,
})
})
.collect())
}
async fn get_open_orders(&self) -> Result<Vec<OpenOrder>, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "openOrders", "user": user}),
20,
"get_open_orders",
)
.await?;
let orders: Vec<RestOpenOrder> = parse_response(resp).await?;
Ok(orders
.into_iter()
.filter_map(|o| {
let price = parse_decimal(&o.limit_px)?;
let quantity = parse_decimal(&o.orig_sz)?;
let remaining = parse_decimal(&o.sz)?;
let filled_quantity = quantity - remaining;
let side = if o.side == "B" {
OrderSide::Buy
} else {
OrderSide::Sell
};
Some(OpenOrder {
order_id: o.oid,
symbol: o.coin,
side,
price,
quantity,
filled_quantity,
})
})
.collect())
}
async fn get_collateral(&self) -> Result<Decimal, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "clearinghouseState", "user": user}),
2,
"get_collateral",
)
.await?;
let state: ClearinghouseStateResponse = parse_response(resp).await?;
parse_decimal(&state.margin_summary.account_value)
.ok_or_else(|| "invalid account value".to_string())
}
async fn get_spot_balance(&self) -> Result<Vec<guilder_abstraction::Balance>, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "spotClearinghouseState", "user": user}),
15,
"get_spot_balance",
)
.await?;
#[derive(Deserialize)]
struct SpotStateResponse {
balances: Vec<SpotBalance>,
}
#[allow(dead_code)]
#[derive(Deserialize)]
struct SpotBalance {
coin: String,
total: String,
hold: String,
#[serde(default)]
token: Option<i32>,
#[serde(default)]
#[serde(rename = "entryNtl")]
entry_ntl: Option<String>,
}
let state: SpotStateResponse = parse_response(resp).await?;
state
.balances
.into_iter()
.map(|balance| {
let total = parse_decimal(&balance.total)
.ok_or_else(|| "invalid total balance".to_string())?;
let locked = parse_decimal(&balance.hold)
.ok_or_else(|| "invalid hold balance".to_string())?;
let available = total - locked;
Ok(guilder_abstraction::Balance {
coin: balance.coin,
total,
available,
locked,
})
})
.collect()
}
async fn get_collateral_balance(
&self,
asset: String,
) -> Result<guilder_abstraction::Balance, String> {
if asset.to_uppercase() != "USDC" {
return Err(format!("only USDC collateral is supported, got {}", asset));
}
let total = self.get_collateral().await?;
Ok(guilder_abstraction::Balance {
coin: "USDC".to_string(),
total,
available: total,
locked: Decimal::ZERO,
})
}
async fn get_user_rate_limit(&self) -> Result<guilder_abstraction::UserRateLimit, String> {
let user = self.require_user_address()?;
let resp = self
.info_post(
serde_json::json!({"type": "userRateLimit", "user": user}),
20,
"get_user_rate_limit",
)
.await?;
let val = parse_response::<Value>(resp).await?;
let cumulative_volume = val["cumVlm"]
.as_str()
.and_then(parse_decimal)
.ok_or_else(|| "missing or invalid cumVlm".to_string())?;
let requests_used = val["nRequestsUsed"]
.as_i64()
.ok_or_else(|| "missing or invalid nRequestsUsed".to_string())?;
let requests_cap = val["nRequestsCap"]
.as_i64()
.ok_or_else(|| "missing or invalid nRequestsCap".to_string())?;
let requests_surplus = val["nRequestsSurplus"]
.as_i64()
.ok_or_else(|| "missing or invalid nRequestsSurplus".to_string())?;
Ok(guilder_abstraction::UserRateLimit {
cumulative_volume,
requests_used,
requests_cap,
requests_surplus,
})
}
}
#[allow(async_fn_in_trait)]
impl guilder_abstraction::SubscribeUserEvents for HyperliquidClient {
fn subscribe_user_fills(&self) -> BoxStream<Result<UserFill, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::empty());
};
let addr_str = addr.clone();
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userEvents", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "userEvents".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userEvents" {
return None;
}
let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
return None;
};
let items: Vec<_> = event
.fills
.unwrap_or_default()
.into_iter()
.filter_map(|fill| {
let side = if fill.side == "B" {
OrderSide::Buy
} else {
OrderSide::Sell
};
let price = parse_decimal(&fill.px)?;
let quantity = parse_decimal(&fill.sz)?;
let fee_usd = parse_decimal(&fill.fee)?;
Some(UserFill {
order_id: fill.oid,
symbol: fill.coin,
side,
price,
quantity,
fee_usd,
timestamp_ms: fill.time,
cloid: fill.cloid,
})
})
.collect();
if items.is_empty() {
None
} else {
Some(stream::iter(items.into_iter().map(Ok)))
}
})
.flatten(),
)
}
fn subscribe_order_updates(&self) -> BoxStream<Result<OrderUpdate, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::empty());
};
let addr_str = addr.clone();
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "orderUpdates", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "orderUpdates".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "orderUpdates" {
return None;
}
let Ok(updates) = serde_json::from_value::<Vec<WsOrderUpdate>>(env.data) else {
return None;
};
let items: Vec<_> = updates
.into_iter()
.map(|upd| {
let status = match upd.status.as_str() {
"open" => OrderStatus::Placed,
"filled" => OrderStatus::Filled,
"canceled" | "cancelled" => OrderStatus::Cancelled,
_ => OrderStatus::PartiallyFilled,
};
let side = if upd.order.side == "B" {
OrderSide::Buy
} else {
OrderSide::Sell
};
OrderUpdate {
order_id: upd.order.oid,
symbol: upd.order.coin,
status,
side: Some(side),
price: parse_decimal(&upd.order.limit_px),
quantity: parse_decimal(&upd.order.orig_sz),
remaining_quantity: parse_decimal(&upd.order.sz),
timestamp_ms: upd.status_timestamp,
cloid: upd.order.cloid,
}
})
.collect();
if items.is_empty() {
None
} else {
Some(stream::iter(items.into_iter().map(Ok)))
}
})
.flatten(),
)
}
fn subscribe_funding_payments(&self) -> BoxStream<Result<FundingPayment, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::empty());
};
let addr_str = addr.clone();
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userEvents", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "userEvents".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userEvents" {
return None;
}
let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
return None;
};
let funding = event.funding?;
let amount_usd = parse_decimal(&funding.usdc)?;
let item = FundingPayment {
symbol: funding.coin,
amount_usd,
timestamp_ms: funding.time,
};
Some(stream::iter(vec![Ok(item)]))
})
.flatten(),
)
}
fn subscribe_deposits(&self) -> BoxStream<Result<Deposit, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::empty());
};
let addr_str = addr.clone();
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "userNonFundingLedgerUpdates".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userNonFundingLedgerUpdates" {
return None;
}
let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
return None;
};
let items: Vec<_> = ledger
.updates
.into_iter()
.filter_map(|e| {
if e.delta.kind != "deposit" {
return None;
}
let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
Some(Deposit {
asset: "USDC".to_string(),
amount_usd,
timestamp_ms: e.time,
})
})
.collect();
if items.is_empty() {
None
} else {
Some(stream::iter(items.into_iter().map(Ok)))
}
})
.flatten(),
)
}
fn subscribe_withdrawals(&self) -> BoxStream<Result<Withdrawal, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::empty());
};
let addr_str = addr.clone();
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userNonFundingLedgerUpdates", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "userNonFundingLedgerUpdates".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(
raw_stream
.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userNonFundingLedgerUpdates" {
return None;
}
let Ok(ledger) = serde_json::from_value::<WsLedgerUpdates>(env.data) else {
return None;
};
let items: Vec<_> = ledger
.updates
.into_iter()
.filter_map(|e| {
if e.delta.kind != "withdraw" {
return None;
}
let amount_usd = e.delta.usdc.as_deref().and_then(parse_decimal)?;
Some(Withdrawal {
asset: "USDC".to_string(),
amount_usd,
timestamp_ms: e.time,
})
})
.collect();
if items.is_empty() {
None
} else {
Some(stream::iter(items.into_iter().map(Ok)))
}
})
.flatten(),
)
}
fn subscribe_spot_balance(
&self,
) -> BoxStream<Result<Vec<guilder_abstraction::Balance>, String>> {
let Some(addr) = self.user_address.as_ref() else {
return Box::pin(stream::iter(vec![Err(
"user address not registered".to_string()
)]));
};
let addr_str = addr.clone();
self.subscribe_spot_balance_with_address(addr_str)
}
fn subscribe_spot_balance_with_address(
&self,
address: String,
) -> BoxStream<Result<Vec<guilder_abstraction::Balance>, String>> {
let addr_str = address;
let sub = serde_json::json!({
"method": "subscribe",
"subscription": {"type": "userEvents", "user": addr_str.clone()}
});
let key = crate::ws::SubKey {
channel: "userEvents".to_string(),
routing_key: addr_str,
};
let raw_stream = self.ws_mux.subscribe(key, sub);
Box::pin(raw_stream.filter_map(|text| async move {
let Ok(env) = serde_json::from_str::<WsEnvelope>(&text) else {
return None;
};
if env.channel != "userEvents" {
return None;
}
let Ok(event) = serde_json::from_value::<WsUserEvent>(env.data) else {
return None;
};
let spot_state = event.spot_state?;
let balances = spot_state.balances?;
let items: Vec<_> = balances
.into_iter()
.filter_map(|b| {
let total = parse_decimal(&b.total)?;
let locked = parse_decimal(&b.hold)?;
let available = total - locked;
Some(guilder_abstraction::Balance {
coin: b.coin,
total,
available,
locked,
})
})
.collect();
if items.is_empty() {
None
} else {
Some(Ok(items))
}
}))
}
}