use anyhow::{Context, Result, anyhow, bail};
use clap::{Parser, Subcommand};
use serde_json::{Value, json};
use crate::{
agent_card::{build_agent_card, sign_agent_card},
config,
signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
trust::{add_self_to_trust, empty_trust},
};
#[derive(Parser, Debug)]
#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Command,
}
#[derive(Subcommand, Debug)]
pub enum Command {
Init {
handle: String,
#[arg(long)]
name: Option<String>,
#[arg(long)]
relay: Option<String>,
#[arg(long)]
json: bool,
},
Whoami {
#[arg(long)]
json: bool,
},
Peers {
#[arg(long)]
json: bool,
},
Send {
peer: String,
kind_or_body: String,
body: Option<String>,
#[arg(long)]
deadline: Option<String>,
#[arg(long)]
json: bool,
},
Tail {
peer: Option<String>,
#[arg(long)]
json: bool,
#[arg(long, default_value_t = 0)]
limit: usize,
},
Monitor {
#[arg(long)]
peer: Option<String>,
#[arg(long)]
json: bool,
#[arg(long)]
include_handshake: bool,
#[arg(long, default_value_t = 500)]
interval_ms: u64,
#[arg(long, default_value_t = 0)]
replay: usize,
},
Verify {
path: String,
#[arg(long)]
json: bool,
},
Mcp,
RelayServer {
#[arg(long, default_value = "127.0.0.1:8770")]
bind: String,
},
BindRelay {
url: String,
#[arg(long)]
json: bool,
},
AddPeerSlot {
handle: String,
url: String,
slot_id: String,
slot_token: String,
#[arg(long)]
json: bool,
},
Push {
peer: Option<String>,
#[arg(long)]
json: bool,
},
Pull {
#[arg(long)]
json: bool,
},
Status {
#[arg(long)]
peer: Option<String>,
#[arg(long)]
json: bool,
},
Responder {
#[command(subcommand)]
command: ResponderCommand,
},
Pin {
card_file: String,
#[arg(long)]
json: bool,
},
RotateSlot {
#[arg(long)]
no_announce: bool,
#[arg(long)]
json: bool,
},
ForgetPeer {
handle: String,
#[arg(long)]
purge: bool,
#[arg(long)]
json: bool,
},
Daemon {
#[arg(long, default_value_t = 5)]
interval: u64,
#[arg(long)]
once: bool,
#[arg(long)]
json: bool,
},
PairHost {
#[arg(long)]
relay: String,
#[arg(long)]
yes: bool,
#[arg(long, default_value_t = 300)]
timeout: u64,
#[arg(long)]
detach: bool,
#[arg(long)]
json: bool,
},
#[command(alias = "join")]
PairJoin {
code_phrase: String,
#[arg(long)]
relay: String,
#[arg(long)]
yes: bool,
#[arg(long, default_value_t = 300)]
timeout: u64,
#[arg(long)]
detach: bool,
#[arg(long)]
json: bool,
},
PairConfirm {
code_phrase: String,
digits: String,
#[arg(long)]
json: bool,
},
PairList {
#[arg(long)]
json: bool,
#[arg(long)]
watch: bool,
#[arg(long, default_value_t = 1)]
watch_interval: u64,
},
PairCancel {
code_phrase: String,
#[arg(long)]
json: bool,
},
PairWatch {
code_phrase: String,
#[arg(long, default_value = "sas_ready")]
status: String,
#[arg(long, default_value_t = 300)]
timeout: u64,
#[arg(long)]
json: bool,
},
Pair {
handle: String,
#[arg(long)]
code: Option<String>,
#[arg(long, default_value = "https://wireup.net")]
relay: String,
#[arg(long)]
yes: bool,
#[arg(long, default_value_t = 300)]
timeout: u64,
#[arg(long)]
no_setup: bool,
#[arg(long)]
detach: bool,
},
PairAbandon {
code_phrase: String,
#[arg(long, default_value = "https://wireup.net")]
relay: String,
},
PairAccept {
peer: String,
#[arg(long)]
json: bool,
},
PairReject {
peer: String,
#[arg(long)]
json: bool,
},
PairListInbound {
#[arg(long)]
json: bool,
},
Setup {
#[arg(long)]
apply: bool,
},
Whois {
handle: Option<String>,
#[arg(long)]
json: bool,
#[arg(long)]
relay: Option<String>,
},
Add {
handle: String,
#[arg(long)]
relay: Option<String>,
#[arg(long)]
json: bool,
},
Up {
handle: String,
#[arg(long)]
name: Option<String>,
#[arg(long)]
json: bool,
},
Doctor {
#[arg(long)]
json: bool,
#[arg(long, default_value_t = 5)]
recent_rejections: usize,
},
Upgrade {
#[arg(long)]
check: bool,
#[arg(long)]
json: bool,
},
Service {
#[command(subcommand)]
action: ServiceAction,
},
Diag {
#[command(subcommand)]
action: DiagAction,
},
Claim {
nick: String,
#[arg(long)]
relay: Option<String>,
#[arg(long)]
public_url: Option<String>,
#[arg(long)]
json: bool,
},
Profile {
#[command(subcommand)]
action: ProfileAction,
},
Invite {
#[arg(long, default_value = "https://wireup.net")]
relay: String,
#[arg(long, default_value_t = 86_400)]
ttl: u64,
#[arg(long, default_value_t = 1)]
uses: u32,
#[arg(long)]
share: bool,
#[arg(long)]
json: bool,
},
Accept {
url: String,
#[arg(long)]
json: bool,
},
Reactor {
#[arg(long)]
on_event: String,
#[arg(long)]
peer: Option<String>,
#[arg(long)]
kind: Option<String>,
#[arg(long, default_value_t = true)]
verified_only: bool,
#[arg(long, default_value_t = 2)]
interval: u64,
#[arg(long)]
once: bool,
#[arg(long)]
dry_run: bool,
#[arg(long, default_value_t = 6)]
max_per_minute: u32,
#[arg(long, default_value_t = 1)]
max_chain_depth: u32,
},
Notify {
#[arg(long, default_value_t = 2)]
interval: u64,
#[arg(long)]
peer: Option<String>,
#[arg(long)]
once: bool,
#[arg(long)]
json: bool,
},
}
#[derive(Subcommand, Debug)]
pub enum DiagAction {
Tail {
#[arg(long, default_value_t = 20)]
limit: usize,
#[arg(long)]
json: bool,
},
Enable,
Disable,
Status {
#[arg(long)]
json: bool,
},
}
#[derive(Subcommand, Debug)]
pub enum ServiceAction {
Install {
#[arg(long)]
json: bool,
},
Uninstall {
#[arg(long)]
json: bool,
},
Status {
#[arg(long)]
json: bool,
},
}
#[derive(Subcommand, Debug)]
pub enum ResponderCommand {
Set {
status: String,
#[arg(long)]
reason: Option<String>,
#[arg(long)]
json: bool,
},
Get {
peer: Option<String>,
#[arg(long)]
json: bool,
},
}
#[derive(Subcommand, Debug)]
pub enum ProfileAction {
Set {
field: String,
value: String,
#[arg(long)]
json: bool,
},
Get {
#[arg(long)]
json: bool,
},
Clear {
field: String,
#[arg(long)]
json: bool,
},
}
pub fn run() -> Result<()> {
let cli = Cli::parse();
match cli.command {
Command::Init {
handle,
name,
relay,
json,
} => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
Command::Status { peer, json } => {
if let Some(peer) = peer {
cmd_status_peer(&peer, json)
} else {
cmd_status(json)
}
}
Command::Whoami { json } => cmd_whoami(json),
Command::Peers { json } => cmd_peers(json),
Command::Send {
peer,
kind_or_body,
body,
deadline,
json,
} => {
let (kind, body) = match body {
Some(real_body) => (kind_or_body, real_body),
None => ("claim".to_string(), kind_or_body),
};
cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
}
Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
Command::Monitor {
peer,
json,
include_handshake,
interval_ms,
replay,
} => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
Command::Verify { path, json } => cmd_verify(&path, json),
Command::Responder { command } => match command {
ResponderCommand::Set {
status,
reason,
json,
} => cmd_responder_set(&status, reason.as_deref(), json),
ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
},
Command::Mcp => cmd_mcp(),
Command::RelayServer { bind } => cmd_relay_server(&bind),
Command::BindRelay { url, json } => cmd_bind_relay(&url, json),
Command::AddPeerSlot {
handle,
url,
slot_id,
slot_token,
json,
} => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
Command::Pull { json } => cmd_pull(json),
Command::Pin { card_file, json } => cmd_pin(&card_file, json),
Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
Command::ForgetPeer {
handle,
purge,
json,
} => cmd_forget_peer(&handle, purge, json),
Command::Daemon {
interval,
once,
json,
} => cmd_daemon(interval, once, json),
Command::PairHost {
relay,
yes,
timeout,
detach,
json,
} => {
if detach {
cmd_pair_host_detach(&relay, json)
} else {
cmd_pair_host(&relay, yes, timeout)
}
}
Command::PairJoin {
code_phrase,
relay,
yes,
timeout,
detach,
json,
} => {
if detach {
cmd_pair_join_detach(&code_phrase, &relay, json)
} else {
cmd_pair_join(&code_phrase, &relay, yes, timeout)
}
}
Command::PairConfirm {
code_phrase,
digits,
json,
} => cmd_pair_confirm(&code_phrase, &digits, json),
Command::PairList {
json,
watch,
watch_interval,
} => cmd_pair_list(json, watch, watch_interval),
Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
Command::PairWatch {
code_phrase,
status,
timeout,
json,
} => cmd_pair_watch(&code_phrase, &status, timeout, json),
Command::Pair {
handle,
code,
relay,
yes,
timeout,
no_setup,
detach,
} => {
if handle.contains('@') && code.is_none() {
cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
} else if detach {
cmd_pair_detach(&handle, code.as_deref(), &relay)
} else {
cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
}
}
Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
Command::PairListInbound { json } => cmd_pair_list_inbound(json),
Command::Invite {
relay,
ttl,
uses,
share,
json,
} => cmd_invite(&relay, ttl, uses, share, json),
Command::Accept { url, json } => cmd_accept(&url, json),
Command::Whois {
handle,
json,
relay,
} => cmd_whois(handle.as_deref(), json, relay.as_deref()),
Command::Add {
handle,
relay,
json,
} => cmd_add(&handle, relay.as_deref(), json),
Command::Up {
handle,
name,
json,
} => cmd_up(&handle, name.as_deref(), json),
Command::Doctor {
json,
recent_rejections,
} => cmd_doctor(json, recent_rejections),
Command::Upgrade { check, json } => cmd_upgrade(check, json),
Command::Service { action } => cmd_service(action),
Command::Diag { action } => cmd_diag(action),
Command::Claim {
nick,
relay,
public_url,
json,
} => cmd_claim(&nick, relay.as_deref(), public_url.as_deref(), json),
Command::Profile { action } => cmd_profile(action),
Command::Setup { apply } => cmd_setup(apply),
Command::Reactor {
on_event,
peer,
kind,
verified_only,
interval,
once,
dry_run,
max_per_minute,
max_chain_depth,
} => cmd_reactor(
&on_event,
peer.as_deref(),
kind.as_deref(),
verified_only,
interval,
once,
dry_run,
max_per_minute,
max_chain_depth,
),
Command::Notify {
interval,
peer,
once,
json,
} => cmd_notify(interval, peer.as_deref(), once, json),
}
}
fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
if !handle
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
{
bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
}
if config::is_initialized()? {
bail!(
"already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
config::config_dir()?
);
}
config::ensure_dirs()?;
let (sk_seed, pk_bytes) = generate_keypair();
config::write_private_key(&sk_seed)?;
let card = build_agent_card(handle, &pk_bytes, name, None, None);
let signed = sign_agent_card(&card, &sk_seed);
config::write_agent_card(&signed)?;
let mut trust = empty_trust();
add_self_to_trust(&mut trust, handle, &pk_bytes);
config::write_trust(&trust)?;
let fp = fingerprint(&pk_bytes);
let key_id = make_key_id(handle, &pk_bytes);
let mut relay_info: Option<(String, String)> = None;
if let Some(url) = relay {
let normalized = url.trim_end_matches('/');
let client = crate::relay_client::RelayClient::new(normalized);
client.check_healthz()?;
let alloc = client.allocate_slot(Some(handle))?;
let mut state = config::read_relay_state()?;
state["self"] = json!({
"relay_url": normalized,
"slot_id": alloc.slot_id.clone(),
"slot_token": alloc.slot_token,
});
config::write_relay_state(&state)?;
relay_info = Some((normalized.to_string(), alloc.slot_id));
}
let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
if as_json {
let mut out = json!({
"did": did_str.clone(),
"fingerprint": fp,
"key_id": key_id,
"config_dir": config::config_dir()?.to_string_lossy(),
});
if let Some((url, slot_id)) = &relay_info {
out["relay_url"] = json!(url);
out["slot_id"] = json!(slot_id);
}
println!("{}", serde_json::to_string(&out)?);
} else {
println!("generated {did_str} (ed25519:{key_id})");
println!(
"config written to {}",
config::config_dir()?.to_string_lossy()
);
if let Some((url, slot_id)) = &relay_info {
println!("bound to relay {url} (slot {slot_id})");
println!();
println!(
"next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
);
} else {
println!();
println!(
"next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
);
}
}
Ok(())
}
fn cmd_status(as_json: bool) -> Result<()> {
let initialized = config::is_initialized()?;
let mut summary = json!({
"initialized": initialized,
});
if initialized {
let card = config::read_agent_card()?;
let did = card
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let handle = card
.get("handle")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
summary["did"] = json!(did);
summary["handle"] = json!(handle);
summary["fingerprint"] = json!(fingerprint(&pk_bytes));
summary["capabilities"] = card
.get("capabilities")
.cloned()
.unwrap_or_else(|| json!([]));
let trust = config::read_trust()?;
let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
let mut peers = Vec::new();
if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
for (peer_handle, _agent) in agents {
if peer_handle == &handle {
continue; }
peers.push(json!({
"handle": peer_handle,
"tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
}));
}
}
summary["peers"] = json!(peers);
let relay_state = config::read_relay_state()?;
summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
if !summary["self_relay"].is_null() {
if let Some(obj) = summary["self_relay"].as_object_mut() {
obj.remove("slot_token");
}
}
summary["peer_slots_count"] = json!(
relay_state
.get("peers")
.and_then(Value::as_object)
.map(|m| m.len())
.unwrap_or(0)
);
let outbox = config::outbox_dir()?;
let inbox = config::inbox_dir()?;
summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
let record = crate::ensure_up::read_pid_record("daemon");
let pidfile_pid = record.pid();
let pidfile_alive = pidfile_pid
.map(|pid| {
#[cfg(target_os = "linux")]
{
std::path::Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(not(target_os = "linux"))]
{
std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
})
.unwrap_or(false);
let pgrep_pids: Vec<u32> = std::process::Command::new("pgrep")
.args(["-f", "wire daemon"])
.output()
.ok()
.filter(|o| o.status.success())
.map(|o| {
String::from_utf8_lossy(&o.stdout)
.split_whitespace()
.filter_map(|s| s.parse::<u32>().ok())
.collect()
})
.unwrap_or_default();
let orphan_pids: Vec<u32> = pgrep_pids
.iter()
.filter(|p| Some(**p) != pidfile_pid)
.copied()
.collect();
let mut daemon = json!({
"running": pidfile_alive,
"pid": pidfile_pid,
"all_running_pids": pgrep_pids,
"orphans": orphan_pids,
});
if let crate::ensure_up::PidRecord::Json(d) = &record {
daemon["version"] = json!(d.version);
daemon["bin_path"] = json!(d.bin_path);
daemon["did"] = json!(d.did);
daemon["relay_url"] = json!(d.relay_url);
daemon["started_at"] = json!(d.started_at);
daemon["schema"] = json!(d.schema);
if d.version != env!("CARGO_PKG_VERSION") {
daemon["version_mismatch"] = json!({
"daemon": d.version.clone(),
"cli": env!("CARGO_PKG_VERSION"),
});
}
} else if matches!(record, crate::ensure_up::PidRecord::LegacyInt(_)) {
daemon["pidfile_form"] = json!("legacy-int");
daemon["version_mismatch"] = json!({
"daemon": "<pre-0.5.11>",
"cli": env!("CARGO_PKG_VERSION"),
});
}
summary["daemon"] = daemon;
let pending = crate::pending_pair::list_pending().unwrap_or_default();
let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
for p in &pending {
*counts.entry(p.status.clone()).or_default() += 1;
}
let pending_inbound =
crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
let inbound_handles: Vec<&str> = pending_inbound
.iter()
.map(|p| p.peer_handle.as_str())
.collect();
summary["pending_pairs"] = json!({
"total": pending.len(),
"by_status": counts,
"inbound_count": pending_inbound.len(),
"inbound_handles": inbound_handles,
});
}
if as_json {
println!("{}", serde_json::to_string(&summary)?);
} else if !initialized {
println!("not initialized — run `wire init <handle>` first");
} else {
println!("did: {}", summary["did"].as_str().unwrap_or("?"));
println!(
"fingerprint: {}",
summary["fingerprint"].as_str().unwrap_or("?")
);
println!("capabilities: {}", summary["capabilities"]);
if !summary["self_relay"].is_null() {
println!(
"self relay: {} (slot {})",
summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
);
} else {
println!("self relay: (not bound — run `wire pair-host --relay <url>` to bind)");
}
println!(
"peers: {}",
summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
);
for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
println!(
" - {:<20} tier={}",
p["handle"].as_str().unwrap_or(""),
p["tier"].as_str().unwrap_or("?")
);
}
println!(
"outbox: {} file(s), {} event(s) queued",
summary["outbox"]["files"].as_u64().unwrap_or(0),
summary["outbox"]["events"].as_u64().unwrap_or(0)
);
println!(
"inbox: {} file(s), {} event(s) received",
summary["inbox"]["files"].as_u64().unwrap_or(0),
summary["inbox"]["events"].as_u64().unwrap_or(0)
);
let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
let daemon_pid = summary["daemon"]["pid"]
.as_u64()
.map(|p| p.to_string())
.unwrap_or_else(|| "—".to_string());
let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
let version_suffix = if !daemon_version.is_empty() {
format!(" v{daemon_version}")
} else {
String::new()
};
println!(
"daemon: {} (pid {}{})",
if daemon_running { "running" } else { "DOWN" },
daemon_pid,
version_suffix,
);
if let Some(mm) = summary["daemon"].get("version_mismatch") {
println!(
" !! version mismatch: daemon={} CLI={}. \
run `wire upgrade` to swap atomically.",
mm["daemon"].as_str().unwrap_or("?"),
mm["cli"].as_str().unwrap_or("?"),
);
}
if let Some(orphans) = summary["daemon"]["orphans"].as_array()
&& !orphans.is_empty()
{
let pids: Vec<String> = orphans
.iter()
.filter_map(|v| v.as_u64().map(|p| p.to_string()))
.collect();
println!(
" !! orphan daemon process(es): pids {}. \
pgrep saw them but pidfile didn't — likely stale process from \
prior install. Multiple daemons race the relay cursor.",
pids.join(", ")
);
}
let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
let inbound_count = summary["pending_pairs"]["inbound_count"]
.as_u64()
.unwrap_or(0);
if pending_total > 0 {
print!("pending pairs: {pending_total}");
if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
let parts: Vec<String> = obj
.iter()
.map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
.collect();
if !parts.is_empty() {
print!(" ({})", parts.join(", "));
}
}
println!();
} else if inbound_count == 0 {
println!("pending pairs: none");
}
if inbound_count > 0 {
let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
.as_array()
.map(|a| {
a.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
println!(
"inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
handles.join(", "),
);
}
}
Ok(())
}
fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
if !dir.exists() {
return Ok(json!({"files": 0, "events": 0}));
}
let mut files = 0usize;
let mut events = 0usize;
for entry in std::fs::read_dir(dir)? {
let path = entry?.path();
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
files += 1;
if let Ok(body) = std::fs::read_to_string(&path) {
events += body.lines().filter(|l| !l.trim().is_empty()).count();
}
}
}
Ok(json!({"files": files, "events": events}))
}
fn responder_status_allowed(status: &str) -> bool {
matches!(
status,
"online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
)
}
fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
let state = config::read_relay_state()?;
let (label, slot_info) = match peer {
Some(peer) => (
peer.to_string(),
state
.get("peers")
.and_then(|p| p.get(peer))
.ok_or_else(|| {
anyhow!(
"unknown peer {peer:?} in relay state — pair with them first:\n \
wire add {peer}@wireup.net (or {peer}@<their-relay>)\n\
(`wire peers` lists who you've already paired with.)"
)
})?,
),
None => (
"self".to_string(),
state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
anyhow!("self slot not bound — run `wire bind-relay <url>` first")
})?,
),
};
let relay_url = slot_info["relay_url"]
.as_str()
.ok_or_else(|| anyhow!("{label} relay_url missing"))?
.to_string();
let slot_id = slot_info["slot_id"]
.as_str()
.ok_or_else(|| anyhow!("{label} slot_id missing"))?
.to_string();
let slot_token = slot_info["slot_token"]
.as_str()
.ok_or_else(|| anyhow!("{label} slot_token missing"))?
.to_string();
Ok((label, relay_url, slot_id, slot_token))
}
fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
if !responder_status_allowed(status) {
bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
}
let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let mut record = json!({
"status": status,
"set_at": now,
});
if let Some(reason) = reason {
record["reason"] = json!(reason);
}
if status == "online" {
record["last_success_at"] = json!(now);
}
let client = crate::relay_client::RelayClient::new(&relay_url);
let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
if as_json {
println!("{}", serde_json::to_string(&saved)?);
} else {
let reason = saved
.get("reason")
.and_then(Value::as_str)
.map(|r| format!(" — {r}"))
.unwrap_or_default();
println!(
"responder {}{}",
saved
.get("status")
.and_then(Value::as_str)
.unwrap_or(status),
reason
);
}
Ok(())
}
fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
let client = crate::relay_client::RelayClient::new(&relay_url);
let health = client.responder_health_get(&slot_id, &slot_token)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"target": label,
"responder_health": health,
}))?
);
} else if health.is_null() {
println!("{label}: responder health not reported");
} else {
let status = health
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
let reason = health
.get("reason")
.and_then(Value::as_str)
.map(|r| format!(" — {r}"))
.unwrap_or_default();
let last_success = health
.get("last_success_at")
.and_then(Value::as_str)
.map(|t| format!(" (last_success: {t})"))
.unwrap_or_default();
println!("{label}: {status}{reason}{last_success}");
}
Ok(())
}
fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
let client = crate::relay_client::RelayClient::new(&relay_url);
let started = std::time::Instant::now();
let transport_ok = client.healthz().unwrap_or(false);
let latency_ms = started.elapsed().as_millis() as u64;
let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let attention = match last_pull_at_unix {
Some(last) if now.saturating_sub(last) <= 300 => json!({
"status": "ok",
"last_pull_at_unix": last,
"age_seconds": now.saturating_sub(last),
"event_count": event_count,
}),
Some(last) => json!({
"status": "stale",
"last_pull_at_unix": last,
"age_seconds": now.saturating_sub(last),
"event_count": event_count,
}),
None => json!({
"status": "never_pulled",
"last_pull_at_unix": Value::Null,
"event_count": event_count,
}),
};
let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
let responder = if responder_health.is_null() {
json!({"status": "not_reported", "record": Value::Null})
} else {
json!({
"status": responder_health
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown"),
"record": responder_health,
})
};
let report = json!({
"peer": peer,
"transport": {
"status": if transport_ok { "ok" } else { "error" },
"relay_url": relay_url,
"latency_ms": latency_ms,
},
"attention": attention,
"responder": responder,
});
if as_json {
println!("{}", serde_json::to_string(&report)?);
} else {
let transport_line = if transport_ok {
format!("ok relay reachable ({latency_ms}ms)")
} else {
"error relay unreachable".to_string()
};
println!("transport {transport_line}");
match report["attention"]["status"].as_str().unwrap_or("unknown") {
"ok" => println!(
"attention ok last pull {}s ago",
report["attention"]["age_seconds"].as_u64().unwrap_or(0)
),
"stale" => println!(
"attention stale last pull {}m ago",
report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
),
"never_pulled" => println!("attention never pulled since relay reset"),
other => println!("attention {other}"),
}
if report["responder"]["status"] == "not_reported" {
println!("auto-responder not reported");
} else {
let record = &report["responder"]["record"];
let status = record
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
let reason = record
.get("reason")
.and_then(Value::as_str)
.map(|r| format!(" — {r}"))
.unwrap_or_default();
println!("auto-responder {status}{reason}");
}
}
Ok(())
}
fn cmd_whoami(as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let card = config::read_agent_card()?;
let did = card
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let handle = card
.get("handle")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
let fp = fingerprint(&pk_bytes);
let key_id = make_key_id(&handle, &pk_bytes);
let capabilities = card
.get("capabilities")
.cloned()
.unwrap_or_else(|| json!(["wire/v3.1"]));
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"did": did,
"handle": handle,
"fingerprint": fp,
"key_id": key_id,
"public_key_b64": pk_b64,
"capabilities": capabilities,
"config_dir": config::config_dir()?.to_string_lossy(),
}))?
);
} else {
println!("{did} (ed25519:{key_id})");
println!("fingerprint: {fp}");
println!("capabilities: {capabilities}");
}
Ok(())
}
fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
let raw = crate::trust::get_tier(trust, handle);
if raw != "VERIFIED" {
return raw.to_string();
}
let token = relay_state
.get("peers")
.and_then(|p| p.get(handle))
.and_then(|p| p.get("slot_token"))
.and_then(Value::as_str)
.unwrap_or("");
if token.is_empty() {
"PENDING_ACK".to_string()
} else {
raw.to_string()
}
}
fn cmd_peers(as_json: bool) -> Result<()> {
let trust = config::read_trust()?;
let agents = trust
.get("agents")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
let mut self_did: Option<String> = None;
if let Ok(card) = config::read_agent_card() {
self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
}
let mut peers = Vec::new();
for (handle, agent) in agents.iter() {
let did = agent
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if Some(did.as_str()) == self_did.as_deref() {
continue; }
let tier = effective_peer_tier(&trust, &relay_state, handle);
let capabilities = agent
.get("card")
.and_then(|c| c.get("capabilities"))
.cloned()
.unwrap_or_else(|| json!([]));
peers.push(json!({
"handle": handle,
"did": did,
"tier": tier,
"capabilities": capabilities,
}));
}
if as_json {
println!("{}", serde_json::to_string(&peers)?);
} else if peers.is_empty() {
println!("no peers pinned (run `wire join <code>` to pair)");
} else {
for p in &peers {
println!(
"{:<20} {:<10} {}",
p["handle"].as_str().unwrap_or(""),
p["tier"].as_str().unwrap_or(""),
p["did"].as_str().unwrap_or(""),
);
}
}
Ok(())
}
fn maybe_warn_peer_attentiveness(peer: &str) {
let state = match config::read_relay_state() {
Ok(s) => s,
Err(_) => return,
};
let p = state.get("peers").and_then(|p| p.get(peer));
let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
Some(s) if !s.is_empty() => s,
_ => return,
};
let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
Some(s) if !s.is_empty() => s,
_ => return,
};
let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
Some(s) if !s.is_empty() => s.to_string(),
_ => match state
.get("self")
.and_then(|s| s.get("relay_url"))
.and_then(Value::as_str)
{
Some(s) if !s.is_empty() => s.to_string(),
_ => return,
},
};
let client = crate::relay_client::RelayClient::new(&relay_url);
let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
Ok(t) => t,
Err(_) => return,
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
match last_pull {
None => {
eprintln!(
"phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
);
}
Some(t) if now.saturating_sub(t) > 300 => {
let mins = now.saturating_sub(t) / 60;
eprintln!(
"phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
);
}
_ => {}
}
}
pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
let trimmed = input.trim();
if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
{
return Ok(trimmed.to_string());
}
let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
let n: i64 = amount
.parse()
.with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
if n <= 0 {
bail!("deadline duration must be positive: {input:?}");
}
let duration = match unit {
"m" => time::Duration::minutes(n),
"h" => time::Duration::hours(n),
"d" => time::Duration::days(n),
_ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
};
Ok((time::OffsetDateTime::now_utc() + duration)
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
}
fn cmd_send(
peer: &str,
kind: &str,
body_arg: &str,
deadline: Option<&str>,
as_json: bool,
) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let peer = crate::agent_card::bare_handle(peer);
let sk_seed = config::read_private_key()?;
let card = config::read_agent_card()?;
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
let body_value: Value = if body_arg == "-" {
use std::io::Read;
let mut raw = String::new();
std::io::stdin()
.read_to_string(&mut raw)
.with_context(|| "reading body from stdin")?;
serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
} else if let Some(path) = body_arg.strip_prefix('@') {
let raw =
std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
serde_json::from_str(&raw).unwrap_or(Value::String(raw))
} else {
Value::String(body_arg.to_string())
};
let kind_id = parse_kind(kind)?;
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let mut event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now,
"from": did,
"to": format!("did:wire:{peer}"),
"type": kind,
"kind": kind_id,
"body": body_value,
});
if let Some(deadline) = deadline {
event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
}
let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
maybe_warn_peer_attentiveness(peer);
let line = serde_json::to_vec(&signed)?;
let outbox = config::append_outbox_record(peer, &line)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"event_id": event_id,
"status": "queued",
"peer": peer,
"outbox": outbox.to_string_lossy(),
}))?
);
} else {
println!(
"queued event {event_id} → {peer} (outbox: {})",
outbox.display()
);
}
Ok(())
}
fn parse_kind(s: &str) -> Result<u32> {
if let Ok(n) = s.parse::<u32>() {
return Ok(n);
}
for (id, name) in crate::signing::kinds() {
if *name == s {
return Ok(*id);
}
}
Ok(1)
}
fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
let inbox = config::inbox_dir()?;
if !inbox.exists() {
if !as_json {
eprintln!("no inbox yet — daemon hasn't run, or no events received");
}
return Ok(());
}
let trust = config::read_trust()?;
let mut count = 0usize;
let entries: Vec<_> = std::fs::read_dir(&inbox)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension().map(|x| x == "jsonl").unwrap_or(false)
&& match peer {
Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
None => true,
}
})
.collect();
for path in entries {
let body = std::fs::read_to_string(&path)?;
for line in body.lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let verified = verify_message_v31(&event, &trust).is_ok();
if as_json {
let mut event_with_meta = event.clone();
if let Some(obj) = event_with_meta.as_object_mut() {
obj.insert("verified".into(), json!(verified));
}
println!("{}", serde_json::to_string(&event_with_meta)?);
} else {
let ts = event
.get("timestamp")
.and_then(Value::as_str)
.unwrap_or("?");
let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
let summary = event
.get("body")
.map(|b| match b {
Value::String(s) => s.clone(),
_ => b.to_string(),
})
.unwrap_or_default();
let mark = if verified { "✓" } else { "✗" };
let deadline = event
.get("time_sensitive_until")
.and_then(Value::as_str)
.map(|d| format!(" deadline: {d}"))
.unwrap_or_default();
println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
}
count += 1;
if limit > 0 && count >= limit {
return Ok(());
}
}
}
Ok(())
}
fn monitor_is_noise_kind(kind: &str) -> bool {
matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
}
fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
if as_json {
Ok(serde_json::to_string(e)?)
} else {
let eid_short: String = e.event_id.chars().take(12).collect();
let body = e.body_preview.replace('\n', " ");
let ts: String = e.timestamp.chars().take(19).collect();
Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
}
}
fn cmd_monitor(
peer_filter: Option<&str>,
as_json: bool,
include_handshake: bool,
interval_ms: u64,
replay: usize,
) -> Result<()> {
let inbox_dir = config::inbox_dir()?;
if !inbox_dir.exists() {
if !as_json {
eprintln!(
"wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
);
}
}
if replay > 0 && inbox_dir.exists() {
let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
let path = entry.path();
if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
continue;
}
let peer = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if let Some(filter) = peer_filter {
if peer != filter {
continue;
}
}
let body = std::fs::read_to_string(&path).unwrap_or_default();
for line in body.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let signed: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let ev = crate::inbox_watch::InboxEvent::from_signed(
&peer,
signed,
true,
);
if !include_handshake && monitor_is_noise_kind(&ev.kind) {
continue;
}
all.push(ev);
}
}
all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let start = all.len().saturating_sub(replay);
for ev in &all[start..] {
println!("{}", monitor_render(ev, as_json)?);
}
use std::io::Write;
std::io::stdout().flush().ok();
}
let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
loop {
let events = w.poll()?;
let mut wrote = false;
for ev in events {
if let Some(filter) = peer_filter {
if ev.peer != filter {
continue;
}
}
if !include_handshake && monitor_is_noise_kind(&ev.kind) {
continue;
}
println!("{}", monitor_render(&ev, as_json)?);
wrote = true;
}
if wrote {
use std::io::Write;
std::io::stdout().flush().ok();
}
std::thread::sleep(sleep_dur);
}
}
#[cfg(test)]
mod tier_tests {
use super::*;
use serde_json::json;
fn trust_with(handle: &str, tier: &str) -> Value {
json!({
"version": 1,
"agents": {
handle: {
"tier": tier,
"did": format!("did:wire:{handle}"),
"card": {"capabilities": ["wire/v3.1"]}
}
}
})
}
#[test]
fn pending_ack_when_verified_but_no_slot_token() {
let trust = trust_with("willard", "VERIFIED");
let relay_state = json!({
"peers": {
"willard": {
"relay_url": "https://relay",
"slot_id": "abc",
"slot_token": "",
}
}
});
assert_eq!(
effective_peer_tier(&trust, &relay_state, "willard"),
"PENDING_ACK"
);
}
#[test]
fn verified_when_slot_token_present() {
let trust = trust_with("willard", "VERIFIED");
let relay_state = json!({
"peers": {
"willard": {
"relay_url": "https://relay",
"slot_id": "abc",
"slot_token": "tok123",
}
}
});
assert_eq!(
effective_peer_tier(&trust, &relay_state, "willard"),
"VERIFIED"
);
}
#[test]
fn raw_tier_passes_through_for_non_verified() {
let trust = trust_with("willard", "UNTRUSTED");
let relay_state = json!({
"peers": {"willard": {"slot_token": ""}}
});
assert_eq!(
effective_peer_tier(&trust, &relay_state, "willard"),
"UNTRUSTED"
);
}
#[test]
fn pending_ack_when_relay_state_missing_peer() {
let trust = trust_with("willard", "VERIFIED");
let relay_state = json!({"peers": {}});
assert_eq!(
effective_peer_tier(&trust, &relay_state, "willard"),
"PENDING_ACK"
);
}
}
#[cfg(test)]
mod monitor_tests {
use super::*;
use crate::inbox_watch::InboxEvent;
use serde_json::Value;
fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
InboxEvent {
peer: peer.to_string(),
event_id: "abcd1234567890ef".to_string(),
kind: kind.to_string(),
body_preview: body.to_string(),
verified: true,
timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
raw: Value::Null,
}
}
#[test]
fn monitor_filter_drops_handshake_kinds_by_default() {
assert!(monitor_is_noise_kind("pair_drop"));
assert!(monitor_is_noise_kind("pair_drop_ack"));
assert!(monitor_is_noise_kind("heartbeat"));
assert!(!monitor_is_noise_kind("claim"));
assert!(!monitor_is_noise_kind("decision"));
assert!(!monitor_is_noise_kind("ack"));
assert!(!monitor_is_noise_kind("request"));
assert!(!monitor_is_noise_kind("note"));
assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
}
#[test]
fn monitor_render_plain_is_one_short_line() {
let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
let line = monitor_render(&e, false).unwrap();
assert!(!line.contains('\n'), "render must be one line: {line}");
assert!(line.contains("willard"));
assert!(line.contains("claim"));
assert!(line.contains("real v8 train"));
assert!(line.contains("abcd12345678"));
assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
assert!(line.contains("2026-05-15T23:14:07"));
}
#[test]
fn monitor_render_strips_newlines_from_body() {
let e = ev("spark", "claim", "line one\nline two\nline three");
let line = monitor_render(&e, false).unwrap();
assert!(!line.contains('\n'), "newlines must be stripped: {line}");
assert!(line.contains("line one line two line three"));
}
#[test]
fn monitor_render_json_is_valid_jsonl() {
let e = ev("spark", "claim", "hi");
let line = monitor_render(&e, true).unwrap();
assert!(!line.contains('\n'));
let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
assert_eq!(parsed["peer"], "spark");
assert_eq!(parsed["kind"], "claim");
assert_eq!(parsed["body_preview"], "hi");
}
#[test]
fn monitor_does_not_drop_on_verified_null() {
let mut e = ev("spark", "claim", "from disk with verified=null");
e.verified = false; let line = monitor_render(&e, false).unwrap();
assert!(line.contains("from disk with verified=null"));
assert!(!monitor_is_noise_kind("claim"));
}
}
fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
let body = if path == "-" {
let mut buf = String::new();
use std::io::Read;
std::io::stdin().read_to_string(&mut buf)?;
buf
} else {
std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
};
let event: Value = serde_json::from_str(&body)?;
let trust = config::read_trust()?;
match verify_message_v31(&event, &trust) {
Ok(()) => {
if as_json {
println!("{}", serde_json::to_string(&json!({"verified": true}))?);
} else {
println!("verified ✓");
}
Ok(())
}
Err(e) => {
let reason = e.to_string();
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"verified": false, "reason": reason}))?
);
} else {
eprintln!("FAILED: {reason}");
}
std::process::exit(1);
}
}
}
fn cmd_mcp() -> Result<()> {
crate::mcp::run()
}
fn cmd_relay_server(bind: &str) -> Result<()> {
let state_dir = if let Ok(home) = std::env::var("WIRE_HOME") {
std::path::PathBuf::from(home)
.join("state")
.join("wire-relay")
} else {
dirs::state_dir()
.or_else(dirs::data_local_dir)
.ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
.join("wire-relay")
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(crate::relay_server::serve(bind, state_dir))
}
fn cmd_bind_relay(url: &str, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let card = config::read_agent_card()?;
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
let normalized = url.trim_end_matches('/');
let client = crate::relay_client::RelayClient::new(normalized);
client.check_healthz()?;
let alloc = client.allocate_slot(Some(&handle))?;
let mut state = config::read_relay_state()?;
state["self"] = json!({
"relay_url": url,
"slot_id": alloc.slot_id,
"slot_token": alloc.slot_token,
});
config::write_relay_state(&state)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"relay_url": url,
"slot_id": alloc.slot_id,
"slot_token_present": true,
}))?
);
} else {
println!("bound to relay {url}");
println!("slot_id: {}", alloc.slot_id);
println!(
"(slot_token written to {} mode 0600)",
config::relay_state_path()?.display()
);
}
Ok(())
}
fn cmd_add_peer_slot(
handle: &str,
url: &str,
slot_id: &str,
slot_token: &str,
as_json: bool,
) -> Result<()> {
let mut state = config::read_relay_state()?;
let peers = state["peers"]
.as_object_mut()
.ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
peers.insert(
handle.to_string(),
json!({
"relay_url": url,
"slot_id": slot_id,
"slot_token": slot_token,
}),
);
config::write_relay_state(&state)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle,
"relay_url": url,
"slot_id": slot_id,
"added": true,
}))?
);
} else {
println!("pinned peer slot for {handle} at {url} ({slot_id})");
}
Ok(())
}
fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
let state = config::read_relay_state()?;
let peers = state["peers"].as_object().cloned().unwrap_or_default();
if peers.is_empty() {
bail!(
"no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
);
}
let outbox_dir = config::outbox_dir()?;
if outbox_dir.exists() {
let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
let path = entry.path();
if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
continue;
}
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if pinned.contains(&stem) {
continue;
}
let bare = crate::agent_card::bare_handle(&stem);
if pinned.contains(bare) {
eprintln!(
"wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
Merge with: `cat {} >> {}` then delete the FQDN file.",
stem,
path.display(),
outbox_dir.join(format!("{bare}.jsonl")).display(),
);
}
}
}
if !outbox_dir.exists() {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
);
} else {
println!("phyllis: nothing to dial out — write a message first with `wire send`");
}
return Ok(());
}
let mut pushed = Vec::new();
let mut skipped = Vec::new();
for (peer_handle, slot_info) in peers.iter() {
if let Some(want) = peer_filter
&& peer_handle != want
{
continue;
}
let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
if !outbox.exists() {
continue;
}
let url = slot_info["relay_url"]
.as_str()
.ok_or_else(|| anyhow!("peer {peer_handle} missing relay_url"))?;
let slot_id = slot_info["slot_id"]
.as_str()
.ok_or_else(|| anyhow!("peer {peer_handle} missing slot_id"))?;
let slot_token = slot_info["slot_token"]
.as_str()
.ok_or_else(|| anyhow!("peer {peer_handle} missing slot_token"))?;
let client = crate::relay_client::RelayClient::new(url);
let body = std::fs::read_to_string(&outbox)?;
for line in body.lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
match client.post_event(slot_id, slot_token, &event) {
Ok(resp) => {
if resp.status == "duplicate" {
skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
} else {
pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
}
}
Err(e) => {
let reason = crate::relay_client::format_transport_error(&e);
skipped.push(
json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
);
}
}
}
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
);
} else {
println!(
"pushed {} event(s); skipped {} ({})",
pushed.len(),
skipped.len(),
if skipped.is_empty() {
"none"
} else {
"see --json for detail"
}
);
}
Ok(())
}
fn cmd_pull(as_json: bool) -> Result<()> {
let state = config::read_relay_state()?;
let self_state = state.get("self").cloned().unwrap_or(Value::Null);
if self_state.is_null() {
bail!("self slot not bound — run `wire bind-relay <url>` first");
}
let url = self_state["relay_url"]
.as_str()
.ok_or_else(|| anyhow!("self.relay_url missing"))?;
let slot_id = self_state["slot_id"]
.as_str()
.ok_or_else(|| anyhow!("self.slot_id missing"))?;
let slot_token = self_state["slot_token"]
.as_str()
.ok_or_else(|| anyhow!("self.slot_token missing"))?;
let last_event_id = self_state
.get("last_pulled_event_id")
.and_then(Value::as_str)
.map(str::to_string);
let client = crate::relay_client::RelayClient::new(url);
let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
let inbox_dir = config::inbox_dir()?;
config::ensure_dirs()?;
let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
if let Some(eid) = &result.advance_cursor_to {
let eid = eid.clone();
config::update_relay_state(|state| {
if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
}
Ok(())
})?;
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"written": result.written,
"rejected": result.rejected,
"total_seen": events.len(),
"cursor_blocked": result.blocked,
"cursor_advanced_to": result.advance_cursor_to,
}))?
);
} else {
let blocking = result
.rejected
.iter()
.filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
.count();
if blocking > 0 {
println!(
"pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
events.len(),
result.written.len(),
result.rejected.len(),
blocking,
);
} else {
println!(
"pulled {} event(s); wrote {}; rejected {}",
events.len(),
result.written.len(),
result.rejected.len(),
);
}
}
Ok(())
}
fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let mut state = config::read_relay_state()?;
let self_state = state.get("self").cloned().unwrap_or(Value::Null);
if self_state.is_null() {
bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
}
let url = self_state["relay_url"]
.as_str()
.ok_or_else(|| anyhow!("self.relay_url missing"))?
.to_string();
let old_slot_id = self_state["slot_id"]
.as_str()
.ok_or_else(|| anyhow!("self.slot_id missing"))?
.to_string();
let old_slot_token = self_state["slot_token"]
.as_str()
.ok_or_else(|| anyhow!("self.slot_token missing"))?
.to_string();
let card = config::read_agent_card()?;
let did = card
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let handle = crate::agent_card::display_handle_from_did(&did).to_string();
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
.to_string();
let pk_bytes = crate::signing::b64decode(&pk_b64)?;
let sk_seed = config::read_private_key()?;
let normalized = url.trim_end_matches('/').to_string();
let client = crate::relay_client::RelayClient::new(&normalized);
client
.check_healthz()
.context("aborting rotation; old slot still valid")?;
let alloc = client.allocate_slot(Some(&handle))?;
let new_slot_id = alloc.slot_id.clone();
let new_slot_token = alloc.slot_token.clone();
let mut announced: Vec<String> = Vec::new();
if !no_announce {
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let body = json!({
"reason": "operator-initiated slot rotation",
"new_relay_url": url,
"new_slot_id": new_slot_id,
});
let peers = state["peers"].as_object().cloned().unwrap_or_default();
for (peer_handle, _peer_info) in peers.iter() {
let event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now.clone(),
"from": did,
"to": format!("did:wire:{peer_handle}"),
"type": "wire_close",
"kind": 1201,
"body": body.clone(),
});
let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
Ok(s) => s,
Err(e) => {
eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
continue;
}
};
let peer_info = match state["peers"].get(peer_handle) {
Some(p) => p.clone(),
None => continue,
};
let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
continue;
}
let peer_client = if peer_url == url {
client.clone()
} else {
crate::relay_client::RelayClient::new(peer_url)
};
match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
Ok(_) => announced.push(peer_handle.clone()),
Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
}
}
}
state["self"] = json!({
"relay_url": url,
"slot_id": new_slot_id,
"slot_token": new_slot_token,
});
config::write_relay_state(&state)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"rotated": true,
"old_slot_id": old_slot_id,
"new_slot_id": new_slot_id,
"relay_url": url,
"announced_to": announced,
}))?
);
} else {
println!("rotated slot on {url}");
println!(
" old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
);
println!(" new slot_id: {new_slot_id}");
if !announced.is_empty() {
println!(
" announced wire_close (kind=1201) to: {}",
announced.join(", ")
);
}
println!();
println!("next steps:");
println!(" - peers see the wire_close event in their next `wire pull`");
println!(
" - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
);
println!(" (or full re-pair via `wire pair-host`/`wire join`)");
println!(" - until they do, you'll receive but they won't be able to reach you");
let _ = old_slot_token;
}
Ok(())
}
fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
let mut trust = config::read_trust()?;
let mut removed_from_trust = false;
if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
&& agents.remove(handle).is_some()
{
removed_from_trust = true;
}
config::write_trust(&trust)?;
let mut state = config::read_relay_state()?;
let mut removed_from_relay = false;
if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
&& peers.remove(handle).is_some()
{
removed_from_relay = true;
}
config::write_relay_state(&state)?;
let mut purged: Vec<String> = Vec::new();
if purge {
for dir in [config::inbox_dir()?, config::outbox_dir()?] {
let path = dir.join(format!("{handle}.jsonl"));
if path.exists() {
std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
purged.push(path.to_string_lossy().into());
}
}
}
if !removed_from_trust && !removed_from_relay {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"removed": false,
"reason": format!("peer {handle:?} not pinned"),
}))?
);
} else {
eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
}
return Ok(());
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle,
"removed_from_trust": removed_from_trust,
"removed_from_relay_state": removed_from_relay,
"purged_files": purged,
}))?
);
} else {
println!("forgot peer {handle:?}");
if removed_from_trust {
println!(" - removed from trust.json");
}
if removed_from_relay {
println!(" - removed from relay.json");
}
if !purged.is_empty() {
for p in &purged {
println!(" - deleted {p}");
}
} else if !purge {
println!(" (inbox/outbox files preserved; pass --purge to delete them)");
}
}
Ok(())
}
fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
if !as_json {
if once {
eprintln!("wire daemon: single sync cycle, then exit");
} else {
eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
}
}
if let Err(e) = crate::pending_pair::cleanup_on_startup() {
eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
}
let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
if !once {
crate::daemon_stream::spawn_stream_subscriber(wake_tx);
}
loop {
let pushed = run_sync_push().unwrap_or_else(|e| {
eprintln!("daemon: push error: {e:#}");
json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
});
let pulled = run_sync_pull().unwrap_or_else(|e| {
eprintln!("daemon: pull error: {e:#}");
json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
});
let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
eprintln!("daemon: pending-pair tick error: {e:#}");
json!({"transitions": []})
});
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"ts": time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default(),
"push": pushed,
"pull": pulled,
"pairs": pairs,
}))?
);
} else {
let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
let pair_transitions = pairs["transitions"]
.as_array()
.map(|a| a.len())
.unwrap_or(0);
if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
eprintln!(
"daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
);
}
if let Some(arr) = pairs["transitions"].as_array() {
for t in arr {
eprintln!(
" pair {} : {} → {}",
t.get("code").and_then(Value::as_str).unwrap_or("?"),
t.get("from").and_then(Value::as_str).unwrap_or("?"),
t.get("to").and_then(Value::as_str).unwrap_or("?")
);
if let Some(sas) = t.get("sas").and_then(Value::as_str)
&& t.get("to").and_then(Value::as_str) == Some("sas_ready")
{
eprintln!(" SAS digits: {}-{}", &sas[..3], &sas[3..]);
eprintln!(
" Run: wire pair-confirm {} {}",
t.get("code").and_then(Value::as_str).unwrap_or("?"),
sas
);
}
}
}
}
if once {
return Ok(());
}
let _ = wake_rx.recv_timeout(interval);
while wake_rx.try_recv().is_ok() {}
}
}
fn run_sync_push() -> Result<Value> {
let state = config::read_relay_state()?;
let peers = state["peers"].as_object().cloned().unwrap_or_default();
if peers.is_empty() {
return Ok(json!({"pushed": [], "skipped": []}));
}
let outbox_dir = config::outbox_dir()?;
if !outbox_dir.exists() {
return Ok(json!({"pushed": [], "skipped": []}));
}
let mut pushed = Vec::new();
let mut skipped = Vec::new();
for (peer_handle, slot_info) in peers.iter() {
let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
if !outbox.exists() {
continue;
}
let url = slot_info["relay_url"].as_str().unwrap_or("");
let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
continue;
}
let client = crate::relay_client::RelayClient::new(url);
let body = std::fs::read_to_string(&outbox)?;
for line in body.lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
match client.post_event(slot_id, slot_token, &event) {
Ok(resp) => {
if resp.status == "duplicate" {
skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
} else {
pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
}
}
Err(e) => {
let reason = crate::relay_client::format_transport_error(&e);
skipped.push(
json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
);
}
}
}
}
Ok(json!({"pushed": pushed, "skipped": skipped}))
}
fn run_sync_pull() -> Result<Value> {
let state = config::read_relay_state()?;
let self_state = state.get("self").cloned().unwrap_or(Value::Null);
if self_state.is_null() {
return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
}
let url = self_state["relay_url"].as_str().unwrap_or("");
let slot_id = self_state["slot_id"].as_str().unwrap_or("");
let slot_token = self_state["slot_token"].as_str().unwrap_or("");
let last_event_id = self_state
.get("last_pulled_event_id")
.and_then(Value::as_str)
.map(str::to_string);
if url.is_empty() {
return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
}
let client = crate::relay_client::RelayClient::new(url);
let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
let inbox_dir = config::inbox_dir()?;
config::ensure_dirs()?;
let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
if let Some(eid) = &result.advance_cursor_to {
let eid = eid.clone();
config::update_relay_state(|state| {
if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
}
Ok(())
})?;
}
Ok(json!({
"written": result.written,
"rejected": result.rejected,
"total_seen": events.len(),
"cursor_blocked": result.blocked,
"cursor_advanced_to": result.advance_cursor_to,
}))
}
fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
let body =
std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
let card: Value =
serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
crate::agent_card::verify_agent_card(&card)
.map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
let mut trust = config::read_trust()?;
crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
config::write_trust(&trust)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle,
"did": did,
"tier": "VERIFIED",
"pinned": true,
}))?
);
} else {
println!("pinned {handle} ({did}) at tier VERIFIED");
}
Ok(())
}
fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
}
fn cmd_pair_join(
code_phrase: &str,
relay_url: &str,
auto_yes: bool,
timeout_secs: u64,
) -> Result<()> {
pair_orchestrate(
relay_url,
Some(code_phrase),
"guest",
auto_yes,
timeout_secs,
)
}
fn pair_orchestrate(
relay_url: &str,
code_in: Option<&str>,
role: &str,
auto_yes: bool,
timeout_secs: u64,
) -> Result<()> {
use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
let mut s = pair_session_open(role, relay_url, code_in)?;
if role == "host" {
eprintln!();
eprintln!("share this code phrase with your peer:");
eprintln!();
eprintln!(" {}", s.code);
eprintln!();
eprintln!(
"waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
s.code
);
} else {
eprintln!();
eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
}
const HEARTBEAT_SECS: u64 = 10;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
let started = std::time::Instant::now();
let mut last_heartbeat = started;
let formatted = loop {
if let Some(sas) = pair_session_try_sas(&mut s)? {
break sas;
}
let now = std::time::Instant::now();
if now >= deadline {
return Err(anyhow!(
"timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
));
}
if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
let elapsed = now.duration_since(started).as_secs();
eprintln!(" ... still waiting ({elapsed}s / {timeout_secs}s)");
last_heartbeat = now;
}
std::thread::sleep(std::time::Duration::from_millis(250));
};
eprintln!();
eprintln!("SAS digits (must match peer's terminal):");
eprintln!();
eprintln!(" {formatted}");
eprintln!();
if !auto_yes {
eprint!("does this match your peer's terminal? [y/N]: ");
use std::io::Write;
std::io::stderr().flush().ok();
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
let trimmed = input.trim().to_lowercase();
if trimmed != "y" && trimmed != "yes" {
bail!("SAS confirmation declined — aborting pairing");
}
}
s.sas_confirmed = true;
let result = pair_session_finalize(&mut s, timeout_secs)?;
let peer_did = result["paired_with"].as_str().unwrap_or("");
let peer_role = if role == "host" { "guest" } else { "host" };
eprintln!("paired with {peer_did} (peer role: {peer_role})");
eprintln!("peer card pinned at tier VERIFIED");
eprintln!(
"peer relay slot saved to {}",
config::relay_state_path()?.display()
);
println!("{}", serde_json::to_string(&result)?);
Ok(())
}
fn cmd_pair(
handle: &str,
code: Option<&str>,
relay: &str,
auto_yes: bool,
timeout_secs: u64,
no_setup: bool,
) -> Result<()> {
let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
let did = init_result
.get("did")
.and_then(|v| v.as_str())
.unwrap_or("(unknown)")
.to_string();
let already = init_result
.get("already_initialized")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if already {
println!("(identity {did} already initialized — reusing)");
} else {
println!("initialized {did}");
}
println!();
match code {
None => {
println!("hosting pair on {relay} (no code = host) ...");
cmd_pair_host(relay, auto_yes, timeout_secs)?;
}
Some(c) => {
println!("joining pair with code {c} on {relay} ...");
cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
}
}
if !no_setup {
println!();
println!("registering wire as MCP server in detected client configs ...");
if let Err(e) = cmd_setup(true) {
eprintln!("warn: setup --apply failed: {e}");
eprintln!(" pair succeeded; you can re-run `wire setup --apply` manually.");
}
}
println!();
println!("pair complete. Next steps:");
println!(" wire daemon start # background sync of inbox/outbox vs relay");
println!(" wire send <peer> claim <msg> # send your peer something");
println!(" wire tail # watch incoming events");
Ok(())
}
fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
let did = init_result
.get("did")
.and_then(|v| v.as_str())
.unwrap_or("(unknown)")
.to_string();
let already = init_result
.get("already_initialized")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if already {
println!("(identity {did} already initialized — reusing)");
} else {
println!("initialized {did}");
}
println!();
match code {
None => cmd_pair_host_detach(relay, false),
Some(c) => cmd_pair_join_detach(c, relay, false),
}
}
fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
Ok(b) => b,
Err(e) => {
if !as_json {
eprintln!(
"warn: could not auto-start daemon: {e}; pair will queue but not advance"
);
}
false
}
};
let code = crate::sas::generate_code_phrase();
let code_hash = crate::pair_session::derive_code_hash(&code);
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default();
let p = crate::pending_pair::PendingPair {
code: code.clone(),
code_hash,
role: "host".to_string(),
relay_url: relay_url.to_string(),
status: "request_host".to_string(),
sas: None,
peer_did: None,
created_at: now,
last_error: None,
pair_id: None,
our_slot_id: None,
our_slot_token: None,
spake2_seed_b64: None,
};
crate::pending_pair::write_pending(&p)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"state": "queued",
"code_phrase": code,
"relay_url": relay_url,
"role": "host",
"daemon_spawned": daemon_spawned,
}))?
);
} else {
if daemon_spawned {
println!("(started wire daemon in background)");
}
println!("detached pair-host queued. Share this code with your peer:\n");
println!(" {code}\n");
println!("Next steps:");
println!(" wire pair-list # check status");
println!(" wire pair-confirm {code} <digits> # when SAS shows up");
println!(" wire pair-cancel {code} # to abort");
}
Ok(())
}
fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire init <handle>` first");
}
let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
Ok(b) => b,
Err(e) => {
if !as_json {
eprintln!(
"warn: could not auto-start daemon: {e}; pair will queue but not advance"
);
}
false
}
};
let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
let code_hash = crate::pair_session::derive_code_hash(&code);
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default();
let p = crate::pending_pair::PendingPair {
code: code.clone(),
code_hash,
role: "guest".to_string(),
relay_url: relay_url.to_string(),
status: "request_guest".to_string(),
sas: None,
peer_did: None,
created_at: now,
last_error: None,
pair_id: None,
our_slot_id: None,
our_slot_token: None,
spake2_seed_b64: None,
};
crate::pending_pair::write_pending(&p)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"state": "queued",
"code_phrase": code,
"relay_url": relay_url,
"role": "guest",
"daemon_spawned": daemon_spawned,
}))?
);
} else {
if daemon_spawned {
println!("(started wire daemon in background)");
}
println!("detached pair-join queued for code {code}.");
println!(
"Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
);
}
Ok(())
}
fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
let typed: String = typed_digits
.chars()
.filter(|c| c.is_ascii_digit())
.collect();
if typed.len() != 6 {
bail!(
"expected 6 digits (got {} after stripping non-digits)",
typed.len()
);
}
let mut p = crate::pending_pair::read_pending(&code)?
.ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
if p.status != "sas_ready" {
bail!(
"pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
p.status
);
}
let stored = p
.sas
.as_ref()
.ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
.clone();
if stored == typed {
p.status = "confirmed".to_string();
crate::pending_pair::write_pending(&p)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"state": "confirmed",
"code_phrase": code,
}))?
);
} else {
println!("digits match. Daemon will finalize the handshake on its next tick.");
println!("Run `wire peers` after a few seconds to confirm.");
}
} else {
p.status = "aborted".to_string();
p.last_error = Some(format!(
"SAS digit mismatch (typed {typed}, expected {stored})"
));
let client = crate::relay_client::RelayClient::new(&p.relay_url);
let _ = client.pair_abandon(&p.code_hash);
crate::pending_pair::write_pending(&p)?;
crate::os_notify::toast(
&format!("wire — pair aborted ({})", p.code),
p.last_error.as_deref().unwrap_or("digits mismatch"),
);
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"state": "aborted",
"code_phrase": code,
"error": "digits mismatch",
}))?
);
}
bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
}
Ok(())
}
fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
if watch {
return cmd_pair_list_watch(watch_interval_secs);
}
let spake2_items = crate::pending_pair::list_pending()?;
let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
if as_json {
println!("{}", serde_json::to_string(&spake2_items)?);
return Ok(());
}
if spake2_items.is_empty() && inbound_items.is_empty() {
println!("no pending pair sessions.");
return Ok(());
}
if !inbound_items.is_empty() {
println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
println!(
"{:<20} {:<35} {:<25} NEXT STEP",
"PEER", "RELAY", "RECEIVED"
);
for p in &inbound_items {
println!(
"{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
p.peer_handle,
p.peer_relay_url,
p.received_at,
peer = p.peer_handle,
);
}
println!();
}
if !spake2_items.is_empty() {
println!("SPAKE2 SESSIONS");
println!(
"{:<15} {:<8} {:<18} {:<10} NOTE",
"CODE", "ROLE", "STATUS", "SAS"
);
for p in spake2_items {
let sas = p
.sas
.as_ref()
.map(|d| format!("{}-{}", &d[..3], &d[3..]))
.unwrap_or_else(|| "—".to_string());
let note = p
.last_error
.as_deref()
.or(p.peer_did.as_deref())
.unwrap_or("");
println!(
"{:<15} {:<8} {:<18} {:<10} {}",
p.code, p.role, p.status, sas, note
);
}
}
Ok(())
}
fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
use std::collections::HashMap;
use std::io::Write;
let interval = std::time::Duration::from_secs(interval_secs.max(1));
let mut prev: HashMap<String, String> = HashMap::new();
{
let items = crate::pending_pair::list_pending()?;
for p in &items {
println!("{}", serde_json::to_string(&p)?);
prev.insert(p.code.clone(), p.status.clone());
}
let _ = std::io::stdout().flush();
}
loop {
std::thread::sleep(interval);
let items = match crate::pending_pair::list_pending() {
Ok(v) => v,
Err(_) => continue,
};
let mut cur: HashMap<String, String> = HashMap::new();
for p in &items {
cur.insert(p.code.clone(), p.status.clone());
match prev.get(&p.code) {
None => {
println!("{}", serde_json::to_string(&p)?);
}
Some(prev_status) if prev_status != &p.status => {
println!("{}", serde_json::to_string(&p)?);
}
_ => {}
}
}
for code in prev.keys() {
if !cur.contains_key(code) {
println!(
"{}",
serde_json::to_string(&json!({
"code": code,
"status": "removed",
"_synthetic": true,
}))?
);
}
}
let _ = std::io::stdout().flush();
prev = cur;
}
}
fn cmd_pair_watch(
code_phrase: &str,
target_status: &str,
timeout_secs: u64,
as_json: bool,
) -> Result<()> {
let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
let mut last_seen_status: Option<String> = None;
loop {
let p_opt = crate::pending_pair::read_pending(&code)?;
let now = std::time::Instant::now();
match p_opt {
None => {
if last_seen_status.is_some() {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"state": "finalized", "code": code}))?
);
} else {
println!("pair {code} finalized (file removed)");
}
return Ok(());
} else {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
);
}
std::process::exit(1);
}
}
Some(p) => {
let cur = p.status.clone();
if Some(cur.clone()) != last_seen_status {
if as_json {
println!("{}", serde_json::to_string(&p)?);
}
last_seen_status = Some(cur.clone());
}
if cur == target_status {
if !as_json {
let sas_str = p
.sas
.as_ref()
.map(|s| format!("{}-{}", &s[..3], &s[3..]))
.unwrap_or_else(|| "—".to_string());
println!("pair {code} reached {target_status} (SAS: {sas_str})");
}
return Ok(());
}
if cur == "aborted" || cur == "aborted_restart" {
if !as_json {
let err = p.last_error.as_deref().unwrap_or("(no detail)");
eprintln!("pair {code} {cur}: {err}");
}
std::process::exit(1);
}
}
}
if now >= deadline {
if !as_json {
eprintln!(
"timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
);
}
std::process::exit(2);
}
std::thread::sleep(std::time::Duration::from_millis(250));
}
}
fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
let p = crate::pending_pair::read_pending(&code)?
.ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
let client = crate::relay_client::RelayClient::new(&p.relay_url);
let _ = client.pair_abandon(&p.code_hash);
crate::pending_pair::delete_pending(&code)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"state": "cancelled",
"code_phrase": code,
}))?
);
} else {
println!("cancelled pending pair {code} (relay slot released, file removed).");
}
Ok(())
}
fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
let code = crate::sas::parse_code_phrase(code_phrase)?;
let code_hash = crate::pair_session::derive_code_hash(code);
let client = crate::relay_client::RelayClient::new(relay_url);
client.pair_abandon(&code_hash)?;
println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
println!("host can now issue a fresh code; guest can re-join.");
Ok(())
}
fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
let share_payload: Option<Value> = if share {
let client = reqwest::blocking::Client::new();
let single_use = if uses == 1 { Some(1u32) } else { None };
let body = json!({
"invite_url": url,
"ttl_seconds": ttl,
"uses": single_use,
});
let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
let resp = client.post(&endpoint).json(&body).send()?;
if !resp.status().is_success() {
let code = resp.status();
let txt = resp.text().unwrap_or_default();
bail!("relay {code} on /v1/invite/register: {txt}");
}
let parsed: Value = resp.json()?;
let token = parsed
.get("token")
.and_then(Value::as_str)
.ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
.to_string();
let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
let curl_line = format!("curl -fsSL {share_url} | sh");
Some(json!({
"token": token,
"share_url": share_url,
"curl": curl_line,
"expires_unix": parsed.get("expires_unix"),
}))
} else {
None
};
if as_json {
let mut out = json!({
"invite_url": url,
"ttl_secs": ttl,
"uses": uses,
"relay": relay,
});
if let Some(s) = &share_payload {
out["share"] = s.clone();
}
println!("{}", serde_json::to_string(&out)?);
} else if let Some(s) = share_payload {
let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
println!("{curl}");
} else {
eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
eprintln!("# TTL: {ttl}s. Uses: {uses}.");
println!("{url}");
}
Ok(())
}
fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
let resolved = if url.starts_with("http://") || url.starts_with("https://") {
let sep = if url.contains('?') { '&' } else { '?' };
let resolve_url = format!("{url}{sep}format=url");
let client = reqwest::blocking::Client::new();
let resp = client
.get(&resolve_url)
.send()
.with_context(|| format!("GET {resolve_url}"))?;
if !resp.status().is_success() {
bail!("could not resolve short URL {url} (HTTP {})", resp.status());
}
let body = resp.text().unwrap_or_default().trim().to_string();
if !body.starts_with("wire://pair?") {
bail!(
"short URL {url} did not resolve to a wire:// invite. \
(got: {}{})",
body.chars().take(80).collect::<String>(),
if body.chars().count() > 80 { "…" } else { "" }
);
}
body
} else {
url.to_string()
};
let result = crate::pair_invite::accept_invite(&resolved)?;
if as_json {
println!("{}", serde_json::to_string(&result)?);
} else {
let did = result
.get("paired_with")
.and_then(Value::as_str)
.unwrap_or("?");
println!("paired with {did}");
println!(
"you can now: wire send {} <kind> <body>",
crate::agent_card::display_handle_from_did(did)
);
}
Ok(())
}
fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
if let Some(h) = handle {
let parsed = crate::pair_profile::parse_handle(h)?;
if config::is_initialized()? {
let card = config::read_agent_card()?;
let local_handle = card
.get("profile")
.and_then(|p| p.get("handle"))
.and_then(Value::as_str)
.map(str::to_string);
if local_handle.as_deref() == Some(h) {
return cmd_whois(None, as_json, None);
}
}
let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
if as_json {
println!("{}", serde_json::to_string(&resolved)?);
} else {
print_resolved_profile(&resolved);
}
return Ok(());
}
let card = config::read_agent_card()?;
if as_json {
let profile = card.get("profile").cloned().unwrap_or(Value::Null);
println!(
"{}",
serde_json::to_string(&json!({
"did": card.get("did").cloned().unwrap_or(Value::Null),
"profile": profile,
}))?
);
} else {
print!("{}", crate::pair_profile::render_self_summary()?);
}
Ok(())
}
fn print_resolved_profile(resolved: &Value) {
let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
let relay = resolved
.get("relay_url")
.and_then(Value::as_str)
.unwrap_or("");
let slot = resolved
.get("slot_id")
.and_then(Value::as_str)
.unwrap_or("");
let profile = resolved
.get("card")
.and_then(|c| c.get("profile"))
.cloned()
.unwrap_or(Value::Null);
println!("{did}");
println!(" nick: {nick}");
if !relay.is_empty() {
println!(" relay_url: {relay}");
}
if !slot.is_empty() {
println!(" slot_id: {slot}");
}
let pick =
|k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
if let Some(s) = pick("display_name") {
println!(" display_name: {s}");
}
if let Some(s) = pick("emoji") {
println!(" emoji: {s}");
}
if let Some(s) = pick("motto") {
println!(" motto: {s}");
}
if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
let joined: Vec<String> = arr
.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect();
println!(" vibe: {}", joined.join(", "));
}
if let Some(s) = pick("pronouns") {
println!(" pronouns: {s}");
}
}
fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
let parsed = crate::pair_profile::parse_handle(handle_arg)?;
let (our_did, our_relay, our_slot_id, our_slot_token) =
crate::pair_invite::ensure_self_with_relay(relay_override)?;
if our_did == format!("did:wire:{}", parsed.nick) {
bail!("refusing to add self (handle matches own DID)");
}
if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
return cmd_add_accept_pending(
handle_arg,
&parsed.nick,
&pending,
&our_relay,
&our_slot_id,
&our_slot_token,
as_json,
);
}
let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
let peer_card = resolved
.get("card")
.cloned()
.ok_or_else(|| anyhow!("resolved missing card"))?;
let peer_did = resolved
.get("did")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("resolved missing did"))?
.to_string();
let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
let peer_slot_id = resolved
.get("slot_id")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("resolved missing slot_id"))?
.to_string();
let peer_relay = resolved
.get("relay_url")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| relay_override.map(str::to_string))
.unwrap_or_else(|| format!("https://{}", parsed.domain));
let mut trust = config::read_trust()?;
crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
config::write_trust(&trust)?;
let mut relay_state = config::read_relay_state()?;
let existing_token = relay_state
.get("peers")
.and_then(|p| p.get(&peer_handle))
.and_then(|p| p.get("slot_token"))
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_default();
relay_state["peers"][&peer_handle] = json!({
"relay_url": peer_relay,
"slot_id": peer_slot_id,
"slot_token": existing_token, });
config::write_relay_state(&relay_state)?;
let our_card = config::read_agent_card()?;
let sk_seed = config::read_private_key()?;
let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
let pk_b64 = our_card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default();
let event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now,
"from": our_did,
"to": peer_did,
"type": "pair_drop",
"kind": 1100u32,
"body": {
"card": our_card,
"relay_url": our_relay,
"slot_id": our_slot_id,
"slot_token": our_slot_token,
},
});
let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
let client = crate::relay_client::RelayClient::new(&peer_relay);
let resp = client.handle_intro(&parsed.nick, &signed)?;
let event_id = signed
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle_arg,
"paired_with": peer_did,
"peer_handle": peer_handle,
"event_id": event_id,
"drop_response": resp,
"status": "drop_sent",
}))?
);
} else {
println!(
"→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
);
}
Ok(())
}
fn cmd_add_accept_pending(
handle_arg: &str,
peer_nick: &str,
pending: &crate::pending_inbound_pair::PendingInboundPair,
_our_relay: &str,
_our_slot_id: &str,
_our_slot_token: &str,
as_json: bool,
) -> Result<()> {
let mut trust = config::read_trust()?;
crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
config::write_trust(&trust)?;
let mut relay_state = config::read_relay_state()?;
relay_state["peers"][&pending.peer_handle] = json!({
"relay_url": pending.peer_relay_url,
"slot_id": pending.peer_slot_id,
"slot_token": pending.peer_slot_token,
});
config::write_relay_state(&relay_state)?;
crate::pair_invite::send_pair_drop_ack(
&pending.peer_handle,
&pending.peer_relay_url,
&pending.peer_slot_id,
&pending.peer_slot_token,
)
.with_context(|| {
format!(
"pair_drop_ack send to {} @ {} slot {} failed",
pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
)
})?;
crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle_arg,
"paired_with": pending.peer_did,
"peer_handle": pending.peer_handle,
"status": "bilateral_accepted",
"via": "pending_inbound",
}))?
);
} else {
println!(
"→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
peer = pending.peer_handle,
);
}
Ok(())
}
fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
let nick = crate::agent_card::bare_handle(peer_nick);
let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
anyhow!(
"no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
)
})?;
let (_our_did, our_relay, our_slot_id, our_slot_token) =
crate::pair_invite::ensure_self_with_relay(None)?;
let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
cmd_add_accept_pending(
&handle_arg,
nick,
&pending,
&our_relay,
&our_slot_id,
&our_slot_token,
as_json,
)
}
fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
let items = crate::pending_inbound_pair::list_pending_inbound()?;
if as_json {
println!("{}", serde_json::to_string(&items)?);
return Ok(());
}
if items.is_empty() {
println!("no pending inbound pair requests.");
return Ok(());
}
println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
for p in items {
println!(
"{:<20} {:<35} {:<25} {}",
p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
);
}
println!(
"→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`."
);
Ok(())
}
fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
let nick = crate::agent_card::bare_handle(peer_nick);
let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
crate::pending_inbound_pair::consume_pending_inbound(nick)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"peer": nick,
"rejected": existed.is_some(),
"had_pending": existed.is_some(),
}))?
);
} else if existed.is_some() {
println!("→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent.");
} else {
println!("no pending pair from {nick} — nothing to reject");
}
Ok(())
}
fn cmd_diag(action: DiagAction) -> Result<()> {
let state = config::state_dir()?;
let knob = state.join("diag.enabled");
let log_path = state.join("diag.jsonl");
match action {
DiagAction::Tail { limit, json } => {
let entries = crate::diag::tail(limit);
if json {
for e in entries {
println!("{}", serde_json::to_string(&e)?);
}
} else if entries.is_empty() {
println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
} else {
for e in entries {
let ts = e["ts"].as_u64().unwrap_or(0);
let ty = e["type"].as_str().unwrap_or("?");
let pid = e["pid"].as_u64().unwrap_or(0);
let payload = e["payload"].to_string();
println!("[{ts}] pid={pid} {ty} {payload}");
}
}
}
DiagAction::Enable => {
config::ensure_dirs()?;
std::fs::write(&knob, "1")?;
println!("wire diag: enabled at {knob:?}");
}
DiagAction::Disable => {
if knob.exists() {
std::fs::remove_file(&knob)?;
}
println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
}
DiagAction::Status { json } => {
let enabled = crate::diag::is_enabled();
let size = std::fs::metadata(&log_path)
.map(|m| m.len())
.unwrap_or(0);
if json {
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"enabled": enabled,
"log_path": log_path,
"log_size_bytes": size,
}))?
);
} else {
println!("wire diag status");
println!(" enabled: {enabled}");
println!(" log: {log_path:?}");
println!(" log size: {size} bytes");
}
}
}
Ok(())
}
fn cmd_service(action: ServiceAction) -> Result<()> {
let (report, as_json) = match action {
ServiceAction::Install { json } => (crate::service::install()?, json),
ServiceAction::Uninstall { json } => (crate::service::uninstall()?, json),
ServiceAction::Status { json } => (crate::service::status()?, json),
};
if as_json {
println!("{}", serde_json::to_string(&report)?);
} else {
println!("wire service {}", report.action);
println!(" platform: {}", report.platform);
println!(" unit: {}", report.unit_path);
println!(" status: {}", report.status);
println!(" detail: {}", report.detail);
}
Ok(())
}
fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
let pgrep_out = std::process::Command::new("pgrep")
.args(["-f", "wire daemon"])
.output();
let running_pids: Vec<u32> = match pgrep_out {
Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
.split_whitespace()
.filter_map(|s| s.parse::<u32>().ok())
.collect(),
_ => Vec::new(),
};
let record = crate::ensure_up::read_pid_record("daemon");
let recorded_version: Option<String> = match &record {
crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
_ => None,
};
let cli_version = env!("CARGO_PKG_VERSION").to_string();
if check_only {
let report = json!({
"running_pids": running_pids,
"pidfile_version": recorded_version,
"cli_version": cli_version,
"would_kill": running_pids,
});
if as_json {
println!("{}", serde_json::to_string(&report)?);
} else {
println!("wire upgrade --check");
println!(" cli version: {cli_version}");
println!(" pidfile version: {}", recorded_version.as_deref().unwrap_or("(missing)"));
if running_pids.is_empty() {
println!(" running daemons: none");
} else {
let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
println!(" running daemons: pids {}", pids.join(", "));
println!(" would kill all + spawn fresh");
}
}
return Ok(());
}
let mut killed: Vec<u32> = Vec::new();
for pid in &running_pids {
let _ = std::process::Command::new("kill")
.args(["-15", &pid.to_string()])
.status();
killed.push(*pid);
}
if !killed.is_empty() {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
let still_alive: Vec<u32> = killed
.iter()
.copied()
.filter(|p| process_alive_pid(*p))
.collect();
if still_alive.is_empty() {
break;
}
if std::time::Instant::now() >= deadline {
for pid in still_alive {
let _ = std::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.status();
}
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
let pidfile = config::state_dir()?.join("daemon.pid");
if pidfile.exists() {
let _ = std::fs::remove_file(&pidfile);
}
let spawned = crate::ensure_up::ensure_daemon_running()?;
let new_record = crate::ensure_up::read_pid_record("daemon");
let new_pid = new_record.pid();
let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
Some(d.version.clone())
} else {
None
};
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"killed": killed,
"spawned_fresh_daemon": spawned,
"new_pid": new_pid,
"new_version": new_version,
"cli_version": cli_version,
}))?
);
} else {
if killed.is_empty() {
println!("wire upgrade: no stale daemons running");
} else {
println!("wire upgrade: killed {} daemon(s) (pids {})",
killed.len(),
killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
}
if spawned {
println!(
"wire upgrade: spawned fresh daemon (pid {} v{})",
new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
new_version.as_deref().unwrap_or(&cli_version),
);
} else {
println!("wire upgrade: daemon was already running on current binary");
}
}
Ok(())
}
fn process_alive_pid(pid: u32) -> bool {
#[cfg(target_os = "linux")]
{
std::path::Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(not(target_os = "linux"))]
{
std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct DoctorCheck {
pub id: String,
pub status: String,
pub detail: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub fix: Option<String>,
}
impl DoctorCheck {
fn pass(id: &str, detail: impl Into<String>) -> Self {
Self {
id: id.into(),
status: "PASS".into(),
detail: detail.into(),
fix: None,
}
}
fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
Self {
id: id.into(),
status: "WARN".into(),
detail: detail.into(),
fix: Some(fix.into()),
}
}
fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
Self {
id: id.into(),
status: "FAIL".into(),
detail: detail.into(),
fix: Some(fix.into()),
}
}
}
fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
let mut checks: Vec<DoctorCheck> = Vec::new();
checks.push(check_daemon_health());
checks.push(check_daemon_pid_consistency());
checks.push(check_relay_reachable());
checks.push(check_pair_rejections(recent_rejections));
checks.push(check_cursor_progress());
let fails = checks.iter().filter(|c| c.status == "FAIL").count();
let warns = checks.iter().filter(|c| c.status == "WARN").count();
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"checks": checks,
"fail_count": fails,
"warn_count": warns,
"ok": fails == 0,
}))?
);
} else {
println!("wire doctor — {} checks", checks.len());
for c in &checks {
let bullet = match c.status.as_str() {
"PASS" => "✓",
"WARN" => "!",
"FAIL" => "✗",
_ => "?",
};
println!(" {bullet} [{}] {}: {}", c.status, c.id, c.detail);
if let Some(fix) = &c.fix {
println!(" fix: {fix}");
}
}
println!();
if fails == 0 && warns == 0 {
println!("ALL GREEN");
} else {
println!("{fails} FAIL, {warns} WARN");
}
}
if fails > 0 {
std::process::exit(1);
}
Ok(())
}
fn check_daemon_health() -> DoctorCheck {
let output = std::process::Command::new("pgrep")
.args(["-f", "wire daemon"])
.output();
let pgrep_pids: Vec<u32> = match output {
Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
.split_whitespace()
.filter_map(|s| s.parse::<u32>().ok())
.collect(),
_ => Vec::new(),
};
let pidfile_pid = crate::ensure_up::read_pid_record("daemon").pid();
let pidfile_alive = pidfile_pid
.map(|pid| {
#[cfg(target_os = "linux")]
{
std::path::Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(not(target_os = "linux"))]
{
std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
})
.unwrap_or(false);
let orphan_pids: Vec<u32> = pgrep_pids
.iter()
.filter(|p| Some(**p) != pidfile_pid)
.copied()
.collect();
let fmt_pids = |xs: &[u32]| -> String {
xs.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", ")
};
match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
(0, _, _) => DoctorCheck::fail(
"daemon",
"no `wire daemon` process running — nothing pulling inbox or pushing outbox",
"`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
),
(1, true, true) => DoctorCheck::pass(
"daemon",
format!(
"one daemon running (pid {}, matches pidfile)",
pgrep_pids[0]
),
),
(n, true, false) => DoctorCheck::fail(
"daemon",
format!(
"{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
The orphans race the relay cursor — they advance past events your current binary can't process. \
(Issue #2 exact class.)",
fmt_pids(&pgrep_pids),
pidfile_pid.unwrap(),
fmt_pids(&orphan_pids),
),
"`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
),
(n, false, _) => DoctorCheck::fail(
"daemon",
format!(
"{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
(Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
fmt_pids(&pgrep_pids),
match pidfile_pid {
Some(p) => format!("claims pid {p} which is dead"),
None => "is missing".to_string(),
},
),
"`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
),
(n, true, true) => DoctorCheck::warn(
"daemon",
format!(
"{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
fmt_pids(&pgrep_pids)
),
"kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
),
}
}
fn check_daemon_pid_consistency() -> DoctorCheck {
let record = crate::ensure_up::read_pid_record("daemon");
match record {
crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
"daemon_pid_consistency",
"no daemon.pid yet — fresh box or daemon never started",
),
crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
"daemon_pid_consistency",
format!("daemon.pid is corrupt: {reason}"),
"delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
),
crate::ensure_up::PidRecord::LegacyInt(pid) => DoctorCheck::warn(
"daemon_pid_consistency",
format!(
"daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
Daemon was started by a pre-0.5.11 binary."
),
"run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
),
crate::ensure_up::PidRecord::Json(d) => {
let mut issues: Vec<String> = Vec::new();
if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
issues.push(format!(
"schema={} (expected {})",
d.schema,
crate::ensure_up::DAEMON_PID_SCHEMA
));
}
let cli_version = env!("CARGO_PKG_VERSION");
if d.version != cli_version {
issues.push(format!(
"version daemon={} cli={cli_version}",
d.version
));
}
if !std::path::Path::new(&d.bin_path).exists() {
issues.push(format!("bin_path {} missing on disk", d.bin_path));
}
if let Ok(card) = config::read_agent_card()
&& let Some(current_did) = card.get("did").and_then(Value::as_str)
&& let Some(recorded_did) = &d.did
&& recorded_did != current_did
{
issues.push(format!(
"did daemon={recorded_did} config={current_did} — identity drift"
));
}
if let Ok(state) = config::read_relay_state()
&& let Some(current_relay) = state
.get("self")
.and_then(|s| s.get("relay_url"))
.and_then(Value::as_str)
&& let Some(recorded_relay) = &d.relay_url
&& recorded_relay != current_relay
{
issues.push(format!(
"relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
));
}
if issues.is_empty() {
DoctorCheck::pass(
"daemon_pid_consistency",
format!(
"daemon v{} bound to {} as {}",
d.version,
d.relay_url.as_deref().unwrap_or("?"),
d.did.as_deref().unwrap_or("?")
),
)
} else {
DoctorCheck::warn(
"daemon_pid_consistency",
format!("daemon pidfile drift: {}", issues.join("; ")),
"`wire upgrade` to atomically restart daemon with current config".to_string(),
)
}
}
}
}
fn check_relay_reachable() -> DoctorCheck {
let state = match config::read_relay_state() {
Ok(s) => s,
Err(e) => return DoctorCheck::fail(
"relay",
format!("could not read relay state: {e}"),
"run `wire up <handle>@<relay>` to bootstrap",
),
};
let url = state
.get("self")
.and_then(|s| s.get("relay_url"))
.and_then(Value::as_str)
.unwrap_or("");
if url.is_empty() {
return DoctorCheck::warn(
"relay",
"no relay bound — wire send/pull will not work",
"run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
);
}
let client = crate::relay_client::RelayClient::new(url);
match client.check_healthz() {
Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
Err(e) => DoctorCheck::fail(
"relay",
format!("{url} unreachable: {e}"),
format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
),
}
}
fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
let path = match config::state_dir() {
Ok(d) => d.join("pair-rejected.jsonl"),
Err(e) => return DoctorCheck::warn(
"pair_rejections",
format!("could not resolve state dir: {e}"),
"set WIRE_HOME or fix XDG_STATE_HOME",
),
};
if !path.exists() {
return DoctorCheck::pass(
"pair_rejections",
"no pair-rejected.jsonl — no recorded pair failures",
);
}
let body = match std::fs::read_to_string(&path) {
Ok(b) => b,
Err(e) => return DoctorCheck::warn(
"pair_rejections",
format!("could not read {path:?}: {e}"),
"check file permissions",
),
};
let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
if lines.is_empty() {
return DoctorCheck::pass(
"pair_rejections",
"pair-rejected.jsonl present but empty",
);
}
let total = lines.len();
let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
let mut summary: Vec<String> = Vec::new();
for line in &recent {
if let Ok(rec) = serde_json::from_str::<Value>(line) {
let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
summary.push(format!("{peer}/{code}"));
}
}
DoctorCheck::warn(
"pair_rejections",
format!(
"{total} pair failures recorded. recent: [{}]",
summary.join(", ")
),
format!(
"inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
),
)
}
fn check_cursor_progress() -> DoctorCheck {
let state = match config::read_relay_state() {
Ok(s) => s,
Err(e) => return DoctorCheck::warn(
"cursor",
format!("could not read relay state: {e}"),
"check ~/Library/Application Support/wire/relay.json",
),
};
let cursor = state
.get("self")
.and_then(|s| s.get("last_pulled_event_id"))
.and_then(Value::as_str)
.map(|s| s.chars().take(16).collect::<String>())
.unwrap_or_else(|| "<none>".to_string());
DoctorCheck::pass(
"cursor",
format!(
"current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
),
)
}
#[cfg(test)]
mod doctor_tests {
use super::*;
#[test]
fn doctor_check_constructors_set_status_correctly() {
let p = DoctorCheck::pass("x", "ok");
assert_eq!(p.status, "PASS");
assert_eq!(p.fix, None);
let w = DoctorCheck::warn("x", "watch out", "do this");
assert_eq!(w.status, "WARN");
assert_eq!(w.fix, Some("do this".to_string()));
let f = DoctorCheck::fail("x", "broken", "fix it");
assert_eq!(f.status, "FAIL");
assert_eq!(f.fix, Some("fix it".to_string()));
}
#[test]
fn check_pair_rejections_no_file_is_pass() {
config::test_support::with_temp_home(|| {
config::ensure_dirs().unwrap();
let c = check_pair_rejections(5);
assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
});
}
#[test]
fn check_pair_rejections_with_entries_warns() {
config::test_support::with_temp_home(|| {
config::ensure_dirs().unwrap();
crate::pair_invite::record_pair_rejection(
"willard",
"pair_drop_ack_send_failed",
"POST 502",
);
let c = check_pair_rejections(5);
assert_eq!(c.status, "WARN");
assert!(c.detail.contains("1 pair failures"));
assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
});
}
}
fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
let (nick, relay_url) = match handle_arg.split_once('@') {
Some((n, host)) => {
let url = if host.starts_with("http://") || host.starts_with("https://") {
host.to_string()
} else {
format!("https://{host}")
};
(n.to_string(), url)
}
None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
};
let mut report: Vec<(String, String)> = Vec::new();
let mut step = |stage: &str, detail: String| {
report.push((stage.to_string(), detail.clone()));
if !as_json {
eprintln!("wire up: {stage} — {detail}");
}
};
if config::is_initialized()? {
let card = config::read_agent_card()?;
let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
let existing_handle =
crate::agent_card::display_handle_from_did(existing_did).to_string();
if existing_handle != nick {
bail!(
"wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
delete `{:?}` to start fresh.",
config::config_dir()?
);
}
step("init", format!("already initialized as {existing_handle}"));
} else {
cmd_init(&nick, name, Some(&relay_url), false)?;
step("init", format!("created identity {nick} bound to {relay_url}"));
}
let relay_state = config::read_relay_state()?;
let bound_relay = relay_state
.get("self")
.and_then(|s| s.get("relay_url"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if bound_relay.is_empty() {
cmd_bind_relay(&relay_url, false)?;
step("bind-relay", format!("bound to {relay_url}"));
} else if bound_relay != relay_url {
step(
"bind-relay",
format!(
"WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
),
);
} else {
step("bind-relay", format!("already bound to {bound_relay}"));
}
match cmd_claim(&nick, Some(&relay_url), None, false) {
Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
Err(e) => step(
"claim",
format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
),
}
match crate::ensure_up::ensure_daemon_running() {
Ok(true) => step("daemon", "started fresh background daemon".to_string()),
Ok(false) => step("daemon", "already running".to_string()),
Err(e) => step(
"daemon",
format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
),
}
let summary = format!(
"ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
`wire monitor` to watch incoming events."
);
step("ready", summary.clone());
if as_json {
let steps_json: Vec<_> = report
.iter()
.map(|(k, v)| json!({"stage": k, "detail": v}))
.collect();
println!(
"{}",
serde_json::to_string(&json!({
"nick": nick,
"relay": relay_url,
"steps": steps_json,
}))?
);
}
Ok(())
}
fn strip_proto(url: &str) -> String {
url.trim_start_matches("https://")
.trim_start_matches("http://")
.to_string()
}
fn cmd_pair_megacommand(
handle_arg: &str,
relay_override: Option<&str>,
timeout_secs: u64,
_as_json: bool,
) -> Result<()> {
let parsed = crate::pair_profile::parse_handle(handle_arg)?;
let peer_handle = parsed.nick.clone();
eprintln!("wire pair: resolving {handle_arg}...");
cmd_add(handle_arg, relay_override, false)?;
eprintln!(
"wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
to ack (their daemon must be running + pulling)..."
);
let _ = run_sync_pull();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
let poll_interval = std::time::Duration::from_millis(500);
loop {
let _ = run_sync_pull();
let relay_state = config::read_relay_state()?;
let peer_entry = relay_state
.get("peers")
.and_then(|p| p.get(&peer_handle))
.cloned();
let token = peer_entry
.as_ref()
.and_then(|e| e.get("slot_token"))
.and_then(Value::as_str)
.unwrap_or("");
if !token.is_empty() {
let trust = config::read_trust()?;
let pinned_in_trust = trust
.get("agents")
.and_then(|a| a.get(&peer_handle))
.is_some();
println!(
"wire pair: paired with {peer_handle}.\n trust: {} bilateral: yes (slot_token recorded)\n next: `wire send {peer_handle} \"<msg>\"`",
if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
);
return Ok(());
}
if std::time::Instant::now() >= deadline {
bail!(
"wire pair: timed out after {timeout_secs}s. \
peer {peer_handle} never sent pair_drop_ack. \
likely causes: (a) their daemon is down — ask them to run \
`wire status` and `wire daemon &`; (b) their binary is older \
than 0.5.x and doesn't understand pair_drop events — ask \
them to `wire upgrade`; (c) network / relay blip — re-run \
`wire pair {handle_arg}` to retry."
);
}
std::thread::sleep(poll_interval);
}
}
fn cmd_claim(
nick: &str,
relay_override: Option<&str>,
public_url: Option<&str>,
as_json: bool,
) -> Result<()> {
if !crate::pair_profile::is_valid_nick(nick) {
bail!(
"phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
);
}
let (_did, relay_url, slot_id, slot_token) =
crate::pair_invite::ensure_self_with_relay(relay_override)?;
let card = config::read_agent_card()?;
let client = crate::relay_client::RelayClient::new(&relay_url);
let resp = client.handle_claim(nick, &slot_id, &slot_token, public_url, &card)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"nick": nick,
"relay": relay_url,
"response": resp,
}))?
);
} else {
let domain = public_url
.unwrap_or(&relay_url)
.trim_start_matches("https://")
.trim_start_matches("http://")
.trim_end_matches('/')
.split('/')
.next()
.unwrap_or("<this-relay-domain>")
.to_string();
println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
println!("verify with: wire whois {nick}@{domain}");
}
Ok(())
}
fn cmd_profile(action: ProfileAction) -> Result<()> {
match action {
ProfileAction::Set { field, value, json } => {
let parsed: Value =
serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
if json {
println!(
"{}",
serde_json::to_string(&json!({
"field": field,
"profile": new_profile,
}))?
);
} else {
println!("profile.{field} set");
}
}
ProfileAction::Get { json } => return cmd_whois(None, json, None),
ProfileAction::Clear { field, json } => {
let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
if json {
println!(
"{}",
serde_json::to_string(&json!({
"field": field,
"cleared": true,
"profile": new_profile,
}))?
);
} else {
println!("profile.{field} cleared");
}
}
}
Ok(())
}
fn cmd_setup(apply: bool) -> Result<()> {
use std::path::PathBuf;
let entry = json!({"command": "wire", "args": ["mcp"]});
let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
let mut targets: Vec<(&str, PathBuf)> = Vec::new();
if let Some(home) = dirs::home_dir() {
targets.push(("Claude Code", home.join(".claude.json")));
targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
#[cfg(target_os = "macos")]
targets.push((
"Claude Desktop (macOS)",
home.join("Library/Application Support/Claude/claude_desktop_config.json"),
));
#[cfg(target_os = "windows")]
if let Ok(appdata) = std::env::var("APPDATA") {
targets.push((
"Claude Desktop (Windows)",
PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
));
}
targets.push(("Cursor", home.join(".cursor/mcp.json")));
}
targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
println!("wire setup\n");
println!("MCP server snippet (add this to your client's mcpServers):");
println!();
println!("{entry_pretty}");
println!();
if !apply {
println!("Probable MCP host config locations on this machine:");
for (name, path) in &targets {
let marker = if path.exists() {
"✓ found"
} else {
" (would create)"
};
println!(" {marker:14} {name}: {}", path.display());
}
println!();
println!("Run `wire setup --apply` to merge wire into each config above.");
println!(
"Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
);
return Ok(());
}
let mut modified: Vec<String> = Vec::new();
let mut skipped: Vec<String> = Vec::new();
for (name, path) in &targets {
match upsert_mcp_entry(path, "wire", &entry) {
Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
Ok(false) => skipped.push(format!(" {name} ({}): already configured", path.display())),
Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
}
}
if !modified.is_empty() {
println!("Modified:");
for line in &modified {
println!(" {line}");
}
println!();
println!("Restart the app(s) above to load wire MCP.");
}
if !skipped.is_empty() {
println!();
println!("Skipped:");
for line in &skipped {
println!(" {line}");
}
}
Ok(())
}
fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
let mut cfg: Value = if path.exists() {
let body = std::fs::read_to_string(path).context("reading config")?;
serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
} else {
json!({})
};
if !cfg.is_object() {
cfg = json!({});
}
let root = cfg.as_object_mut().unwrap();
let servers = root
.entry("mcpServers".to_string())
.or_insert_with(|| json!({}));
if !servers.is_object() {
*servers = json!({});
}
let map = servers.as_object_mut().unwrap();
if map.get(server_name) == Some(entry) {
return Ok(false);
}
map.insert(server_name.to_string(), entry.clone());
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent).context("creating parent dir")?;
}
let out = serde_json::to_string_pretty(&cfg)? + "\n";
std::fs::write(path, out).context("writing config")?;
Ok(true)
}
#[allow(clippy::too_many_arguments)]
fn cmd_reactor(
on_event: &str,
peer_filter: Option<&str>,
kind_filter: Option<&str>,
verified_only: bool,
interval_secs: u64,
once: bool,
dry_run: bool,
max_per_minute: u32,
max_chain_depth: u32,
) -> Result<()> {
use crate::inbox_watch::{InboxEvent, InboxWatcher};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::Write;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
let cursor_path = config::state_dir()?.join("reactor.cursor");
let emitted_path = config::state_dir()?.join("reactor-emitted.log");
let mut emitted_ids: HashSet<String> = HashSet::new();
if emitted_path.exists()
&& let Ok(body) = std::fs::read_to_string(&emitted_path)
{
for line in body.lines() {
let t = line.trim();
if !t.is_empty() {
emitted_ids.insert(t.to_string());
}
}
}
let outbox_dir = config::outbox_dir()?;
let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
let kind_num: Option<u32> = match kind_filter {
Some(k) => Some(parse_kind(k)?),
None => None,
};
let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
let dispatch = |ev: &InboxEvent,
peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
emitted_ids: &HashSet<String>|
-> Result<bool> {
if let Some(p) = peer_filter
&& ev.peer != p
{
return Ok(false);
}
if verified_only && !ev.verified {
return Ok(false);
}
if let Some(want) = kind_num {
let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
if ev_kind != Some(want) {
return Ok(false);
}
}
if max_chain_depth > 0 {
let body_str = match &ev.raw["body"] {
Value::String(s) => s.clone(),
other => serde_json::to_string(other).unwrap_or_default(),
};
if let Some(referenced) = parse_re_marker(&body_str) {
let matched = emitted_ids.contains(&referenced)
|| emitted_ids.iter().any(|full| full.starts_with(&referenced));
if matched {
eprintln!(
"wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
ev.event_id, ev.peer, referenced
);
return Ok(false);
}
}
}
if max_per_minute > 0 {
let now = Instant::now();
let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
while let Some(&front) = win.front() {
if now.duration_since(front) > Duration::from_secs(60) {
win.pop_front();
} else {
break;
}
}
if win.len() as u32 >= max_per_minute {
eprintln!(
"wire reactor: skip {} from {} — rate-limit ({}/min reached)",
ev.event_id, ev.peer, max_per_minute
);
return Ok(false);
}
win.push_back(now);
}
if dry_run {
println!("{}", serde_json::to_string(&ev.raw)?);
return Ok(true);
}
let mut child = Command::new("sh")
.arg("-c")
.arg(on_event)
.stdin(Stdio::piped())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.env("WIRE_EVENT_PEER", &ev.peer)
.env("WIRE_EVENT_ID", &ev.event_id)
.env("WIRE_EVENT_KIND", &ev.kind)
.spawn()
.with_context(|| format!("spawning reactor handler: {on_event}"))?;
if let Some(mut stdin) = child.stdin.take() {
let body = serde_json::to_vec(&ev.raw)?;
let _ = stdin.write_all(&body);
let _ = stdin.write_all(b"\n");
}
std::mem::drop(child);
Ok(true)
};
let scan_outbox = |emitted_ids: &mut HashSet<String>,
outbox_cursors: &mut HashMap<String, u64>|
-> Result<usize> {
if !outbox_dir.exists() {
return Ok(0);
}
let mut added = 0;
let mut new_ids: Vec<String> = Vec::new();
for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
let path = entry.path();
if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
continue;
}
let peer = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
let start = *outbox_cursors.get(&peer).unwrap_or(&0);
if cur_len <= start {
outbox_cursors.insert(peer, start);
continue;
}
let body = std::fs::read_to_string(&path).unwrap_or_default();
let tail = &body[start as usize..];
for line in tail.lines() {
if let Ok(v) = serde_json::from_str::<Value>(line)
&& let Some(eid) = v.get("event_id").and_then(Value::as_str)
&& emitted_ids.insert(eid.to_string())
{
new_ids.push(eid.to_string());
added += 1;
}
}
outbox_cursors.insert(peer, cur_len);
}
if !new_ids.is_empty() {
let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
if all.len() > 500 {
all.sort();
let drop_n = all.len() - 500;
let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
emitted_ids.retain(|x| !dropped.contains(x));
all = emitted_ids.iter().cloned().collect();
}
let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
}
Ok(added)
};
let sweep = |watcher: &mut InboxWatcher,
emitted_ids: &mut HashSet<String>,
outbox_cursors: &mut HashMap<String, u64>,
peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
-> Result<usize> {
let _ = scan_outbox(emitted_ids, outbox_cursors);
let events = watcher.poll()?;
let mut fired = 0usize;
for ev in &events {
match dispatch(ev, peer_dispatch_log, emitted_ids) {
Ok(true) => fired += 1,
Ok(false) => {}
Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
}
}
watcher.save_cursors(&cursor_path)?;
Ok(fired)
};
if once {
sweep(
&mut watcher,
&mut emitted_ids,
&mut outbox_cursors,
&mut peer_dispatch_log,
)?;
return Ok(());
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
loop {
if let Err(e) = sweep(
&mut watcher,
&mut emitted_ids,
&mut outbox_cursors,
&mut peer_dispatch_log,
) {
eprintln!("wire reactor: sweep error: {e}");
}
std::thread::sleep(interval);
}
}
fn parse_re_marker(body: &str) -> Option<String> {
let needle = "(re:";
let i = body.find(needle)?;
let rest = &body[i + needle.len()..];
let end = rest.find(')')?;
let id = rest[..end].trim().to_string();
if id.is_empty() {
return None;
}
Some(id)
}
fn cmd_notify(
interval_secs: u64,
peer_filter: Option<&str>,
once: bool,
as_json: bool,
) -> Result<()> {
use crate::inbox_watch::InboxWatcher;
let cursor_path = config::state_dir()?.join("notify.cursor");
let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
let events = watcher.poll()?;
for ev in events {
if let Some(p) = peer_filter
&& ev.peer != p
{
continue;
}
if as_json {
println!("{}", serde_json::to_string(&ev)?);
} else {
os_notify_inbox_event(&ev);
}
}
watcher.save_cursors(&cursor_path)?;
Ok(())
};
if once {
return sweep(&mut watcher);
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
loop {
if let Err(e) = sweep(&mut watcher) {
eprintln!("wire notify: sweep error: {e}");
}
std::thread::sleep(interval);
}
}
fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
let title = if ev.verified {
format!("wire ← {}", ev.peer)
} else {
format!("wire ← {} (UNVERIFIED)", ev.peer)
};
let body = format!("{}: {}", ev.kind, ev.body_preview);
crate::os_notify::toast(&title, &body);
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
fn os_toast(title: &str, body: &str) {
eprintln!("[wire notify] {title}\n {body}");
}