use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::{Request, State};
use axum::http::StatusCode;
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Json};
use axum::routing::{get, post};
use parking_lot::Mutex;
use serde_json::json;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
#[allow(clippy::struct_excessive_bools)] #[derive(Debug, Default, Clone)]
pub struct Overrides {
pub degrade_health: bool,
pub health_503: bool,
pub force_unauthorized: bool,
pub force_not_found: bool,
pub force_server_error: bool,
pub force_empty_evaluation: bool,
pub force_empty_regime: bool,
pub force_regime_error_envelope: bool,
pub force_approaching_not_found: bool,
pub force_stale_risk_equity: bool,
pub transient_fail_count: u32,
pub rate_limit_count: u32,
pub rate_limit_retry_after: Option<String>,
pub version: Option<String>,
pub ws_drop_once: bool,
pub operator_label: Option<String>,
pub operator_version: u64,
pub auto_toggle_echo_state: Option<bool>,
pub auto_toggle_reason: Option<String>,
pub force_simulated: bool,
pub post_transient_fail: bool,
pub post_server_error: bool,
}
#[derive(Debug, Clone)]
pub struct AppState {
pub overrides: Arc<Mutex<Overrides>>,
pub received_events: Arc<Mutex<Vec<serde_json::Value>>>,
pub received_executes: Arc<Mutex<Vec<CapturedPost>>>,
pub received_auto_toggles: Arc<Mutex<Vec<CapturedPost>>>,
pub received_live_controls: Arc<Mutex<Vec<String>>>,
}
#[derive(Debug, Clone)]
pub struct CapturedPost {
pub headers: std::collections::BTreeMap<String, String>,
pub body: serde_json::Value,
}
impl AppState {
fn new() -> Self {
Self {
overrides: Arc::new(Mutex::new(Overrides::default())),
received_events: Arc::new(Mutex::new(Vec::new())),
received_executes: Arc::new(Mutex::new(Vec::new())),
received_auto_toggles: Arc::new(Mutex::new(Vec::new())),
received_live_controls: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[derive(Debug)]
pub struct MockEngine {
addr: SocketAddr,
state: AppState,
shutdown: Option<oneshot::Sender<()>>,
handle: Option<JoinHandle<()>>,
}
impl MockEngine {
pub async fn spawn() -> anyhow::Result<Self> {
let state = AppState::new();
let app = router(state.clone()).with_state(state.clone());
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let (tx, rx) = oneshot::channel::<()>();
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = rx.await;
})
.await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(Self {
addr,
state,
shutdown: Some(tx),
handle: Some(handle),
})
}
#[must_use]
pub fn addr(&self) -> SocketAddr {
self.addr
}
#[must_use]
pub fn base_url(&self) -> String {
format!("http://{}", self.addr)
}
#[must_use]
pub fn ws_url(&self) -> String {
format!("ws://{}/ws", self.addr)
}
pub fn with_overrides(&self, mutate: impl FnOnce(&mut Overrides)) {
let mut o = self.state.overrides.lock();
mutate(&mut o);
}
pub async fn shutdown(mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(handle) = self.handle.take() {
let _ = handle.await;
}
}
}
impl Drop for MockEngine {
fn drop(&mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(handle) = &self.handle {
handle.abort();
}
}
}
fn router(shared: AppState) -> Router<AppState> {
let typed = Router::new()
.route("/v2/status", get(v2_status))
.route("/positions", get(positions))
.route("/risk", get(risk))
.route("/regime", get(regime))
.route("/brief", get(brief))
.route("/evaluate/:coin", get(evaluate))
.route("/pulse", get(pulse))
.route("/approaching", get(approaching))
.route("/rejections", get(rejections))
.route("/hl/status", get(hl_status))
.route("/hl/account", get(hl_account))
.route("/hl/reconcile", get(hl_reconcile))
.route("/immune", get(immune))
.route("/live/cockpit", get(live_cockpit))
.route("/live/certification", get(live_certification))
.route("/live/evidence", get(live_evidence))
.route("/live/canary-policy", get(live_canary_policy))
.route("/runtime/parity", get(runtime_parity))
.route("/live/receipts", get(live_receipts))
.route("/live/preflight", get(live_preflight))
.route("/market/quote", get(market_quote))
.route("/operator/state", get(operator_state))
.route("/operator/events", post(operator_events))
.route("/execute", post(execute))
.route("/auto/toggle", post(auto_toggle))
.route("/live/heartbeat", post(live_heartbeat))
.route("/live/pause", post(live_pause))
.route("/live/resume", post(live_resume))
.route("/live/kill", post(live_kill))
.route("/live/flatten", post(live_flatten))
.layer(middleware::from_fn_with_state(shared, inject_failures));
Router::new()
.route("/", get(root))
.route("/health", get(health))
.route("/ws", get(ws_handler))
.merge(typed)
}
async fn inject_failures(
State(s): State<AppState>,
req: Request,
next: Next,
) -> axum::response::Response {
let action = {
let mut o = s.overrides.lock();
if o.force_unauthorized {
InjectAction::Unauthorized
} else if o.force_not_found {
InjectAction::NotFound
} else if o.rate_limit_count > 0 {
o.rate_limit_count -= 1;
InjectAction::RateLimited(o.rate_limit_retry_after.clone())
} else if o.transient_fail_count > 0 {
o.transient_fail_count -= 1;
InjectAction::Transient
} else if o.force_server_error {
InjectAction::ServerError
} else {
InjectAction::Pass
}
};
match action {
InjectAction::Unauthorized => (StatusCode::UNAUTHORIZED, "missing token").into_response(),
InjectAction::NotFound => (StatusCode::NOT_FOUND, "unknown endpoint").into_response(),
InjectAction::RateLimited(header) => {
let retry_after = header.unwrap_or_else(|| "1".to_string());
let mut resp = (StatusCode::TOO_MANY_REQUESTS, "slow down").into_response();
if let Ok(v) = retry_after.parse() {
resp.headers_mut()
.insert(axum::http::header::RETRY_AFTER, v);
}
resp
}
InjectAction::Transient => (StatusCode::SERVICE_UNAVAILABLE, "retry me").into_response(),
InjectAction::ServerError => {
(StatusCode::INTERNAL_SERVER_ERROR, "unexpected").into_response()
}
InjectAction::Pass => next.run(req).await,
}
}
#[derive(Debug, Clone)]
enum InjectAction {
Pass,
Unauthorized,
NotFound,
RateLimited(Option<String>),
Transient,
ServerError,
}
async fn root(State(s): State<AppState>) -> impl IntoResponse {
let version = s
.overrides
.lock()
.version
.clone()
.unwrap_or_else(|| "1.2.3-mock".to_string());
Json(json!({
"name": "ZERO OS",
"version": version,
"status": "running",
"ts": chrono_utc_now_iso(),
}))
}
async fn health(State(s): State<AppState>) -> Response {
let o = s.overrides.lock().clone();
if o.health_503 {
return (StatusCode::SERVICE_UNAVAILABLE, "overloaded").into_response();
}
let status = if o.degrade_health { "degraded" } else { "ok" };
Json(json!({
"status": status,
"components": {
"controller": {"status": "healthy", "last_seen": chrono_utc_now_iso(), "age_s": 1.1},
"market_data": {"status": "healthy", "last_seen": chrono_utc_now_iso(), "age_s": 0.4},
},
"dependencies": {"hyperliquid": "healthy", "llm": "healthy"},
"circuit_breakers": {},
"risk": {
"account_value": 10_000.0,
"drawdown_pct": 0.8,
"halted": false,
},
"ws_connections": 0,
}))
.into_response()
}
async fn v2_status() -> Json<serde_json::Value> {
Json(json!({
"confidence": {"score": 72, "level": "high"},
"market": {
"regime": "TREND_LONG confirmed across majors.",
"health": 0.954,
"signal": "stable",
"prediction": "stable",
"fear_greed": 54,
"coins_tradeable": 30
},
"positions": {"open": 2, "unrealized_pnl": 34.12, "equity": 10_034.12},
"today": {"trades": 24, "wins": 15, "pnl": -3.95, "streak": -3, "sizing_mult": 0.7},
"approaching": [],
"blind_spots": [],
"alert": null,
"recovery": {
"status": "recovered",
"source": "journal",
"durable": true,
"journal_path": "/data/decisions.jsonl",
"decisions_recovered": 24,
"fills_recovered": 17,
"rejections_recovered": 7,
"positions_recovered": 2,
"last_decision_at": "2026-05-01T00:00:00Z",
"current_decisions": 24,
"current_fills": 17,
"current_rejections": 7,
"current_positions": 2
},
"ts": chrono_utc_now_iso(),
}))
}
async fn positions() -> Json<serde_json::Value> {
Json(json!({
"positions": [
{
"symbol": "BTC",
"side": "long",
"size": 0.42,
"entry": 64_120.5,
"mark": 64_480.0,
"unrealized_pnl": 151.13,
"unrealized_r": 0.82,
"stop": 63_800.0,
"target": 65_400.0,
"lens_id": "alpha_v3",
"age_s": 1_824.0
},
{
"symbol": "ETH",
"side": "short",
"size": 1.2,
"entry": 3_120.0,
"mark": 3_098.0,
"unrealized_pnl": 26.4,
"unrealized_r": 0.31,
"stop": 3_160.0,
"target": 3_010.0,
"lens_id": "beta_v1",
"age_s": 421.0
}
],
"account_value": 10_034.12,
"total_unrealized_pnl": 177.53
}))
}
async fn risk(State(s): State<AppState>) -> Json<serde_json::Value> {
let o = s.overrides.lock().clone();
if o.force_stale_risk_equity {
return Json(json!({
"account_value": 638.488_706, "updated_at": chrono_utc_now_iso(),
"daily_pnl_usd": -3.312,
"daily_loss_usd": 4.1261,
"per_runner": {},
"global_halt": false,
"daily_loss_since": chrono_utc_now_iso(),
"halted": false,
"halt_reason": null,
"halt_until": null,
"stop_failure_halt": false,
"open_count": 0,
"drawdown_pct": 0.22, "peak_equity": 577.338_628, "peak_equity_30d": 577.34,
"last_drawdown_alert_pct": 20,
"capital_floor_hit": false
}));
}
Json(json!({
"account_value": 10_034.12,
"updated_at": chrono_utc_now_iso(),
"daily_pnl_usd": 34.12,
"daily_loss_usd": 4.1261,
"per_runner": {},
"global_halt": false,
"daily_loss_since": chrono_utc_now_iso(),
"halted": false,
"halt_reason": null,
"halt_until": null,
"stop_failure_halt": false,
"open_count": 2,
"drawdown_pct": 0.8,
"peak_equity": 10_100.0,
"peak_equity_30d": 10_100.0,
"last_drawdown_alert_pct": 20,
"capital_floor_hit": false
}))
}
async fn regime(State(s): State<AppState>) -> Json<serde_json::Value> {
let o = s.overrides.lock().clone();
if o.force_regime_error_envelope {
return Json(json!({"error": "coin not found"}));
}
if o.force_empty_regime {
return Json(json!({}));
}
Json(json!({
"regime": "TREND_LONG",
"confidence": 0.81,
"trending_long": 7,
"trending_short": 2,
"choppy": 3
}))
}
async fn brief() -> Json<serde_json::Value> {
Json(json!({
"timestamp": chrono_utc_now_iso(),
"fear_greed": 54,
"open_positions": 2,
"positions": [
{
"symbol": "BTC",
"side": "long",
"size": 0.42,
"entry": 64_120.5,
"mark": 64_480.0,
"unrealized_pnl": 151.13,
"unrealized_r": 0.82
}
],
"recent_signals": [
{"coin": "BTC", "kind": "signal", "message": "edge_floor cleared"}
],
"approaching": [
{"coin": "AVAX", "direction": "long", "distance_to_gate": 0.04}
],
"last_cycle": {
"regime": "TREND_LONG",
"signals_evaluated": 30,
"actions_taken": 2
}
}))
}
async fn evaluate(
State(s): State<AppState>,
axum::extract::Path(coin): axum::extract::Path<String>,
) -> Json<serde_json::Value> {
if s.overrides.lock().force_empty_evaluation {
return Json(json!({
"coin": coin,
"layers": [],
"data_fresh": true,
"timestamp": chrono_utc_now_iso()
}));
}
Json(json!({
"coin": coin,
"price": 85.48,
"consensus": 10,
"conviction": 0.64,
"direction": "NONE",
"regime": "random_quiet",
"layers": [
{"layer": "layer_0", "passed": true, "value": "random_quiet", "detail": "regime=random_quiet"},
{"layer": "layer_1", "passed": true, "value": {"agree": 0, "oppose": 0}, "detail": "technical neutral"},
{"layer": "layer_2", "passed": false, "value": 1.25e-05, "detail": "funding_rate below threshold"}
],
"data_fresh": true,
"timestamp": chrono_utc_now_iso()
}))
}
async fn pulse() -> Json<serde_json::Value> {
Json(json!({
"events": [
{"kind": "signal", "coin": "BTC", "message": "edge_floor cleared", "ts": chrono_utc_now_iso(), "severity": "info"},
{"kind": "rejection", "coin": "SOL", "message": "stage2 HOLD on volume", "ts": chrono_utc_now_iso(), "severity": "info"}
],
"count": 2,
"timestamp": chrono_utc_now_iso()
}))
}
async fn approaching(State(s): State<AppState>) -> axum::response::Response {
if s.overrides.lock().force_approaching_not_found {
return (StatusCode::NOT_FOUND, Json(json!({"detail": "Not Found"}))).into_response();
}
Json(json!({
"approaching": [
{"coin": "AVAX", "direction": "long", "distance_to_gate": 0.04, "gate": "edge_floor", "ts": chrono_utc_now_iso()},
{"coin": "LINK", "direction": "short", "distance_to_gate": 0.07, "gate": "stage2", "ts": chrono_utc_now_iso()}
]
}))
.into_response()
}
async fn rejections() -> Json<serde_json::Value> {
Json(json!({
"rejections": [
{"coin": "SOL", "direction": "long", "stage": "stage2", "reason": "volume below threshold", "ts": chrono_utc_now_iso()}
]
}))
}
async fn hl_status(
axum::extract::Query(query): axum::extract::Query<BTreeMap<String, String>>,
) -> Json<serde_json::Value> {
let mids = match query.get("symbol").map(String::as_str) {
Some("BTC") => json!({"BTC": 40500.0}),
Some("ETH") => json!({"ETH": 2850.0}),
Some(symbol) => json!({symbol: 100.0}),
None => json!({"BTC": 40500.0, "ETH": 2850.0}),
};
Json(json!({
"enabled": true,
"exchange": "hyperliquid",
"endpoint": "https://api.hyperliquid.xyz/info",
"coins": 2,
"mids": mids,
"secrets_required": false
}))
}
async fn hl_account() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.hl_account.v1",
"exchange": "hyperliquid",
"user": "0x0000...0000",
"as_of": chrono_utc_now_iso(),
"account_value": 10_000.0,
"margin_used": 25.0,
"withdrawable": 9_975.0,
"positions": [
{
"symbol": "BTC",
"side": "long",
"quantity": 0.01,
"entry_price": 50_000.0,
"position_value": 500.0,
"unrealized_pnl": 10.0,
"margin_used": 25.0
}
],
"open_orders": [{"coin": "BTC", "oid": 123}],
"counts": {"positions": 1, "open_orders": 1}
}))
}
async fn hl_reconcile() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.reconciliation.v1",
"exchange": "hyperliquid",
"status": "ok",
"risk_increasing_allowed": true,
"reason": "local runtime and Hyperliquid account state are reconciled",
"as_of": chrono_utc_now_iso(),
"stale_after_s": 10,
"local": {"positions": [], "open_positions": 0},
"exchange_state": {"positions": [], "open_positions": 0},
"drifts": []
}))
}
async fn live_certification() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_certification.v1",
"mode": "dry_run",
"passed": true,
"live_start_certified": true,
"summary": {
"total": 10,
"passed": 10,
"failed": 0,
"exchange": "fake",
"secrets_required": false,
"orders_placed_live": 0
},
"drills": [
{
"name": "heartbeat_arms_dead_man",
"status": "pass",
"note": "exchange dead-man heartbeat must be accepted before risk can increase",
"evidence": {"heartbeat_ok": true}
},
{
"name": "exchange_submit_outage_fails_closed_without_retry",
"status": "pass",
"note": "exchange submit failures must become auditable refused records and must not retry",
"evidence": {"exchange_attempts": 1}
}
],
"evidence_requirements": ["live_preflight packet", "hl_reconcile packet"]
}))
}
async fn live_evidence() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_evidence.v1",
"generated_at": chrono_utc_now_iso(),
"mode": "paper",
"live_mode": "refused",
"ready": false,
"risk_increasing_allowed": false,
"operator_context": mock_operator_context(),
"summary": {
"artifacts": 9,
"preflight_ready": false,
"controls_ready": true,
"certification_passed": true,
"live_start_certified": true,
"live_receipts_total": 0,
"live_receipts_accepted": 0,
"reconciliation_status": "ok",
"immune_risk_increasing_allowed": false,
"live_records_total": 0,
"live_records_accepted": 0,
"deployment_heartbeat_status": "paper_only"
},
"artifacts": [
{"name": "live_preflight", "schema_version": "zero.live_preflight.v1", "status": "refused", "hash": "sha256:1111111111111111111111111111111111111111111111111111111111111111", "included": "hash_only"},
{"name": "live_cockpit", "schema_version": "zero.live_cockpit.v1", "status": "refused", "hash": "sha256:2222222222222222222222222222222222222222222222222222222222222222", "included": "hash_only"},
{"name": "live_execution_receipts", "schema_version": "zero.live_execution_receipts.v1", "status": "empty", "hash": "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "included": "hash_only"},
{"name": "hl_reconcile", "schema_version": "zero.reconciliation.v1", "status": "ok", "hash": "sha256:3333333333333333333333333333333333333333333333333333333333333333", "included": "hash_only"},
{"name": "immune", "schema_version": "zero.immune.v1", "status": "blocked", "hash": "sha256:4444444444444444444444444444444444444444444444444444444444444444", "included": "hash_only"},
{"name": "live_certification", "schema_version": "zero.live_certification.v1", "status": "pass", "hash": "sha256:5555555555555555555555555555555555555555555555555555555555555555", "included": "hash_only"},
{"name": "audit_export", "schema_version": "zero.audit.v1", "status": "captured", "hash": "sha256:6666666666666666666666666666666666666666666666666666666666666666", "included": "hash_only"},
{"name": "deployment_claim", "schema_version": "zero.deployment.claim.v1", "status": "captured", "hash": "sha256:7777777777777777777777777777777777777777777777777777777777777777", "included": "hash_only"},
{"name": "deployment_heartbeat", "schema_version": "zero.deployment.heartbeat.v1", "status": "paper_only", "hash": "sha256:8888888888888888888888888888888888888888888888888888888888888888", "included": "hash_only"}
],
"canary_rule": {
"tiny_capital_only": true,
"operator_owned_custody": true,
"requires_external_exchange_records": true,
"risk_reducing_actions_required": ["/pause-entries", "/flatten-all", "/kill"],
"default_public_runtime_places_live_orders": false
},
"privacy": {
"contains_exchange_credentials": false,
"contains_wallet_material": false,
"contains_raw_decisions": false,
"contains_trace_tokens": false,
"contains_idempotency_tokens": false,
"contains_private_notes": false
},
"evidence_hash": "sha256:9999999999999999999999999999999999999999999999999999999999999999",
"signature": {
"status": "unsigned_local",
"algorithm": null,
"signature": null,
"signer": "mock-runtime",
"signed_evidence_hash": "sha256:9999999999999999999999999999999999999999999999999999999999999999",
"key_material_included": false
}
}))
}
async fn live_canary_policy() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_canary_policy.v1",
"policy_version": "zero.live_canary_policy.public.v1",
"generated_at": chrono_utc_now_iso(),
"mode": "refusal",
"summary": {
"ready_for_canary": false,
"policy_armed": false,
"live_order_attempted": true,
"live_order_accepted": false,
"receipts_accepted": 0,
"exchange_evidence_attached": true,
"publishable_canary_evidence": false,
"refusal_evidence_qualified": true,
"qualified": true,
"next_step": "keep_public_claim_at_refusal_proof"
},
"policy": {
"default_state": "disarmed",
"arm_requires": [
"ready live preflight",
"risk-increasing cockpit allowance",
"passing dry-run live certification",
"operator-owned custody",
"exact live-risk confirmation phrase"
],
"disarm_after": [
"canary attempt completed",
"pause captured",
"flatten captured",
"kill captured",
"evidence exported",
"operator report written"
],
"launch_window_seconds": 300,
"tiny_capital_only": true,
"requires_exchange_evidence_for_accepted_receipts": true,
"required_evidence": ["live_preflight", "live_cockpit", "live_certification", "live_receipts", "exchange_evidence"]
},
"phases": [
{
"name": "readiness",
"status": "blocked",
"detail": "live gates are not ready for risk-increasing canary mode",
"preflight_ready": false,
"controls_ready": true,
"cockpit_risk_increasing_allowed": false,
"certification_passed": true
},
{
"name": "policy_arm",
"status": "disarmed",
"detail": "policy remains disarmed outside ready canary mode",
"mode": "refusal",
"requires_explicit_confirmation": true
},
{
"name": "qualification",
"status": "pass",
"detail": "refusal-mode bundle qualifies as fail-closed public proof, not live trading proof",
"publishable_canary_evidence": false,
"refusal_evidence_qualified": true,
"exchange_evidence_attached": true
}
],
"recommendation": {
"action": "keep_public_claim_at_refusal_proof",
"risk_direction": "none",
"reason": "fail-closed evidence is valid but does not prove live execution"
},
"operator_context": mock_operator_context(),
"request": {
"mode": "refusal",
"source": "mock"
},
"privacy": {
"contains_exchange_credentials": false,
"contains_wallet_material": false,
"contains_raw_exchange_order_ids": false,
"contains_raw_client_order_ids": false,
"contains_idempotency_tokens": false,
"contains_confirmation_phrase": false,
"contains_private_notes": false
}
}))
}
async fn runtime_parity() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.runtime.production_parity.v1",
"available": true,
"ok": true,
"mode": "production-parity",
"source": "bundled-paper-scenario",
"generated_at": chrono_utc_now_iso(),
"cycles_requested": 4,
"cycles_run": 4,
"paper_only": true,
"places_live_orders": false,
"paper": {
"decisions": 4,
"fills": 2,
"rejections": 2,
"open_positions": 1
},
"live_shadow": {
"mode": "disabled-fail-closed",
"accepted": 0,
"refused": 4,
"adapter_orders_placed": 0,
"records": []
},
"feedback": {
"schema_version": "zero.runtime.feedback.v1",
"cycles": 4,
"sample_size": 4,
"fills": 2,
"rejections": 2,
"rejection_rate": 0.5,
"by_rejection_reason": {"order notional exceeds limit": 2},
"by_rejection_symbol": {"ETH": 1, "SOL": 1},
"items": []
},
"certification": {
"schema_version": "zero.live_certification.v1",
"mode": "dry_run",
"passed": true,
"live_start_certified": true,
"summary": {
"total": 10,
"passed": 10,
"failed": 0,
"orders_placed_live": 0
},
"drills": [],
"evidence_requirements": ["operator-owned live canary evidence for live claims"]
},
"checks": {
"paper_boundary": true,
"phase_order": true,
"live_shadow_fail_closed": true,
"live_adapter_no_orders": true,
"operator_owned_canary_required": true
},
"claim_boundary": {
"production_ooda_parity": true,
"live_trading_claimed": false,
"operator_owned_canary_required_for_live_claim": true,
"protected_live_code_evolution_allowed": false,
"remote_push_allowed": false
}
}))
}
async fn live_receipts() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_execution_receipts.v1",
"generated_at": chrono_utc_now_iso(),
"mode": "paper",
"operator_context": mock_operator_context(),
"summary": {
"total": 0,
"accepted": 0,
"refused": 0,
"exchange_error": 0,
"status": "empty"
},
"receipts": [],
"privacy": {
"contains_exchange_credentials": false,
"contains_wallet_material": false,
"contains_raw_venue_ack_payload": false,
"contains_trace_tokens": false,
"contains_idempotency_tokens": false,
"contains_private_notes": false
},
"receipts_hash": "sha256:cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"
}))
}
async fn immune() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.immune.v1",
"generated_at": chrono_utc_now_iso(),
"mode": "paper",
"risk_increasing_allowed": false,
"summary": {"total": 3, "open": 2, "closed": 1, "warning": 0, "risk_blocking": 2},
"breakers": [
{
"name": "stale_market_data",
"status": "closed",
"blocks_risk": false,
"severity": "info",
"reason": "market data fresh",
"evidence": {"age_s": 0.1}
},
{
"name": "dead_man",
"status": "open",
"blocks_risk": true,
"severity": "critical",
"reason": "live executor not configured",
"evidence": {"configured": false}
},
{
"name": "reconciliation",
"status": "open",
"blocks_risk": true,
"severity": "critical",
"reason": "account reconciliation unavailable",
"evidence": {"status": "missing"}
}
]
}))
}
async fn live_cockpit() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_cockpit.v1",
"generated_at": chrono_utc_now_iso(),
"mode": "paper",
"live_mode": "refused",
"ready": false,
"controls_ready": true,
"risk_increasing_allowed": false,
"next_action": "fix preflight check live_executor: mock has no live executor",
"operator_context": mock_operator_context(),
"access_policy": {
"identity_required_for_live_controls": true,
"default_scope": "local-private",
"header_overrides": [
"X-Zero-Operator-Id",
"X-Zero-Operator-Handle",
"X-Zero-Operator-Role",
"X-Zero-Operator-Scope"
]
},
"preflight": {
"schema_version": "zero.live_preflight.v1",
"ready": false,
"live_mode": "refused",
"controls_ready": true,
"summary": {"total": 9, "passed": 8, "failed": 1},
"failed_checks": [
{"name": "live_executor", "status": "fail", "note": "mock has no live executor"}
]
},
"immune": {
"schema_version": "zero.immune.v1",
"risk_increasing_allowed": false,
"summary": {"total": 3, "open": 2, "closed": 1, "warning": 0, "risk_blocking": 2},
"open_breakers": [
{
"name": "dead_man",
"status": "open",
"blocks_risk": true,
"severity": "critical",
"reason": "live executor not configured",
"evidence": {"configured": false}
},
{
"name": "reconciliation",
"status": "open",
"blocks_risk": true,
"severity": "critical",
"reason": "account reconciliation unavailable",
"evidence": {"status": "missing"}
}
]
},
"reconciliation": {
"schema_version": "zero.reconciliation.v1",
"status": "ok",
"risk_increasing_allowed": true,
"reason": "local runtime and Hyperliquid account state are reconciled",
"drifts": 0
},
"certification": {
"schema_version": "zero.live_certification.v1",
"mode": "dry_run",
"passed": true,
"live_start_certified": true,
"summary": {"total": 10, "passed": 10, "failed": 0, "orders_placed_live": 0},
"failed_drills": []
},
"heartbeat": {
"configured": false,
"expired": true,
"last_heartbeat_at": null,
"timeout_s": null
},
"live_records": {
"total": 0,
"accepted": 0,
"refused": 0,
"exchange_error": 0,
"recent": []
},
"operator_actions": {
"risk_reducing": ["/pause-entries", "/kill", "/flatten-all"],
"risk_increasing": ["/resume-entries"],
"read_only": ["/live-cockpit", "/live-certify", "/immune", "/hl-reconcile"],
"recent": []
}
}))
}
async fn live_preflight() -> Json<serde_json::Value> {
Json(json!({
"schema_version": "zero.live_preflight.v1",
"generated_at": chrono_utc_now_iso(),
"exchange": "hyperliquid",
"mode": "paper",
"ready": false,
"live_mode": "refused",
"controls_ready": true,
"checks": [
{"name": "live_executor", "status": "fail", "note": "mock has no live executor"},
{"name": "wallet_address", "status": "ok", "note": "0x0000...0000"},
{"name": "api_private_key", "status": "ok", "note": "0x0000...0000"},
{"name": "account_read", "status": "ok", "note": "clearinghouseState read ok"},
{"name": "reconciliation", "status": "ok", "note": "local runtime and Hyperliquid account state are reconciled", "status_code": "ok"},
{"name": "dry_run_order", "status": "ok", "note": "buy 0.001 BTC validates locally"},
{"name": "journal", "status": "ok", "note": "append-only decision journal configured"},
{"name": "risk_limits", "status": "ok", "note": "max_notional_usd=1000 max_position_usd=5000"},
{"name": "emergency_controls", "status": "ok", "note": "kill switch armed"}
]
}))
}
async fn market_quote(
axum::extract::Query(query): axum::extract::Query<BTreeMap<String, String>>,
) -> Json<serde_json::Value> {
let symbol = query
.get("symbol")
.map_or_else(|| "BTC".to_string(), |s| s.to_uppercase());
let price = match symbol.as_str() {
"BTC" => 40500.0,
"ETH" => 2850.0,
_ => 100.0,
};
Json(json!({
"symbol": symbol,
"price": price,
"source": "paper:static",
"as_of": chrono_utc_now_iso(),
"mode": "paper",
"live": false
}))
}
async fn operator_state(State(s): State<AppState>) -> Json<serde_json::Value> {
let (label, version) = {
let o = s.overrides.lock();
(
o.operator_label
.clone()
.unwrap_or_else(|| "steady".to_string()),
o.operator_version,
)
};
let friction = match label.as_str() {
"elevated" | "fatigued" => "l1",
"tilt" => "l2",
_ => "l0",
};
Json(json!({
"label": label,
"friction": friction,
"vector": {
"velocity": {"last_1h": 0, "last_4h": 0, "last_24h": 0, "baseline_1h": null},
"deviation": {
"overrides_last_10": 0, "verdicts_last_10": 0,
"overrides_last_50": 0, "verdicts_last_50": 0,
},
"session": {"active_duration_ms": 0, "longest_focus_ms": 0, "since_last_break_ms": 0},
"loss_reaction": {
"median_last_10_ms": 0, "fastest_session_ms": 0, "baseline_ms": null,
},
"re_entry": {"within_15m": 0, "within_30m": 0, "within_2h": 0},
"sleep_proxy": {"hours_since_rest_ended": null},
"on_break": false,
},
"as_of": chrono_utc_now_iso(),
"version": version,
}))
}
async fn operator_events(
State(s): State<AppState>,
body: String,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let parsed: serde_json::Value = serde_json::from_str(&body)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{{\"error\":\"{e}\"}}")))?;
let events: Vec<serde_json::Value> = match &parsed {
serde_json::Value::Object(map) if map.contains_key("events") => {
map["events"].as_array().cloned().unwrap_or_default()
}
serde_json::Value::Array(arr) => arr.clone(),
serde_json::Value::Object(_) => vec![parsed.clone()],
_ => {
return Err((
StatusCode::BAD_REQUEST,
"{\"error\":\"body must be an object or array\"}".to_string(),
));
}
};
{
let mut log = s.received_events.lock();
for ev in &events {
log.push(ev.clone());
}
}
let Json(snapshot) = operator_state(State(s)).await;
Ok(Json(json!({
"accepted": events.len(),
"snapshot": snapshot,
})))
}
impl MockEngine {
#[must_use]
pub fn received_operator_events(&self) -> Vec<serde_json::Value> {
self.state.received_events.lock().clone()
}
#[must_use]
pub fn received_executes(&self) -> Vec<CapturedPost> {
self.state.received_executes.lock().clone()
}
#[must_use]
pub fn received_auto_toggles(&self) -> Vec<CapturedPost> {
self.state.received_auto_toggles.lock().clone()
}
#[must_use]
pub fn received_live_controls(&self) -> Vec<String> {
self.state.received_live_controls.lock().clone()
}
}
fn capture_live_control(s: &AppState, path: &str) {
s.received_live_controls.lock().push(path.to_string());
}
async fn live_heartbeat(State(s): State<AppState>) -> Json<serde_json::Value> {
capture_live_control(&s, "/live/heartbeat");
Json(json!({
"ok": true,
"action": "heartbeat",
"risk_direction": "neutral",
"operator_context": mock_operator_context(),
"as_of": chrono_utc_now_iso(),
"dead_man_timeout_s": 30,
"exchange_dead_man": {"ok": true}
}))
}
async fn live_pause(State(s): State<AppState>) -> Json<serde_json::Value> {
capture_live_control(&s, "/live/pause");
Json(json!({
"ok": true,
"state": "paused",
"action": "pause_entries",
"risk_direction": "reduces",
"operator_context": mock_operator_context(),
"as_of": chrono_utc_now_iso()
}))
}
async fn live_resume(State(s): State<AppState>) -> Json<serde_json::Value> {
capture_live_control(&s, "/live/resume");
Json(json!({
"ok": true,
"state": "running",
"action": "resume_entries",
"risk_direction": "increases",
"operator_context": mock_operator_context(),
"as_of": chrono_utc_now_iso()
}))
}
async fn live_kill(State(s): State<AppState>) -> Json<serde_json::Value> {
capture_live_control(&s, "/live/kill");
Json(json!({
"ok": true,
"state": "killed",
"action": "kill",
"risk_direction": "reduces",
"operator_context": mock_operator_context(),
"as_of": chrono_utc_now_iso(),
"exchange_cancel": {"ok": true, "cancelled": 2}
}))
}
async fn live_flatten(State(s): State<AppState>) -> Json<serde_json::Value> {
capture_live_control(&s, "/live/flatten");
Json(json!({
"ok": true,
"action": "flatten_all",
"risk_direction": "reduces",
"operator_context": mock_operator_context(),
"orders": [
{"accepted": true, "coin": "BTC", "side": "sell", "size": 0.42, "reason": "submitted"}
]
}))
}
#[allow(clippy::similar_names)]
async fn execute(
State(s): State<AppState>,
headers: axum::http::HeaderMap,
body: String,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
{
let o = s.overrides.lock();
if o.post_transient_fail {
return Err((StatusCode::SERVICE_UNAVAILABLE, "retry me".into()));
}
if o.post_server_error {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "boom".into()));
}
}
let parsed: serde_json::Value = serde_json::from_str(&body)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{{\"error\":\"{e}\"}}")))?;
let captured = CapturedPost {
headers: capture_relevant_headers(&headers),
body: parsed.clone(),
};
s.received_executes.lock().push(captured);
let mode_header = headers
.get("x-zero-mode")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let force_sim = s.overrides.lock().force_simulated;
let simulated = force_sim || mode_header.eq_ignore_ascii_case("paper");
let coin = parsed.get("coin").cloned().unwrap_or(json!("BTC"));
let side = parsed.get("side").cloned().unwrap_or(json!("buy"));
let size = parsed.get("size").cloned().unwrap_or(json!(0.0));
let key = parsed
.get("idempotency_key")
.and_then(|v| v.as_str())
.unwrap_or_default();
let fill_id = if simulated {
format!("paper-{key}")
} else {
format!("live-{key}")
};
Ok(Json(json!({
"accepted": true,
"simulated": simulated,
"fill_id": fill_id,
"coin": coin,
"side": side,
"size": size,
"request_hash": "sha256:abababababababababababababababababababababababababababababababab",
"receipt_hash": "sha256:bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc",
})))
}
async fn auto_toggle(
State(s): State<AppState>,
headers: axum::http::HeaderMap,
body: String,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
{
let o = s.overrides.lock();
if o.post_transient_fail {
return Err((StatusCode::SERVICE_UNAVAILABLE, "retry me".into()));
}
if o.post_server_error {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "boom".into()));
}
}
let parsed: serde_json::Value = serde_json::from_str(&body)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{{\"error\":\"{e}\"}}")))?;
let captured = CapturedPost {
headers: capture_relevant_headers(&headers),
body: parsed.clone(),
};
s.received_auto_toggles.lock().push(captured);
let requested = parsed
.get("enabled")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let (echo, reason) = {
let o = s.overrides.lock();
(o.auto_toggle_echo_state, o.auto_toggle_reason.clone())
};
let actual = echo.unwrap_or(requested);
let state_str = if actual { "on" } else { "off" };
let mode_header = headers
.get("x-zero-mode")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let force_sim = s.overrides.lock().force_simulated;
let simulated = force_sim || mode_header.eq_ignore_ascii_case("paper");
let mut resp = serde_json::Map::new();
resp.insert("state".into(), json!(state_str));
resp.insert("simulated".into(), json!(simulated));
if let Some(r) = reason {
resp.insert("reason".into(), json!(r));
}
Ok(Json(serde_json::Value::Object(resp)))
}
fn capture_relevant_headers(
headers: &axum::http::HeaderMap,
) -> std::collections::BTreeMap<String, String> {
const RELEVANT: &[&str] = &[
"x-zero-mode",
"x-idempotency-key",
"content-type",
"authorization",
];
let mut out = std::collections::BTreeMap::new();
for name in RELEVANT {
if let Some(v) = headers.get(*name)
&& let Ok(s) = v.to_str()
{
out.insert((*name).to_string(), s.to_string());
}
}
out
}
async fn ws_handler(ws: WebSocketUpgrade, State(s): State<AppState>) -> Response {
ws.on_upgrade(move |socket| handle_ws(socket, s))
}
async fn handle_ws(mut socket: WebSocket, s: AppState) {
let should_drop = {
let mut o = s.overrides.lock();
if o.ws_drop_once {
o.ws_drop_once = false;
true
} else {
false
}
};
if should_drop {
let _ = socket.close().await;
return;
}
let events = [
json!({"event": "heartbeat", "ts": now_iso(), "data": {}}),
json!({
"event": "v2_status",
"ts": now_iso(),
"data": {
"confidence": {"score": 72, "level": "high"},
"market": {
"regime": "TREND_LONG",
"health": 0.954,
"fear_greed": 54,
"coins_tradeable": 30
},
"positions": {"open": 2, "unrealized_pnl": 34.12, "equity": 10_034.12},
"today": {"trades": 24, "wins": 15, "pnl": -3.95},
"approaching": [],
"blind_spots": []
}
}),
json!({
"event": "positions_update",
"ts": now_iso(),
"data": {
"positions": [
{"symbol": "BTC", "side": "long", "size": 0.42, "entry": 64_120.5,
"mark": 64_480.0, "unrealized_pnl": 151.13, "unrealized_r": 0.82}
],
"account_value": 10_034.12,
"total_unrealized_pnl": 151.13
}
}),
json!({
"event": "risk_update",
"ts": now_iso(),
"data": {
"account_value": 10_034.12,
"drawdown_pct": 0.8,
"halted": false,
"global_halt": false,
"stop_failure_halt": false,
"daily_pnl_usd": 34.12,
"daily_loss_usd": 20.0,
"peak_equity": 10_100.0,
"open_count": 2
}
}),
json!({
"event": "regime_update",
"ts": now_iso(),
"data": {"regime": "TREND_LONG", "confidence": 0.81}
}),
];
for ev in events {
if socket.send(Message::Text(ev.to_string())).await.is_err() {
return;
}
}
let mut ticker = tokio::time::interval(Duration::from_millis(250));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {
let hb = json!({"event": "heartbeat", "ts": now_iso(), "data": {}});
if socket.send(Message::Text(hb.to_string())).await.is_err() {
return;
}
}
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_)) | Err(_)) | None => return,
_ => {}
}
}
}
}
}
fn now_iso() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
format!("2026-01-01T00:00:{:02}Z", secs % 60)
}
type Response = axum::response::Response;
fn chrono_utc_now_iso() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("1970-01-01T00:00:{:02}Z", secs % 60)
}
fn mock_operator_context() -> serde_json::Value {
json!({
"schema_version": "zero.operator_context.v1",
"operator_id": "mock-operator",
"handle": "mock-operator",
"role": "owner",
"scope": "local-private",
"source": "mock"
})
}