use std::sync::Arc;
use parking_lot::RwLock;
use zero_engine_client::{EngineState, ExecuteSide, HttpClient, LiveControlResponse};
use zero_operator_state::label::Label;
use crate::command::{
AutoAction, Command, ConfigAction, DISCLOSURE_OVERRIDE_CONFIRM, HeadlessAction, ModeTarget,
OverlayTarget, StateOverrideLabel, VerboseAction,
};
use crate::config::{ConfigSource, DoctorSeverity};
use crate::friction::FrictionDecision;
use crate::parse::parse_line;
use crate::risk::RiskDirection;
use crate::session::{ReplayKind, SessionSource};
use crate::supervisor::{
AutoRequest, AutoSource, SupervisorAction, SupervisorError, SupervisorReply, SupervisorSource,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutputLine {
System(String),
Command(String),
Warn(String),
Alert(String),
}
impl OutputLine {
pub fn system(s: impl Into<String>) -> Self {
Self::System(s.into())
}
pub fn command(s: impl Into<String>) -> Self {
Self::Command(s.into())
}
pub fn warn(s: impl Into<String>) -> Self {
Self::Warn(s.into())
}
pub fn alert(s: impl Into<String>) -> Self {
Self::Alert(s.into())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayLine {
pub kind: ReplayKind,
pub at_ms: i64,
pub text: String,
}
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct DispatchOutput {
pub lines: Vec<OutputLine>,
pub replay_lines: Vec<ReplayLine>,
pub mode_change: Option<ModeTarget>,
pub show_overlay: Option<OverlayTarget>,
pub quit: bool,
pub clear_log: bool,
pub risk: Option<RiskDirection>,
pub friction: Option<FrictionDecision>,
pub pending_command: Option<Command>,
pub verbose_toggle: Option<bool>,
pub wrap_off_toggle: Option<bool>,
pub coaching_reset: bool,
pub dismiss_overlay: bool,
}
impl DispatchOutput {
#[must_use]
pub fn with_line(mut self, l: OutputLine) -> Self {
self.lines.push(l);
self
}
}
pub trait StateSource: Send + Sync + 'static {
fn label(&self) -> Label;
}
#[derive(Debug, Clone, Copy)]
pub struct StaticLabel(pub Label);
impl StaticLabel {
#[must_use]
pub const fn steady() -> Self {
Self(Label::Steady)
}
#[must_use]
pub const fn tilt() -> Self {
Self(Label::Tilt)
}
}
impl Default for StaticLabel {
fn default() -> Self {
Self::steady()
}
}
impl StateSource for StaticLabel {
fn label(&self) -> Label {
self.0
}
}
#[derive(Clone)]
pub struct DispatchContext {
pub http: Option<HttpClient>,
pub engine: Arc<RwLock<EngineState>>,
pub state: Arc<dyn StateSource>,
pub sessions: Option<Arc<dyn SessionSource>>,
pub config: Option<Arc<dyn ConfigSource>>,
pub verbose: bool,
pub wrap_off: bool,
pub auto: Option<Arc<dyn AutoSource>>,
pub supervisor: Option<Arc<dyn SupervisorSource>>,
}
impl std::fmt::Debug for DispatchContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DispatchContext")
.field("http_connected", &self.http.is_some())
.field("label", &self.state.label())
.field("sessions_enabled", &self.sessions.is_some())
.field("config_enabled", &self.config.is_some())
.field("auto_enabled", &self.auto.is_some())
.field("supervisor_enabled", &self.supervisor.is_some())
.finish_non_exhaustive()
}
}
impl DispatchContext {
#[must_use]
pub fn new(http: Option<HttpClient>, engine: Arc<RwLock<EngineState>>) -> Self {
Self {
http,
engine,
state: Arc::new(StaticLabel::steady()),
sessions: None,
config: None,
verbose: false,
wrap_off: false,
auto: None,
supervisor: None,
}
}
#[must_use]
pub fn with_state(mut self, src: Arc<dyn StateSource>) -> Self {
self.state = src;
self
}
#[must_use]
pub fn with_sessions(mut self, src: Arc<dyn SessionSource>) -> Self {
self.sessions = Some(src);
self
}
#[must_use]
pub fn with_config(mut self, src: Arc<dyn ConfigSource>) -> Self {
self.config = Some(src);
self
}
#[must_use]
pub const fn with_verbose(mut self, on: bool) -> Self {
self.verbose = on;
self
}
#[must_use]
pub const fn with_wrap_off(mut self, on: bool) -> Self {
self.wrap_off = on;
self
}
#[must_use]
pub fn with_auto(mut self, src: Arc<dyn AutoSource>) -> Self {
self.auto = Some(src);
self
}
#[must_use]
pub fn with_supervisor(mut self, src: Arc<dyn SupervisorSource>) -> Self {
self.supervisor = Some(src);
self
}
}
pub async fn dispatch(ctx: &DispatchContext, input: &str) -> Result<Option<DispatchOutput>, Never> {
let parsed = parse_line(input);
let Some(cmd) = crate::command::resolve(&parsed) else {
return Ok(None);
};
let risk = cmd.risk();
let label = ctx.state.label();
let (risk_ctx, halt_reason, reread_phrase) = {
let eng = ctx.engine.read();
eng.risk
.as_ref()
.map(|stat| {
let r = &stat.value;
let rc = zero_operator_state::RiskContext::from_engine(
r.drawdown_pct,
r.last_drawdown_alert_pct,
r.is_halted(),
);
let halt_reason = halt_reason_label(r);
let reread = reread_phrase_from_risk(r.drawdown_pct, r.last_drawdown_alert_pct);
(rc, halt_reason, reread)
})
.unwrap_or_default()
};
let decision = crate::friction::decide_with_risk(
risk,
label,
risk_ctx,
halt_reason.as_deref(),
reread_phrase,
);
let mut out = if matches!(decision, FrictionDecision::Proceed) {
run(ctx, &cmd).await
} else {
friction_advisory(&cmd, label, &decision)
};
let carry_pending = !matches!(decision, FrictionDecision::Proceed) && !decision.is_refusal();
out.risk = Some(risk);
out.friction = Some(decision);
if carry_pending {
out.pending_command = Some(cmd);
}
Ok(Some(out))
}
fn halt_reason_label(risk: &zero_engine_client::models::Risk) -> Option<String> {
if risk.stop_failure_halt {
Some("stop_failure_halt".to_string())
} else if risk.global_halt {
Some("global_halt".to_string())
} else if risk.halted {
Some(
risk.halt_reason
.clone()
.unwrap_or_else(|| "halted".to_string()),
)
} else {
None
}
}
fn reread_phrase_from_risk(
drawdown_pct: Option<f64>,
last_drawdown_alert_pct: Option<f64>,
) -> Option<String> {
let dd = drawdown_pct?;
let alert = last_drawdown_alert_pct?;
let delta = (alert - dd).abs();
Some(format!(
"i acknowledge drawdown {dd:.2}% is within {delta:.2}pp of the {alert:.2}% hard alert"
))
}
pub async fn run_bypass_friction(ctx: &DispatchContext, cmd: Command) -> DispatchOutput {
let risk = cmd.risk();
let mut out = run(ctx, &cmd).await;
out.risk = Some(risk);
out.friction = Some(FrictionDecision::Proceed);
out
}
fn friction_advisory(cmd: &Command, label: Label, d: &FrictionDecision) -> DispatchOutput {
let mut out = DispatchOutput::default();
match d {
FrictionDecision::Proceed => {}
FrictionDecision::Pause { pause, level } => {
out.lines.push(OutputLine::warn(format!(
"{name}: friction {level:?} — state={label}, pause {pause}s",
name = cmd.name(),
pause = pause.as_secs(),
)));
}
FrictionDecision::TypedConfirm { pause, level } => {
let word = d.confirm_word().map_or_else(
|| crate::friction::TYPED_CONFIRM_WORD.to_string(),
std::borrow::Cow::into_owned,
);
out.lines.push(OutputLine::alert(format!(
"{name}: friction {level:?} — state={label}, {pause}s pause + type '{word}'",
name = cmd.name(),
pause = pause.as_secs(),
)));
}
FrictionDecision::WaitAndReread {
pause,
level,
phrase,
} => {
out.lines.push(OutputLine::alert(format!(
"{name}: friction {level:?} — state={label}, {pause}s pause + re-read: '{phrase}'",
name = cmd.name(),
pause = pause.as_secs(),
)));
}
FrictionDecision::HardStop { level, reason } => {
out.lines.push(OutputLine::alert(format!(
"{name}: friction {level:?} REFUSED — state={label}, reason={reason}. \
Only risk-reducing commands are accepted while the engine is halted.",
name = cmd.name(),
)));
}
}
out
}
async fn run(ctx: &DispatchContext, cmd: &Command) -> DispatchOutput {
match cmd {
Command::Help => help(),
Command::Quit => DispatchOutput {
quit: true,
lines: vec![OutputLine::system("exiting")],
..Default::default()
},
Command::Clear => DispatchOutput {
clear_log: true,
dismiss_overlay: true,
..Default::default()
},
Command::SwitchMode(m) => DispatchOutput {
mode_change: Some(*m),
..Default::default()
},
Command::Status => status(ctx).await,
Command::Brief => brief(ctx).await,
Command::Risk => risk_cmd(ctx).await,
Command::HyperliquidStatus { symbol } => hl_status_cmd(ctx, symbol.as_deref()).await,
Command::HyperliquidAccount => hl_account_cmd(ctx).await,
Command::HyperliquidReconcile => hl_reconcile_cmd(ctx).await,
Command::LiveCertify => live_certify_cmd(ctx).await,
Command::LiveCockpit => live_cockpit_cmd(ctx).await,
Command::LiveEvidence => live_evidence_cmd(ctx).await,
Command::LiveReceipts => live_receipts_cmd(ctx).await,
Command::LiveCanaryPolicy => live_canary_policy_cmd(ctx).await,
Command::RuntimeParity => runtime_parity_cmd(ctx).await,
Command::Immune => immune_cmd(ctx).await,
Command::Quote { symbol } => quote_cmd(ctx, symbol.as_deref()).await,
Command::Regime { coin } => regime_cmd(ctx, coin.as_deref()).await,
Command::Evaluate { coin, extras } => evaluate_cmd(ctx, coin.as_deref(), extras).await,
Command::Positions => positions_cmd(ctx).await,
Command::Pulse { limit } => pulse_cmd(ctx, *limit).await,
Command::Approaching => approaching_cmd(ctx).await,
Command::Rejections { coin, limit } => rejections_cmd(ctx, coin.as_deref(), *limit).await,
Command::Kill => kill_cmd(ctx).await,
Command::FlattenAll => flatten_cmd(ctx).await,
Command::PauseEntries => pause_cmd(ctx).await,
Command::ResumeEntries => resume_entries_cmd(ctx).await,
Command::Break { minutes } => break_stub(ctx, *minutes).await,
Command::Execute => execute_stub(),
Command::ExecuteOrder {
coin,
side,
size,
error,
} => {
execute_cmd(
ctx,
coin.as_deref(),
*side,
size.as_deref(),
error.as_deref(),
)
.await
}
Command::State => DispatchOutput {
show_overlay: Some(OverlayTarget::State),
..Default::default()
},
Command::Sessions { limit } => sessions_cmd(ctx, *limit),
Command::Resume { needle } => resume_cmd(ctx, needle.as_deref()),
Command::Fork => fork_cmd(ctx),
Command::Save { label } => save_cmd(ctx, label.as_deref()),
Command::Replay { needle } => replay_cmd(ctx, needle.as_deref()),
Command::Share { needle } => share_cmd(ctx, needle.as_deref()),
Command::Heat => heat_cmd(ctx).await,
Command::Config { action } => config_cmd(ctx, action),
Command::Verbose { action } => verbose_cmd(ctx, action),
Command::StateOverride { label } => state_override_cmd(*label),
Command::Continue => continue_cmd(),
Command::Close { coin } => close_cmd(coin.as_deref()),
Command::WrapOff => wrap_off_cmd(),
Command::CoachingReset => coaching_reset_cmd(),
Command::DisclosureOverride { confirmed } => disclosure_override_cmd(*confirmed),
Command::Rate { trade_id, rating } => rate_cmd(ctx, trade_id.as_deref(), *rating).await,
Command::ZeroPrefix { rest } => zero_prefix_hint(rest),
Command::Auto { action } => auto_cmd(ctx, action),
Command::Headless { action } => headless_cmd(ctx, action),
Command::Unknown(head) => DispatchOutput {
lines: vec![OutputLine::warn(format!(
"unknown command: /{head} (try /help)"
))],
..Default::default()
},
}
}
fn help() -> DispatchOutput {
let mut out = DispatchOutput::default();
out.lines.push(OutputLine::system("commands:"));
out.lines
.extend(HELP_LINES.iter().copied().map(OutputLine::system));
out.lines.push(OutputLine::system(
" /doctor — diagnose config + secrets (alias for /config doctor)",
));
out.lines.push(OutputLine::system(
" /config show — show resolved config values",
));
out.lines.push(OutputLine::system(
"mode switches are also on Ctrl+1..5. Ctrl+0 returns to Conversation.",
));
out
}
const HELP_LINES: &[&str] = &[
" /help — this list",
" /quit — exit",
" /clear — clear the log",
" /conv /decisions /heat-mode /pos-mode — switch modes",
" /status — engine status",
" /brief — morning briefing",
" /risk — guardrail summary",
" /hl-status [coin] — read-only Hyperliquid info status",
" /hl-account — read-only Hyperliquid account truth",
" /hl-reconcile — Hyperliquid account reconciliation",
" /live-certify — dry-run live execution certification",
" /live-cockpit — live readiness cockpit",
" /live-evidence — hash-only live evidence bundle",
" /live-receipts — public-safe execution receipts",
" /live-canary — live canary readiness and proof policy",
" /runtime-parity — production-parity OODA report",
" /immune — immune breaker state",
" /quote <coin> — active paper quote source",
" /heat — composite heat (risk + circuit state)",
" /regime [coin] — market regime",
" /evaluate <coin> — gate verdict (overlay)",
" /pos — open positions",
" /pulse [N] — recent engine events",
" /approaching — coins near a gate",
" /rejections [coin] [N] — recent gate rejections",
" /kill /flatten-all /pause-entries /break /close — risk-reducers (instant)",
" /resume-entries — resume new entries (friction-gated)",
" /close <coin> — close a single position",
" /execute <coin> <buy|sell> <size> — place order (gated)",
" /state — full operator-state overview (any key closes)",
" /state-override <L> — declare operator-state label (gated)",
" /continue — acknowledge coaching notice",
" /coaching reset — clear coaching notice buffer",
" /wrap-off — skip daily wrap (this session only)",
" /disclosure-override --i-know-what-i-am-doing — bypass progressive disclosure",
];
fn zero_prefix_hint(rest: &str) -> DispatchOutput {
let tail = rest.trim();
let hint = match tail {
"" => "you're already inside zero — try `/help` to list commands".to_owned(),
"doctor" | "doctor --fix" | "doctor --format json" => {
"you're already inside zero — try `/doctor` (or `/config doctor`)".to_owned()
}
"version" | "--version" | "-V" => {
"you're already inside zero — the version banner printed at startup; `/quit` returns to the shell".to_owned()
}
other => format!(
"you're already inside zero — `{other}` is a shell subcommand. `/quit` returns to the shell, or try `/help`"
),
};
DispatchOutput {
lines: vec![OutputLine::warn(hint)],
..Default::default()
}
}
fn require_http<'a>(ctx: &'a DispatchContext, out: &mut DispatchOutput) -> Option<&'a HttpClient> {
if let Some(c) = &ctx.http {
Some(c)
} else {
out.lines.push(OutputLine::alert(
"no engine client configured — run `zero init` or set ZERO_API_URL",
));
None
}
}
async fn status(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.v2_status().await {
Ok(s) => {
let regime = s.regime().unwrap_or("—");
let conf = match (s.engine_confidence(), s.confidence_level()) {
(Some(score), Some(level)) => format!("{score:.0} ({level})"),
(Some(score), None) => format!("{score:.0}"),
(None, Some(level)) => level.to_string(),
(None, None) => "—".into(),
};
let eq = s.equity().map_or("—".into(), |v| format!("${v:.2}"));
let open = s.open().map_or("—".into(), |n| n.to_string());
let upnl = s
.unrealized_pnl()
.map_or("—".into(), |v| format!("{v:+.2}"));
out.lines.push(OutputLine::command(format!(
"engine: regime={regime} confidence={conf} equity={eq} open={open} upnl={upnl}"
)));
let today = &s.today;
if today.trades.is_some() || today.pnl.is_some() {
let trades = today.trades.map_or("—".into(), |n| n.to_string());
let wins = today.wins.map_or("—".into(), |n| n.to_string());
let pnl = today.pnl.map_or("—".into(), |v| format!("{v:+.2}"));
let streak = today.streak.map_or("—".into(), |n| format!("{n:+}"));
let sizing = today.sizing_mult.map_or("—".into(), |v| format!("{v:.2}x"));
out.lines.push(OutputLine::system(format!(
" today: trades={trades} wins={wins} pnl={pnl} streak={streak} sizing={sizing}"
)));
}
let market = &s.market;
if market.fear_greed.is_some() || market.health.is_some() {
let fg = market.fear_greed.map_or("—".into(), |n| n.to_string());
let health = market
.health
.map_or("—".into(), |v| format!("{:.0}%", v * 100.0));
let coins = market.coins_tradeable.map_or("—".into(), |n| n.to_string());
out.lines.push(OutputLine::system(format!(
" market: fear_greed={fg} health={health} coins_tradeable={coins}"
)));
}
if let Some(recovery) = &s.recovery {
let status = recovery.status.as_deref().unwrap_or("unknown");
let source = recovery.source.as_deref().unwrap_or("unknown");
let durable = if recovery.durable {
"durable"
} else {
"ephemeral"
};
let decisions = recovery
.current_decisions
.or(recovery.decisions_recovered)
.map_or("—".into(), |n| n.to_string());
let fills = recovery
.current_fills
.or(recovery.fills_recovered)
.map_or("—".into(), |n| n.to_string());
let positions = recovery
.current_positions
.or(recovery.positions_recovered)
.map_or("—".into(), |n| n.to_string());
out.lines.push(OutputLine::system(format!(
" recovery: {status} source={source} journal={durable} decisions={decisions} fills={fills} positions={positions}"
)));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("status: {e}"))),
}
out
}
async fn brief(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.brief().await {
Ok(b) => {
if !b.has_content() {
out.lines
.push(OutputLine::system("(engine has no briefing right now)"));
return out;
}
let open = b.open_positions.unwrap_or(0);
let fg = b
.fear_greed
.map_or("—".into(), |v| format!("{v} ({})", fg_sentiment(v)));
out.lines.push(OutputLine::command(format!(
"brief: open={open} fear_greed={fg} signals={} approaching={}",
b.recent_signals.len(),
b.approaching.len(),
)));
for pos in b.positions.iter().take(8) {
let pnl = pos
.unrealized_pnl
.map_or_else(|| "—".into(), |v| format!("{v:+.2}"));
out.lines.push(OutputLine::system(format!(
" position {} {} size={:.4} entry={:.2} pnl={}",
pos.symbol, pos.side, pos.size, pos.entry, pnl
)));
}
for sig in b.recent_signals.iter().take(5) {
if let Some(summary) = brief_line_summary(sig) {
out.lines
.push(OutputLine::system(format!(" signal {summary}")));
}
}
for app in b.approaching.iter().take(5) {
if let Some(summary) = brief_line_summary(app) {
out.lines
.push(OutputLine::system(format!(" approaching {summary}")));
}
}
if let Some(cycle) = b.last_cycle.as_object()
&& !cycle.is_empty()
{
let parts: Vec<String> = cycle
.iter()
.take(5)
.map(|(k, v)| format!("{k}={}", compact_json_value(v)))
.collect();
out.lines.push(OutputLine::system(format!(
" last_cycle {}",
parts.join(" ")
)));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("brief: {e}"))),
}
out
}
async fn hl_status_cmd(ctx: &DispatchContext, symbol: Option<&str>) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.hyperliquid_status(symbol).await {
Ok(s) if !s.enabled => {
let reason = s
.reason
.as_deref()
.unwrap_or("Hyperliquid read-only adapter disabled");
out.lines
.push(OutputLine::warn(format!("hl: disabled — {reason}")));
}
Ok(s) => {
let coins = s.coins.map_or("—".into(), |n| n.to_string());
let secrets = s
.secrets_required
.map_or("—".into(), |required| required.to_string());
out.lines.push(OutputLine::command(format!(
"hl: enabled coins={coins} secrets_required={secrets}"
)));
for (symbol, mid) in s.mids.iter().take(8) {
out.lines
.push(OutputLine::system(format!(" {symbol}: mid={mid:.4}")));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("hl-status: {e}"))),
}
out
}
async fn hl_account_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.hyperliquid_account().await {
Ok(account) => {
let equity = account
.account_value
.map_or("—".into(), |value| format!("${value:.2}"));
let margin = account
.margin_used
.map_or("—".into(), |value| format!("${value:.2}"));
out.lines.push(OutputLine::command(format!(
"hl-account: user={} equity={equity} margin={margin} positions={} open_orders={}",
account.user,
account.positions.len(),
account.open_orders.len()
)));
for position in account.positions.iter().take(8) {
out.lines.push(OutputLine::system(format!(
" {} {} qty={:.6} entry={:.4} value=${:.2} upnl=${:.2}",
position.symbol,
position.side,
position.quantity.abs(),
position.entry_price,
position.position_value,
position.unrealized_pnl
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("hl-account: {e}"))),
}
out
}
async fn hl_reconcile_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.hyperliquid_reconciliation().await {
Ok(report) => {
out.lines.push(OutputLine::command(format!(
"hl-reconcile: status={} risk_increasing_allowed={} reason={}",
report.status, report.risk_increasing_allowed, report.reason
)));
for drift in report.drifts.iter().take(8) {
let symbol = drift.symbol.as_deref().unwrap_or("account");
out.lines.push(OutputLine::system(format!(
" {symbol}: {} {} — {}",
drift.severity, drift.code, drift.reason
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("hl-reconcile: {e}"))),
}
out
}
async fn live_certify_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.live_certification().await {
Ok(report) => {
let passed = report
.summary
.get("passed")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
let total = report
.summary
.get("total")
.and_then(serde_json::Value::as_u64)
.unwrap_or(report.drills.len() as u64);
out.lines.push(OutputLine::command(format!(
"live-certify: passed={} live_start_certified={} drills={passed}/{total}",
report.passed, report.live_start_certified
)));
for drill in report
.drills
.iter()
.filter(|drill| drill.status != "pass")
.take(8)
{
out.lines.push(OutputLine::system(format!(
" {}: {} — {}",
drill.name, drill.status, drill.note
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("live-certify: {e}"))),
}
out
}
async fn live_cockpit_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.live_cockpit().await {
Ok(cockpit) => {
let preflight_total = json_u64(&cockpit.preflight.summary, "total");
let preflight_passed = json_u64(&cockpit.preflight.summary, "passed");
let preflight_failed = json_u64(&cockpit.preflight.summary, "failed");
let immune_open = json_u64(&cockpit.immune.summary, "open");
let immune_blocking = json_u64(&cockpit.immune.summary, "risk_blocking");
let cert_total = json_u64(&cockpit.certification.summary, "total");
let cert_passed = json_u64(&cockpit.certification.summary, "passed");
let timeout = cockpit
.heartbeat
.timeout_s
.map_or_else(|| "n/a".to_string(), |s| s.to_string());
out.lines.push(OutputLine::command(format!(
"live-cockpit: live_mode={} ready={} risk_allowed={} controls_ready={}",
cockpit.live_mode,
cockpit.ready,
cockpit.risk_increasing_allowed,
cockpit.controls_ready
)));
out.lines.push(OutputLine::system(format!(
" next: {}",
cockpit.next_action
)));
out.lines.push(OutputLine::system(format!(
" operator: handle={} id={} role={} scope={}",
cockpit.operator_context.handle,
cockpit.operator_context.operator_id,
cockpit.operator_context.role,
cockpit.operator_context.scope
)));
out.lines.push(OutputLine::system(format!(
" preflight: passed={preflight_passed}/{preflight_total} failed={preflight_failed}"
)));
out.lines.push(OutputLine::system(format!(
" immune: open={immune_open} risk_blocking={immune_blocking}"
)));
out.lines.push(OutputLine::system(format!(
" reconcile: status={} risk_allowed={} drifts={} - {}",
cockpit.reconciliation.status,
cockpit.reconciliation.risk_increasing_allowed,
cockpit.reconciliation.drifts,
cockpit.reconciliation.reason
)));
out.lines.push(OutputLine::system(format!(
" certification: passed={} live_start_certified={} drills={cert_passed}/{cert_total}",
cockpit.certification.passed, cockpit.certification.live_start_certified
)));
out.lines.push(OutputLine::system(format!(
" heartbeat: configured={} expired={} timeout_s={timeout}",
cockpit.heartbeat.configured, cockpit.heartbeat.expired
)));
out.lines.push(OutputLine::system(format!(
" live-records: total={} accepted={} refused={} exchange_error={}",
cockpit.live_records.total,
cockpit.live_records.accepted,
cockpit.live_records.refused,
cockpit.live_records.exchange_error
)));
for check in cockpit.preflight.failed_checks.iter().take(4) {
out.lines.push(OutputLine::system(format!(
" preflight:{} {} - {}",
check.name, check.status, check.note
)));
}
for breaker in cockpit.immune.open_breakers.iter().take(4) {
out.lines.push(OutputLine::system(format!(
" breaker:{} {} - {}",
breaker.name, breaker.status, breaker.reason
)));
}
out.lines.push(OutputLine::system(
" actions: reduce=/pause-entries /kill /flatten-all resume=/resume-entries",
));
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("live-cockpit: {e}"))),
}
out
}
async fn live_evidence_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.live_evidence().await {
Ok(evidence) => {
let signer = evidence
.signature
.get("signer")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown");
let signature_status = evidence
.signature
.get("status")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown");
let hash_short = evidence
.evidence_hash
.strip_prefix("sha256:")
.map_or(evidence.evidence_hash.as_str(), |hash| hash)
.chars()
.take(12)
.collect::<String>();
out.lines.push(OutputLine::command(format!(
"live-evidence: live_mode={} ready={} risk_allowed={} artifacts={} hash=sha256:{hash_short}...",
evidence.live_mode,
evidence.ready,
evidence.risk_increasing_allowed,
evidence.artifacts.len()
)));
out.lines.push(OutputLine::system(format!(
" signature: status={signature_status} signer={signer}"
)));
out.lines.push(OutputLine::system(format!(
" operator: handle={} id={} role={} scope={}",
evidence.operator_context.handle,
evidence.operator_context.operator_id,
evidence.operator_context.role,
evidence.operator_context.scope
)));
for artifact in evidence.artifacts.iter().take(8) {
out.lines.push(OutputLine::system(format!(
" {}: {} {} {}",
artifact.name, artifact.status, artifact.schema_version, artifact.hash
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("live-evidence: {e}"))),
}
out
}
async fn live_receipts_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.live_receipts().await {
Ok(receipts) => {
let total = json_u64(&receipts.summary, "total");
let accepted = json_u64(&receipts.summary, "accepted");
let refused = json_u64(&receipts.summary, "refused");
let exchange_error = json_u64(&receipts.summary, "exchange_error");
let status = receipts
.summary
.get("status")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown");
let hash_short = short_sha(&receipts.receipts_hash);
out.lines.push(OutputLine::command(format!(
"live-receipts: status={status} total={total} accepted={accepted} refused={refused} exchange_error={exchange_error} hash=sha256:{hash_short}..."
)));
out.lines.push(OutputLine::system(format!(
" operator: handle={} id={} role={} scope={}",
receipts.operator_context.handle,
receipts.operator_context.operator_id,
receipts.operator_context.role,
receipts.operator_context.scope
)));
out.lines.push(OutputLine::system(format!(
" privacy: credentials={} wallet={} raw_ack={} trace_tokens={} idempotency_tokens={}",
json_bool(&receipts.privacy, "contains_exchange_credentials"),
json_bool(&receipts.privacy, "contains_wallet_material"),
json_bool(&receipts.privacy, "contains_raw_venue_ack_payload"),
json_bool(&receipts.privacy, "contains_trace_tokens"),
json_bool(&receipts.privacy, "contains_idempotency_tokens")
)));
for receipt in receipts.receipts.iter().take(8) {
out.lines.push(OutputLine::system(format!(
" receipt: accepted={} status={} reason={} hash=sha256:{}...",
receipt.accepted,
receipt.status,
receipt.reason,
short_sha(&receipt.receipt_hash)
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("live-receipts: {e}"))),
}
out
}
async fn live_canary_policy_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.live_canary_policy().await {
Ok(policy) => {
out.lines.push(OutputLine::command(format!(
"live-canary: ready={} armed={} qualified={} publishable={} accepted_live={}",
policy.summary.ready_for_canary,
policy.summary.policy_armed,
policy.summary.qualified,
policy.summary.publishable_canary_evidence,
policy.summary.live_order_accepted
)));
out.lines.push(OutputLine::system(format!(
" next: {} risk={} - {}",
policy.recommendation.action,
policy.recommendation.risk_direction,
policy.recommendation.reason
)));
out.lines.push(OutputLine::system(format!(
" evidence: attempted={} receipts_accepted={} exchange_attached={} refusal_qualified={}",
policy.summary.live_order_attempted,
policy.summary.receipts_accepted,
policy.summary.exchange_evidence_attached,
policy.summary.refusal_evidence_qualified
)));
out.lines.push(OutputLine::system(format!(
" operator: handle={} id={} role={} scope={}",
policy.operator_context.handle,
policy.operator_context.operator_id,
policy.operator_context.role,
policy.operator_context.scope
)));
for phase in policy.phases.iter().take(8) {
out.lines.push(OutputLine::system(format!(
" phase:{} {} - {}",
phase.name, phase.status, phase.detail
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("live-canary: {e}"))),
}
out
}
async fn runtime_parity_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.runtime_parity().await {
Ok(report) => {
let production_ooda = json_bool(&report.claim_boundary, "production_ooda_parity");
let live_claimed = json_bool(&report.claim_boundary, "live_trading_claimed");
let canary_required = json_bool(
&report.claim_boundary,
"operator_owned_canary_required_for_live_claim",
);
let protected_live_code = json_bool(
&report.claim_boundary,
"protected_live_code_evolution_allowed",
);
let top_reason = report
.feedback
.by_rejection_reason
.iter()
.max_by_key(|(_, count)| *count)
.map_or("none", |(reason, _)| reason.as_str());
out.lines.push(OutputLine::command(format!(
"runtime-parity: ok={} production_ooda={} paper_only={} live_trading_claimed={}",
report.ok, production_ooda, report.paper_only, live_claimed
)));
out.lines.push(OutputLine::system(format!(
" paper: cycles={}/{} decisions={} fills={} rejections={} open_positions={}",
report.cycles_run,
report.cycles_requested,
report.paper.decisions,
report.paper.fills,
report.paper.rejections,
report.paper.open_positions
)));
out.lines.push(OutputLine::system(format!(
" live-shadow: mode={} refused={} accepted={} adapter_orders={} places_live_orders={}",
report.live_shadow.mode,
report.live_shadow.refused,
report.live_shadow.accepted,
report.live_shadow.adapter_orders_placed,
report.places_live_orders
)));
out.lines.push(OutputLine::system(format!(
" feedback: rejection_rate={:.2}% sample={} top_rejection={}",
report.feedback.rejection_rate * 100.0,
report.feedback.sample_size,
top_reason
)));
out.lines.push(OutputLine::system(format!(
" boundary: operator_owned_canary_required={canary_required} protected_live_code_evolution={protected_live_code}"
)));
out.lines.push(OutputLine::system(format!(
" certification: passed={} live_start_certified={} mode={}",
report.certification.passed,
report.certification.live_start_certified,
report.certification.mode
)));
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("runtime-parity: {e}"))),
}
out
}
async fn immune_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.immune().await {
Ok(report) => {
let open = report
.summary
.get("open")
.and_then(serde_json::Value::as_u64)
.unwrap_or_else(|| report.breakers.iter().filter(|b| b.blocks_risk).count() as u64);
out.lines.push(OutputLine::command(format!(
"immune: risk_increasing_allowed={} open={} mode={}",
report.risk_increasing_allowed, open, report.mode
)));
for breaker in report
.breakers
.iter()
.filter(|breaker| breaker.blocks_risk)
.take(8)
{
out.lines.push(OutputLine::system(format!(
" {}: {} - {}",
breaker.name, breaker.status, breaker.reason
)));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("immune: {e}"))),
}
out
}
fn json_u64(map: &std::collections::BTreeMap<String, serde_json::Value>, key: &str) -> u64 {
map.get(key)
.and_then(serde_json::Value::as_u64)
.unwrap_or(0)
}
fn json_bool(map: &std::collections::BTreeMap<String, serde_json::Value>, key: &str) -> bool {
map.get(key)
.and_then(serde_json::Value::as_bool)
.unwrap_or(false)
}
fn short_sha(hash: &str) -> String {
hash.strip_prefix("sha256:")
.unwrap_or(hash)
.chars()
.take(12)
.collect()
}
async fn quote_cmd(ctx: &DispatchContext, symbol: Option<&str>) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(symbol) = symbol else {
out.lines.push(OutputLine::warn(
"/quote <coin> — name the coin to inspect (e.g. /quote BTC)",
));
return out;
};
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.market_quote(symbol).await {
Ok(q) => {
let live = if q.live { "live" } else { "fixture" };
out.lines.push(OutputLine::command(format!(
"quote {}: {:.4} source={} mode={live}",
q.symbol, q.price, q.source
)));
if let Some(as_of) = q.as_of {
out.lines
.push(OutputLine::system(format!(" as_of={as_of}")));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("quote: {e}"))),
}
out
}
fn fg_sentiment(v: i64) -> &'static str {
match v {
i64::MIN..=24 => "extreme fear",
25..=44 => "fear",
45..=55 => "neutral",
56..=74 => "greed",
_ => "extreme greed",
}
}
fn brief_line_summary(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Object(map) if !map.is_empty() => {
let parts: Vec<String> = map
.iter()
.take(4)
.map(|(k, v)| format!("{k}={}", compact_json_value(v)))
.collect();
Some(parts.join(" "))
}
serde_json::Value::Array(items) if !items.is_empty() => {
Some(format!("[{} items]", items.len()))
}
serde_json::Value::Null => None,
other => Some(compact_json_value(other)),
}
}
fn compact_json_value(v: &serde_json::Value) -> String {
match v {
serde_json::Value::Null => "—".into(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Array(a) => format!("[{}]", a.len()),
serde_json::Value::Object(m) => format!("{{{}}}", m.len()),
}
}
async fn risk_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.risk().await {
Ok(r) => {
let halted = r.is_halted();
let peak_ref = r.peak_equity_30d.or(r.peak_equity);
let equity_above_peak = match (r.account_value, peak_ref) {
(Some(eq), Some(peak)) if peak > 0.0 => eq > peak * 1.01,
_ => false,
};
let dd = if equity_above_peak {
"—".to_string()
} else {
r.drawdown_pct.map_or("—".into(), |v| format!("{v:.2}%"))
};
let daily_loss = r
.daily_loss_pct()
.map(|v| format!("{v:.2}%"))
.or_else(|| r.daily_loss_usd.map(|v| format!("${v:.2}")))
.unwrap_or_else(|| "—".into());
let daily_pnl = r.daily_pnl_usd.map_or("—".into(), |v| format!("{v:+.2}"));
let eq = r.account_value.map_or("—".into(), |v| format!("${v:.2}"));
let peak = peak_ref.map_or("—".into(), |v| format!("${v:.2}"));
let open = r.open_count.map_or("—".into(), |n| n.to_string());
let state = if halted { "HALTED" } else { "OK" };
let line = format!(
"risk: {state} equity={eq} peak={peak} dd={dd} daily-pnl={daily_pnl} \
daily-loss={daily_loss} open={open}"
);
if halted {
out.lines.push(OutputLine::alert(line));
if let Some(reason) = &r.halt_reason {
out.lines
.push(OutputLine::alert(format!(" halt reason: {reason}")));
}
if let Some(until) = &r.halt_until {
out.lines
.push(OutputLine::alert(format!(" halt until: {until}")));
}
} else {
out.lines.push(OutputLine::command(line));
}
if equity_above_peak {
out.lines.push(OutputLine::warn(
" inconsistent: equity > peak — engine peak-equity tracker is stale \
(see bus/risk.json vs bus/portfolio.json); dd suppressed",
));
}
if r.capital_floor_hit {
out.lines
.push(OutputLine::alert(" capital floor hit".to_string()));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("risk: {e}"))),
}
out
}
async fn heat_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.risk().await {
Ok(r) => {
let dd = r.drawdown_pct.unwrap_or(0.0);
let daily = r.daily_loss_pct().unwrap_or(0.0);
let score_pct = dd.max(daily).clamp(0.0, 100.0);
let pinned = r.is_halted() || r.capital_floor_hit;
let heat_pct = if pinned { 100.0 } else { score_pct };
let halted = if r.is_halted() { "on" } else { "off" };
let floor = if r.capital_floor_hit { "on" } else { "off" };
let open = r.open_count.map_or("—".into(), |n| n.to_string());
let level = if pinned {
"CRITICAL"
} else if heat_pct >= 80.0 {
"HIGH"
} else if heat_pct >= 50.0 {
"WARM"
} else {
"COOL"
};
let line = format!(
"heat: {level} {heat_pct:.0}% dd={dd:.1}% daily-loss={daily:.1}% \
open={open} halted={halted} floor={floor}"
);
if pinned || heat_pct >= 80.0 {
out.lines.push(OutputLine::alert(line));
} else {
out.lines.push(OutputLine::command(line));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("heat: {e}"))),
}
out
}
async fn regime_cmd(ctx: &DispatchContext, coin: Option<&str>) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
let label = coin.unwrap_or("market");
match http.regime(coin).await {
Ok(r) => {
if let Some(err) = r.extra.get("error").and_then(|v| v.as_str()) {
out.lines
.push(OutputLine::alert(format!("regime[{label}]: {err}")));
return out;
}
if r.regime.is_none() && r.confidence.is_none() {
out.lines.push(OutputLine::alert(format!(
"regime[{label}]: engine has no regime reading (empty response)"
)));
return out;
}
let name = r.regime.as_deref().unwrap_or("—");
let conf = r.confidence.map_or("—".into(), |v| format!("{v:.2}"));
out.lines.push(OutputLine::command(format!(
"regime[{label}]: {name} confidence={conf}"
)));
}
Err(e) => out.lines.push(OutputLine::alert(format!("regime: {e}"))),
}
out
}
async fn evaluate_cmd(
ctx: &DispatchContext,
coin: Option<&str>,
extras: &[String],
) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(raw) = coin else {
out.lines.push(OutputLine::warn(
"/evaluate <coin> — name the coin to evaluate (e.g. /evaluate BTC)",
));
return out;
};
let coin = raw.trim();
if coin.is_empty() {
out.lines.push(OutputLine::warn(
"/evaluate <coin> — name the coin to evaluate (e.g. /evaluate BTC)",
));
return out;
}
if !extras.is_empty() {
out.lines.push(OutputLine::warn(format!(
"/evaluate takes only a coin — ignoring extra args: {}",
extras.join(" ")
)));
}
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.evaluate(coin).await {
Ok(mut eval) => {
if eval.coin.is_none() {
eval.coin = Some(coin.to_string());
}
if eval.layers.is_empty() && eval.direction.is_none() {
out.lines.push(OutputLine::alert(format!(
"evaluate {coin}: engine returned an empty verdict (no layers, no direction)"
)));
out.dismiss_overlay = true;
return out;
}
out.show_overlay = Some(OverlayTarget::Verdict(Box::new(eval)));
}
Err(e) => {
out.lines
.push(OutputLine::alert(format!("evaluate {coin}: {e}")));
out.dismiss_overlay = true;
}
}
out
}
async fn positions_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.positions().await {
Ok(p) => {
if p.items.is_empty() {
out.lines
.push(OutputLine::system("flat — no open positions"));
return out;
}
for pos in &p.items {
let pnl = pos
.unrealized_pnl
.map_or_else(|| "—".into(), |v| format!("{v:+.2}"));
out.lines.push(OutputLine::command(format!(
"{} {} size={:.4} entry={:.2} pnl={}",
pos.symbol, pos.side, pos.size, pos.entry, pnl
)));
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("positions: {e}"))),
}
out
}
async fn pulse_cmd(ctx: &DispatchContext, limit: Option<u32>) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
let n = limit.unwrap_or_else(Command::default_pulse_limit);
match http.pulse(n).await {
Ok(p) => {
if p.items.is_empty() {
out.lines.push(OutputLine::system(
"(pulse idle — engine has no recent events)",
));
return out;
}
for ev in &p.items {
let ts = trim_ts(ev.ts.as_deref());
let kind = ev.kind.as_deref().unwrap_or("event");
let coin = ev.coin.as_deref().unwrap_or("—");
let msg = ev.message.as_deref().unwrap_or("(no message)");
let line = format!("{ts} {kind:<10} {coin:<6} {msg}");
match ev.severity.as_deref() {
Some("warn" | "warning") => out.lines.push(OutputLine::warn(line)),
Some("alert" | "error" | "critical") => {
out.lines.push(OutputLine::alert(line));
}
_ => out.lines.push(OutputLine::command(line)),
}
}
}
Err(e) => out.lines.push(OutputLine::alert(format!("pulse: {e}"))),
}
out
}
async fn approaching_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.approaching().await {
Ok(feed) => {
if feed.items.is_empty() {
out.lines.push(OutputLine::system("(nothing approaching)"));
return out;
}
let mut items = feed.items.clone();
items.sort_by(|a, b| match (a.distance_to_gate, b.distance_to_gate) {
(Some(x), Some(y)) => x.partial_cmp(&y).unwrap_or(std::cmp::Ordering::Equal),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
for a in &items {
let dir = a.direction.as_deref().unwrap_or("—");
let gate = a.gate.as_deref().unwrap_or("—");
let dist = a
.distance_to_gate
.map_or_else(|| "—".into(), |d| format!("{d:+.3}"));
out.lines.push(OutputLine::command(format!(
"{coin:<6} {dir:<5} gate={gate:<10} Δ={dist}",
coin = a.coin,
)));
}
}
Err(zero_engine_client::HttpError::NotFound { .. }) => {
out.lines.push(OutputLine::alert(
"approaching: this engine build does not expose /approaching (endpoint missing)",
));
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("approaching: {e}"))),
}
out
}
async fn rejections_cmd(
ctx: &DispatchContext,
coin: Option<&str>,
limit: Option<u32>,
) -> DispatchOutput {
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
let n = limit.unwrap_or_else(Command::default_rejections_limit);
match http.rejections(n, coin).await {
Ok(feed) => {
if feed.items.is_empty() {
let scope = coin.map_or_else(
|| "(no rejections)".to_string(),
|c| format!("(no rejections for {c})"),
);
out.lines.push(OutputLine::system(scope));
return out;
}
for r in &feed.items {
let ts = trim_ts(r.ts.as_deref());
let coin = r.coin.as_deref().unwrap_or("—");
let dir = r.direction.as_deref().unwrap_or("—");
let stage = r.stage.as_deref().unwrap_or("—");
let reason = r.reason.as_deref().unwrap_or("(no reason)");
out.lines.push(OutputLine::command(format!(
"{ts} {coin:<6} {dir:<5} {stage:<8} {reason}"
)));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("rejections: {e}"))),
}
out
}
fn trim_ts(raw: Option<&str>) -> String {
let Some(s) = raw else {
return "— ".into();
};
if let Some(rest) = s.split_once('T').map(|(_, r)| r) {
let hms: String = rest.chars().take(8).collect();
if hms.len() == 8 {
return hms;
}
}
if s.len() <= 8 {
return format!("{s:<8}");
}
s.to_string()
}
async fn kill_cmd(ctx: &DispatchContext) -> DispatchOutput {
let mut lines = match &ctx.http {
Some(http) => match http.post_live_kill().await {
Ok(reply) => render_live_control("/kill", "live kill", &reply),
Err(e) => vec![OutputLine::alert(format!("/kill — engine refused: {e}"))],
},
None => vec![OutputLine::alert(
"/kill — engine client unavailable; live kill not posted.",
)],
};
let Some(sup) = ctx.supervisor.as_ref() else {
return DispatchOutput {
lines,
..Default::default()
};
};
match sup.tear_down_socket() {
Ok(true) => lines.push(OutputLine::alert(
"/kill — headless supervisor stopped and operator-local socket torn down.",
)),
Ok(false) => {}
Err(e) => lines.push(OutputLine::alert(format!(
"/kill — headless tear-down failed: {e}. Manual cleanup may be required."
))),
}
DispatchOutput {
lines,
..Default::default()
}
}
async fn flatten_cmd(ctx: &DispatchContext) -> DispatchOutput {
let Some(http) = &ctx.http else {
return single_alert("/flatten-all — engine client unavailable; live flatten not posted.");
};
match http.post_live_flatten().await {
Ok(reply) => DispatchOutput {
lines: render_live_control("/flatten-all", "live flatten", &reply),
..Default::default()
},
Err(e) => single_alert(format!("/flatten-all — engine refused: {e}")),
}
}
async fn pause_cmd(ctx: &DispatchContext) -> DispatchOutput {
let Some(http) = &ctx.http else {
return single_alert("/pause-entries — engine client unavailable; live pause not posted.");
};
match http.post_live_pause().await {
Ok(reply) => DispatchOutput {
lines: render_live_control("/pause-entries", "live entries pause", &reply),
..Default::default()
},
Err(e) => single_alert(format!("/pause-entries — engine refused: {e}")),
}
}
async fn resume_entries_cmd(ctx: &DispatchContext) -> DispatchOutput {
let Some(http) = &ctx.http else {
return single_alert(
"/resume-entries — engine client unavailable; live resume not posted.",
);
};
match http.post_live_resume().await {
Ok(reply) => DispatchOutput {
lines: render_live_control("/resume-entries", "live entries resume", &reply),
..Default::default()
},
Err(e) => single_alert(format!("/resume-entries — engine refused: {e}")),
}
}
fn render_live_control(
command: &str,
action: &str,
reply: &LiveControlResponse,
) -> Vec<OutputLine> {
let reason = reply.reason.as_deref().unwrap_or("no reason supplied");
if !reply.ok {
return vec![OutputLine::alert(format!("{command} — refused: {reason}"))];
}
let mut parts = vec![format!("{command} — {action} accepted")];
if let Some(state) = reply.state.as_deref() {
parts.push(format!("state={state}"));
}
if !reply.orders.is_empty() {
parts.push(format!("orders={}", reply.orders.len()));
}
if let Some(operator) = &reply.operator_context
&& !operator.handle.is_empty()
{
parts.push(format!("operator={}", operator.handle));
}
vec![OutputLine::alert(parts.join(" "))]
}
fn execute_stub() -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::warn(
"/execute <coin> <buy|sell> <size> — example: /execute BTC buy 0.001",
)],
..Default::default()
}
}
async fn execute_cmd(
ctx: &DispatchContext,
coin: Option<&str>,
direction: Option<ExecuteSide>,
quantity: Option<&str>,
error: Option<&str>,
) -> DispatchOutput {
if let Some(error) = error {
return single_warn(format!(
"/execute <coin> <buy|sell> <size> — {error} (example: /execute BTC buy 0.001)"
));
}
let (Some(coin), Some(direction), Some(quantity)) = (
coin,
direction,
quantity.and_then(|value| value.parse::<f64>().ok()),
) else {
return single_warn("/execute <coin> <buy|sell> <size> — example: /execute BTC buy 0.001");
};
let mut out = DispatchOutput::default();
let Some(http) = require_http(ctx, &mut out) else {
return out;
};
match http.post_execute(coin, direction, quantity).await {
Ok(reply) => {
let rendered_coin = reply.coin.as_deref().unwrap_or(coin);
let rendered_direction = reply.side.unwrap_or(direction).as_wire();
let rendered_quantity = reply.size.unwrap_or(quantity);
let reason = reply.reason.as_deref().unwrap_or("no reason supplied");
let mode_suffix = if reply.simulated {
" (paper)"
} else {
" (live)"
};
let fill = reply.fill_id.as_deref().unwrap_or("none");
let receipt = reply
.extra
.get("receipt_hash")
.and_then(serde_json::Value::as_str)
.map(short_hash);
if reply.accepted {
let mut parts = vec![format!(
"/execute — accepted{mode_suffix} {rendered_coin} {rendered_direction} {rendered_quantity} fill={fill}"
)];
if let Some(receipt) = receipt {
parts.push(format!("receipt={receipt}"));
}
out.lines.push(OutputLine::alert(parts.join(" ")));
} else {
let mut parts = vec![format!(
"/execute — refused{mode_suffix} {rendered_coin} {rendered_direction} {rendered_quantity}: {reason}"
)];
if let Some(receipt) = receipt {
parts.push(format!("receipt={receipt}"));
}
out.lines.push(OutputLine::alert(parts.join(" ")));
}
}
Err(e) => out
.lines
.push(OutputLine::alert(format!("/execute — engine refused: {e}"))),
}
out
}
fn short_hash(hash: &str) -> String {
if let Some(rest) = hash.strip_prefix("sha256:")
&& rest.len() >= 12
{
return format!("sha256:{}...", &rest[..12]);
}
hash.to_string()
}
fn auto_cmd(ctx: &DispatchContext, action: &AutoAction) -> DispatchOutput {
let request = match action {
AutoAction::On => AutoRequest::On,
AutoAction::Off => AutoRequest::Off,
AutoAction::Status => AutoRequest::Status,
AutoAction::Missing => {
return single_system(
"/auto — usage: /auto on | off | status. `on` is risk-increasing and friction-gated.",
);
}
AutoAction::Unknown(tok) => {
return single_warn(format!(
"/auto — unknown action '{tok}'. usage: /auto on | off | status."
));
}
};
let Some(source) = ctx.auto.as_ref() else {
return single_alert(
"/auto — unavailable (no engine auto-mode adapter on this invocation).",
);
};
match source.act(request) {
Ok(reply) => {
let mode = reply.mode.as_str();
let line = match (action, reply.changed) {
(AutoAction::Status, _) => format!("/auto status — mode={mode}"),
(AutoAction::On | AutoAction::Off, true) => {
format!("/auto — mode={mode} (changed)")
}
(AutoAction::On | AutoAction::Off, false) => {
return single_warn(format!("/auto — mode already {mode}; no change."));
}
(AutoAction::Missing | AutoAction::Unknown(_), _) => {
unreachable!("/auto missing/unknown resolve before reaching the source adapter",)
}
};
DispatchOutput {
lines: vec![OutputLine::command(line)],
..Default::default()
}
}
Err(e) => single_alert(format!("/auto — {e}")),
}
}
fn headless_cmd(ctx: &DispatchContext, action: &HeadlessAction) -> DispatchOutput {
let request = match action {
HeadlessAction::Start => SupervisorAction::Start,
HeadlessAction::Stop => SupervisorAction::Stop,
HeadlessAction::Status => SupervisorAction::Status,
HeadlessAction::Missing => {
return single_system(
"/headless — usage: /headless start | stop | status. The daemon is the operator-local supervisor (ADR-006).",
);
}
HeadlessAction::Unknown(tok) => {
return single_warn(format!(
"/headless — unknown action '{tok}'. usage: /headless start | stop | status."
));
}
};
let Some(source) = ctx.supervisor.as_ref() else {
return single_alert(
"/headless — supervisor unavailable (no headless adapter on this invocation).",
);
};
match source.act(request) {
Ok(reply) => {
let line = format_headless_reply(action, &reply);
DispatchOutput {
lines: vec![OutputLine::command(line)],
..Default::default()
}
}
Err(SupervisorError::Refused(msg)) => single_warn(format!("/headless — refused: {msg}")),
Err(e) => single_alert(format!("/headless — {e}")),
}
}
fn format_headless_reply(action: &HeadlessAction, reply: &SupervisorReply) -> String {
use crate::supervisor::SupervisorState;
let state = match &reply.state {
SupervisorState::Running => "running",
SupervisorState::Stopped => "stopped",
SupervisorState::Failed(reason) => {
return format!("/headless {} — failed: {reason}", headless_verb(action),);
}
};
let changed = if reply.changed { " (changed)" } else { "" };
let socket = reply
.socket
.as_deref()
.map(|s| format!(" socket={s}"))
.unwrap_or_default();
let pid = reply.pid.map(|p| format!(" pid={p}")).unwrap_or_default();
let uptime = reply
.uptime
.map(|d| format!(" uptime={}s", d.as_secs()))
.unwrap_or_default();
format!(
"/headless {} — state={state}{changed}{socket}{pid}{uptime}",
headless_verb(action),
)
}
const fn headless_verb(action: &HeadlessAction) -> &'static str {
match action {
HeadlessAction::Start => "start",
HeadlessAction::Stop => "stop",
HeadlessAction::Status => "status",
HeadlessAction::Missing | HeadlessAction::Unknown(_) => "(usage)",
}
}
fn sessions_cmd(ctx: &DispatchContext, limit: Option<u32>) -> DispatchOutput {
let Some(sessions) = ctx.sessions.as_ref() else {
return single_alert("/sessions — persistence disabled (no session store).");
};
let effective = limit
.unwrap_or_else(Command::default_sessions_limit)
.clamp(1, Command::max_sessions_limit());
let rows = match sessions.list(effective) {
Ok(rows) => rows,
Err(e) => return single_alert(format!("/sessions — {e}")),
};
if rows.is_empty() {
return DispatchOutput {
lines: vec![OutputLine::system(
"/sessions — no prior sessions on record.",
)],
..Default::default()
};
}
let current = sessions.current_ulid();
let mut lines = Vec::with_capacity(rows.len() + 1);
lines.push(OutputLine::command(format!(
"/sessions — {n} recent session(s)",
n = rows.len()
)));
for row in rows {
let marker = if Some(&row.ulid) == current.as_ref() {
"*"
} else {
" "
};
let started = format_ms_short(row.started_at_ms);
let state = if row.ended_at_ms.is_some() {
"ended"
} else {
"live/interrupted"
};
let parent = row
.parent_ulid
.as_deref()
.map(|p| format!(" parent:{p}"))
.unwrap_or_default();
let events = if row.n_events >= 0 {
format!(" {n} evt", n = row.n_events)
} else {
String::new()
};
lines.push(OutputLine::system(format!(
"{marker} {ulid} · {started} · {state}{events}{parent}",
ulid = row.ulid,
)));
}
DispatchOutput {
lines,
..Default::default()
}
}
fn resume_cmd(ctx: &DispatchContext, needle: Option<&str>) -> DispatchOutput {
fetch_and_paint_session(ctx, needle, SessionVerb::Resume)
}
fn replay_cmd(ctx: &DispatchContext, needle: Option<&str>) -> DispatchOutput {
fetch_and_paint_session(ctx, needle, SessionVerb::Replay)
}
#[derive(Debug, Clone, Copy)]
enum SessionVerb {
Resume,
Replay,
}
impl SessionVerb {
const fn name(self) -> &'static str {
match self {
Self::Resume => "/resume",
Self::Replay => "/replay",
}
}
const fn banner_prefix(self) -> &'static str {
match self {
Self::Resume => "resuming",
Self::Replay => "replaying",
}
}
}
fn fetch_and_paint_session(
ctx: &DispatchContext,
needle: Option<&str>,
verb: SessionVerb,
) -> DispatchOutput {
let name = verb.name();
let Some(sessions) = ctx.sessions.as_ref() else {
return single_alert(format!("{name} — persistence disabled (no session store)."));
};
let Some(needle) = needle else {
return DispatchOutput {
lines: vec![OutputLine::system(format!(
"{name} <ulid|label> — try /sessions for a list of ids."
))],
..Default::default()
};
};
let summary = match sessions.find(needle) {
Ok(s) => s,
Err(crate::session::SessionError::NotFound) => {
return single_alert(format!(
"{name} — no session matches '{needle}'. Try /sessions."
));
}
Err(e) => return single_alert(format!("{name} — {e}")),
};
let events = match sessions.list_events(&summary.ulid, 200) {
Ok(e) => e,
Err(e) => return single_alert(format!("{name} — {e}")),
};
let banner = format!(
"{prefix} {ulid} · {started} · {n} event(s)",
prefix = verb.banner_prefix(),
ulid = summary.ulid,
started = format_ms_short(summary.started_at_ms),
n = events.len(),
);
let replay_lines: Vec<ReplayLine> = events
.into_iter()
.map(|e| ReplayLine {
kind: e.kind,
at_ms: e.at_ms,
text: e.text,
})
.collect();
DispatchOutput {
lines: vec![OutputLine::command(banner)],
replay_lines,
..Default::default()
}
}
fn fork_cmd(ctx: &DispatchContext) -> DispatchOutput {
let Some(sessions) = ctx.sessions.as_ref() else {
return single_alert("/fork — persistence disabled (no session store).");
};
match sessions.fork_from_current() {
Ok(Some(child)) => DispatchOutput {
lines: vec![OutputLine::command(format!(
"/fork — new session {child}; parent carries over."
))],
..Default::default()
},
Ok(None) => single_alert("/fork — no current session to fork from."),
Err(e) => single_alert(format!("/fork — {e}")),
}
}
fn save_cmd(ctx: &DispatchContext, label: Option<&str>) -> DispatchOutput {
let Some(sessions) = ctx.sessions.as_ref() else {
return single_alert("/save — persistence disabled (no session store).");
};
let Some(label) = label else {
return DispatchOutput {
lines: vec![OutputLine::system(
"/save <label> — pick a short name you'll recognise later.",
)],
..Default::default()
};
};
let Some(ulid) = sessions.current_ulid() else {
return single_alert("/save — no active session to label.");
};
match sessions.save_label(&ulid, label) {
Ok(()) => DispatchOutput {
lines: vec![OutputLine::command(format!("/save — '{label}' → {ulid}"))],
..Default::default()
},
Err(e) => single_alert(format!("/save — {e}")),
}
}
fn share_cmd(ctx: &DispatchContext, needle: Option<&str>) -> DispatchOutput {
let Some(sessions) = ctx.sessions.as_ref() else {
return single_alert("/share — persistence disabled (no session store).");
};
let target = needle
.map(ToOwned::to_owned)
.or_else(|| sessions.current_ulid());
let Some(needle) = target else {
return single_alert("/share — no active session and no ulid/label given. Try /sessions.");
};
let summary = match sessions.find(&needle) {
Ok(s) => s,
Err(crate::session::SessionError::NotFound) => {
return single_alert(format!(
"/share — no session matches '{needle}'. Try /sessions."
));
}
Err(e) => return single_alert(format!("/share — {e}")),
};
let events = match sessions.list_events(&summary.ulid, 1000) {
Ok(e) => e,
Err(e) => return single_alert(format!("/share — {e}")),
};
let n = events.len();
let json = render_share_json(&summary, &events);
DispatchOutput {
lines: vec![
OutputLine::command(format!(
"/share — {ulid} · {n} event(s) · copy the block below",
ulid = summary.ulid,
)),
OutputLine::system(json),
],
..Default::default()
}
}
fn config_cmd(ctx: &DispatchContext, action: &ConfigAction) -> DispatchOutput {
match action {
ConfigAction::Missing => single_warn(
"/config <show|doctor> — show resolved values or diagnose config + secrets.",
),
ConfigAction::Unknown(other) => single_warn(format!(
"/config: unknown action '{other}'. Try /config show or /config doctor."
)),
ConfigAction::Show => {
let Some(source) = ctx.config.as_ref() else {
return single_alert("/config — config introspection unavailable.");
};
let rows = source.show();
if rows.is_empty() {
return DispatchOutput {
lines: vec![OutputLine::system(
"/config show — no config loaded. Run `zero init`.",
)],
..Default::default()
};
}
let mut out = DispatchOutput::default();
out.lines.push(OutputLine::command(format!(
"/config show — {n} field(s)",
n = rows.len()
)));
let label_width = rows.iter().map(|r| r.label.len()).max().unwrap_or(0);
for row in rows {
out.lines.push(OutputLine::system(format!(
" {label:<width$} {value}",
label = row.label,
width = label_width,
value = row.value,
)));
}
out
}
ConfigAction::Doctor => {
let Some(source) = ctx.config.as_ref() else {
return single_alert("/config — config introspection unavailable.");
};
let findings = source.doctor();
if findings.is_empty() {
return DispatchOutput {
lines: vec![OutputLine::system(
"/config doctor — no findings (adapter returned empty list).",
)],
..Default::default()
};
}
let mut out = DispatchOutput::default();
let n_err = findings
.iter()
.filter(|f| matches!(f.severity, DoctorSeverity::Error))
.count();
let n_warn = findings
.iter()
.filter(|f| matches!(f.severity, DoctorSeverity::Warn))
.count();
let header = format!(
"/config doctor — {total} check(s) errors={n_err} warnings={n_warn}",
total = findings.len(),
);
if n_err > 0 {
out.lines.push(OutputLine::alert(header));
} else {
out.lines.push(OutputLine::command(header));
}
for f in findings {
let prefix = match f.severity {
DoctorSeverity::Ok => " ok ",
DoctorSeverity::Warn => " warn ",
DoctorSeverity::Error => " ERROR ",
};
for emitted in wrap_doctor_row(prefix, &f.message, f.severity) {
out.lines.push(emitted);
}
}
out
}
}
}
const DOCTOR_ROW_WRAP_BODY_COLS: usize = 70;
const DOCTOR_ROW_PREFIX_COLS: usize = 8;
fn wrap_doctor_row(prefix: &str, message: &str, severity: DoctorSeverity) -> Vec<OutputLine> {
debug_assert_eq!(
prefix.len(),
DOCTOR_ROW_PREFIX_COLS,
"doctor row prefix must be exactly {DOCTOR_ROW_PREFIX_COLS} cols for continuation alignment"
);
let make_line = |text: String| match severity {
DoctorSeverity::Ok => OutputLine::system(text),
DoctorSeverity::Warn => OutputLine::warn(text),
DoctorSeverity::Error => OutputLine::alert(text),
};
let continuation_indent = " ".repeat(DOCTOR_ROW_PREFIX_COLS);
if message.chars().count() <= DOCTOR_ROW_WRAP_BODY_COLS {
return vec![make_line(format!("{prefix}{message}"))];
}
let mut lines = Vec::new();
let mut current = String::with_capacity(DOCTOR_ROW_WRAP_BODY_COLS);
let mut is_first = true;
for word in message.split_whitespace() {
let word_len = word.chars().count();
let current_len = current.chars().count();
let needs_space = !current.is_empty();
let prospective = current_len + usize::from(needs_space) + word_len;
if prospective > DOCTOR_ROW_WRAP_BODY_COLS && !current.is_empty() {
let pfx = if is_first {
prefix.to_owned()
} else {
continuation_indent.clone()
};
lines.push(make_line(format!("{pfx}{current}")));
is_first = false;
current.clear();
}
if !current.is_empty() {
current.push(' ');
}
current.push_str(word);
}
if !current.is_empty() {
let pfx = if is_first {
prefix.to_owned()
} else {
continuation_indent
};
lines.push(make_line(format!("{pfx}{current}")));
}
lines
}
fn verbose_cmd(ctx: &DispatchContext, action: &VerboseAction) -> DispatchOutput {
let target = match action {
VerboseAction::On => true,
VerboseAction::Off => false,
VerboseAction::Toggle => !ctx.verbose,
VerboseAction::Unknown(other) => {
return single_warn(format!(
"/verbose — unknown '{other}'. Use on|off|toggle (or no argument to toggle)."
));
}
};
let word = if target { "on" } else { "off" };
DispatchOutput {
lines: vec![OutputLine::system(format!("verbose {word}"))],
verbose_toggle: Some(target),
..Default::default()
}
}
fn state_override_cmd(label: Option<StateOverrideLabel>) -> DispatchOutput {
let Some(label) = label else {
return single_warn(
"/state-override <label> — one of FRESH | STEADY | ELEVATED | TILT | FATIGUED | RECOVERY",
);
};
DispatchOutput {
lines: vec![OutputLine::command(format!(
"/state-override — label declared: {name} (engine POST /operator/events pending)",
name = label.as_str(),
))],
..Default::default()
}
}
fn continue_cmd() -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::system(
"/continue — acknowledged (coaching buffer pending; no notices queued right now)",
)],
..Default::default()
}
}
fn close_cmd(coin: Option<&str>) -> DispatchOutput {
let Some(raw) = coin else {
return DispatchOutput {
lines: vec![OutputLine::warn(
"/close <coin> — name the coin (try /pos to see open symbols; /flatten-all closes all)",
)],
..Default::default()
};
};
let coin = raw.trim();
if coin.is_empty() {
return DispatchOutput {
lines: vec![OutputLine::warn(
"/close <coin> — name the coin (try /pos to see open symbols; /flatten-all closes all)",
)],
..Default::default()
};
}
DispatchOutput {
lines: vec![OutputLine::system(format!(
"/close {coin} — noted (positions model + engine POST pending; no order was placed)"
))],
..Default::default()
}
}
fn wrap_off_cmd() -> DispatchOutput {
let body =
"/wrap-off — daily wrap skipped for this session (next session runs the wrap again)";
DispatchOutput {
lines: vec![OutputLine::system(body)],
wrap_off_toggle: Some(true),
..Default::default()
}
}
fn coaching_reset_cmd() -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::system(
"/coaching reset — buffer cleared (coaching stream pending; nothing was queued)",
)],
coaching_reset: true,
..Default::default()
}
}
fn disclosure_override_cmd(confirmed: bool) -> DispatchOutput {
if !confirmed {
let phrase = DISCLOSURE_OVERRIDE_CONFIRM;
return DispatchOutput {
lines: vec![OutputLine::alert(format!(
"/disclosure-override — phrase required: `/disclosure-override {phrase}`",
))],
..Default::default()
};
}
DispatchOutput {
lines: vec![OutputLine::command(
"/disclosure-override — progressive disclosure bypassed for this session (disclosure store pending; no milestone was written)",
)],
..Default::default()
}
}
async fn rate_cmd(
ctx: &DispatchContext,
trade_id: Option<&str>,
rating: Option<u8>,
) -> DispatchOutput {
use chrono::Utc;
use zero_operator_state::{Event, EventKind};
let trade_id = trade_id.map(str::trim).filter(|s| !s.is_empty());
let Some(trade_id) = trade_id else {
return DispatchOutput {
lines: vec![OutputLine::warn(
"/rate <trade_id> <1..=10> — name the trade and a conviction rating (1 low, 10 high)",
)],
..Default::default()
};
};
let Some(rating) = rating else {
return DispatchOutput {
lines: vec![OutputLine::warn(format!(
"/rate {trade_id} <1..=10> — rating must be an integer in 1..=10 (1 low, 10 high)"
))],
..Default::default()
};
};
if !(1..=10).contains(&rating) {
return DispatchOutput {
lines: vec![OutputLine::warn(format!(
"/rate {trade_id} {rating} — rating must be an integer in 1..=10 (1 low, 10 high)"
))],
..Default::default()
};
}
let event = Event::new(
Utc::now(),
EventKind::Conviction {
trade_id: trade_id.to_string(),
rating,
},
);
let tail = post_operator_event_tail(ctx, &event).await;
DispatchOutput {
lines: vec![OutputLine::command(format!(
"/rate {trade_id} {rating} — recorded{tail}"
))],
..Default::default()
}
}
async fn post_operator_event_tail(
ctx: &DispatchContext,
event: &zero_operator_state::Event,
) -> String {
let Some(http) = &ctx.http else {
return " (engine client unavailable; not posted)".to_string();
};
match http.post_operator_event(event).await {
Ok(_) => ", posted to engine".to_string(),
Err(e) => {
tracing::debug!(error = %e, "operator-event POST failed");
", engine unreachable (kept locally)".to_string()
}
}
}
fn single_warn(msg: impl Into<String>) -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::warn(msg.into())],
..Default::default()
}
}
fn render_share_json(
summary: &crate::session::SessionSummary,
events: &[crate::session::ReplayEvent],
) -> String {
use serde_json::{Value, json};
let events: Vec<Value> = events
.iter()
.map(|e| {
json!({
"kind": replay_kind_str(e.kind),
"at_ms": e.at_ms,
"text": e.text,
})
})
.collect();
let body = json!({
"ulid": summary.ulid,
"started_at_ms": summary.started_at_ms,
"ended_at_ms": summary.ended_at_ms,
"engine_base_url": summary.engine_base_url,
"cli_version": summary.cli_version,
"parent_ulid": summary.parent_ulid,
"n_events": summary.n_events,
"events": events,
});
serde_json::to_string_pretty(&body).unwrap_or_else(|_| "{}".into())
}
const fn replay_kind_str(k: ReplayKind) -> &'static str {
match k {
ReplayKind::Prompt => "prompt",
ReplayKind::System => "system",
ReplayKind::Command => "command",
ReplayKind::Warn => "warn",
ReplayKind::Alert => "alert",
}
}
fn single_alert(msg: impl Into<String>) -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::alert(msg.into())],
..Default::default()
}
}
fn single_system(msg: impl Into<String>) -> DispatchOutput {
DispatchOutput {
lines: vec![OutputLine::system(msg.into())],
..Default::default()
}
}
fn format_ms_short(ms: i64) -> String {
use chrono::{DateTime, TimeZone, Utc};
let secs = ms.div_euclid(1000);
let nanos = u32::try_from(ms.rem_euclid(1000) * 1_000_000).unwrap_or(0);
let dt: DateTime<Utc> = Utc
.timestamp_opt(secs, nanos)
.single()
.unwrap_or_else(|| Utc.timestamp_opt(0, 0).single().unwrap_or_default());
dt.format("%Y-%m-%d %H:%M UTC").to_string()
}
async fn break_stub(ctx: &DispatchContext, minutes: Option<u32>) -> DispatchOutput {
use chrono::Utc;
use zero_operator_state::{Event, EventKind};
let planned_ms = minutes.map(|m| u64::from(m) * 60_000);
let event = Event::new(Utc::now(), EventKind::BreakStarted { planned_ms });
let tail = post_operator_event_tail(ctx, &event).await;
let note = minutes.map_or_else(
|| format!("/break — noted{tail}"),
|m| format!("/break {m}m — noted{tail}"),
);
DispatchOutput {
lines: vec![OutputLine::system(note)],
..Default::default()
}
}
#[derive(Debug, thiserror::Error)]
pub enum Never {}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::{DispatchContext, StaticLabel, dispatch};
use crate::command::Command;
use crate::friction::FrictionDecision;
use crate::risk::RiskDirection;
use zero_engine_client::EngineState;
use zero_operator_state::friction::FrictionLevel;
use zero_operator_state::label::Label;
fn ctx_with_label(l: Label) -> DispatchContext {
DispatchContext::new(None, EngineState::shared()).with_state(Arc::new(StaticLabel(l)))
}
#[tokio::test]
async fn empty_input_returns_none() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "").await.unwrap();
assert!(out.is_none());
}
#[tokio::test]
async fn help_renders_many_lines() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/help").await.unwrap().unwrap();
assert!(out.lines.len() >= 6);
assert!(!out.quit);
assert!(out.mode_change.is_none());
}
#[test]
fn short_doctor_row_emits_single_line_unchanged() {
use super::{OutputLine, wrap_doctor_row};
use crate::config::DoctorSeverity;
let out = wrap_doctor_row(" ok ", "keychain reachable", DoctorSeverity::Ok);
assert_eq!(out.len(), 1);
match &out[0] {
OutputLine::System(s) => assert_eq!(s, " ok keychain reachable"),
other => panic!("expected System, got {other:?}"),
}
}
#[test]
fn long_doctor_row_wraps_preserving_all_text() {
use super::{DOCTOR_ROW_PREFIX_COLS, OutputLine, wrap_doctor_row};
use crate::config::DoctorSeverity;
let msg =
"engine token unset — pass --token, set ZERO_API_TOKEN, or run `zero init --force`";
let out = wrap_doctor_row(" ERROR ", msg, DoctorSeverity::Error);
assert!(
out.len() >= 2,
"expected wrap to produce ≥2 lines, got {} ({out:?})",
out.len(),
);
for line in &out {
assert!(
matches!(line, OutputLine::Alert(_)),
"expected Alert for every line of a wrapped ERROR, got {line:?}",
);
}
let joined: String = out
.iter()
.enumerate()
.map(|(i, line)| {
let OutputLine::Alert(s) = line else {
unreachable!()
};
let body = if i == 0 {
s.strip_prefix(" ERROR ").expect("first line keeps prefix")
} else {
s.strip_prefix(&" ".repeat(DOCTOR_ROW_PREFIX_COLS))
.expect("continuation uses indent")
};
body.to_owned()
})
.collect::<Vec<_>>()
.join(" ");
let normalize = |s: &str| s.split_whitespace().collect::<Vec<_>>().join(" ");
assert_eq!(
normalize(&joined),
normalize(msg),
"wrapped rows must preserve every word of the original message",
);
}
#[test]
fn doctor_row_continuation_aligns_under_message_column() {
use super::{DOCTOR_ROW_PREFIX_COLS, OutputLine, wrap_doctor_row};
use crate::config::DoctorSeverity;
let msg = "config file missing at /Users/forge/Library/Application Support/zero/config.toml — run `zero init`";
let out = wrap_doctor_row(" warn ", msg, DoctorSeverity::Warn);
assert!(out.len() >= 2);
for (i, line) in out.iter().enumerate() {
let OutputLine::Warn(s) = line else {
panic!("expected Warn, got {line:?}");
};
if i == 0 {
assert!(
s.starts_with(" warn "),
"first line must start with severity prefix, got {s:?}",
);
} else {
let expected_indent = " ".repeat(DOCTOR_ROW_PREFIX_COLS);
assert!(
s.starts_with(&expected_indent),
"continuation line {i} must align under message column (10 spaces), got {s:?}",
);
let at_col_10: Option<char> = s.chars().nth(DOCTOR_ROW_PREFIX_COLS);
assert!(
at_col_10.is_some_and(|c| !c.is_whitespace()),
"continuation line {i} body must start at col 10, got {s:?}",
);
}
}
}
#[test]
fn doctor_row_single_long_token_is_never_broken() {
use super::{OutputLine, wrap_doctor_row};
use crate::config::DoctorSeverity;
let url = "https://docs.getzero.dev/runbook/reconnecting-forever-after-rotating-your-token-thoroughly";
let out = wrap_doctor_row(" ERROR ", url, DoctorSeverity::Error);
let joined: String = out
.iter()
.map(|line| {
let OutputLine::Alert(s) = line else {
unreachable!()
};
s.trim_start().to_owned()
})
.collect::<String>();
let joined = joined.trim_start_matches("ERROR ").to_owned();
assert!(
joined.contains(url),
"URL token must survive un-broken across wrap boundaries; joined={joined:?}",
);
}
#[tokio::test]
async fn quit_sets_quit_flag() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/quit").await.unwrap().unwrap();
assert!(out.quit);
}
#[tokio::test]
async fn state_sets_overlay_signal() {
use crate::command::OverlayTarget;
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/state").await.unwrap().unwrap();
assert_eq!(out.show_overlay, Some(OverlayTarget::State));
assert!(!out.quit);
assert!(out.lines.is_empty(), "overlay command emits no lines");
assert_eq!(out.risk, Some(RiskDirection::Neutral));
}
#[tokio::test]
async fn state_under_tilt_still_opens_overlay() {
use crate::command::OverlayTarget;
let ctx = ctx_with_label(Label::Tilt);
let out = dispatch(&ctx, "/state").await.unwrap().unwrap();
assert_eq!(out.show_overlay, Some(OverlayTarget::State));
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
}
#[tokio::test]
async fn clear_sets_clear_flag() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/clear").await.unwrap().unwrap();
assert!(out.clear_log);
}
#[tokio::test]
async fn unknown_emits_warn() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/nope").await.unwrap().unwrap();
assert_eq!(out.lines.len(), 1);
matches!(out.lines[0], super::OutputLine::Warn(_));
}
#[tokio::test]
async fn status_without_http_emits_alert() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = dispatch(&ctx, "/status").await.unwrap().unwrap();
assert!(
matches!(&out.lines[0], super::OutputLine::Alert(s) if s.contains("engine client"))
);
}
#[tokio::test]
async fn execute_under_steady_proceeds() {
let ctx = ctx_with_label(Label::Steady);
let out = dispatch(&ctx, "/execute").await.unwrap().unwrap();
assert_eq!(out.risk, Some(RiskDirection::Increases));
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
assert!(matches!(
out.lines.first(),
Some(super::OutputLine::Warn(_))
));
}
#[tokio::test]
async fn execute_under_elevated_pauses_without_running() {
let ctx = ctx_with_label(Label::Elevated);
let out = dispatch(&ctx, "/execute").await.unwrap().unwrap();
assert_eq!(out.risk, Some(RiskDirection::Increases));
assert!(matches!(
out.friction,
Some(FrictionDecision::Pause {
level: FrictionLevel::L1,
..
})
));
let joined = join_lines(&out);
assert!(joined.contains("friction"), "{joined:?}");
assert!(!joined.contains("accepted"), "{joined:?}");
assert_eq!(out.pending_command, Some(Command::Execute));
}
#[tokio::test]
async fn execute_under_tilt_requires_typed_confirm() {
let ctx = ctx_with_label(Label::Tilt);
let out = dispatch(&ctx, "/execute").await.unwrap().unwrap();
assert!(matches!(
out.friction,
Some(FrictionDecision::TypedConfirm {
level: FrictionLevel::L2,
..
})
));
assert_eq!(
out.friction
.as_ref()
.and_then(FrictionDecision::confirm_word)
.as_deref(),
Some("execute")
);
let joined = join_lines(&out);
assert!(joined.contains("type 'execute'"), "{joined:?}");
assert!(!joined.contains("accepted"), "{joined:?}");
assert_eq!(out.pending_command, Some(Command::Execute));
}
#[tokio::test]
async fn proceed_path_leaves_pending_command_empty() {
let ctx = ctx_with_label(Label::Steady);
let out = dispatch(&ctx, "/execute").await.unwrap().unwrap();
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
assert!(
out.pending_command.is_none(),
"Proceed path must not carry pending_command"
);
}
#[tokio::test]
async fn bypass_friction_runs_command_ignoring_label() {
let ctx = ctx_with_label(Label::Tilt);
let out = super::run_bypass_friction(&ctx, Command::Execute).await;
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
assert_eq!(out.risk, Some(RiskDirection::Increases));
let joined = join_lines(&out);
assert!(
joined.contains("/execute <coin>"),
"expected usage: {joined}"
);
}
#[tokio::test]
async fn bypass_friction_on_neutral_command_is_harmless() {
let ctx = DispatchContext::new(None, EngineState::shared());
let out = super::run_bypass_friction(&ctx, Command::Help).await;
assert_eq!(out.risk, Some(RiskDirection::Neutral));
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
}
#[tokio::test]
async fn kill_under_tilt_still_proceeds() {
let ctx = ctx_with_label(Label::Tilt);
let out = dispatch(&ctx, "/kill").await.unwrap().unwrap();
assert_eq!(out.risk, Some(RiskDirection::Reduces));
assert_eq!(
out.friction,
Some(FrictionDecision::Proceed),
"Reduces commands MUST never be gated"
);
}
#[tokio::test]
async fn flatten_under_tilt_still_proceeds() {
let ctx = ctx_with_label(Label::Tilt);
let out = dispatch(&ctx, "/flatten-all").await.unwrap().unwrap();
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
}
#[tokio::test]
async fn status_under_tilt_still_proceeds() {
let ctx = ctx_with_label(Label::Tilt);
let out = dispatch(&ctx, "/status").await.unwrap().unwrap();
assert_eq!(out.risk, Some(RiskDirection::Neutral));
assert_eq!(out.friction, Some(FrictionDecision::Proceed));
}
fn join_lines(out: &super::DispatchOutput) -> String {
out.lines
.iter()
.map(|l| match l {
super::OutputLine::System(s)
| super::OutputLine::Command(s)
| super::OutputLine::Warn(s)
| super::OutputLine::Alert(s) => s.as_str(),
})
.collect::<Vec<_>>()
.join("\n")
}
}