use std::time::Duration;
use reqwest::StatusCode;
use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
use serde::Serialize;
use serde::de::DeserializeOwned;
use thiserror::Error;
use zero_operator_state::{Event as OperatorEvent, Snapshot as OperatorSnapshot};
use crate::models::{
ApproachingFeed, AutoToggleRequest, AutoToggleResponse, Brief, Evaluation, ExecuteRequest,
ExecuteResponse, Health, HyperliquidAccount, HyperliquidReconciliation, HyperliquidStatus,
ImmuneReport, LiveCanaryPolicy, LiveCertification, LiveCockpit, LiveControlResponse,
LiveEvidence, LiveExecutionReceipts, LivePreflight, MarketQuote, OperatorContext,
OperatorEventsAccepted, Positions, Pulse, Regime, RejectionsFeed, Risk, Root, RuntimeParity,
V2Status,
};
use crate::rate_budget::{self, RateBudget};
const TIMEOUT: Duration = Duration::from_secs(8);
const RETRY_DELAY: Duration = Duration::from_millis(500);
#[derive(Debug, Error)]
pub enum HttpError {
#[error("engine unreachable — {0}")]
Unreachable(String),
#[error("timeout after {0:?}")]
Timeout(Duration),
#[error("auth rejected (401/403)")]
Unauthorized,
#[error("not found: {path}")]
NotFound { path: String },
#[error("http {status}: {body}")]
Status { status: StatusCode, body: String },
#[error("decode: {0}")]
Decode(String),
#[error("url: {0}")]
Url(#[from] url::ParseError),
#[error("rate: exhausted — retry in {}", format_retry_after(*.retry_after))]
RateBudgetExhausted {
retry_after: Duration,
origin: RateLimitSource,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RateLimitSource {
CliBudget,
Engine429,
}
impl std::fmt::Display for RateLimitSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::CliBudget => "cli-budget",
Self::Engine429 => "engine-429",
})
}
}
#[derive(Debug, Clone)]
pub struct HttpClient {
base_url: url::Url,
token: Option<String>,
inner: reqwest::Client,
rate_budget: Option<RateBudget>,
mode: Option<Mode>,
operator: Option<OperatorRequestContext>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OperatorRequestContext {
pub operator_id: String,
pub handle: String,
pub role: String,
pub scope: String,
}
impl OperatorRequestContext {
#[must_use]
pub fn local(handle: impl Into<String>) -> Self {
let handle = handle.into();
Self {
operator_id: handle.clone(),
handle,
role: "owner".to_string(),
scope: "local-private".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Mode {
Paper,
Live,
}
impl Mode {
#[must_use]
pub const fn as_header_value(self) -> &'static str {
match self {
Self::Paper => "paper",
Self::Live => "live",
}
}
}
impl HttpClient {
pub fn new(base_url: impl AsRef<str>, token: Option<String>) -> Result<Self, HttpError> {
let base_url = url::Url::parse(base_url.as_ref())?;
let inner = reqwest::Client::builder()
.timeout(TIMEOUT)
.user_agent(concat!("zero-cli/", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|e| HttpError::Unreachable(e.to_string()))?;
Ok(Self {
base_url,
token,
inner,
rate_budget: None,
mode: None,
operator: None,
})
}
#[must_use]
pub fn with_mode(mut self, mode: Mode) -> Self {
self.mode = Some(mode);
self
}
#[must_use]
pub fn with_operator_context(mut self, operator: OperatorRequestContext) -> Self {
self.operator = Some(operator);
self
}
#[must_use]
pub const fn mode(&self) -> Option<Mode> {
self.mode
}
#[must_use]
pub fn with_rate_budget(mut self, budget: RateBudget) -> Self {
self.rate_budget = Some(budget);
self
}
#[must_use]
pub const fn rate_budget(&self) -> Option<&RateBudget> {
self.rate_budget.as_ref()
}
#[must_use]
pub fn base_url(&self) -> &url::Url {
&self.base_url
}
#[must_use]
pub fn has_token(&self) -> bool {
self.token.is_some()
}
fn url_for(&self, path: &str) -> Result<url::Url, HttpError> {
let path = path.trim_start_matches('/');
Ok(self.base_url.join(path)?)
}
fn auth_headers(&self) -> HeaderMap {
let mut headers = HeaderMap::new();
if let Some(token) = &self.token
&& let Ok(v) = HeaderValue::from_str(&format!("Bearer {token}"))
{
headers.insert(AUTHORIZATION, v);
}
if let Some(mode) = self.mode {
headers.insert(
HeaderName::from_static("x-zero-mode"),
HeaderValue::from_static(mode.as_header_value()),
);
}
if let Some(operator) = &self.operator {
insert_header_str(&mut headers, "x-zero-operator-id", &operator.operator_id);
insert_header_str(&mut headers, "x-zero-operator-handle", &operator.handle);
insert_header_str(&mut headers, "x-zero-operator-role", &operator.role);
insert_header_str(&mut headers, "x-zero-operator-scope", &operator.scope);
}
headers
}
fn check_rate_budget(&self, path: &str) -> Result<(), HttpError> {
let Some(budget) = &self.rate_budget else {
return Ok(());
};
let cost = rate_budget::cost_of(path);
budget.try_consume(cost).map_err(|exh| {
tracing::debug!(
path = %path,
cost = cost,
retry_after = ?exh.retry_after,
"cli rate budget exhausted",
);
HttpError::RateBudgetExhausted {
retry_after: exh.retry_after,
origin: RateLimitSource::CliBudget,
}
})
}
fn refund_rate_budget(&self, path: &str) {
if let Some(budget) = &self.rate_budget {
budget.refund(rate_budget::cost_of(path));
}
}
pub async fn get_json<T: DeserializeOwned>(&self, path: &str) -> Result<T, HttpError> {
self.check_rate_budget(path)?;
let url = self.url_for(path)?;
let headers = self.auth_headers();
match self.send_once::<T>(url.clone(), headers.clone()).await {
Ok(t) => Ok(t),
Err(e) if is_retryable(&e) => {
tracing::debug!(%url, error = %e, "retrying after transient failure");
tokio::time::sleep(RETRY_DELAY).await;
match self.send_once::<T>(url, headers).await {
Ok(t) => Ok(t),
Err(e2) => Err(self.maybe_refund_for_429(path, e2)),
}
}
Err(e) => Err(self.maybe_refund_for_429(path, e)),
}
}
fn maybe_refund_for_429(&self, path: &str, err: HttpError) -> HttpError {
if matches!(
err,
HttpError::RateBudgetExhausted {
origin: RateLimitSource::Engine429,
..
}
) {
self.refund_rate_budget(path);
tracing::debug!(
path = %path,
"engine returned 429; refunded local bucket",
);
}
err
}
async fn send_once<T: DeserializeOwned>(
&self,
url: url::Url,
headers: HeaderMap,
) -> Result<T, HttpError> {
let resp = self
.inner
.get(url.clone())
.headers(headers)
.send()
.await
.map_err(|e| map_transport(&e))?;
let status = resp.status();
if status.is_success() {
return resp
.json::<T>()
.await
.map_err(|e| HttpError::Decode(e.to_string()));
}
match status {
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => Err(HttpError::Unauthorized),
StatusCode::NOT_FOUND => Err(HttpError::NotFound {
path: url.path().to_string(),
}),
StatusCode::TOO_MANY_REQUESTS => {
let retry_after = resp
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(parse_retry_after)
.unwrap_or(Duration::from_secs(1));
Err(HttpError::RateBudgetExhausted {
retry_after,
origin: RateLimitSource::Engine429,
})
}
_ => {
let body = resp.text().await.unwrap_or_default();
Err(HttpError::Status {
status,
body: truncate(&body, 512),
})
}
}
}
pub async fn post_json<B, R>(&self, path: &str, body: &B) -> Result<R, HttpError>
where
B: Serialize + ?Sized,
R: DeserializeOwned,
{
self.check_rate_budget(path)?;
let url = self.url_for(path)?;
let mut headers = self.auth_headers();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let payload = serde_json::to_vec(body).map_err(|e| HttpError::Decode(e.to_string()))?;
match self
.post_once::<R>(url.clone(), headers.clone(), payload.clone())
.await
{
Ok(v) => Ok(v),
Err(e) if is_retryable(&e) => {
tracing::debug!(%url, error = %e, "retrying POST after transient failure");
tokio::time::sleep(RETRY_DELAY).await;
match self.post_once::<R>(url, headers, payload).await {
Ok(v) => Ok(v),
Err(e2) => Err(self.maybe_refund_for_429(path, e2)),
}
}
Err(e) => Err(self.maybe_refund_for_429(path, e)),
}
}
async fn post_once<R: DeserializeOwned>(
&self,
url: url::Url,
headers: HeaderMap,
body: Vec<u8>,
) -> Result<R, HttpError> {
let resp = self
.inner
.post(url.clone())
.headers(headers)
.body(body)
.send()
.await
.map_err(|e| map_transport(&e))?;
let status = resp.status();
if status.is_success() {
return resp
.json::<R>()
.await
.map_err(|e| HttpError::Decode(e.to_string()));
}
match status {
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => Err(HttpError::Unauthorized),
StatusCode::NOT_FOUND => Err(HttpError::NotFound {
path: url.path().to_string(),
}),
StatusCode::TOO_MANY_REQUESTS => {
let retry_after = resp
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(parse_retry_after)
.unwrap_or(Duration::from_secs(1));
Err(HttpError::RateBudgetExhausted {
retry_after,
origin: RateLimitSource::Engine429,
})
}
_ => {
let body = resp.text().await.unwrap_or_default();
Err(HttpError::Status {
status,
body: truncate(&body, 512),
})
}
}
}
pub async fn post_json_no_retry<B, R>(
&self,
path: &str,
body: &B,
idempotency_key: Option<&str>,
) -> Result<R, HttpError>
where
B: Serialize + ?Sized,
R: DeserializeOwned,
{
self.check_rate_budget(path)?;
let url = self.url_for(path)?;
let mut headers = self.auth_headers();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if let Some(key) = idempotency_key
&& let Ok(v) = HeaderValue::from_str(key)
{
headers.insert(HeaderName::from_static("x-idempotency-key"), v);
}
let payload = serde_json::to_vec(body).map_err(|e| HttpError::Decode(e.to_string()))?;
match self.post_once::<R>(url, headers, payload).await {
Ok(v) => Ok(v),
Err(e) => Err(self.maybe_refund_for_429(path, e)),
}
}
pub async fn root(&self) -> Result<Root, HttpError> {
self.get_json("/").await
}
pub async fn health(&self) -> Result<Health, HttpError> {
self.get_json("/health").await
}
pub async fn hyperliquid_status(
&self,
symbol: Option<&str>,
) -> Result<HyperliquidStatus, HttpError> {
match symbol {
Some(s) => {
let path = format!("/hl/status?symbol={}", urlencoding(s));
self.get_json(&path).await
}
None => self.get_json("/hl/status").await,
}
}
pub async fn hyperliquid_account(&self) -> Result<HyperliquidAccount, HttpError> {
self.get_json("/hl/account").await
}
pub async fn hyperliquid_reconciliation(&self) -> Result<HyperliquidReconciliation, HttpError> {
self.get_json("/hl/reconcile").await
}
pub async fn market_quote(&self, symbol: &str) -> Result<MarketQuote, HttpError> {
let path = format!("/market/quote?symbol={}", urlencoding(symbol));
self.get_json(&path).await
}
pub async fn live_preflight(&self) -> Result<LivePreflight, HttpError> {
self.get_json("/live/preflight").await
}
pub async fn live_certification(&self) -> Result<LiveCertification, HttpError> {
self.get_json("/live/certification").await
}
pub async fn live_cockpit(&self) -> Result<LiveCockpit, HttpError> {
self.get_json("/live/cockpit").await
}
pub async fn live_evidence(&self) -> Result<LiveEvidence, HttpError> {
self.get_json("/live/evidence").await
}
pub async fn live_canary_policy(&self) -> Result<LiveCanaryPolicy, HttpError> {
self.get_json("/live/canary-policy").await
}
pub async fn runtime_parity(&self) -> Result<RuntimeParity, HttpError> {
self.get_json("/runtime/parity").await
}
pub async fn live_receipts(&self) -> Result<LiveExecutionReceipts, HttpError> {
self.get_json("/live/receipts").await
}
pub async fn operator_context(&self) -> Result<OperatorContext, HttpError> {
self.get_json("/operator/context").await
}
pub async fn immune(&self) -> Result<ImmuneReport, HttpError> {
self.get_json("/immune").await
}
pub async fn post_live_heartbeat(&self) -> Result<LiveControlResponse, HttpError> {
self.post_json_no_retry::<serde_json::Value, LiveControlResponse>(
"/live/heartbeat",
&serde_json::json!({}),
None,
)
.await
}
pub async fn post_live_pause(&self) -> Result<LiveControlResponse, HttpError> {
self.post_json_no_retry::<serde_json::Value, LiveControlResponse>(
"/live/pause",
&serde_json::json!({}),
None,
)
.await
}
pub async fn post_live_resume(&self) -> Result<LiveControlResponse, HttpError> {
self.post_json_no_retry::<serde_json::Value, LiveControlResponse>(
"/live/resume",
&serde_json::json!({}),
None,
)
.await
}
pub async fn post_live_kill(&self) -> Result<LiveControlResponse, HttpError> {
self.post_json_no_retry::<serde_json::Value, LiveControlResponse>(
"/live/kill",
&serde_json::json!({}),
None,
)
.await
}
pub async fn post_live_flatten(&self) -> Result<LiveControlResponse, HttpError> {
self.post_json_no_retry::<serde_json::Value, LiveControlResponse>(
"/live/flatten",
&serde_json::json!({}),
None,
)
.await
}
pub async fn v2_status(&self) -> Result<V2Status, HttpError> {
self.get_json("/v2/status").await
}
pub async fn positions(&self) -> Result<Positions, HttpError> {
self.get_json("/positions").await
}
pub async fn risk(&self) -> Result<Risk, HttpError> {
self.get_json("/risk").await
}
pub async fn regime(&self, coin: Option<&str>) -> Result<Regime, HttpError> {
match coin {
Some(c) => {
let path = format!("/regime?coin={}", urlencoding(c));
self.get_json(&path).await
}
None => self.get_json("/regime").await,
}
}
pub async fn brief(&self) -> Result<Brief, HttpError> {
self.get_json("/brief").await
}
pub async fn evaluate(&self, coin: &str) -> Result<Evaluation, HttpError> {
let path = format!("/evaluate/{}", urlencoding(coin));
self.get_json(&path).await
}
pub async fn pulse(&self, limit: u32) -> Result<Pulse, HttpError> {
let limit = limit.clamp(1, 100);
let path = format!("/pulse?limit={limit}");
self.get_json(&path).await
}
pub async fn approaching(&self) -> Result<ApproachingFeed, HttpError> {
self.get_json("/approaching").await
}
pub async fn operator_state(&self) -> Result<OperatorSnapshot, HttpError> {
self.get_json("/operator/state").await
}
pub async fn post_operator_event(
&self,
event: &OperatorEvent,
) -> Result<OperatorEventsAccepted, HttpError> {
self.post_json("/operator/events", event).await
}
#[allow(clippy::similar_names)]
pub async fn post_execute(
&self,
coin: &str,
side: crate::models::ExecuteSide,
size: f64,
) -> Result<ExecuteResponse, HttpError> {
let idempotency_key = mint_idempotency_key();
let body = ExecuteRequest {
coin: coin.to_string(),
side,
size,
idempotency_key: idempotency_key.clone(),
};
self.post_json_no_retry::<ExecuteRequest, ExecuteResponse>(
"/execute",
&body,
Some(idempotency_key.as_str()),
)
.await
}
pub async fn post_auto_toggle(&self, enabled: bool) -> Result<AutoToggleResponse, HttpError> {
let body = AutoToggleRequest { enabled };
self.post_json_no_retry::<AutoToggleRequest, AutoToggleResponse>(
"/auto/toggle",
&body,
None,
)
.await
}
pub async fn rejections(
&self,
limit: u32,
coin: Option<&str>,
) -> Result<RejectionsFeed, HttpError> {
let limit = limit.clamp(1, 500);
let path = match coin {
Some(c) => format!("/rejections?limit={limit}&coin={}", urlencoding(c)),
None => format!("/rejections?limit={limit}"),
};
self.get_json(&path).await
}
}
fn urlencoding(s: &str) -> String {
use std::fmt::Write as _;
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(char::from(b));
}
_ => {
let _ = write!(out, "%{b:02X}");
}
}
}
out
}
fn map_transport(e: &reqwest::Error) -> HttpError {
if e.is_timeout() {
HttpError::Timeout(TIMEOUT)
} else {
HttpError::Unreachable(e.to_string())
}
}
fn is_retryable(e: &HttpError) -> bool {
match e {
HttpError::Timeout(_) | HttpError::Unreachable(_) => true,
HttpError::Status { status, .. } => matches!(
*status,
StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE | StatusCode::GATEWAY_TIMEOUT
),
_ => false,
}
}
#[must_use]
pub(crate) fn format_retry_after(d: Duration) -> String {
let secs = d.as_secs();
if secs > 3600 {
">1h".to_string()
} else {
format!("{secs}s")
}
}
#[must_use]
pub(crate) fn parse_retry_after(value: &str) -> Option<Duration> {
let trimmed = value.trim();
if let Ok(secs) = trimmed.parse::<u64>() {
return Some(Duration::from_secs(secs));
}
let target = chrono::DateTime::parse_from_rfc2822(trimmed).ok()?;
let now = chrono::Utc::now();
let delta = target.with_timezone(&chrono::Utc) - now;
Some(delta.to_std().unwrap_or(Duration::ZERO))
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}…", &s[..max])
}
}
fn insert_header_str(headers: &mut HeaderMap, name: &'static str, value: &str) {
if let Ok(v) = HeaderValue::from_str(value) {
headers.insert(HeaderName::from_static(name), v);
}
}
#[must_use]
pub(crate) fn mint_idempotency_key() -> String {
uuid::Uuid::new_v4().to_string()
}
#[must_use]
pub const fn retry_delay() -> Duration {
RETRY_DELAY
}
#[must_use]
pub const fn timeout() -> Duration {
TIMEOUT
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn retry_after_parses_plain_seconds() {
assert_eq!(parse_retry_after("30"), Some(Duration::from_secs(30)));
assert_eq!(parse_retry_after(" 30 "), Some(Duration::from_secs(30)));
assert_eq!(parse_retry_after("0"), Some(Duration::from_secs(0)));
}
#[test]
fn retry_after_parses_http_date_in_the_future() {
let one_year_ahead = chrono::Utc::now() + chrono::Duration::days(365);
let formatted = one_year_ahead
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string();
let d = parse_retry_after(&formatted).expect("parseable");
assert!(d > Duration::from_secs(86_400));
assert!(d < Duration::from_secs(366 * 86_400));
}
#[test]
fn retry_after_clamps_past_date_to_zero() {
let past = chrono::Utc::now() - chrono::Duration::days(3);
let formatted = past.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
assert_eq!(parse_retry_after(&formatted), Some(Duration::ZERO));
}
#[test]
fn retry_after_unparseable_returns_none() {
assert_eq!(parse_retry_after("not-a-date"), None);
assert_eq!(parse_retry_after(""), None);
}
#[test]
fn rate_limit_source_display_is_stable() {
assert_eq!(format!("{}", RateLimitSource::CliBudget), "cli-budget");
assert_eq!(format!("{}", RateLimitSource::Engine429), "engine-429");
}
#[test]
fn rate_budget_exhausted_display_is_terse_and_seconds() {
let e = HttpError::RateBudgetExhausted {
retry_after: Duration::from_secs(3),
origin: RateLimitSource::CliBudget,
};
assert_eq!(format!("{e}"), "rate: exhausted — retry in 3s");
let e429 = HttpError::RateBudgetExhausted {
retry_after: Duration::from_secs(45),
origin: RateLimitSource::Engine429,
};
assert_eq!(format!("{e429}"), "rate: exhausted — retry in 45s");
let perma = HttpError::RateBudgetExhausted {
retry_after: Duration::MAX,
origin: RateLimitSource::CliBudget,
};
assert_eq!(format!("{perma}"), "rate: exhausted — retry in >1h");
}
#[test]
fn mode_header_value_is_stable() {
assert_eq!(Mode::Paper.as_header_value(), "paper");
assert_eq!(Mode::Live.as_header_value(), "live");
}
#[test]
fn with_mode_attaches_header_on_auth_headers() {
let client = HttpClient::new("https://example.test", None)
.expect("client")
.with_mode(Mode::Paper);
assert_eq!(client.mode(), Some(Mode::Paper));
let headers = client.auth_headers();
let got = headers
.get("x-zero-mode")
.expect("x-zero-mode header attached");
assert_eq!(got.to_str().unwrap(), "paper");
let unset = HttpClient::new("https://example.test", None).expect("client");
assert!(unset.mode().is_none());
assert!(unset.auth_headers().get("x-zero-mode").is_none());
}
#[test]
fn operator_context_attaches_audit_headers() {
let client = HttpClient::new("https://example.test", None)
.expect("client")
.with_operator_context(OperatorRequestContext {
operator_id: "team-alpha:alice".to_string(),
handle: "alice".to_string(),
role: "trader".to_string(),
scope: "team-private".to_string(),
});
let headers = client.auth_headers();
assert_eq!(
headers
.get("x-zero-operator-id")
.and_then(|v| v.to_str().ok()),
Some("team-alpha:alice"),
);
assert_eq!(
headers
.get("x-zero-operator-handle")
.and_then(|v| v.to_str().ok()),
Some("alice"),
);
assert_eq!(
headers
.get("x-zero-operator-role")
.and_then(|v| v.to_str().ok()),
Some("trader"),
);
assert_eq!(
headers
.get("x-zero-operator-scope")
.and_then(|v| v.to_str().ok()),
Some("team-private"),
);
}
#[test]
fn mint_idempotency_key_is_unique_per_call() {
let a = mint_idempotency_key();
let b = mint_idempotency_key();
assert_ne!(a, b, "successive calls must mint distinct keys");
assert_eq!(a.len(), 36, "UUID v4 stringifies to 36 chars");
assert_eq!(a.matches('-').count(), 4, "four hyphens in v4 form");
}
#[test]
fn is_retryable_never_retries_rate_budget_exhausted() {
let err = HttpError::RateBudgetExhausted {
retry_after: Duration::from_secs(2),
origin: RateLimitSource::Engine429,
};
assert!(!is_retryable(&err));
}
}