use anyhow::{Context, Result, anyhow, bail};
use serde_json::{Value, json};
use super::setup;
use crate::{config, signing::sign_message_v31};
pub(super) fn cmd_mcp() -> Result<()> {
crate::mcp::run()
}
pub(super) fn cmd_relay_server(
bind: &str,
local_only: bool,
uds: Option<&std::path::Path>,
) -> Result<()> {
if let Some(socket_path) = uds {
let base = if let Ok(home) = std::env::var("WIRE_HOME") {
std::path::PathBuf::from(home)
.join("state")
.join("wire-relay")
.join("uds")
} else {
dirs::state_dir()
.or_else(dirs::data_local_dir)
.ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
.join("wire-relay")
.join("uds")
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
return runtime.block_on(crate::relay_server::serve_uds(
socket_path.to_path_buf(),
base,
));
}
if local_only {
validate_loopback_bind(bind)?;
}
let base = if let Ok(home) = std::env::var("WIRE_HOME") {
std::path::PathBuf::from(home)
.join("state")
.join("wire-relay")
} else {
dirs::state_dir()
.or_else(dirs::data_local_dir)
.ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
.join("wire-relay")
};
let state_dir = if local_only { base.join("local") } else { base };
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(crate::relay_server::serve_with_mode(
bind,
state_dir,
crate::relay_server::ServerMode { local_only },
))
}
fn validate_loopback_bind(bind: &str) -> Result<()> {
let host = if let Some(stripped) = bind.strip_prefix('[') {
let close = stripped
.find(']')
.ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
stripped[..close].to_string()
} else {
bind.rsplit_once(':')
.map(|(h, _)| h.to_string())
.unwrap_or_else(|| bind.to_string())
};
use std::net::{IpAddr, ToSocketAddrs};
let probe = format!("{host}:0");
let resolved: Vec<_> = probe
.to_socket_addrs()
.with_context(|| format!("resolving bind host {host:?}"))?
.collect();
if resolved.is_empty() {
bail!("--local-only: bind host {host:?} resolved to no addresses");
}
for addr in &resolved {
let ip = addr.ip();
let is_acceptable = match ip {
IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private() || {
let octets = v4.octets();
octets[0] == 100 && (64..=127).contains(&octets[1])
}
}
IpAddr::V6(v6) => v6.is_loopback(), };
if !is_acceptable {
bail!(
"--local-only refuses non-private bind: {host:?} resolves to {ip} \
which is not loopback (127/8, ::1), RFC 1918 private \
(10/8, 172.16/12, 192.168/16), or RFC 6598 CGNAT/Tailscale \
(100.64.0.0/10). Remove --local-only to bind publicly."
);
}
}
Ok(())
}
fn parse_scope(s: &str) -> Result<crate::endpoints::EndpointScope> {
use crate::endpoints::EndpointScope;
match s.to_lowercase().as_str() {
"federation" | "fed" => Ok(EndpointScope::Federation),
"local" => Ok(EndpointScope::Local),
"lan" => Ok(EndpointScope::Lan),
"uds" => Ok(EndpointScope::Uds),
other => bail!("unknown --scope `{other}` (expected federation|local|lan|uds)"),
}
}
pub(crate) fn cmd_bind_relay(
url: &str,
scope: Option<&str>,
replace: bool,
migrate_pinned: bool,
as_json: bool,
) -> Result<()> {
use crate::endpoints::{Endpoint, self_endpoints};
if !config::is_initialized()? {
bail!("not initialized — run `wire up` first");
}
let card = config::read_agent_card()?;
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
let normalized_raw = url.trim_end_matches('/');
let normalized_owned = setup::strip_relay_url_userinfo(normalized_raw);
let normalized = normalized_owned.as_str();
setup::assert_relay_url_clean_for_publish(normalized)?;
let new_scope = match scope {
Some(s) => parse_scope(s)?,
None => crate::endpoints::infer_scope_from_url(normalized),
};
let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
let pinned: Vec<String> = existing
.get("peers")
.and_then(|p| p.as_object())
.map(|o| o.keys().cloned().collect())
.unwrap_or_default();
let existing_eps = self_endpoints(&existing);
let is_rebind_same = existing_eps.iter().any(|e| e.relay_url == normalized);
let destructive = replace || is_rebind_same;
if destructive && !pinned.is_empty() && !migrate_pinned {
let list = pinned.join(", ");
let why = if replace {
"`--replace` drops your other slot(s)"
} else {
"re-binding the same relay rotates its slot"
};
bail!(
"bind-relay would black-hole {n} pinned peer(s): {list}. {why}; they are \
pinned to your CURRENT slot and would keep pushing to a slot you no longer \
read.\n\n\
SAFE PATHS:\n\
• Default (omit `--replace`) ADDITIVELY binds a NEW relay, keeping existing \
slots — no black-hole.\n\
• `wire rotate-slot` — same-relay rotation that emits wire_close to peers.\n\
• `wire bind-relay {url} --migrate-pinned` — proceed anyway; re-pair each \
peer out-of-band.\n\n\
Issue #7 (silent black-hole on relay change) caught this.",
n = pinned.len(),
);
}
let client = crate::relay_client::RelayClient::new(normalized);
client.check_healthz()?;
let alloc = client.allocate_slot(Some(&handle))?;
if destructive && !pinned.is_empty() {
eprintln!(
"wire bind-relay: {mode} with {n} pinned peer(s) — they will black-hole \
until they re-pin: {peers}",
mode = if replace { "replacing" } else { "rotating" },
n = pinned.len(),
peers = pinned.join(", "),
);
}
let mut state = existing;
if replace {
state["self"] = Value::Null;
}
crate::endpoints::upsert_self_endpoint(
&mut state,
Endpoint {
relay_url: normalized.to_string(),
slot_id: alloc.slot_id.clone(),
slot_token: alloc.slot_token.clone(),
scope: new_scope,
},
);
config::write_relay_state(&state)?;
let eps = self_endpoints(&state);
let scope_str = format!("{new_scope:?}").to_lowercase();
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"relay_url": normalized,
"slot_id": alloc.slot_id,
"scope": scope_str,
"endpoints": eps.len(),
"additive": !replace,
"slot_token_present": true,
}))?
);
} else {
println!(
"bound {scope_str} slot on {normalized} (slot {})",
alloc.slot_id
);
println!(
"self now has {n} endpoint(s): {list}",
n = eps.len(),
list = eps
.iter()
.map(|e| format!("{}({:?})", e.relay_url, e.scope))
.collect::<Vec<_>>()
.join(", "),
);
}
Ok(())
}
pub(super) fn cmd_add_peer_slot(
handle: &str,
url: &str,
slot_id: &str,
slot_token: &str,
as_json: bool,
) -> Result<()> {
use crate::endpoints::{Endpoint, infer_scope_from_url, pin_peer_endpoints};
let mut state = config::read_relay_state()?;
let new_ep = Endpoint {
relay_url: url.to_string(),
slot_id: slot_id.to_string(),
slot_token: slot_token.to_string(),
scope: infer_scope_from_url(url),
};
let mut endpoints: Vec<Endpoint> = state
.get("peers")
.and_then(|p| p.get(handle))
.and_then(|e| e.get("endpoints"))
.and_then(|a| serde_json::from_value::<Vec<Endpoint>>(a.clone()).ok())
.unwrap_or_default();
if let Some(existing) = endpoints
.iter_mut()
.find(|e| e.relay_url == new_ep.relay_url)
{
*existing = new_ep;
} else {
endpoints.push(new_ep);
}
let n = endpoints.len();
pin_peer_endpoints(&mut state, handle, &endpoints)?;
config::write_relay_state(&state)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle,
"relay_url": url,
"slot_id": slot_id,
"added": true,
"endpoint_count": n,
}))?
);
} else {
println!(
"pinned peer slot for {handle} at {url} ({slot_id}) — peer now has {n} endpoint(s)"
);
}
Ok(())
}
pub(super) fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
let mut state = config::read_relay_state()?;
let peers = state["peers"].as_object().cloned().unwrap_or_default();
if peers.is_empty() {
bail!(
"no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
);
}
let outbox_dir = config::outbox_dir()?;
if outbox_dir.exists() {
let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
let path = entry.path();
if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
continue;
}
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if pinned.contains(&stem) {
continue;
}
let bare = crate::agent_card::bare_handle(&stem);
if pinned.contains(bare) {
eprintln!(
"wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
Merge with: `cat {} >> {}` then delete the FQDN file.",
stem,
path.display(),
outbox_dir.join(format!("{bare}.jsonl")).display(),
);
}
}
}
if !outbox_dir.exists() {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
);
} else {
println!("phyllis: nothing to dial out — write a message first with `wire send`");
}
return Ok(());
}
let mut pushed = Vec::new();
let mut skipped = Vec::new();
let mut rotated_this_push: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut state_dirty = false;
for (peer_handle, _) in peers.iter() {
if let Some(want) = peer_filter
&& peer_handle != want
{
continue;
}
let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
if !outbox.exists() {
continue;
}
let mut ordered_endpoints =
crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
if ordered_endpoints.is_empty() {
for line in std::fs::read_to_string(&outbox).unwrap_or_default().lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
skipped.push(json!({
"peer": peer_handle,
"event_id": event_id,
"reason": "no reachable endpoint pinned for peer",
}));
}
continue;
}
let body = std::fs::read_to_string(&outbox)?;
for line in body.lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let last_err: std::cell::RefCell<Option<String>> = std::cell::RefCell::new(None);
match crate::relay_client::try_post_event_with_failover(
&ordered_endpoints,
&event,
|endpoint, ev| {
let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
match client.post_event(&endpoint.slot_id, &endpoint.slot_token, ev) {
Ok(resp) => Ok(resp),
Err(e) => {
*last_err.borrow_mut() =
Some(crate::relay_client::format_transport_error(&e));
Err(e)
}
}
},
) {
Ok((endpoint, resp)) => {
if resp.status == "duplicate" {
skipped.push(json!({
"peer": peer_handle,
"event_id": event_id,
"reason": "duplicate",
"endpoint": endpoint.relay_url,
"scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
}));
} else {
pushed.push(json!({
"peer": peer_handle,
"event_id": event_id,
"endpoint": endpoint.relay_url,
"scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
}));
}
}
Err(_) => {
let last_err_text = last_err.borrow().clone().unwrap_or_default();
let mut delivered_via_retry: Option<(crate::endpoints::Endpoint, _)> = None;
match try_reresolve_peer_on_slot_4xx(
&mut state,
peer_handle,
&last_err_text,
&rotated_this_push,
) {
Ok(true) => {
rotated_this_push.insert(peer_handle.clone());
state_dirty = true;
ordered_endpoints = crate::endpoints::peer_endpoints_in_priority_order(
&state,
peer_handle,
);
*last_err.borrow_mut() = None;
if let Ok((endpoint, resp)) =
crate::relay_client::try_post_event_with_failover(
&ordered_endpoints,
&event,
|endpoint, ev| {
let client = crate::relay_client::RelayClient::new(
&endpoint.relay_url,
);
match client.post_event(
&endpoint.slot_id,
&endpoint.slot_token,
ev,
) {
Ok(resp) => Ok(resp),
Err(e) => {
*last_err.borrow_mut() = Some(
crate::relay_client::format_transport_error(&e),
);
Err(e)
}
}
},
)
{
delivered_via_retry = Some((endpoint, resp));
}
}
Ok(false) => {
}
Err(e) => {
*last_err.borrow_mut() = Some(format!(
"{}; re-resolve also failed: {e:#}",
last_err.borrow().clone().unwrap_or_default()
));
rotated_this_push.insert(peer_handle.clone());
}
}
if let Some((endpoint, resp)) = delivered_via_retry {
if resp.status == "duplicate" {
skipped.push(json!({
"peer": peer_handle,
"event_id": event_id,
"reason": "duplicate",
"endpoint": endpoint.relay_url,
"scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
"via": "slot_reresolve_retry",
}));
} else {
pushed.push(json!({
"peer": peer_handle,
"event_id": event_id,
"endpoint": endpoint.relay_url,
"scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
"via": "slot_reresolve_retry",
}));
}
} else {
skipped.push(json!({
"peer": peer_handle,
"event_id": event_id,
"reason": last_err
.borrow()
.clone()
.unwrap_or_else(|| "all endpoints failed".to_string()),
}));
}
}
}
}
}
if state_dirty && let Err(e) = config::write_relay_state(&state) {
eprintln!(
"wire push: WARN failed to persist rotated peer slots: {e:#}. \
Slot rotation will be re-attempted on next push."
);
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
);
} else {
println!(
"pushed {} event(s); skipped {} ({})",
pushed.len(),
skipped.len(),
if skipped.is_empty() {
"none"
} else {
"see --json for detail"
}
);
}
Ok(())
}
pub(super) fn cmd_pull(as_json: bool) -> Result<()> {
let state = config::read_relay_state()?;
let self_state = state.get("self").cloned().unwrap_or(Value::Null);
if self_state.is_null() {
bail!("self slot not bound — run `wire bind-relay <url>` first");
}
let endpoints = crate::endpoints::self_endpoints(&state);
if endpoints.is_empty() {
bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
}
let inbox_dir = config::inbox_dir()?;
config::ensure_dirs()?;
let mut total_seen = 0usize;
let mut all_written: Vec<Value> = Vec::new();
let mut all_rejected: Vec<Value> = Vec::new();
let mut all_blocked = false;
let mut all_advance_cursor_to: Option<String> = None;
for endpoint in &endpoints {
let cursor_key = endpoint_cursor_key(endpoint.scope);
let last_event_id = self_state
.get(&cursor_key)
.and_then(Value::as_str)
.map(str::to_string);
let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
let events = match client.list_events(
&endpoint.slot_id,
&endpoint.slot_token,
last_event_id.as_deref(),
Some(1000),
) {
Ok(ev) => ev,
Err(e) => {
eprintln!(
"wire pull: endpoint {} ({:?}) errored: {}; continuing",
endpoint.relay_url,
endpoint.scope,
crate::relay_client::format_transport_error(&e),
);
continue;
}
};
total_seen += events.len();
let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
all_written.extend(result.written.iter().cloned());
all_rejected.extend(result.rejected.iter().cloned());
if result.blocked {
all_blocked = true;
}
if let Some(eid) = result.advance_cursor_to.clone() {
if endpoint.scope == crate::endpoints::EndpointScope::Federation {
all_advance_cursor_to = Some(eid.clone());
}
let key = cursor_key.clone();
config::update_relay_state(|state| {
if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
self_obj.insert(key, Value::String(eid));
}
Ok(())
})?;
}
}
let result = crate::pull::PullResult {
written: all_written,
rejected: all_rejected,
blocked: all_blocked,
advance_cursor_to: all_advance_cursor_to,
};
let events_len = total_seen;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"written": result.written,
"rejected": result.rejected,
"total_seen": events_len,
"cursor_blocked": result.blocked,
"cursor_advanced_to": result.advance_cursor_to,
}))?
);
} else {
let blocking = result
.rejected
.iter()
.filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
.count();
if blocking > 0 {
println!(
"pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
events_len,
result.written.len(),
result.rejected.len(),
blocking,
);
} else {
println!(
"pulled {} event(s); wrote {}; rejected {}",
events_len,
result.written.len(),
result.rejected.len(),
);
}
}
Ok(())
}
fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
match scope {
crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
crate::endpoints::EndpointScope::Lan => "last_pulled_event_id_lan".to_string(),
crate::endpoints::EndpointScope::Uds => "last_pulled_event_id_uds".to_string(),
}
}
pub(super) fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire up` first");
}
let mut state = config::read_relay_state()?;
let self_state = state.get("self").cloned().unwrap_or(Value::Null);
if self_state.is_null() {
bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
}
let primary = crate::endpoints::self_primary_endpoint(&state)
.ok_or_else(|| anyhow!("self has no resolvable inbound endpoint to rotate"))?;
let url = primary.relay_url.clone();
let old_slot_id = primary.slot_id.clone();
let old_slot_token = primary.slot_token.clone();
let card = config::read_agent_card()?;
let did = card
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let handle = crate::agent_card::display_handle_from_did(&did).to_string();
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
.to_string();
let pk_bytes = crate::signing::b64decode(&pk_b64)?;
let sk_seed = config::read_private_key()?;
let normalized = url.trim_end_matches('/').to_string();
let client = crate::relay_client::RelayClient::new(&normalized);
client
.check_healthz()
.context("aborting rotation; old slot still valid")?;
let alloc = client.allocate_slot(Some(&handle))?;
let new_slot_id = alloc.slot_id.clone();
let new_slot_token = alloc.slot_token.clone();
let mut announced: Vec<String> = Vec::new();
if !no_announce {
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let body = json!({
"reason": "operator-initiated slot rotation",
"new_relay_url": url,
"new_slot_id": new_slot_id,
});
let peers = state["peers"].as_object().cloned().unwrap_or_default();
for (peer_handle, _peer_info) in peers.iter() {
let event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now.clone(),
"from": did,
"to": format!("did:wire:{peer_handle}"),
"type": "wire_close",
"kind": 1201,
"body": body.clone(),
});
let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
Ok(s) => s,
Err(e) => {
eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
continue;
}
};
let peer_info = match state["peers"].get(peer_handle) {
Some(p) => p.clone(),
None => continue,
};
let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
continue;
}
let peer_client = if peer_url == url {
client.clone()
} else {
crate::relay_client::RelayClient::new(peer_url)
};
match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
Ok(_) => announced.push(peer_handle.clone()),
Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
}
}
}
state["self"] = json!({
"relay_url": url,
"slot_id": new_slot_id,
"slot_token": new_slot_token,
});
config::write_relay_state(&state)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"rotated": true,
"old_slot_id": old_slot_id,
"new_slot_id": new_slot_id,
"relay_url": url,
"announced_to": announced,
}))?
);
} else {
println!("rotated slot on {url}");
println!(
" old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
);
println!(" new slot_id: {new_slot_id}");
if !announced.is_empty() {
println!(
" announced wire_close (kind=1201) to: {}",
announced.join(", ")
);
}
println!();
println!("next steps:");
println!(" - peers see the wire_close event in their next `wire pull`");
println!(
" - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
);
println!(" (or full re-pair via `wire dial <handle>@<relay>`)");
println!(" - until they do, you'll receive but they won't be able to reach you");
let _ = old_slot_token;
}
Ok(())
}
pub(super) fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
let mut trust = config::read_trust()?;
let mut removed_from_trust = false;
if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
&& agents.remove(handle).is_some()
{
removed_from_trust = true;
}
config::write_trust(&trust)?;
let mut state = config::read_relay_state()?;
let mut removed_from_relay = false;
if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
&& peers.remove(handle).is_some()
{
removed_from_relay = true;
}
config::write_relay_state(&state)?;
let mut purged: Vec<String> = Vec::new();
if purge {
for dir in [config::inbox_dir()?, config::outbox_dir()?] {
let path = dir.join(format!("{handle}.jsonl"));
if path.exists() {
std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
purged.push(path.to_string_lossy().into());
}
}
}
if !removed_from_trust && !removed_from_relay {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"removed": false,
"reason": format!("peer {handle:?} not pinned"),
}))?
);
} else {
eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
}
return Ok(());
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"handle": handle,
"removed_from_trust": removed_from_trust,
"removed_from_relay_state": removed_from_relay,
"purged_files": purged,
}))?
);
} else {
println!("forgot peer {handle:?}");
if removed_from_trust {
println!(" - removed from trust.json");
}
if removed_from_relay {
println!(" - removed from relay.json");
}
if !purged.is_empty() {
for p in &purged {
println!(" - deleted {p}");
}
} else if !purge {
println!(" (inbox/outbox files preserved; pass --purge to delete them)");
}
}
Ok(())
}
pub(super) fn cmd_daemon(
interval_secs: u64,
once: bool,
all_sessions: bool,
session: Option<String>,
as_json: bool,
) -> Result<()> {
if all_sessions {
if once {
bail!("--all-sessions and --once are mutually exclusive (supervisor runs forever)");
}
if session.is_some() {
bail!(
"--all-sessions and --session are mutually exclusive (supervisor manages every session, not a single named one)"
);
}
return crate::daemon_supervisor::run_supervisor(interval_secs, as_json);
}
if let Some(ref name) = session {
let home = crate::session::find_session_home_by_name(name)
.with_context(|| format!("resolving session home for --session {name}"))?
.ok_or_else(|| {
anyhow!(
"session '{name}' not found — run `wire session list` to see initialized sessions"
)
})?;
unsafe {
std::env::set_var("WIRE_HOME", &home);
}
if !as_json {
eprintln!(
"wire daemon: pinned to session '{name}' (WIRE_HOME={})",
home.display()
);
}
}
if !config::is_initialized()? {
bail!("not initialized — run `wire up` first");
}
let _pid_guard = if !once && std::env::var("WIRE_DAEMON_NO_SINGLETON").is_err() {
if let Some(holder_pid) = crate::ensure_up::daemon_singleton_holder() {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"status": "skipped",
"reason": "daemon already running",
"holder_pid": holder_pid,
}))?
);
} else {
eprintln!(
"wire daemon: another daemon is already running (pid {holder_pid}); not starting a second polling loop. Set WIRE_DAEMON_NO_SINGLETON=1 to override."
);
}
return Ok(());
}
Some(crate::ensure_up::claim_daemon_singleton()?)
} else {
None
};
if !once {
crate::session::warn_on_identity_collision(std::process::id(), "daemon");
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
if !as_json {
if once {
eprintln!("wire daemon: single sync cycle, then exit");
} else {
eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
}
}
if let Err(e) = crate::ensure_up::write_self_daemon_pid() {
eprintln!("daemon: pidfile write error: {e:#}");
}
let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
if !once {
crate::daemon_stream::spawn_stream_subscriber(wake_tx);
}
let mut notify_state: Option<(crate::inbox_watch::InboxWatcher, std::path::PathBuf)> = if once {
None
} else {
let cursor_path = config::state_dir()?.join("notify.cursor");
match crate::inbox_watch::InboxWatcher::from_cursor_file(&cursor_path) {
Ok(w) => Some((w, cursor_path)),
Err(e) => {
eprintln!("daemon: notify watcher init failed, toasts disabled: {e:#}");
None
}
}
};
loop {
let pushed = run_sync_push().unwrap_or_else(|e| {
eprintln!("daemon: push error: {e:#}");
json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
});
let pulled = run_sync_pull().unwrap_or_else(|e| {
eprintln!("daemon: pull error: {e:#}");
json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
});
if let Some((ref mut watcher, ref cursor_path)) = notify_state {
match super::comms::notify_sweep_new_events(watcher, cursor_path) {
Ok(events) => super::comms::toast_inbox_events(&events),
Err(e) => eprintln!("daemon: notify sweep error: {e:#}"),
}
}
let cycle_push_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
let cycle_pull_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
let cycle_rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
crate::ensure_up::write_last_sync_record(cycle_push_n, cycle_pull_n, cycle_rejected_n);
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"ts": time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default(),
"push": pushed,
"pull": pulled,
}))?
);
} else if cycle_push_n > 0 || cycle_pull_n > 0 || cycle_rejected_n > 0 {
eprintln!(
"daemon: pushed={cycle_push_n} pulled={cycle_pull_n} rejected={cycle_rejected_n}"
);
}
if once {
return Ok(());
}
match wake_rx.recv_timeout(interval) {
Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
std::thread::sleep(interval);
}
}
while wake_rx.try_recv().is_ok() {}
}
}
pub fn run_sync_push() -> Result<Value> {
let state = config::read_relay_state()?;
let peers = state["peers"].as_object().cloned().unwrap_or_default();
if peers.is_empty() {
return Ok(json!({"pushed": [], "skipped": []}));
}
let outbox_dir = config::outbox_dir()?;
if !outbox_dir.exists() {
return Ok(json!({"pushed": [], "skipped": []}));
}
let mut pushed = Vec::new();
let mut skipped = Vec::new();
for (peer_handle, slot_info) in peers.iter() {
let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
if !outbox.exists() {
continue;
}
let url = slot_info["relay_url"].as_str().unwrap_or("");
let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
continue;
}
let client = crate::relay_client::RelayClient::new(url);
let body = std::fs::read_to_string(&outbox)?;
for line in body.lines() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
match client.post_event(slot_id, slot_token, &event) {
Ok(resp) => {
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default();
if let Err(e) = config::append_pushed_log(peer_handle, &event_id, &now) {
eprintln!(
"daemon: pushed-log append for {peer_handle}/{event_id} failed (non-fatal): {e:#}"
);
}
if resp.status == "duplicate" {
skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
} else {
pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
}
}
Err(e) => {
let reason = crate::relay_client::format_transport_error(&e);
skipped
.push(json!({"peer": peer_handle, "event_id": event_id, "reason": reason}));
}
}
}
}
Ok(json!({"pushed": pushed, "skipped": skipped}))
}
pub fn run_sync_pull() -> Result<Value> {
let state = config::read_relay_state()?;
if state.get("self").map(Value::is_null).unwrap_or(true) {
return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
}
let endpoints = crate::endpoints::self_endpoints(&state);
if endpoints.is_empty() {
return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
}
let inbox_dir = config::inbox_dir()?;
config::ensure_dirs()?;
let self_obj = state.get("self").cloned().unwrap_or(Value::Null);
let legacy_cursor = self_obj
.get("last_pulled_event_id")
.and_then(Value::as_str)
.map(str::to_string);
let primary_slot = crate::endpoints::self_primary_endpoint(&state).map(|e| e.slot_id);
let mut cursors: serde_json::Map<String, Value> = self_obj
.get("cursors")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
let mut all_written: Vec<Value> = Vec::new();
let mut all_rejected: Vec<Value> = Vec::new();
let mut total_seen = 0usize;
let mut blocked_any = false;
for ep in &endpoints {
if ep.relay_url.is_empty() {
continue;
}
let cursor = cursors
.get(&ep.slot_id)
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
if Some(&ep.slot_id) == primary_slot.as_ref() {
legacy_cursor.clone()
} else {
None
}
});
let client = crate::relay_client::RelayClient::new(&ep.relay_url);
let events =
match client.list_events(&ep.slot_id, &ep.slot_token, cursor.as_deref(), Some(1000)) {
Ok(e) => e,
Err(e) => {
eprintln!(
"daemon: pull error on {} slot {} (continuing): {e:#}",
ep.relay_url, ep.slot_id
);
continue;
}
};
total_seen += events.len();
let result = crate::pull::process_events(&events, cursor, &inbox_dir)?;
if let Some(eid) = &result.advance_cursor_to {
cursors.insert(ep.slot_id.clone(), Value::String(eid.clone()));
}
blocked_any |= result.blocked;
all_written.extend(result.written);
all_rejected.extend(result.rejected);
}
let primary_cursor = primary_slot
.as_ref()
.and_then(|s| cursors.get(s))
.and_then(Value::as_str)
.map(str::to_string);
let mut latest_inbound: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for w in &all_written {
let from = match w.get("from").and_then(Value::as_str) {
Some(s) => s.to_string(),
None => continue,
};
let ts = match w.get("timestamp").and_then(Value::as_str) {
Some(s) if !s.is_empty() => s.to_string(),
_ => continue,
};
latest_inbound
.entry(from)
.and_modify(|existing| {
if ts > *existing {
*existing = ts.clone();
}
})
.or_insert(ts);
}
config::update_relay_state(|state| {
if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
self_obj.insert("cursors".into(), Value::Object(cursors.clone()));
if let Some(pc) = &primary_cursor {
self_obj.insert("last_pulled_event_id".into(), Value::String(pc.clone()));
}
}
if !latest_inbound.is_empty()
&& let Some(peers_obj) = state.get_mut("peers").and_then(Value::as_object_mut)
{
for (handle, ts) in &latest_inbound {
let entry = peers_obj.entry(handle.clone()).or_insert_with(|| json!({}));
if let Some(obj) = entry.as_object_mut() {
obj.insert("last_inbound_event_at".into(), Value::String(ts.clone()));
}
}
}
Ok(())
})?;
Ok(json!({
"written": all_written,
"rejected": all_rejected,
"total_seen": total_seen,
"cursor_blocked": blocked_any,
"endpoints_pulled": endpoints.len(),
}))
}
pub fn error_smells_like_slot_4xx(last_err: &str) -> bool {
fn is_token_boundary(b: u8) -> bool {
matches!(b, b' ' | b':' | b'\t' | b'\n' | b'\r')
}
let bytes = last_err.as_bytes();
for code in ["410", "404"] {
let code_bytes = code.as_bytes();
let mut search_from = 0usize;
while let Some(rel) = last_err[search_from..].find(code) {
let abs = search_from + rel;
let end = abs + code_bytes.len();
let before_ok = abs == 0 || is_token_boundary(bytes[abs - 1]);
let after_ok = end == bytes.len() || is_token_boundary(bytes[end]);
if before_ok && after_ok {
return true;
}
search_from = abs + 1;
}
}
false
}
fn try_reresolve_peer_on_slot_4xx(
state: &mut Value,
peer_handle: &str,
last_err: &str,
already_tried: &std::collections::HashSet<String>,
) -> Result<bool> {
if !error_smells_like_slot_4xx(last_err) {
return Ok(false);
}
if already_tried.contains(peer_handle) {
return Ok(false);
}
let peer_entry = state
.get("peers")
.and_then(|p| p.get(peer_handle))
.ok_or_else(|| anyhow!("peer `{peer_handle}` not in relay_state"))?;
let peer_relay = peer_entry
.get("endpoints")
.and_then(Value::as_array)
.and_then(|arr| {
arr.iter().find(|e| {
e.get("scope").and_then(Value::as_str) == Some("federation")
|| e.get("scope").and_then(Value::as_str) == Some("Federation")
})
})
.and_then(|e| e.get("relay_url").and_then(Value::as_str))
.or_else(|| peer_entry.get("relay_url").and_then(Value::as_str))
.ok_or_else(|| {
anyhow!("peer `{peer_handle}` has no federation endpoint to re-resolve against")
})?
.to_string();
let domain = peer_relay
.trim_start_matches("https://")
.trim_start_matches("http://")
.split('/')
.next()
.unwrap_or(&peer_relay)
.to_string();
let handle = crate::pair_profile::Handle {
nick: peer_handle.to_string(),
domain,
};
let resolved = crate::pair_profile::resolve_handle(&handle, Some(&peer_relay))?;
let new_slot_id = resolved
.get("slot_id")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("re-resolved payload missing slot_id"))?
.to_string();
let peers = state
.get_mut("peers")
.and_then(Value::as_object_mut)
.ok_or_else(|| anyhow!("relay_state.peers missing or wrong shape"))?;
let peer_entry = peers
.get_mut(peer_handle)
.ok_or_else(|| anyhow!("peer `{peer_handle}` disappeared from state mid-resolve"))?;
let current_slot_id = peer_entry
.get("endpoints")
.and_then(Value::as_array)
.and_then(|arr| {
arr.iter().find(|e| {
let scope = e.get("scope").and_then(Value::as_str);
scope == Some("federation") || scope == Some("Federation")
})
})
.and_then(|e| e.get("slot_id").and_then(Value::as_str))
.unwrap_or("")
.to_string();
if current_slot_id == new_slot_id {
return Ok(false);
}
if let Some(endpoints) = peer_entry
.get_mut("endpoints")
.and_then(Value::as_array_mut)
{
for ep in endpoints.iter_mut() {
let scope = ep.get("scope").and_then(Value::as_str);
if scope == Some("federation") || scope == Some("Federation") {
ep["slot_id"] = Value::String(new_slot_id.clone());
ep["slot_token"] = Value::String(String::new());
}
}
}
peer_entry["slot_id"] = Value::String(new_slot_id.clone());
peer_entry["slot_token"] = Value::String(String::new());
eprintln!(
"wire push: peer `{peer_handle}` rotated their relay slot (was `{current_slot_id}`, \
now `{new_slot_id}`); pin updated in place. Re-pair via `wire add \
{peer_handle}@<relay>` to refresh the slot_token."
);
Ok(true)
}
#[cfg(test)]
mod slot_reresolve_tests {
use super::*;
#[test]
fn try_reresolve_skips_when_error_is_not_4xx_shape() {
let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
let already = std::collections::HashSet::new();
let res =
try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "post failed: 502", &already)
.unwrap();
assert!(!res, "502 must NOT trigger a re-resolve");
let res =
try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "connection refused", &already)
.unwrap();
assert!(!res, "transport errors must NOT trigger a re-resolve");
let res = try_reresolve_peer_on_slot_4xx(
&mut state,
"some-peer",
"post failed: 401 Unauthorized",
&already,
)
.unwrap();
assert!(
!res,
"401 (auth) is a token problem, not a slot rotation — must NOT trigger a re-resolve"
);
}
#[test]
fn try_reresolve_rate_limits_one_attempt_per_peer_per_push() {
let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
let mut already = std::collections::HashSet::new();
already.insert("some-peer".to_string());
let res = try_reresolve_peer_on_slot_4xx(
&mut state,
"some-peer",
"post failed: 410 Gone",
&already,
)
.unwrap();
assert!(
!res,
"peer already in `already_tried` must NOT trigger another re-resolve in the same push"
);
}
#[test]
fn try_reresolve_errors_when_peer_missing_from_state() {
let mut state = json!({"peers": {}});
let already = std::collections::HashSet::new();
let err = try_reresolve_peer_on_slot_4xx(
&mut state,
"missing-peer",
"post failed: 410 Gone",
&already,
)
.unwrap_err()
.to_string();
assert!(
err.contains("missing-peer") && err.contains("not in relay_state"),
"missing-peer error must name the peer + the failure: {err}"
);
}
#[test]
fn try_reresolve_errors_when_peer_has_no_federation_endpoint() {
let mut state = json!({
"peers": {
"local-only": {
"endpoints": [
{
"scope": "Local",
"relay_url": "http://127.0.0.1:8771",
"slot_id": "loc",
"slot_token": "tok"
}
]
}
}
});
let already = std::collections::HashSet::new();
let err = try_reresolve_peer_on_slot_4xx(
&mut state,
"local-only",
"post failed: 410 Gone",
&already,
)
.unwrap_err()
.to_string();
assert!(
err.contains("federation endpoint"),
"no-federation error must name the problem: {err}"
);
}
#[test]
fn error_smells_like_slot_4xx_matches_reqwest_status_display_shape() {
assert!(error_smells_like_slot_4xx(
"post_event failed: 410 Gone: slot rotated by peer"
));
assert!(error_smells_like_slot_4xx(
"post_event failed: 404 Not Found: handle no longer claimed"
));
}
#[test]
fn error_smells_like_slot_4xx_matches_uds_bare_u16_shape() {
assert!(error_smells_like_slot_4xx(
"post_event (uds /tmp/wire-relay.sock) failed: 410: gone"
));
assert!(error_smells_like_slot_4xx(
"post_event (uds /tmp/wire-relay.sock) failed: 404: not found"
));
}
#[test]
fn error_smells_like_slot_4xx_rejects_substring_lookalikes() {
let false_positives = [
"push aborted: slot 4101 expired",
"post_event failed: 502 Bad Gateway: request_id=410abc-deadbeef",
"post_event failed: 500: received 4040 bytes, expected envelope",
"post_event failed: 500: event 0x4104 malformed",
"post_event failed: 503: backlog=4102 entries pending",
"post_event failed: 500: tx_id=4044beef",
"post_event failed: 500: hash=abc410def",
];
for case in false_positives {
assert!(
!error_smells_like_slot_4xx(case),
"must NOT trigger re-resolve on substring lookalike: {case:?}"
);
}
}
#[test]
fn error_smells_like_slot_4xx_handles_edge_positions() {
assert!(error_smells_like_slot_4xx("410 Gone"));
assert!(error_smells_like_slot_4xx("404 Not Found"));
assert!(error_smells_like_slot_4xx("got 410"));
assert!(error_smells_like_slot_4xx("got 404"));
assert!(error_smells_like_slot_4xx("post_event failed:\t410\tGone"));
assert!(error_smells_like_slot_4xx("post_event failed:\n410\nGone"));
assert!(error_smells_like_slot_4xx("410"));
assert!(error_smells_like_slot_4xx("404"));
assert!(!error_smells_like_slot_4xx(""));
assert!(!error_smells_like_slot_4xx("no relevant status"));
assert!(!error_smells_like_slot_4xx(
"post_event failed: 401 Unauthorized"
));
assert!(!error_smells_like_slot_4xx(
"post_event failed: 403 Forbidden"
));
assert!(!error_smells_like_slot_4xx(
"post_event failed: 411 Length Required"
));
}
}