use anyhow::{Context, Result, anyhow, bail};
use serde_json::{Value, json};
use crate::{config, signing::sign_message_v31};
pub(super) fn cmd_peers(as_json: bool) -> Result<()> {
let trust = config::read_trust()?;
let agents = trust
.get("agents")
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
let mut self_did: Option<String> = None;
if let Ok(card) = config::read_agent_card() {
self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
}
let mut peers = Vec::new();
for (handle, agent) in agents.iter() {
let did = agent
.get("did")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if Some(did.as_str()) == self_did.as_deref() {
continue; }
let tier = super::effective_peer_tier(&trust, &relay_state, handle);
let capabilities = agent
.get("card")
.and_then(|c| c.get("capabilities"))
.cloned()
.unwrap_or_else(|| json!([]));
let character = if did.is_empty() {
None
} else {
let card_obj = agent.get("card");
Some(match card_obj {
Some(card) => crate::character::Character::from_card(card),
None => crate::character::Character::from_did(&did),
})
};
let peer_op_claims = agent
.get("card")
.map(super::op_claims_from_card)
.unwrap_or_default();
let mut row = serde_json::Map::new();
row.insert("handle".into(), json!(handle));
row.insert("did".into(), json!(did));
row.insert("tier".into(), json!(tier));
row.insert("capabilities".into(), capabilities);
row.insert("persona".into(), serde_json::to_value(&character)?);
for (k, v) in peer_op_claims {
row.insert(k, v);
}
peers.push(Value::Object(row));
}
if as_json {
println!("{}", serde_json::to_string(&peers)?);
} else if peers.is_empty() {
println!("no peers pinned (run `wire join <code>` to pair)");
} else {
for p in &peers {
let char_json = &p["persona"];
let (colored_char, plain_len): (String, usize) = match char_json {
serde_json::Value::Null => ("?".to_string(), 1),
v => match serde_json::from_value::<crate::character::Character>(v.clone()) {
Ok(c) => {
let plain = c.short().chars().count() + 1; (c.colored(), plain)
}
Err(_) => ("?".to_string(), 1),
},
};
let pad = 22usize.saturating_sub(plain_len);
println!(
"{}{} {:<20} {:<10} {}",
colored_char,
" ".repeat(pad),
p["handle"].as_str().unwrap_or(""),
p["tier"].as_str().unwrap_or(""),
p["did"].as_str().unwrap_or(""),
);
}
}
Ok(())
}
fn maybe_warn_peer_attentiveness(peer: &str) {
let state = match config::read_relay_state() {
Ok(s) => s,
Err(_) => return,
};
let ep = match crate::endpoints::peer_primary_endpoint(&state, peer) {
Some(ep)
if !ep.slot_id.is_empty() && !ep.slot_token.is_empty() && !ep.relay_url.is_empty() =>
{
ep
}
_ => return,
};
let (slot_id, slot_token, relay_url) = (&ep.slot_id, &ep.slot_token, &ep.relay_url);
let client = crate::relay_client::RelayClient::new(relay_url);
let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
Ok(t) => t,
Err(_) => return,
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
match last_pull {
None => {
eprintln!(
"phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
);
}
Some(t) if now.saturating_sub(t) > 300 => {
let mins = now.saturating_sub(t) / 60;
eprintln!(
"phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
);
}
_ => {}
}
}
pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
let trimmed = input.trim();
if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
{
return Ok(trimmed.to_string());
}
let unit_start = trimmed.char_indices().next_back().map_or(0, |(i, _)| i);
let (amount, unit) = trimmed.split_at(unit_start);
let n: i64 = amount
.parse()
.with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
if n <= 0 {
bail!("deadline duration must be positive: {input:?}");
}
let duration = match unit {
"m" => time::Duration::minutes(n),
"h" => time::Duration::hours(n),
"d" => time::Duration::days(n),
_ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
};
Ok((time::OffsetDateTime::now_utc() + duration)
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
}
#[cfg(test)]
mod deadline_tests {
use super::*;
#[test]
fn duration_shorthand_parses() {
assert!(parse_deadline_until("30m").is_ok());
assert!(parse_deadline_until("2h").is_ok());
assert!(parse_deadline_until("1d").is_ok());
}
#[test]
fn rfc3339_passes_through() {
assert_eq!(
parse_deadline_until("2030-01-02T03:04:05Z").unwrap(),
"2030-01-02T03:04:05Z"
);
}
#[test]
fn garbage_is_an_error_not_a_panic() {
assert!(parse_deadline_until("soon").is_err());
assert!(parse_deadline_until("").is_err());
assert!(parse_deadline_until("-5m").is_err());
assert!(parse_deadline_until("0h").is_err());
}
#[test]
fn multibyte_final_char_is_an_error_not_a_panic() {
assert!(parse_deadline_until("30分").is_err());
assert!(parse_deadline_until("5µ").is_err());
assert!(parse_deadline_until("日").is_err());
}
}
pub(super) fn cmd_send(
peer: &str,
kind: &str,
body_arg: &str,
deadline: Option<&str>,
no_auto_pair: bool,
queue: bool,
as_json: bool,
) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire up` first");
}
let peer_in = crate::agent_card::bare_handle(peer).to_string();
let peer = match super::resolve_peer_handle(&peer_in) {
Ok(Some(resolved)) if resolved != peer_in => {
eprintln!("wire send: resolved nickname `{peer_in}` → peer `{resolved}`");
resolved
}
Ok(Some(canonical)) => canonical, Ok(None) => peer_in, Err(super::ResolveError::Ambiguous(candidates)) => bail!(
"nickname `{peer_in}` is ambiguous — matches {} pinned peers: {}. \
Disambiguate by passing the peer handle (one of those listed) instead of the nickname.",
candidates.len(),
candidates.join(", ")
),
Err(super::ResolveError::NotFound) => peer_in, };
let peer_is_pinned = config::read_relay_state()
.ok()
.and_then(|s| s.get("peers").and_then(Value::as_object).cloned())
.map(|peers| peers.contains_key(&peer))
.unwrap_or(false);
if !peer_is_pinned && let Some(sister_name) = crate::session::resolve_local_sister(&peer) {
if no_auto_pair {
bail!(
"wire send: `{peer}` resolves to local sister `{sister_name}` but is not pinned, \
and --no-auto-pair was passed. Run `wire dial {peer}` first, \
then re-run send."
);
}
eprintln!(
"wire send: `{peer}` not pinned yet — auto-pairing via local-sister `{sister_name}` first. \
Pass --no-auto-pair to refuse implicit dialing."
);
super::cmd_add_local_sister(&sister_name, true).map_err(|e| {
anyhow!("wire send: auto-pair to local sister `{sister_name}` failed: {e:#}")
})?;
}
let peer = peer.as_str();
let sk_seed = config::read_private_key()?;
let card = config::read_agent_card()?;
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
let body_value: Value = if body_arg == "-" {
use std::io::Read;
let mut raw = String::new();
std::io::stdin()
.read_to_string(&mut raw)
.with_context(|| "reading body from stdin")?;
serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
} else if let Some(path) = body_arg.strip_prefix('@') {
let raw =
std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
serde_json::from_str(&raw).unwrap_or(Value::String(raw))
} else {
Value::String(body_arg.to_string())
};
let kind_id = super::parse_kind(kind)?;
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let trust_for_did = config::read_trust().unwrap_or_else(|_| json!({"agents": {}}));
let to_did = crate::trust::resolve_peer_did(&trust_for_did, peer);
let mut event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now,
"from": did,
"to": to_did,
"type": kind,
"kind": kind_id,
"body": body_value,
});
if let Some(deadline) = deadline {
event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
}
if let Some(peer_dh) = crate::enc::wire_x25519::peer_dh_pubkey(&trust_for_did, peer) {
crate::enc::wire_x25519::seal_event_body(&mut event, &peer_dh, &sk_seed)?;
}
let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
maybe_warn_peer_attentiveness(peer);
if !queue {
let outcome = crate::send::attempt_deliver(peer, &signed)?;
if as_json {
println!(
"{}",
serde_json::to_string(&crate::send::delivery_json(&outcome, peer))?
);
} else {
use crate::send::SyncDelivery;
match &outcome {
SyncDelivery::Delivered {
event_id,
relay_url,
slot_id,
} => println!("delivered {event_id} → {peer} (relay {relay_url} slot {slot_id})"),
SyncDelivery::Duplicate {
event_id,
relay_url,
slot_id,
} => println!(
"duplicate {event_id} → {peer} (already on relay {relay_url} slot {slot_id} — change the body to send a distinct event)"
),
SyncDelivery::PeerUnknown { event_id } => println!(
"FAILED {event_id} → {peer}: peer not pinned. Run `wire dial {peer}` to pair, or `wire send --queue {peer} ...` to write to outbox for the daemon to retry later."
),
SyncDelivery::SlotStale {
event_id, detail, ..
} => println!(
"FAILED {event_id} → {peer}: relay says slot is stale ({detail}). Run `wire dial {peer}` to re-pair."
),
SyncDelivery::TransportError {
event_id, detail, ..
} => println!(
"FAILED {event_id} → {peer}: transport error ({detail}). Retry, or pass --queue to outbox the event for daemon retry."
),
}
}
if !outcome.reached_relay() {
std::process::exit(2);
}
return Ok(());
}
let peer_pinned_in_trust = trust_for_did
.get("agents")
.and_then(Value::as_object)
.map(|a| a.contains_key(peer))
.unwrap_or(false);
if !peer_pinned_in_trust && !peer_is_pinned {
let pending_inbound = crate::pending_inbound_pair::list_pending_inbound()
.ok()
.map(|v| v.iter().any(|p| p.peer_handle == peer))
.unwrap_or(false);
if !pending_inbound {
eprintln!(
"wire send: WARN — `{peer}` is not pinned and has no pending pair. \
The event will sit in outbox forever unless you pair first \
(`wire dial {peer}` or accept an inbound invite)."
);
}
}
let line = serde_json::to_vec(&signed)?;
let outbox = config::append_outbox_record(peer, &line)?;
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"event_id": event_id,
"status": "queued",
"peer": peer,
"outbox": outbox.to_string_lossy(),
}))?
);
} else {
println!(
"queued event {event_id} → {peer} (outbox: {}; daemon will push)",
outbox.display()
);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) fn cmd_send_project(
project: &str,
kind: &str,
body_arg: &str,
deadline: Option<&str>,
as_json: bool,
) -> Result<()> {
if !config::is_initialized()? {
bail!("not initialized — run `wire up` first");
}
let sk_seed = config::read_private_key()?;
let card = config::read_agent_card()?;
let did = card.get("did").and_then(Value::as_str).unwrap_or("");
let handle = crate::agent_card::display_handle_from_did(did).to_string();
let pk_b64 = card
.get("verify_keys")
.and_then(Value::as_object)
.and_then(|m| m.values().next())
.and_then(|v| v.get("key"))
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
let pk_bytes = crate::signing::b64decode(pk_b64)?;
let trust = config::read_trust().unwrap_or_else(|_| json!({"agents": {}}));
let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
let recipients = crate::trust::project_recipients(&trust, &relay_state, &handle, project);
if recipients.is_empty() {
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"project": project,
"recipients": [],
"delivered": 0,
"note": "no peers at ORG_VERIFIED+ tagged with this project",
}))?
);
} else {
println!(
"no fan-out recipients: no pinned peer is at ORG_VERIFIED+ AND tagged \
project={project}. Check `wire peers` (tier) and that org-mates publish \
the same project tag on their card."
);
}
return Ok(());
}
let body_value: Value = if body_arg == "-" {
use std::io::Read;
let mut raw = String::new();
std::io::stdin()
.read_to_string(&mut raw)
.with_context(|| "reading body from stdin")?;
serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
} else if let Some(path) = body_arg.strip_prefix('@') {
let raw =
std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
serde_json::from_str(&raw).unwrap_or(Value::String(raw))
} else {
Value::String(body_arg.to_string())
};
let kind_id = super::parse_kind(kind)?;
let deadline_until = match deadline {
Some(d) => Some(parse_deadline_until(d)?),
None => None,
};
let mut results: Vec<Value> = Vec::new();
let mut delivered = 0usize;
for peer in &recipients {
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
let to_did = crate::trust::resolve_peer_did(&trust, peer);
let mut event = json!({
"schema_version": crate::signing::EVENT_SCHEMA_VERSION,
"timestamp": now,
"from": did,
"to": to_did,
"type": kind,
"kind": kind_id,
"body": body_value.clone(),
});
if let Some(until) = &deadline_until {
event["time_sensitive_until"] = json!(until);
}
if let Some(peer_dh) = crate::enc::wire_x25519::peer_dh_pubkey(&trust, peer) {
crate::enc::wire_x25519::seal_event_body(&mut event, &peer_dh, &sk_seed)?;
}
let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
let outcome = crate::send::attempt_deliver(peer, &signed)?;
if outcome.reached_relay() {
delivered += 1;
}
if !as_json {
use crate::send::SyncDelivery;
match &outcome {
SyncDelivery::Delivered { event_id, .. } => {
println!(" delivered {event_id} → {peer}")
}
SyncDelivery::Duplicate { event_id, .. } => {
println!(" duplicate {event_id} → {peer} (change body for a distinct event)")
}
other => println!(
" FAILED → {peer}: {}",
crate::send::delivery_json(other, peer)
.get("status")
.and_then(Value::as_str)
.unwrap_or("error")
),
}
}
results.push(crate::send::delivery_json(&outcome, peer));
}
if as_json {
println!(
"{}",
serde_json::to_string(&json!({
"project": project,
"recipients": recipients,
"delivered": delivered,
"results": results,
}))?
);
} else {
println!(
"fan-out project={project}: {delivered}/{} reached the relay.",
recipients.len()
);
}
if delivered < recipients.len() {
std::process::exit(2);
}
Ok(())
}
pub(crate) fn here_summary() -> Result<Value> {
let initialized = config::is_initialized().unwrap_or(false);
let (self_did, self_handle, self_character) = if initialized {
let card = config::read_agent_card().ok();
let did = card
.as_ref()
.and_then(|c| c.get("did").and_then(Value::as_str))
.unwrap_or("")
.to_string();
let handle = if did.is_empty() {
String::new()
} else {
crate::agent_card::display_handle_from_did(&did).to_string()
};
let character = if did.is_empty() {
None
} else {
Some(crate::character::Character::from_did(&did))
};
(did, handle, character)
} else {
(String::new(), String::new(), None)
};
let cwd = std::env::current_dir()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default();
let wire_home = std::env::var("WIRE_HOME").unwrap_or_default();
let mut sisters: Vec<Value> = Vec::new();
if let Ok(listing) = crate::session::list_local_sessions() {
for group in listing.local.values() {
for s in group {
if s.handle.as_deref() == Some(self_handle.as_str()) {
continue; }
let ch = s.did.as_deref().map(crate::character::Character::from_did);
sisters.push(json!({
"session": s.name,
"handle": s.handle,
"persona": ch,
}));
}
}
}
let mut peers: Vec<Value> = Vec::new();
if initialized
&& let Ok(trust) = config::read_trust()
&& let Some(agents) = trust.get("agents").and_then(Value::as_object)
{
let relay_state =
config::read_relay_state().unwrap_or_else(|_| json!({"self": null, "peers": {}}));
for (handle, agent) in agents {
if handle == &self_handle {
continue; }
let did = agent.get("did").and_then(Value::as_str).unwrap_or("");
let ch = if did.is_empty() {
None
} else {
Some(crate::character::Character::from_did(did))
};
peers.push(json!({
"handle": handle,
"did": did,
"tier": crate::trust::effective_tier(&trust, &relay_state, handle),
"persona": ch,
}));
}
}
Ok(json!({
"self": {
"handle": self_handle,
"did": self_did,
"persona": self_character,
"cwd": cwd,
"wire_home": wire_home,
},
"sister_sessions": sisters,
"pinned_peers": peers,
}))
}
pub(super) fn cmd_here(as_json: bool) -> Result<()> {
let summary = here_summary()?;
if as_json {
println!("{}", serde_json::to_string(&summary)?);
return Ok(());
}
let self_handle = summary["self"]["handle"].as_str().unwrap_or("");
if self_handle.is_empty() {
println!("not initialized — run `wire up` to bootstrap.");
return Ok(());
}
let self_persona = &summary["self"]["persona"];
let self_character: Option<crate::character::Character> =
serde_json::from_value(self_persona.clone()).ok();
let glyph = self_character
.as_ref()
.map(crate::character::emoji_with_fallback)
.unwrap_or_else(|| "?".to_string());
let nick = self_character
.as_ref()
.map(|c| c.nickname.clone())
.unwrap_or_default();
println!("you are {glyph} {nick} ({self_handle})");
let cwd = summary["self"]["cwd"].as_str().unwrap_or("");
if !cwd.is_empty() {
println!(" cwd: {cwd}");
}
let render_glyph = |character: &Value| -> String {
let emoji = character
.get("emoji")
.and_then(Value::as_str)
.unwrap_or("?");
let nickname = character
.get("nickname")
.and_then(Value::as_str)
.unwrap_or("?");
if crate::character::terminal_supports_emoji() {
return emoji.to_string();
}
let synth = crate::character::Character {
nickname: nickname.to_string(),
emoji: emoji.to_string(),
palette: crate::character::Palette {
primary_hex: String::new(),
accent_hex: String::new(),
ansi256_primary: 0,
ansi256_accent: 0,
},
};
crate::character::emoji_with_fallback(&synth)
};
let empty = Vec::new();
let sisters = summary["sister_sessions"].as_array().unwrap_or(&empty);
let peers = summary["pinned_peers"].as_array().unwrap_or(&empty);
if !sisters.is_empty() {
println!();
println!("sister sessions on this machine:");
for s in sisters {
let session = s["session"].as_str().unwrap_or("?");
let ch_nick = s["persona"]["nickname"].as_str().unwrap_or("?");
let glyph = render_glyph(&s["persona"]);
println!(" {glyph} {ch_nick} ({session})");
}
}
if !peers.is_empty() {
println!();
println!("pinned peers:");
for p in peers {
let handle = p["handle"].as_str().unwrap_or("?");
let tier = p["tier"].as_str().unwrap_or("");
let ch_nick = p["persona"]["nickname"].as_str().unwrap_or("?");
let glyph = render_glyph(&p["persona"]);
println!(" {glyph} {ch_nick} ({handle}) [{tier}]");
}
}
if sisters.is_empty() && peers.is_empty() {
println!();
println!(
"no neighbors yet — `wire session new` to add a sister, or `wire dial <peer>` to reach out."
);
}
Ok(())
}
pub(super) fn cmd_tail(
peer: Option<&str>,
as_json: bool,
limit: usize,
oldest: bool,
) -> Result<()> {
let inbox = config::inbox_dir()?;
if !inbox.exists() {
if !as_json {
eprintln!("no inbox yet — daemon hasn't run, or no events received");
}
return Ok(());
}
let trust = config::read_trust()?;
let entries: Vec<_> = std::fs::read_dir(&inbox)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension().map(|x| x == "jsonl").unwrap_or(false)
&& match peer {
Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
None => true,
}
})
.collect();
let mut events: Vec<(String, usize, Value)> = Vec::new();
for path in &entries {
let body = std::fs::read_to_string(path)?;
for (idx, line) in body.lines().enumerate() {
let event: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let ts = event
.get("timestamp")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
events.push((ts, idx, event));
}
}
events.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let total = events.len();
let window: &[(String, usize, Value)] = if limit == 0 {
&events[..]
} else if oldest {
&events[..limit.min(total)]
} else {
let start = total.saturating_sub(limit);
&events[start..]
};
if !as_json && window.is_empty() {
let channels: Vec<String> = std::fs::read_dir(&inbox)?
.filter_map(|e| e.ok())
.filter_map(|e| {
let p = e.path();
(p.extension().map(|x| x == "jsonl").unwrap_or(false))
.then(|| p.file_stem().and_then(|s| s.to_str()).map(str::to_string))
.flatten()
})
.collect();
let synced = match crate::ensure_up::last_sync_age_seconds() {
Some(age) => format!("daemon last synced {age}s ago"),
None => "daemon has not recorded a sync here yet".to_string(),
};
match peer {
Some(want) if !channels.iter().any(|c| c == want) => {
eprintln!("no inbox channel for '{want}'. {synced}.");
if channels.is_empty() {
eprintln!(
" no peers have messaged you yet — `wire dial <name> \"hi\"` to start a line, or `wire doctor` if you expected traffic."
);
} else {
eprintln!(" channels you can tail: {}", channels.join(", "));
}
}
_ => {
let scope = peer.map(|p| format!(" from '{p}'")).unwrap_or_default();
eprintln!("0 events{scope} ({total} matched). {synced}.");
eprintln!(" expected a message? check sync: `wire doctor` / `wire status`.");
}
}
return Ok(());
}
let seed = crate::enc::wire_x25519::self_seed_for_read();
for (_, _, event) in window {
let verified = crate::signing::verify_message_v31(event, &trust).is_ok();
let ev_dec = match &seed {
Some(s) => crate::enc::wire_x25519::decrypt_event_for_read(event, &trust, s),
None => event.clone(),
};
let event = &ev_dec;
if as_json {
let mut event_with_meta = event.clone();
if let Some(obj) = event_with_meta.as_object_mut() {
obj.insert("verified".into(), json!(verified));
}
println!("{}", serde_json::to_string(&event_with_meta)?);
} else {
let ts = event
.get("timestamp")
.and_then(Value::as_str)
.unwrap_or("?");
let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
let summary = event
.get("body")
.map(|b| match b {
Value::String(s) => s.clone(),
_ => b.to_string(),
})
.unwrap_or_default();
let mark = if verified { "✓" } else { "✗" };
let deadline = event
.get("time_sensitive_until")
.and_then(Value::as_str)
.map(|d| format!(" deadline: {d}"))
.unwrap_or_default();
println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
}
}
Ok(())
}
fn monitor_is_noise_kind(kind: &str) -> bool {
matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
}
fn resolve_persona(peer_handle: &str) -> Option<crate::character::Character> {
let trust = config::read_trust().ok()?;
let agent = trust.get("agents").and_then(|a| a.get(peer_handle))?;
if let Some(card) = agent.get("card") {
Some(crate::character::Character::from_card(card))
} else {
let did = agent.get("did").and_then(Value::as_str)?;
Some(crate::character::Character::from_did(did))
}
}
pub(super) fn persona_label(peer_handle: &str) -> String {
match resolve_persona(peer_handle) {
Some(ch) => format!("{} {}", ch.emoji, ch.nickname),
None => peer_handle.to_string(),
}
}
fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
if as_json {
Ok(serde_json::to_string(e)?)
} else {
let eid_short: String = e.event_id.chars().take(12).collect();
let body = e.body_preview.replace('\n', " ");
let ts: String = e.timestamp.chars().take(19).collect();
Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
}
}
pub(super) fn cmd_monitor(
peer_filter: Option<&str>,
as_json: bool,
include_handshake: bool,
interval_ms: u64,
replay: usize,
) -> Result<()> {
let inbox_dir = config::inbox_dir()?;
if !inbox_dir.exists() && !as_json {
eprintln!("wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?");
}
crate::session::warn_on_identity_collision(std::process::id(), "monitor");
if replay > 0 && inbox_dir.exists() {
let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
let path = entry.path();
if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
continue;
}
let peer = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if let Some(filter) = peer_filter
&& peer != filter
{
continue;
}
let body = std::fs::read_to_string(&path).unwrap_or_default();
for line in body.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let signed: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let ev = crate::inbox_watch::InboxEvent::from_signed(
&peer, signed, true,
);
if !include_handshake && monitor_is_noise_kind(&ev.kind) {
continue;
}
all.push(ev);
}
}
all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let start = all.len().saturating_sub(replay);
for ev in &all[start..] {
println!("{}", monitor_render(ev, as_json)?);
}
use std::io::Write;
std::io::stdout().flush().ok();
}
let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
loop {
let events = match w.poll() {
Ok(evs) => evs,
Err(e) => {
eprintln!("wire monitor: poll error (continuing to watch): {e:#}");
std::thread::sleep(sleep_dur);
continue;
}
};
let mut wrote = false;
for ev in events {
if let Some(filter) = peer_filter
&& ev.peer != filter
{
continue;
}
if !include_handshake && monitor_is_noise_kind(&ev.kind) {
continue;
}
println!("{}", monitor_render(&ev, as_json)?);
wrote = true;
}
if wrote {
use std::io::Write;
std::io::stdout().flush().ok();
}
std::thread::sleep(sleep_dur);
}
}
#[cfg(test)]
mod monitor_tests {
use super::*;
use crate::inbox_watch::InboxEvent;
use serde_json::Value;
fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
InboxEvent {
peer: peer.to_string(),
event_id: "abcd1234567890ef".to_string(),
kind: kind.to_string(),
body_preview: body.to_string(),
verified: true,
timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
raw: Value::Null,
}
}
#[test]
fn monitor_filter_drops_handshake_kinds_by_default() {
assert!(monitor_is_noise_kind("pair_drop"));
assert!(monitor_is_noise_kind("pair_drop_ack"));
assert!(monitor_is_noise_kind("heartbeat"));
assert!(!monitor_is_noise_kind("claim"));
assert!(!monitor_is_noise_kind("decision"));
assert!(!monitor_is_noise_kind("ack"));
assert!(!monitor_is_noise_kind("request"));
assert!(!monitor_is_noise_kind("note"));
assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
}
#[test]
fn monitor_render_plain_is_one_short_line() {
let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
let line = monitor_render(&e, false).unwrap();
assert!(!line.contains('\n'), "render must be one line: {line}");
assert!(line.contains("willard"));
assert!(line.contains("claim"));
assert!(line.contains("real v8 train"));
assert!(line.contains("abcd12345678"));
assert!(
!line.contains("abcd1234567890ef"),
"should truncate full id"
);
assert!(line.contains("2026-05-15T23:14:07"));
}
#[test]
fn monitor_render_strips_newlines_from_body() {
let e = ev("spark", "claim", "line one\nline two\nline three");
let line = monitor_render(&e, false).unwrap();
assert!(!line.contains('\n'), "newlines must be stripped: {line}");
assert!(line.contains("line one line two line three"));
}
#[test]
fn monitor_render_json_is_valid_jsonl() {
let e = ev("spark", "claim", "hi");
let line = monitor_render(&e, true).unwrap();
assert!(!line.contains('\n'));
let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
assert_eq!(parsed["peer"], "spark");
assert_eq!(parsed["kind"], "claim");
assert_eq!(parsed["body_preview"], "hi");
}
#[test]
fn monitor_does_not_drop_on_verified_null() {
let mut e = ev("spark", "claim", "from disk with verified=null");
e.verified = false; let line = monitor_render(&e, false).unwrap();
assert!(line.contains("from disk with verified=null"));
assert!(!monitor_is_noise_kind("claim"));
}
}
pub(super) fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
let body = if path == "-" {
let mut buf = String::new();
use std::io::Read;
std::io::stdin().read_to_string(&mut buf)?;
buf
} else {
std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
};
let event: Value = serde_json::from_str(&body)?;
let trust = config::read_trust()?;
match crate::signing::verify_message_v31(&event, &trust) {
Ok(()) => {
if as_json {
println!("{}", serde_json::to_string(&json!({"verified": true}))?);
} else {
println!("verified ✓");
}
Ok(())
}
Err(e) => {
let reason = e.to_string();
if as_json {
println!(
"{}",
serde_json::to_string(&json!({"verified": false, "reason": reason}))?
);
} else {
eprintln!("FAILED: {reason}");
}
std::process::exit(1);
}
}
}
pub(super) fn cmd_notify(
interval_secs: u64,
peer_filter: Option<&str>,
once: bool,
as_json: bool,
) -> Result<()> {
use crate::inbox_watch::InboxWatcher;
let cursor_path = config::state_dir()?.join("notify.cursor");
let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
if !once {
crate::session::warn_on_identity_collision(std::process::id(), "notify");
}
let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
let events = watcher.poll()?;
for ev in events {
if let Some(p) = peer_filter
&& ev.peer != p
{
continue;
}
if as_json {
println!("{}", serde_json::to_string(&ev)?);
} else {
os_notify_inbox_event(&ev);
}
}
watcher.save_cursors(&cursor_path)?;
Ok(())
};
if once {
return sweep(&mut watcher);
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
loop {
if let Err(e) = sweep(&mut watcher) {
eprintln!("wire notify: sweep error: {e}");
}
std::thread::sleep(interval);
}
}
pub(super) fn notify_sweep_new_events(
watcher: &mut crate::inbox_watch::InboxWatcher,
cursor_path: &std::path::Path,
) -> Result<Vec<crate::inbox_watch::InboxEvent>> {
let events = watcher.poll()?;
watcher.save_cursors(cursor_path)?;
Ok(events)
}
pub(super) fn toast_inbox_events(events: &[crate::inbox_watch::InboxEvent]) {
for ev in events {
os_notify_inbox_event(ev);
}
}
fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
let who = persona_label(&ev.peer);
let title = if ev.verified {
format!("wire ← {who}")
} else {
format!("wire ← {who} (UNVERIFIED)")
};
let body = format!("{}: {}", ev.kind, ev.body_preview);
let id = if ev.event_id.is_empty() {
ev.body_preview.as_str()
} else {
ev.event_id.as_str()
};
let dedup_key = format!("inbox:{}:{}", ev.peer, id);
crate::os_notify::toast_dedup(&dedup_key, &title, &body);
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
fn os_toast(title: &str, body: &str) {
eprintln!("[wire notify] {title}\n {body}");
}
#[cfg(test)]
mod notify_sweep_tests {
use super::*;
use crate::inbox_watch::InboxWatcher;
use std::io::Write;
fn tmp_base(tag: &str) -> std::path::PathBuf {
let n = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.subsec_nanos();
std::env::temp_dir().join(format!("wire-{tag}-{}-{n}", std::process::id()))
}
fn append_event(inbox: &std::path::Path, peer: &str, body: &str) {
std::fs::create_dir_all(inbox).unwrap();
let p = inbox.join(format!("{peer}.jsonl"));
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&p)
.unwrap();
let e = serde_json::json!({
"event_id": format!("evt-{body}"),
"from": format!("did:wire:{peer}"),
"to": "did:wire:self",
"type": "decision",
"kind": 1,
"timestamp": "2026-06-11T00:00:00Z",
"body": body,
"sig": "x",
});
writeln!(f, "{}", serde_json::to_string(&e).unwrap()).unwrap();
}
#[test]
fn notify_sweep_reports_new_events_once_and_persists_cursor() {
let base = tmp_base("notifysweep");
let inbox = base.join("inbox");
let cursor = base.join("notify.cursor");
append_event(&inbox, "paul", "first");
let mut w = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
let got = notify_sweep_new_events(&mut w, &cursor).unwrap();
assert_eq!(got.len(), 1, "first sweep sees the one new event");
assert!(got[0].body_preview.contains("first"));
let mut w2 = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
assert!(
notify_sweep_new_events(&mut w2, &cursor)
.unwrap()
.is_empty(),
"persisted cursor prevents re-firing the same event"
);
append_event(&inbox, "paul", "second");
let mut w3 = InboxWatcher::from_dir_and_cursor(inbox, &cursor).unwrap();
let third = notify_sweep_new_events(&mut w3, &cursor).unwrap();
assert_eq!(third.len(), 1);
assert!(third[0].body_preview.contains("second"));
let _ = std::fs::remove_dir_all(&base);
}
}