1use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21 agent_card::{build_agent_card, sign_agent_card},
22 config,
23 signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24 trust::{add_self_to_trust, empty_trust},
25};
26
27#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31 #[command(subcommand)]
32 pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37 Init {
39 handle: String,
41 #[arg(long)]
43 name: Option<String>,
44 #[arg(long)]
47 relay: Option<String>,
48 #[arg(long)]
50 json: bool,
51 },
52 Whoami {
56 #[arg(long)]
57 json: bool,
58 },
59 Peers {
61 #[arg(long)]
62 json: bool,
63 },
64 Send {
72 peer: String,
74 kind_or_body: String,
79 body: Option<String>,
83 #[arg(long)]
85 deadline: Option<String>,
86 #[arg(long)]
88 json: bool,
89 },
90 Tail {
92 peer: Option<String>,
94 #[arg(long)]
96 json: bool,
97 #[arg(long, default_value_t = 0)]
99 limit: usize,
100 },
101 Monitor {
112 #[arg(long)]
114 peer: Option<String>,
115 #[arg(long)]
117 json: bool,
118 #[arg(long)]
121 include_handshake: bool,
122 #[arg(long, default_value_t = 500)]
124 interval_ms: u64,
125 #[arg(long, default_value_t = 0)]
127 replay: usize,
128 },
129 Verify {
131 path: String,
133 #[arg(long)]
135 json: bool,
136 },
137 Mcp,
141 RelayServer {
143 #[arg(long, default_value = "127.0.0.1:8770")]
145 bind: String,
146 #[arg(long)]
154 local_only: bool,
155 },
156 BindRelay {
165 url: String,
167 #[arg(long)]
172 migrate_pinned: bool,
173 #[arg(long)]
174 json: bool,
175 },
176 AddPeerSlot {
179 handle: String,
181 url: String,
183 slot_id: String,
185 slot_token: String,
187 #[arg(long)]
188 json: bool,
189 },
190 Push {
192 peer: Option<String>,
194 #[arg(long)]
195 json: bool,
196 },
197 Pull {
199 #[arg(long)]
200 json: bool,
201 },
202 Status {
205 #[arg(long)]
207 peer: Option<String>,
208 #[arg(long)]
209 json: bool,
210 },
211 Responder {
213 #[command(subcommand)]
214 command: ResponderCommand,
215 },
216 Pin {
219 card_file: String,
221 #[arg(long)]
222 json: bool,
223 },
224 RotateSlot {
235 #[arg(long)]
238 no_announce: bool,
239 #[arg(long)]
240 json: bool,
241 },
242 ForgetPeer {
246 handle: String,
248 #[arg(long)]
250 purge: bool,
251 #[arg(long)]
252 json: bool,
253 },
254 Daemon {
258 #[arg(long, default_value_t = 5)]
260 interval: u64,
261 #[arg(long)]
263 once: bool,
264 #[arg(long)]
265 json: bool,
266 },
267 PairHost {
272 #[arg(long)]
274 relay: String,
275 #[arg(long)]
279 yes: bool,
280 #[arg(long, default_value_t = 300)]
282 timeout: u64,
283 #[arg(long)]
289 detach: bool,
290 #[arg(long)]
292 json: bool,
293 },
294 #[command(alias = "join")]
298 PairJoin {
299 code_phrase: String,
301 #[arg(long)]
303 relay: String,
304 #[arg(long)]
305 yes: bool,
306 #[arg(long, default_value_t = 300)]
307 timeout: u64,
308 #[arg(long)]
310 detach: bool,
311 #[arg(long)]
313 json: bool,
314 },
315 PairConfirm {
319 code_phrase: String,
321 digits: String,
323 #[arg(long)]
325 json: bool,
326 },
327 PairList {
329 #[arg(long)]
331 json: bool,
332 #[arg(long)]
336 watch: bool,
337 #[arg(long, default_value_t = 1)]
339 watch_interval: u64,
340 },
341 PairCancel {
343 code_phrase: String,
344 #[arg(long)]
345 json: bool,
346 },
347 PairWatch {
357 code_phrase: String,
358 #[arg(long, default_value = "sas_ready")]
360 status: String,
361 #[arg(long, default_value_t = 300)]
363 timeout: u64,
364 #[arg(long)]
366 json: bool,
367 },
368 Pair {
377 handle: String,
380 #[arg(long)]
383 code: Option<String>,
384 #[arg(long, default_value = "https://wireup.net")]
386 relay: String,
387 #[arg(long)]
389 yes: bool,
390 #[arg(long, default_value_t = 300)]
392 timeout: u64,
393 #[arg(long)]
396 no_setup: bool,
397 #[arg(long)]
402 detach: bool,
403 },
404 PairAbandon {
410 code_phrase: String,
412 #[arg(long, default_value = "https://wireup.net")]
414 relay: String,
415 },
416 PairAccept {
422 peer: String,
424 #[arg(long)]
426 json: bool,
427 },
428 PairReject {
435 peer: String,
437 #[arg(long)]
439 json: bool,
440 },
441 PairListInbound {
447 #[arg(long)]
449 json: bool,
450 },
451 #[command(subcommand)]
461 Session(SessionCommand),
462 #[command(subcommand)]
467 Mesh(MeshCommand),
468 Setup {
473 #[arg(long)]
475 apply: bool,
476 },
477 Whois {
481 handle: Option<String>,
483 #[arg(long)]
484 json: bool,
485 #[arg(long)]
488 relay: Option<String>,
489 },
490 Add {
496 handle: String,
499 #[arg(long)]
501 relay: Option<String>,
502 #[arg(long)]
510 local_sister: bool,
511 #[arg(long)]
512 json: bool,
513 },
514 Up {
524 handle: String,
527 #[arg(long)]
529 name: Option<String>,
530 #[arg(long)]
531 json: bool,
532 },
533 Doctor {
540 #[arg(long)]
542 json: bool,
543 #[arg(long, default_value_t = 5)]
545 recent_rejections: usize,
546 },
547 Upgrade {
552 #[arg(long)]
555 check: bool,
556 #[arg(long)]
557 json: bool,
558 },
559 Service {
564 #[command(subcommand)]
565 action: ServiceAction,
566 },
567 Diag {
572 #[command(subcommand)]
573 action: DiagAction,
574 },
575 Claim {
579 nick: String,
580 #[arg(long)]
582 relay: Option<String>,
583 #[arg(long)]
585 public_url: Option<String>,
586 #[arg(long)]
594 hidden: bool,
595 #[arg(long)]
596 json: bool,
597 },
598 Profile {
608 #[command(subcommand)]
609 action: ProfileAction,
610 },
611 Invite {
615 #[arg(long, default_value = "https://wireup.net")]
617 relay: String,
618 #[arg(long, default_value_t = 86_400)]
620 ttl: u64,
621 #[arg(long, default_value_t = 1)]
624 uses: u32,
625 #[arg(long)]
629 share: bool,
630 #[arg(long)]
632 json: bool,
633 },
634 Accept {
637 url: String,
639 #[arg(long)]
641 json: bool,
642 },
643 Reactor {
649 #[arg(long)]
651 on_event: String,
652 #[arg(long)]
654 peer: Option<String>,
655 #[arg(long)]
657 kind: Option<String>,
658 #[arg(long, default_value_t = true)]
660 verified_only: bool,
661 #[arg(long, default_value_t = 2)]
663 interval: u64,
664 #[arg(long)]
666 once: bool,
667 #[arg(long)]
669 dry_run: bool,
670 #[arg(long, default_value_t = 6)]
674 max_per_minute: u32,
675 #[arg(long, default_value_t = 1)]
679 max_chain_depth: u32,
680 },
681 Notify {
686 #[arg(long, default_value_t = 2)]
688 interval: u64,
689 #[arg(long)]
691 peer: Option<String>,
692 #[arg(long)]
694 once: bool,
695 #[arg(long)]
699 json: bool,
700 },
701}
702
703#[derive(Subcommand, Debug)]
704pub enum DiagAction {
705 Tail {
707 #[arg(long, default_value_t = 20)]
708 limit: usize,
709 #[arg(long)]
710 json: bool,
711 },
712 Enable,
715 Disable,
717 Status {
719 #[arg(long)]
720 json: bool,
721 },
722}
723
724#[derive(Subcommand, Debug)]
725pub enum SessionCommand {
726 New {
734 name: Option<String>,
736 #[arg(long, default_value = "https://wireup.net")]
738 relay: String,
739 #[arg(long)]
746 with_local: bool,
747 #[arg(long, default_value = "http://127.0.0.1:8771")]
751 local_relay: String,
752 #[arg(long)]
755 no_daemon: bool,
756 #[arg(long)]
764 local_only: bool,
765 #[arg(long)]
767 json: bool,
768 },
769 List {
772 #[arg(long)]
773 json: bool,
774 },
775 ListLocal {
781 #[arg(long)]
782 json: bool,
783 },
784 PairAllLocal {
800 #[arg(long, default_value_t = 1)]
805 settle_secs: u64,
806 #[arg(long, default_value = "https://wireup.net")]
811 federation_relay: String,
812 #[arg(long)]
813 json: bool,
814 },
815 MeshStatus {
829 #[arg(long, default_value_t = 300)]
834 stale_secs: u64,
835 #[arg(long)]
836 json: bool,
837 },
838 Env {
842 name: Option<String>,
844 #[arg(long)]
845 json: bool,
846 },
847 Current {
851 #[arg(long)]
852 json: bool,
853 },
854 Destroy {
858 name: String,
859 #[arg(long)]
861 force: bool,
862 #[arg(long)]
863 json: bool,
864 },
865}
866
867#[derive(Subcommand, Debug)]
872pub enum MeshCommand {
873 Status {
876 #[arg(long, default_value_t = 300)]
878 stale_secs: u64,
879 #[arg(long)]
880 json: bool,
881 },
882 Broadcast {
901 #[arg(long, default_value = "claim")]
904 kind: String,
905 #[arg(long, default_value = "local")]
907 scope: String,
908 #[arg(long)]
910 exclude: Vec<String>,
911 #[arg(long)]
915 noreply: bool,
916 body: String,
918 #[arg(long)]
919 json: bool,
920 },
921 Role {
930 #[command(subcommand)]
931 action: MeshRoleAction,
932 },
933 Route {
949 role: String,
951 #[arg(long, default_value = "round-robin")]
953 strategy: String,
954 #[arg(long)]
956 exclude: Vec<String>,
957 #[arg(long, default_value = "claim")]
960 kind: String,
961 body: String,
963 #[arg(long)]
964 json: bool,
965 },
966}
967
968#[derive(Subcommand, Debug)]
970pub enum MeshRoleAction {
971 Set {
976 role: String,
977 #[arg(long)]
978 json: bool,
979 },
980 Get {
983 peer: Option<String>,
984 #[arg(long)]
985 json: bool,
986 },
987 List {
990 #[arg(long)]
991 json: bool,
992 },
993 Clear {
996 #[arg(long)]
997 json: bool,
998 },
999}
1000
1001#[derive(Subcommand, Debug)]
1002pub enum ServiceAction {
1003 Install {
1013 #[arg(long)]
1015 local_relay: bool,
1016 #[arg(long)]
1017 json: bool,
1018 },
1019 Uninstall {
1023 #[arg(long)]
1025 local_relay: bool,
1026 #[arg(long)]
1027 json: bool,
1028 },
1029 Status {
1031 #[arg(long)]
1033 local_relay: bool,
1034 #[arg(long)]
1035 json: bool,
1036 },
1037}
1038
1039#[derive(Subcommand, Debug)]
1040pub enum ResponderCommand {
1041 Set {
1043 status: String,
1045 #[arg(long)]
1047 reason: Option<String>,
1048 #[arg(long)]
1050 json: bool,
1051 },
1052 Get {
1054 peer: Option<String>,
1056 #[arg(long)]
1058 json: bool,
1059 },
1060}
1061
1062#[derive(Subcommand, Debug)]
1063pub enum ProfileAction {
1064 Set {
1068 field: String,
1069 value: String,
1070 #[arg(long)]
1071 json: bool,
1072 },
1073 Get {
1075 #[arg(long)]
1076 json: bool,
1077 },
1078 Clear {
1080 field: String,
1081 #[arg(long)]
1082 json: bool,
1083 },
1084}
1085
1086pub fn run() -> Result<()> {
1088 crate::session::maybe_adopt_session_wire_home("cli");
1099 let cli = Cli::parse();
1100 match cli.command {
1101 Command::Init {
1102 handle,
1103 name,
1104 relay,
1105 json,
1106 } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
1107 Command::Status { peer, json } => {
1108 if let Some(peer) = peer {
1109 cmd_status_peer(&peer, json)
1110 } else {
1111 cmd_status(json)
1112 }
1113 }
1114 Command::Whoami { json } => cmd_whoami(json),
1115 Command::Peers { json } => cmd_peers(json),
1116 Command::Send {
1117 peer,
1118 kind_or_body,
1119 body,
1120 deadline,
1121 json,
1122 } => {
1123 let (kind, body) = match body {
1126 Some(real_body) => (kind_or_body, real_body),
1127 None => ("claim".to_string(), kind_or_body),
1128 };
1129 cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
1130 }
1131 Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
1132 Command::Monitor {
1133 peer,
1134 json,
1135 include_handshake,
1136 interval_ms,
1137 replay,
1138 } => cmd_monitor(
1139 peer.as_deref(),
1140 json,
1141 include_handshake,
1142 interval_ms,
1143 replay,
1144 ),
1145 Command::Verify { path, json } => cmd_verify(&path, json),
1146 Command::Responder { command } => match command {
1147 ResponderCommand::Set {
1148 status,
1149 reason,
1150 json,
1151 } => cmd_responder_set(&status, reason.as_deref(), json),
1152 ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
1153 },
1154 Command::Mcp => cmd_mcp(),
1155 Command::RelayServer { bind, local_only } => cmd_relay_server(&bind, local_only),
1156 Command::BindRelay {
1157 url,
1158 migrate_pinned,
1159 json,
1160 } => cmd_bind_relay(&url, migrate_pinned, json),
1161 Command::AddPeerSlot {
1162 handle,
1163 url,
1164 slot_id,
1165 slot_token,
1166 json,
1167 } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
1168 Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
1169 Command::Pull { json } => cmd_pull(json),
1170 Command::Pin { card_file, json } => cmd_pin(&card_file, json),
1171 Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
1172 Command::ForgetPeer {
1173 handle,
1174 purge,
1175 json,
1176 } => cmd_forget_peer(&handle, purge, json),
1177 Command::Daemon {
1178 interval,
1179 once,
1180 json,
1181 } => cmd_daemon(interval, once, json),
1182 Command::PairHost {
1183 relay,
1184 yes,
1185 timeout,
1186 detach,
1187 json,
1188 } => {
1189 if detach {
1190 cmd_pair_host_detach(&relay, json)
1191 } else {
1192 cmd_pair_host(&relay, yes, timeout)
1193 }
1194 }
1195 Command::PairJoin {
1196 code_phrase,
1197 relay,
1198 yes,
1199 timeout,
1200 detach,
1201 json,
1202 } => {
1203 if detach {
1204 cmd_pair_join_detach(&code_phrase, &relay, json)
1205 } else {
1206 cmd_pair_join(&code_phrase, &relay, yes, timeout)
1207 }
1208 }
1209 Command::PairConfirm {
1210 code_phrase,
1211 digits,
1212 json,
1213 } => cmd_pair_confirm(&code_phrase, &digits, json),
1214 Command::PairList {
1215 json,
1216 watch,
1217 watch_interval,
1218 } => cmd_pair_list(json, watch, watch_interval),
1219 Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
1220 Command::PairWatch {
1221 code_phrase,
1222 status,
1223 timeout,
1224 json,
1225 } => cmd_pair_watch(&code_phrase, &status, timeout, json),
1226 Command::Pair {
1227 handle,
1228 code,
1229 relay,
1230 yes,
1231 timeout,
1232 no_setup,
1233 detach,
1234 } => {
1235 if handle.contains('@') && code.is_none() {
1242 cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
1243 } else if detach {
1244 cmd_pair_detach(&handle, code.as_deref(), &relay)
1245 } else {
1246 cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
1247 }
1248 }
1249 Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
1250 Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
1251 Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
1252 Command::PairListInbound { json } => cmd_pair_list_inbound(json),
1253 Command::Session(cmd) => cmd_session(cmd),
1254 Command::Mesh(cmd) => cmd_mesh(cmd),
1255 Command::Invite {
1256 relay,
1257 ttl,
1258 uses,
1259 share,
1260 json,
1261 } => cmd_invite(&relay, ttl, uses, share, json),
1262 Command::Accept { url, json } => cmd_accept(&url, json),
1263 Command::Whois {
1264 handle,
1265 json,
1266 relay,
1267 } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
1268 Command::Add {
1269 handle,
1270 relay,
1271 local_sister,
1272 json,
1273 } => cmd_add(&handle, relay.as_deref(), local_sister, json),
1274 Command::Up { handle, name, json } => cmd_up(&handle, name.as_deref(), json),
1275 Command::Doctor {
1276 json,
1277 recent_rejections,
1278 } => cmd_doctor(json, recent_rejections),
1279 Command::Upgrade { check, json } => cmd_upgrade(check, json),
1280 Command::Service { action } => cmd_service(action),
1281 Command::Diag { action } => cmd_diag(action),
1282 Command::Claim {
1283 nick,
1284 relay,
1285 public_url,
1286 hidden,
1287 json,
1288 } => cmd_claim(&nick, relay.as_deref(), public_url.as_deref(), hidden, json),
1289 Command::Profile { action } => cmd_profile(action),
1290 Command::Setup { apply } => cmd_setup(apply),
1291 Command::Reactor {
1292 on_event,
1293 peer,
1294 kind,
1295 verified_only,
1296 interval,
1297 once,
1298 dry_run,
1299 max_per_minute,
1300 max_chain_depth,
1301 } => cmd_reactor(
1302 &on_event,
1303 peer.as_deref(),
1304 kind.as_deref(),
1305 verified_only,
1306 interval,
1307 once,
1308 dry_run,
1309 max_per_minute,
1310 max_chain_depth,
1311 ),
1312 Command::Notify {
1313 interval,
1314 peer,
1315 once,
1316 json,
1317 } => cmd_notify(interval, peer.as_deref(), once, json),
1318 }
1319}
1320
1321fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
1324 if !handle
1325 .chars()
1326 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1327 {
1328 bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
1329 }
1330 if config::is_initialized()? {
1331 bail!(
1332 "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
1333 config::config_dir()?
1334 );
1335 }
1336
1337 config::ensure_dirs()?;
1338 let (sk_seed, pk_bytes) = generate_keypair();
1339 config::write_private_key(&sk_seed)?;
1340
1341 let card = build_agent_card(handle, &pk_bytes, name, None, None);
1342 let signed = sign_agent_card(&card, &sk_seed);
1343 config::write_agent_card(&signed)?;
1344
1345 let mut trust = empty_trust();
1346 add_self_to_trust(&mut trust, handle, &pk_bytes);
1347 config::write_trust(&trust)?;
1348
1349 let fp = fingerprint(&pk_bytes);
1350 let key_id = make_key_id(handle, &pk_bytes);
1351
1352 let mut relay_info: Option<(String, String)> = None;
1354 if let Some(url) = relay {
1355 let normalized = url.trim_end_matches('/');
1356 let client = crate::relay_client::RelayClient::new(normalized);
1357 client.check_healthz()?;
1358 let alloc = client.allocate_slot(Some(handle))?;
1359 let mut state = config::read_relay_state()?;
1360 state["self"] = json!({
1361 "relay_url": normalized,
1362 "slot_id": alloc.slot_id.clone(),
1363 "slot_token": alloc.slot_token,
1364 });
1365 config::write_relay_state(&state)?;
1366 relay_info = Some((normalized.to_string(), alloc.slot_id));
1367 }
1368
1369 let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
1370 if as_json {
1371 let mut out = json!({
1372 "did": did_str.clone(),
1373 "fingerprint": fp,
1374 "key_id": key_id,
1375 "config_dir": config::config_dir()?.to_string_lossy(),
1376 });
1377 if let Some((url, slot_id)) = &relay_info {
1378 out["relay_url"] = json!(url);
1379 out["slot_id"] = json!(slot_id);
1380 }
1381 println!("{}", serde_json::to_string(&out)?);
1382 } else {
1383 println!("generated {did_str} (ed25519:{key_id})");
1384 println!(
1385 "config written to {}",
1386 config::config_dir()?.to_string_lossy()
1387 );
1388 if let Some((url, slot_id)) = &relay_info {
1389 println!("bound to relay {url} (slot {slot_id})");
1390 println!();
1391 println!(
1392 "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
1393 );
1394 } else {
1395 println!();
1396 println!(
1397 "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
1398 );
1399 }
1400 }
1401 Ok(())
1402}
1403
1404fn cmd_status(as_json: bool) -> Result<()> {
1407 let initialized = config::is_initialized()?;
1408
1409 let mut summary = json!({
1410 "initialized": initialized,
1411 });
1412
1413 if initialized {
1414 let card = config::read_agent_card()?;
1415 let did = card
1416 .get("did")
1417 .and_then(Value::as_str)
1418 .unwrap_or("")
1419 .to_string();
1420 let handle = card
1424 .get("handle")
1425 .and_then(Value::as_str)
1426 .map(str::to_string)
1427 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1428 let pk_b64 = card
1429 .get("verify_keys")
1430 .and_then(Value::as_object)
1431 .and_then(|m| m.values().next())
1432 .and_then(|v| v.get("key"))
1433 .and_then(Value::as_str)
1434 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1435 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1436 summary["did"] = json!(did);
1437 summary["handle"] = json!(handle);
1438 summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1439 summary["capabilities"] = card
1440 .get("capabilities")
1441 .cloned()
1442 .unwrap_or_else(|| json!([]));
1443
1444 let trust = config::read_trust()?;
1445 let relay_state_for_tier =
1446 config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1447 let mut peers = Vec::new();
1448 if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1449 for (peer_handle, _agent) in agents {
1450 if peer_handle == &handle {
1451 continue; }
1453 peers.push(json!({
1458 "handle": peer_handle,
1459 "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1460 }));
1461 }
1462 }
1463 summary["peers"] = json!(peers);
1464
1465 let relay_state = config::read_relay_state()?;
1466 summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1467 if !summary["self_relay"].is_null() {
1468 if let Some(obj) = summary["self_relay"].as_object_mut() {
1470 obj.remove("slot_token");
1471 }
1472 }
1473 summary["peer_slots_count"] = json!(
1474 relay_state
1475 .get("peers")
1476 .and_then(Value::as_object)
1477 .map(|m| m.len())
1478 .unwrap_or(0)
1479 );
1480
1481 let outbox = config::outbox_dir()?;
1483 let inbox = config::inbox_dir()?;
1484 summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1485 summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1486
1487 let snap = crate::ensure_up::daemon_liveness();
1493 let mut daemon = json!({
1494 "running": snap.pidfile_alive,
1495 "pid": snap.pidfile_pid,
1496 "all_running_pids": snap.pgrep_pids,
1497 "orphans": snap.orphan_pids,
1498 });
1499 if let crate::ensure_up::PidRecord::Json(d) = &snap.record {
1500 daemon["version"] = json!(d.version);
1501 daemon["bin_path"] = json!(d.bin_path);
1502 daemon["did"] = json!(d.did);
1503 daemon["relay_url"] = json!(d.relay_url);
1504 daemon["started_at"] = json!(d.started_at);
1505 daemon["schema"] = json!(d.schema);
1506 if d.version != env!("CARGO_PKG_VERSION") {
1507 daemon["version_mismatch"] = json!({
1508 "daemon": d.version.clone(),
1509 "cli": env!("CARGO_PKG_VERSION"),
1510 });
1511 }
1512 } else if matches!(snap.record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1513 daemon["pidfile_form"] = json!("legacy-int");
1514 daemon["version_mismatch"] = json!({
1515 "daemon": "<pre-0.5.11>",
1516 "cli": env!("CARGO_PKG_VERSION"),
1517 });
1518 }
1519 summary["daemon"] = daemon;
1520
1521 let pending = crate::pending_pair::list_pending().unwrap_or_default();
1523 let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1524 for p in &pending {
1525 *counts.entry(p.status.clone()).or_default() += 1;
1526 }
1527 let pending_inbound =
1529 crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
1530 let inbound_handles: Vec<&str> = pending_inbound
1531 .iter()
1532 .map(|p| p.peer_handle.as_str())
1533 .collect();
1534 summary["pending_pairs"] = json!({
1535 "total": pending.len(),
1536 "by_status": counts,
1537 "inbound_count": pending_inbound.len(),
1538 "inbound_handles": inbound_handles,
1539 });
1540 }
1541
1542 if as_json {
1543 println!("{}", serde_json::to_string(&summary)?);
1544 } else if !initialized {
1545 println!("not initialized — run `wire init <handle>` first");
1546 } else {
1547 println!("did: {}", summary["did"].as_str().unwrap_or("?"));
1548 println!(
1549 "fingerprint: {}",
1550 summary["fingerprint"].as_str().unwrap_or("?")
1551 );
1552 println!("capabilities: {}", summary["capabilities"]);
1553 if !summary["self_relay"].is_null() {
1554 println!(
1555 "self relay: {} (slot {})",
1556 summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1557 summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1558 );
1559 } else {
1560 println!("self relay: (not bound — run `wire pair-host --relay <url>` to bind)");
1561 }
1562 println!(
1563 "peers: {}",
1564 summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1565 );
1566 for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1567 println!(
1568 " - {:<20} tier={}",
1569 p["handle"].as_str().unwrap_or(""),
1570 p["tier"].as_str().unwrap_or("?")
1571 );
1572 }
1573 println!(
1574 "outbox: {} file(s), {} event(s) queued",
1575 summary["outbox"]["files"].as_u64().unwrap_or(0),
1576 summary["outbox"]["events"].as_u64().unwrap_or(0)
1577 );
1578 println!(
1579 "inbox: {} file(s), {} event(s) received",
1580 summary["inbox"]["files"].as_u64().unwrap_or(0),
1581 summary["inbox"]["events"].as_u64().unwrap_or(0)
1582 );
1583 let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1584 let daemon_pid = summary["daemon"]["pid"]
1585 .as_u64()
1586 .map(|p| p.to_string())
1587 .unwrap_or_else(|| "—".to_string());
1588 let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1589 let version_suffix = if !daemon_version.is_empty() {
1590 format!(" v{daemon_version}")
1591 } else {
1592 String::new()
1593 };
1594 println!(
1595 "daemon: {} (pid {}{})",
1596 if daemon_running { "running" } else { "DOWN" },
1597 daemon_pid,
1598 version_suffix,
1599 );
1600 if let Some(mm) = summary["daemon"].get("version_mismatch") {
1602 println!(
1603 " !! version mismatch: daemon={} CLI={}. \
1604 run `wire upgrade` to swap atomically.",
1605 mm["daemon"].as_str().unwrap_or("?"),
1606 mm["cli"].as_str().unwrap_or("?"),
1607 );
1608 }
1609 if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1610 && !orphans.is_empty()
1611 {
1612 let pids: Vec<String> = orphans
1613 .iter()
1614 .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1615 .collect();
1616 println!(
1617 " !! orphan daemon process(es): pids {}. \
1618 pgrep saw them but pidfile didn't — likely stale process from \
1619 prior install. Multiple daemons race the relay cursor.",
1620 pids.join(", ")
1621 );
1622 }
1623 let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1624 let inbound_count = summary["pending_pairs"]["inbound_count"]
1625 .as_u64()
1626 .unwrap_or(0);
1627 if pending_total > 0 {
1628 print!("pending pairs: {pending_total}");
1629 if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1630 let parts: Vec<String> = obj
1631 .iter()
1632 .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1633 .collect();
1634 if !parts.is_empty() {
1635 print!(" ({})", parts.join(", "));
1636 }
1637 }
1638 println!();
1639 } else if inbound_count == 0 {
1640 println!("pending pairs: none");
1641 }
1642 if inbound_count > 0 {
1646 let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
1647 .as_array()
1648 .map(|a| {
1649 a.iter()
1650 .filter_map(|v| v.as_str().map(str::to_string))
1651 .collect()
1652 })
1653 .unwrap_or_default();
1654 println!(
1655 "inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
1656 handles.join(", "),
1657 );
1658 }
1659 }
1660 Ok(())
1661}
1662
1663fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1664 if !dir.exists() {
1665 return Ok(json!({"files": 0, "events": 0}));
1666 }
1667 let mut files = 0usize;
1668 let mut events = 0usize;
1669 for entry in std::fs::read_dir(dir)? {
1670 let path = entry?.path();
1671 if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1672 files += 1;
1673 if let Ok(body) = std::fs::read_to_string(&path) {
1674 events += body.lines().filter(|l| !l.trim().is_empty()).count();
1675 }
1676 }
1677 }
1678 Ok(json!({"files": files, "events": events}))
1679}
1680
1681fn responder_status_allowed(status: &str) -> bool {
1684 matches!(
1685 status,
1686 "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1687 )
1688}
1689
1690fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1691 let state = config::read_relay_state()?;
1692 let (label, slot_info) = match peer {
1693 Some(peer) => (
1694 peer.to_string(),
1695 state
1696 .get("peers")
1697 .and_then(|p| p.get(peer))
1698 .ok_or_else(|| {
1699 anyhow!(
1700 "unknown peer {peer:?} in relay state — pair with them first:\n \
1701 wire add {peer}@wireup.net (or {peer}@<their-relay>)\n\
1702 (`wire peers` lists who you've already paired with.)"
1703 )
1704 })?,
1705 ),
1706 None => (
1707 "self".to_string(),
1708 state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1709 anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1710 })?,
1711 ),
1712 };
1713 let relay_url = slot_info["relay_url"]
1714 .as_str()
1715 .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1716 .to_string();
1717 let slot_id = slot_info["slot_id"]
1718 .as_str()
1719 .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1720 .to_string();
1721 let slot_token = slot_info["slot_token"]
1722 .as_str()
1723 .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1724 .to_string();
1725 Ok((label, relay_url, slot_id, slot_token))
1726}
1727
1728fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1729 if !responder_status_allowed(status) {
1730 bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1731 }
1732 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1733 let now = time::OffsetDateTime::now_utc()
1734 .format(&time::format_description::well_known::Rfc3339)
1735 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1736 let mut record = json!({
1737 "status": status,
1738 "set_at": now,
1739 });
1740 if let Some(reason) = reason {
1741 record["reason"] = json!(reason);
1742 }
1743 if status == "online" {
1744 record["last_success_at"] = json!(now);
1745 }
1746 let client = crate::relay_client::RelayClient::new(&relay_url);
1747 let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1748 if as_json {
1749 println!("{}", serde_json::to_string(&saved)?);
1750 } else {
1751 let reason = saved
1752 .get("reason")
1753 .and_then(Value::as_str)
1754 .map(|r| format!(" — {r}"))
1755 .unwrap_or_default();
1756 println!(
1757 "responder {}{}",
1758 saved
1759 .get("status")
1760 .and_then(Value::as_str)
1761 .unwrap_or(status),
1762 reason
1763 );
1764 }
1765 Ok(())
1766}
1767
1768fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1769 let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1770 let client = crate::relay_client::RelayClient::new(&relay_url);
1771 let health = client.responder_health_get(&slot_id, &slot_token)?;
1772 if as_json {
1773 println!(
1774 "{}",
1775 serde_json::to_string(&json!({
1776 "target": label,
1777 "responder_health": health,
1778 }))?
1779 );
1780 } else if health.is_null() {
1781 println!("{label}: responder health not reported");
1782 } else {
1783 let status = health
1784 .get("status")
1785 .and_then(Value::as_str)
1786 .unwrap_or("unknown");
1787 let reason = health
1788 .get("reason")
1789 .and_then(Value::as_str)
1790 .map(|r| format!(" — {r}"))
1791 .unwrap_or_default();
1792 let last_success = health
1793 .get("last_success_at")
1794 .and_then(Value::as_str)
1795 .map(|t| format!(" (last_success: {t})"))
1796 .unwrap_or_default();
1797 println!("{label}: {status}{reason}{last_success}");
1798 }
1799 Ok(())
1800}
1801
1802fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1803 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1804 let client = crate::relay_client::RelayClient::new(&relay_url);
1805
1806 let started = std::time::Instant::now();
1807 let transport_ok = client.healthz().unwrap_or(false);
1808 let latency_ms = started.elapsed().as_millis() as u64;
1809
1810 let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1811 let now = std::time::SystemTime::now()
1812 .duration_since(std::time::UNIX_EPOCH)
1813 .map(|d| d.as_secs())
1814 .unwrap_or(0);
1815 let attention = match last_pull_at_unix {
1816 Some(last) if now.saturating_sub(last) <= 300 => json!({
1817 "status": "ok",
1818 "last_pull_at_unix": last,
1819 "age_seconds": now.saturating_sub(last),
1820 "event_count": event_count,
1821 }),
1822 Some(last) => json!({
1823 "status": "stale",
1824 "last_pull_at_unix": last,
1825 "age_seconds": now.saturating_sub(last),
1826 "event_count": event_count,
1827 }),
1828 None => json!({
1829 "status": "never_pulled",
1830 "last_pull_at_unix": Value::Null,
1831 "event_count": event_count,
1832 }),
1833 };
1834
1835 let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1836 let responder = if responder_health.is_null() {
1837 json!({"status": "not_reported", "record": Value::Null})
1838 } else {
1839 json!({
1840 "status": responder_health
1841 .get("status")
1842 .and_then(Value::as_str)
1843 .unwrap_or("unknown"),
1844 "record": responder_health,
1845 })
1846 };
1847
1848 let report = json!({
1849 "peer": peer,
1850 "transport": {
1851 "status": if transport_ok { "ok" } else { "error" },
1852 "relay_url": relay_url,
1853 "latency_ms": latency_ms,
1854 },
1855 "attention": attention,
1856 "responder": responder,
1857 });
1858
1859 if as_json {
1860 println!("{}", serde_json::to_string(&report)?);
1861 } else {
1862 let transport_line = if transport_ok {
1863 format!("ok relay reachable ({latency_ms}ms)")
1864 } else {
1865 "error relay unreachable".to_string()
1866 };
1867 println!("transport {transport_line}");
1868 match report["attention"]["status"].as_str().unwrap_or("unknown") {
1869 "ok" => println!(
1870 "attention ok last pull {}s ago",
1871 report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1872 ),
1873 "stale" => println!(
1874 "attention stale last pull {}m ago",
1875 report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1876 ),
1877 "never_pulled" => println!("attention never pulled since relay reset"),
1878 other => println!("attention {other}"),
1879 }
1880 if report["responder"]["status"] == "not_reported" {
1881 println!("auto-responder not reported");
1882 } else {
1883 let record = &report["responder"]["record"];
1884 let status = record
1885 .get("status")
1886 .and_then(Value::as_str)
1887 .unwrap_or("unknown");
1888 let reason = record
1889 .get("reason")
1890 .and_then(Value::as_str)
1891 .map(|r| format!(" — {r}"))
1892 .unwrap_or_default();
1893 println!("auto-responder {status}{reason}");
1894 }
1895 }
1896 Ok(())
1897}
1898
1899fn cmd_whoami(as_json: bool) -> Result<()> {
1904 if !config::is_initialized()? {
1905 bail!("not initialized — run `wire init <handle>` first");
1906 }
1907 let card = config::read_agent_card()?;
1908 let did = card
1909 .get("did")
1910 .and_then(Value::as_str)
1911 .unwrap_or("")
1912 .to_string();
1913 let handle = card
1914 .get("handle")
1915 .and_then(Value::as_str)
1916 .map(str::to_string)
1917 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1918 let pk_b64 = card
1919 .get("verify_keys")
1920 .and_then(Value::as_object)
1921 .and_then(|m| m.values().next())
1922 .and_then(|v| v.get("key"))
1923 .and_then(Value::as_str)
1924 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1925 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1926 let fp = fingerprint(&pk_bytes);
1927 let key_id = make_key_id(&handle, &pk_bytes);
1928 let capabilities = card
1929 .get("capabilities")
1930 .cloned()
1931 .unwrap_or_else(|| json!(["wire/v3.1"]));
1932
1933 if as_json {
1934 println!(
1935 "{}",
1936 serde_json::to_string(&json!({
1937 "did": did,
1938 "handle": handle,
1939 "fingerprint": fp,
1940 "key_id": key_id,
1941 "public_key_b64": pk_b64,
1942 "capabilities": capabilities,
1943 "config_dir": config::config_dir()?.to_string_lossy(),
1944 }))?
1945 );
1946 } else {
1947 println!("{did} (ed25519:{key_id})");
1948 println!("fingerprint: {fp}");
1949 println!("capabilities: {capabilities}");
1950 }
1951 Ok(())
1952}
1953
1954fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1969 let raw = crate::trust::get_tier(trust, handle);
1970 if raw != "VERIFIED" {
1971 return raw.to_string();
1972 }
1973 let token = relay_state
1974 .get("peers")
1975 .and_then(|p| p.get(handle))
1976 .and_then(|p| p.get("slot_token"))
1977 .and_then(Value::as_str)
1978 .unwrap_or("");
1979 if token.is_empty() {
1980 "PENDING_ACK".to_string()
1981 } else {
1982 raw.to_string()
1983 }
1984}
1985
1986fn cmd_peers(as_json: bool) -> Result<()> {
1987 let trust = config::read_trust()?;
1988 let agents = trust
1989 .get("agents")
1990 .and_then(Value::as_object)
1991 .cloned()
1992 .unwrap_or_default();
1993 let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1994
1995 let mut self_did: Option<String> = None;
1996 if let Ok(card) = config::read_agent_card() {
1997 self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1998 }
1999
2000 let mut peers = Vec::new();
2001 for (handle, agent) in agents.iter() {
2002 let did = agent
2003 .get("did")
2004 .and_then(Value::as_str)
2005 .unwrap_or("")
2006 .to_string();
2007 if Some(did.as_str()) == self_did.as_deref() {
2008 continue; }
2010 let tier = effective_peer_tier(&trust, &relay_state, handle);
2011 let capabilities = agent
2012 .get("card")
2013 .and_then(|c| c.get("capabilities"))
2014 .cloned()
2015 .unwrap_or_else(|| json!([]));
2016 peers.push(json!({
2017 "handle": handle,
2018 "did": did,
2019 "tier": tier,
2020 "capabilities": capabilities,
2021 }));
2022 }
2023
2024 if as_json {
2025 println!("{}", serde_json::to_string(&peers)?);
2026 } else if peers.is_empty() {
2027 println!("no peers pinned (run `wire join <code>` to pair)");
2028 } else {
2029 for p in &peers {
2030 println!(
2031 "{:<20} {:<10} {}",
2032 p["handle"].as_str().unwrap_or(""),
2033 p["tier"].as_str().unwrap_or(""),
2034 p["did"].as_str().unwrap_or(""),
2035 );
2036 }
2037 }
2038 Ok(())
2039}
2040
2041fn maybe_warn_peer_attentiveness(peer: &str) {
2051 let state = match config::read_relay_state() {
2052 Ok(s) => s,
2053 Err(_) => return,
2054 };
2055 let p = state.get("peers").and_then(|p| p.get(peer));
2056 let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
2057 Some(s) if !s.is_empty() => s,
2058 _ => return,
2059 };
2060 let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
2061 Some(s) if !s.is_empty() => s,
2062 _ => return,
2063 };
2064 let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
2065 Some(s) if !s.is_empty() => s.to_string(),
2066 _ => match state
2067 .get("self")
2068 .and_then(|s| s.get("relay_url"))
2069 .and_then(Value::as_str)
2070 {
2071 Some(s) if !s.is_empty() => s.to_string(),
2072 _ => return,
2073 },
2074 };
2075 let client = crate::relay_client::RelayClient::new(&relay_url);
2076 let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
2077 Ok(t) => t,
2078 Err(_) => return,
2079 };
2080 let now = std::time::SystemTime::now()
2081 .duration_since(std::time::UNIX_EPOCH)
2082 .map(|d| d.as_secs())
2083 .unwrap_or(0);
2084 match last_pull {
2085 None => {
2086 eprintln!(
2087 "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
2088 );
2089 }
2090 Some(t) if now.saturating_sub(t) > 300 => {
2091 let mins = now.saturating_sub(t) / 60;
2092 eprintln!(
2093 "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
2094 );
2095 }
2096 _ => {}
2097 }
2098}
2099
2100pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
2101 let trimmed = input.trim();
2102 if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
2103 {
2104 return Ok(trimmed.to_string());
2105 }
2106 let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
2107 let n: i64 = amount
2108 .parse()
2109 .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
2110 if n <= 0 {
2111 bail!("deadline duration must be positive: {input:?}");
2112 }
2113 let duration = match unit {
2114 "m" => time::Duration::minutes(n),
2115 "h" => time::Duration::hours(n),
2116 "d" => time::Duration::days(n),
2117 _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
2118 };
2119 Ok((time::OffsetDateTime::now_utc() + duration)
2120 .format(&time::format_description::well_known::Rfc3339)
2121 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
2122}
2123
2124fn cmd_send(
2125 peer: &str,
2126 kind: &str,
2127 body_arg: &str,
2128 deadline: Option<&str>,
2129 as_json: bool,
2130) -> Result<()> {
2131 if !config::is_initialized()? {
2132 bail!("not initialized — run `wire init <handle>` first");
2133 }
2134 let peer = crate::agent_card::bare_handle(peer);
2135 let sk_seed = config::read_private_key()?;
2136 let card = config::read_agent_card()?;
2137 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2138 let handle = crate::agent_card::display_handle_from_did(did).to_string();
2139 let pk_b64 = card
2140 .get("verify_keys")
2141 .and_then(Value::as_object)
2142 .and_then(|m| m.values().next())
2143 .and_then(|v| v.get("key"))
2144 .and_then(Value::as_str)
2145 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
2146 let pk_bytes = crate::signing::b64decode(pk_b64)?;
2147
2148 let body_value: Value = if body_arg == "-" {
2153 use std::io::Read;
2154 let mut raw = String::new();
2155 std::io::stdin()
2156 .read_to_string(&mut raw)
2157 .with_context(|| "reading body from stdin")?;
2158 serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
2161 } else if let Some(path) = body_arg.strip_prefix('@') {
2162 let raw =
2163 std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
2164 serde_json::from_str(&raw).unwrap_or(Value::String(raw))
2165 } else {
2166 Value::String(body_arg.to_string())
2167 };
2168
2169 let kind_id = parse_kind(kind)?;
2170
2171 let now = time::OffsetDateTime::now_utc()
2172 .format(&time::format_description::well_known::Rfc3339)
2173 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
2174
2175 let mut event = json!({
2176 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
2177 "timestamp": now,
2178 "from": did,
2179 "to": format!("did:wire:{peer}"),
2180 "type": kind,
2181 "kind": kind_id,
2182 "body": body_value,
2183 });
2184 if let Some(deadline) = deadline {
2185 event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
2186 }
2187 let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
2188 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
2189
2190 maybe_warn_peer_attentiveness(peer);
2195
2196 let line = serde_json::to_vec(&signed)?;
2201 let outbox = config::append_outbox_record(peer, &line)?;
2202
2203 if as_json {
2204 println!(
2205 "{}",
2206 serde_json::to_string(&json!({
2207 "event_id": event_id,
2208 "status": "queued",
2209 "peer": peer,
2210 "outbox": outbox.to_string_lossy(),
2211 }))?
2212 );
2213 } else {
2214 println!(
2215 "queued event {event_id} → {peer} (outbox: {})",
2216 outbox.display()
2217 );
2218 }
2219 Ok(())
2220}
2221
2222fn parse_kind(s: &str) -> Result<u32> {
2223 if let Ok(n) = s.parse::<u32>() {
2224 return Ok(n);
2225 }
2226 for (id, name) in crate::signing::kinds() {
2227 if *name == s {
2228 return Ok(*id);
2229 }
2230 }
2231 Ok(1)
2233}
2234
2235fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
2238 let inbox = config::inbox_dir()?;
2239 if !inbox.exists() {
2240 if !as_json {
2241 eprintln!("no inbox yet — daemon hasn't run, or no events received");
2242 }
2243 return Ok(());
2244 }
2245 let trust = config::read_trust()?;
2246 let mut count = 0usize;
2247
2248 let entries: Vec<_> = std::fs::read_dir(&inbox)?
2249 .filter_map(|e| e.ok())
2250 .map(|e| e.path())
2251 .filter(|p| {
2252 p.extension().map(|x| x == "jsonl").unwrap_or(false)
2253 && match peer {
2254 Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
2255 None => true,
2256 }
2257 })
2258 .collect();
2259
2260 for path in entries {
2261 let body = std::fs::read_to_string(&path)?;
2262 for line in body.lines() {
2263 let event: Value = match serde_json::from_str(line) {
2264 Ok(v) => v,
2265 Err(_) => continue,
2266 };
2267 let verified = verify_message_v31(&event, &trust).is_ok();
2268 if as_json {
2269 let mut event_with_meta = event.clone();
2270 if let Some(obj) = event_with_meta.as_object_mut() {
2271 obj.insert("verified".into(), json!(verified));
2272 }
2273 println!("{}", serde_json::to_string(&event_with_meta)?);
2274 } else {
2275 let ts = event
2276 .get("timestamp")
2277 .and_then(Value::as_str)
2278 .unwrap_or("?");
2279 let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
2280 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
2281 let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
2282 let summary = event
2283 .get("body")
2284 .map(|b| match b {
2285 Value::String(s) => s.clone(),
2286 _ => b.to_string(),
2287 })
2288 .unwrap_or_default();
2289 let mark = if verified { "✓" } else { "✗" };
2290 let deadline = event
2291 .get("time_sensitive_until")
2292 .and_then(Value::as_str)
2293 .map(|d| format!(" deadline: {d}"))
2294 .unwrap_or_default();
2295 println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
2296 }
2297 count += 1;
2298 if limit > 0 && count >= limit {
2299 return Ok(());
2300 }
2301 }
2302 }
2303 Ok(())
2304}
2305
2306fn monitor_is_noise_kind(kind: &str) -> bool {
2312 matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
2313}
2314
2315fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
2319 if as_json {
2320 Ok(serde_json::to_string(e)?)
2321 } else {
2322 let eid_short: String = e.event_id.chars().take(12).collect();
2323 let body = e.body_preview.replace('\n', " ");
2324 let ts: String = e.timestamp.chars().take(19).collect();
2325 Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
2326 }
2327}
2328
2329fn cmd_monitor(
2345 peer_filter: Option<&str>,
2346 as_json: bool,
2347 include_handshake: bool,
2348 interval_ms: u64,
2349 replay: usize,
2350) -> Result<()> {
2351 let inbox_dir = config::inbox_dir()?;
2352 if !inbox_dir.exists() && !as_json {
2353 eprintln!("wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?");
2354 }
2355 if replay > 0 && inbox_dir.exists() {
2361 let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
2362 for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
2363 let path = entry.path();
2364 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2365 continue;
2366 }
2367 let peer = match path.file_stem().and_then(|s| s.to_str()) {
2368 Some(s) => s.to_string(),
2369 None => continue,
2370 };
2371 if let Some(filter) = peer_filter
2372 && peer != filter
2373 {
2374 continue;
2375 }
2376 let body = std::fs::read_to_string(&path).unwrap_or_default();
2377 for line in body.lines() {
2378 let line = line.trim();
2379 if line.is_empty() {
2380 continue;
2381 }
2382 let signed: Value = match serde_json::from_str(line) {
2383 Ok(v) => v,
2384 Err(_) => continue,
2385 };
2386 let ev = crate::inbox_watch::InboxEvent::from_signed(
2387 &peer, signed, true,
2388 );
2389 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2390 continue;
2391 }
2392 all.push(ev);
2393 }
2394 }
2395 all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2398 let start = all.len().saturating_sub(replay);
2399 for ev in &all[start..] {
2400 println!("{}", monitor_render(ev, as_json)?);
2401 }
2402 use std::io::Write;
2403 std::io::stdout().flush().ok();
2404 }
2405
2406 let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2409 let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2410
2411 loop {
2412 let events = w.poll()?;
2413 let mut wrote = false;
2414 for ev in events {
2415 if let Some(filter) = peer_filter
2416 && ev.peer != filter
2417 {
2418 continue;
2419 }
2420 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2421 continue;
2422 }
2423 println!("{}", monitor_render(&ev, as_json)?);
2424 wrote = true;
2425 }
2426 if wrote {
2427 use std::io::Write;
2428 std::io::stdout().flush().ok();
2429 }
2430 std::thread::sleep(sleep_dur);
2431 }
2432}
2433
2434#[cfg(test)]
2435mod tier_tests {
2436 use super::*;
2437 use serde_json::json;
2438
2439 fn trust_with(handle: &str, tier: &str) -> Value {
2440 json!({
2441 "version": 1,
2442 "agents": {
2443 handle: {
2444 "tier": tier,
2445 "did": format!("did:wire:{handle}"),
2446 "card": {"capabilities": ["wire/v3.1"]}
2447 }
2448 }
2449 })
2450 }
2451
2452 #[test]
2453 fn pending_ack_when_verified_but_no_slot_token() {
2454 let trust = trust_with("willard", "VERIFIED");
2458 let relay_state = json!({
2459 "peers": {
2460 "willard": {
2461 "relay_url": "https://relay",
2462 "slot_id": "abc",
2463 "slot_token": "",
2464 }
2465 }
2466 });
2467 assert_eq!(
2468 effective_peer_tier(&trust, &relay_state, "willard"),
2469 "PENDING_ACK"
2470 );
2471 }
2472
2473 #[test]
2474 fn verified_when_slot_token_present() {
2475 let trust = trust_with("willard", "VERIFIED");
2476 let relay_state = json!({
2477 "peers": {
2478 "willard": {
2479 "relay_url": "https://relay",
2480 "slot_id": "abc",
2481 "slot_token": "tok123",
2482 }
2483 }
2484 });
2485 assert_eq!(
2486 effective_peer_tier(&trust, &relay_state, "willard"),
2487 "VERIFIED"
2488 );
2489 }
2490
2491 #[test]
2492 fn raw_tier_passes_through_for_non_verified() {
2493 let trust = trust_with("willard", "UNTRUSTED");
2496 let relay_state = json!({
2497 "peers": {"willard": {"slot_token": ""}}
2498 });
2499 assert_eq!(
2500 effective_peer_tier(&trust, &relay_state, "willard"),
2501 "UNTRUSTED"
2502 );
2503 }
2504
2505 #[test]
2506 fn pending_ack_when_relay_state_missing_peer() {
2507 let trust = trust_with("willard", "VERIFIED");
2511 let relay_state = json!({"peers": {}});
2512 assert_eq!(
2513 effective_peer_tier(&trust, &relay_state, "willard"),
2514 "PENDING_ACK"
2515 );
2516 }
2517}
2518
2519#[cfg(test)]
2520mod monitor_tests {
2521 use super::*;
2522 use crate::inbox_watch::InboxEvent;
2523 use serde_json::Value;
2524
2525 fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2526 InboxEvent {
2527 peer: peer.to_string(),
2528 event_id: "abcd1234567890ef".to_string(),
2529 kind: kind.to_string(),
2530 body_preview: body.to_string(),
2531 verified: true,
2532 timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2533 raw: Value::Null,
2534 }
2535 }
2536
2537 #[test]
2538 fn monitor_filter_drops_handshake_kinds_by_default() {
2539 assert!(monitor_is_noise_kind("pair_drop"));
2544 assert!(monitor_is_noise_kind("pair_drop_ack"));
2545 assert!(monitor_is_noise_kind("heartbeat"));
2546
2547 assert!(!monitor_is_noise_kind("claim"));
2549 assert!(!monitor_is_noise_kind("decision"));
2550 assert!(!monitor_is_noise_kind("ack"));
2551 assert!(!monitor_is_noise_kind("request"));
2552 assert!(!monitor_is_noise_kind("note"));
2553 assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2557 }
2558
2559 #[test]
2560 fn monitor_render_plain_is_one_short_line() {
2561 let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2562 let line = monitor_render(&e, false).unwrap();
2563 assert!(!line.contains('\n'), "render must be one line: {line}");
2565 assert!(line.contains("willard"));
2567 assert!(line.contains("claim"));
2568 assert!(line.contains("real v8 train"));
2569 assert!(line.contains("abcd12345678"));
2571 assert!(
2572 !line.contains("abcd1234567890ef"),
2573 "should truncate full id"
2574 );
2575 assert!(line.contains("2026-05-15T23:14:07"));
2577 }
2578
2579 #[test]
2580 fn monitor_render_strips_newlines_from_body() {
2581 let e = ev("spark", "claim", "line one\nline two\nline three");
2586 let line = monitor_render(&e, false).unwrap();
2587 assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2588 assert!(line.contains("line one line two line three"));
2589 }
2590
2591 #[test]
2592 fn monitor_render_json_is_valid_jsonl() {
2593 let e = ev("spark", "claim", "hi");
2594 let line = monitor_render(&e, true).unwrap();
2595 assert!(!line.contains('\n'));
2596 let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2597 assert_eq!(parsed["peer"], "spark");
2598 assert_eq!(parsed["kind"], "claim");
2599 assert_eq!(parsed["body_preview"], "hi");
2600 }
2601
2602 #[test]
2603 fn monitor_does_not_drop_on_verified_null() {
2604 let mut e = ev("spark", "claim", "from disk with verified=null");
2615 e.verified = false; let line = monitor_render(&e, false).unwrap();
2617 assert!(line.contains("from disk with verified=null"));
2618 assert!(!monitor_is_noise_kind("claim"));
2620 }
2621}
2622
2623fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2626 let body = if path == "-" {
2627 let mut buf = String::new();
2628 use std::io::Read;
2629 std::io::stdin().read_to_string(&mut buf)?;
2630 buf
2631 } else {
2632 std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2633 };
2634 let event: Value = serde_json::from_str(&body)?;
2635 let trust = config::read_trust()?;
2636 match verify_message_v31(&event, &trust) {
2637 Ok(()) => {
2638 if as_json {
2639 println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2640 } else {
2641 println!("verified ✓");
2642 }
2643 Ok(())
2644 }
2645 Err(e) => {
2646 let reason = e.to_string();
2647 if as_json {
2648 println!(
2649 "{}",
2650 serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2651 );
2652 } else {
2653 eprintln!("FAILED: {reason}");
2654 }
2655 std::process::exit(1);
2656 }
2657 }
2658}
2659
2660fn cmd_mcp() -> Result<()> {
2663 crate::mcp::run()
2664}
2665
2666fn cmd_relay_server(bind: &str, local_only: bool) -> Result<()> {
2667 if local_only {
2671 validate_loopback_bind(bind)?;
2672 }
2673 let base = if let Ok(home) = std::env::var("WIRE_HOME") {
2679 std::path::PathBuf::from(home)
2680 .join("state")
2681 .join("wire-relay")
2682 } else {
2683 dirs::state_dir()
2684 .or_else(dirs::data_local_dir)
2685 .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2686 .join("wire-relay")
2687 };
2688 let state_dir = if local_only { base.join("local") } else { base };
2689 let runtime = tokio::runtime::Builder::new_multi_thread()
2690 .enable_all()
2691 .build()?;
2692 runtime.block_on(crate::relay_server::serve_with_mode(
2693 bind,
2694 state_dir,
2695 crate::relay_server::ServerMode { local_only },
2696 ))
2697}
2698
2699fn validate_loopback_bind(bind: &str) -> Result<()> {
2705 let host = if let Some(stripped) = bind.strip_prefix('[') {
2707 let close = stripped
2708 .find(']')
2709 .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
2710 stripped[..close].to_string()
2711 } else {
2712 bind.rsplit_once(':')
2713 .map(|(h, _)| h.to_string())
2714 .unwrap_or_else(|| bind.to_string())
2715 };
2716 use std::net::ToSocketAddrs;
2717 let probe = format!("{host}:0");
2718 let resolved: Vec<_> = probe
2719 .to_socket_addrs()
2720 .with_context(|| format!("resolving bind host {host:?}"))?
2721 .collect();
2722 if resolved.is_empty() {
2723 bail!("--local-only: bind host {host:?} resolved to no addresses");
2724 }
2725 for addr in &resolved {
2726 if !addr.ip().is_loopback() {
2727 bail!(
2728 "--local-only refuses non-loopback bind: {host:?} resolves to {} \
2729 which is not in 127.0.0.0/8 or [::1]. Remove --local-only to bind \
2730 publicly, or use 127.0.0.1 / [::1] / localhost.",
2731 addr.ip()
2732 );
2733 }
2734 }
2735 Ok(())
2736}
2737
2738fn cmd_bind_relay(url: &str, migrate_pinned: bool, as_json: bool) -> Result<()> {
2741 if !config::is_initialized()? {
2742 bail!("not initialized — run `wire init <handle>` first");
2743 }
2744 let card = config::read_agent_card()?;
2745 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2746 let handle = crate::agent_card::display_handle_from_did(did).to_string();
2747
2748 let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
2755 let pinned: Vec<String> = existing
2756 .get("peers")
2757 .and_then(|p| p.as_object())
2758 .map(|o| o.keys().cloned().collect())
2759 .unwrap_or_default();
2760 if !pinned.is_empty() && !migrate_pinned {
2761 let list = pinned.join(", ");
2762 bail!(
2763 "bind-relay would silently black-hole {n} pinned peer(s): {list}. \
2764 They are pinned to your CURRENT slot; without coordination they will keep \
2765 pushing to a slot you no longer read.\n\n\
2766 SAFE PATHS:\n\
2767 • `wire rotate-slot` — rotates slot on the SAME relay and emits a \
2768 wire_close event to every pinned peer so their daemons drop the stale \
2769 coords cleanly. This is the supported migration path.\n\
2770 • `wire bind-relay {url} --migrate-pinned` — acknowledges that pinned \
2771 peers will need to re-pin manually (you must notify them out-of-band, \
2772 via a fresh `wire add` from each peer or a re-shared invite). Use this \
2773 only when the current slot is unreachable so rotate-slot can't ack.\n\n\
2774 Issue #7 (silent black-hole on relay change) caught this — proceed only \
2775 if you understand the consequences.",
2776 n = pinned.len(),
2777 );
2778 }
2779
2780 let normalized = url.trim_end_matches('/');
2781 let client = crate::relay_client::RelayClient::new(normalized);
2782 client.check_healthz()?;
2783 let alloc = client.allocate_slot(Some(&handle))?;
2784 let mut state = existing;
2785 if !pinned.is_empty() {
2786 eprintln!(
2790 "wire bind-relay: migrating with {n} pinned peer(s) — they will black-hole \
2791 until they re-pin: {peers}",
2792 n = pinned.len(),
2793 peers = pinned.join(", "),
2794 );
2795 }
2796 state["self"] = json!({
2797 "relay_url": url,
2798 "slot_id": alloc.slot_id,
2799 "slot_token": alloc.slot_token,
2800 });
2801 config::write_relay_state(&state)?;
2802
2803 if as_json {
2804 println!(
2805 "{}",
2806 serde_json::to_string(&json!({
2807 "relay_url": url,
2808 "slot_id": alloc.slot_id,
2809 "slot_token_present": true,
2810 }))?
2811 );
2812 } else {
2813 println!("bound to relay {url}");
2814 println!("slot_id: {}", alloc.slot_id);
2815 println!(
2816 "(slot_token written to {} mode 0600)",
2817 config::relay_state_path()?.display()
2818 );
2819 }
2820 Ok(())
2821}
2822
2823fn cmd_add_peer_slot(
2826 handle: &str,
2827 url: &str,
2828 slot_id: &str,
2829 slot_token: &str,
2830 as_json: bool,
2831) -> Result<()> {
2832 let mut state = config::read_relay_state()?;
2833 let peers = state["peers"]
2834 .as_object_mut()
2835 .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2836 peers.insert(
2837 handle.to_string(),
2838 json!({
2839 "relay_url": url,
2840 "slot_id": slot_id,
2841 "slot_token": slot_token,
2842 }),
2843 );
2844 config::write_relay_state(&state)?;
2845 if as_json {
2846 println!(
2847 "{}",
2848 serde_json::to_string(&json!({
2849 "handle": handle,
2850 "relay_url": url,
2851 "slot_id": slot_id,
2852 "added": true,
2853 }))?
2854 );
2855 } else {
2856 println!("pinned peer slot for {handle} at {url} ({slot_id})");
2857 }
2858 Ok(())
2859}
2860
2861fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2864 let state = config::read_relay_state()?;
2865 let peers = state["peers"].as_object().cloned().unwrap_or_default();
2866 if peers.is_empty() {
2867 bail!(
2868 "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2869 );
2870 }
2871 let outbox_dir = config::outbox_dir()?;
2872 if outbox_dir.exists() {
2877 let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2878 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2879 let path = entry.path();
2880 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2881 continue;
2882 }
2883 let stem = match path.file_stem().and_then(|s| s.to_str()) {
2884 Some(s) => s.to_string(),
2885 None => continue,
2886 };
2887 if pinned.contains(&stem) {
2888 continue;
2889 }
2890 let bare = crate::agent_card::bare_handle(&stem);
2893 if pinned.contains(bare) {
2894 eprintln!(
2895 "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2896 Merge with: `cat {} >> {}` then delete the FQDN file.",
2897 stem,
2898 path.display(),
2899 outbox_dir.join(format!("{bare}.jsonl")).display(),
2900 );
2901 }
2902 }
2903 }
2904 if !outbox_dir.exists() {
2905 if as_json {
2906 println!(
2907 "{}",
2908 serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2909 );
2910 } else {
2911 println!("phyllis: nothing to dial out — write a message first with `wire send`");
2912 }
2913 return Ok(());
2914 }
2915
2916 let mut pushed = Vec::new();
2917 let mut skipped = Vec::new();
2918
2919 for (peer_handle, _) in peers.iter() {
2925 if let Some(want) = peer_filter
2926 && peer_handle != want
2927 {
2928 continue;
2929 }
2930 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2931 if !outbox.exists() {
2932 continue;
2933 }
2934 let ordered_endpoints =
2935 crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
2936 if ordered_endpoints.is_empty() {
2937 for line in std::fs::read_to_string(&outbox).unwrap_or_default().lines() {
2941 let event: Value = match serde_json::from_str(line) {
2942 Ok(v) => v,
2943 Err(_) => continue,
2944 };
2945 let event_id = event
2946 .get("event_id")
2947 .and_then(Value::as_str)
2948 .unwrap_or("")
2949 .to_string();
2950 skipped.push(json!({
2951 "peer": peer_handle,
2952 "event_id": event_id,
2953 "reason": "no reachable endpoint pinned for peer",
2954 }));
2955 }
2956 continue;
2957 }
2958 let body = std::fs::read_to_string(&outbox)?;
2959 for line in body.lines() {
2960 let event: Value = match serde_json::from_str(line) {
2961 Ok(v) => v,
2962 Err(_) => continue,
2963 };
2964 let event_id = event
2965 .get("event_id")
2966 .and_then(Value::as_str)
2967 .unwrap_or("")
2968 .to_string();
2969
2970 let mut delivered = false;
2971 let mut last_err_reason: Option<String> = None;
2972 for endpoint in &ordered_endpoints {
2973 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2974 match client.post_event(&endpoint.slot_id, &endpoint.slot_token, &event) {
2975 Ok(resp) => {
2976 if resp.status == "duplicate" {
2977 skipped.push(json!({
2978 "peer": peer_handle,
2979 "event_id": event_id,
2980 "reason": "duplicate",
2981 "endpoint": endpoint.relay_url,
2982 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2983 }));
2984 } else {
2985 pushed.push(json!({
2986 "peer": peer_handle,
2987 "event_id": event_id,
2988 "endpoint": endpoint.relay_url,
2989 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2990 }));
2991 }
2992 delivered = true;
2993 break;
2994 }
2995 Err(e) => {
2996 last_err_reason = Some(crate::relay_client::format_transport_error(&e));
3001 }
3002 }
3003 }
3004 if !delivered {
3005 skipped.push(json!({
3006 "peer": peer_handle,
3007 "event_id": event_id,
3008 "reason": last_err_reason.unwrap_or_else(|| "all endpoints failed".to_string()),
3009 }));
3010 }
3011 }
3012 }
3013
3014 if as_json {
3015 println!(
3016 "{}",
3017 serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
3018 );
3019 } else {
3020 println!(
3021 "pushed {} event(s); skipped {} ({})",
3022 pushed.len(),
3023 skipped.len(),
3024 if skipped.is_empty() {
3025 "none"
3026 } else {
3027 "see --json for detail"
3028 }
3029 );
3030 }
3031 Ok(())
3032}
3033
3034fn cmd_pull(as_json: bool) -> Result<()> {
3037 let state = config::read_relay_state()?;
3038 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3039 if self_state.is_null() {
3040 bail!("self slot not bound — run `wire bind-relay <url>` first");
3041 }
3042
3043 let endpoints = crate::endpoints::self_endpoints(&state);
3052 if endpoints.is_empty() {
3053 bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
3054 }
3055
3056 let inbox_dir = config::inbox_dir()?;
3057 config::ensure_dirs()?;
3058
3059 let mut total_seen = 0usize;
3060 let mut all_written: Vec<Value> = Vec::new();
3061 let mut all_rejected: Vec<Value> = Vec::new();
3062 let mut all_blocked = false;
3063 let mut all_advance_cursor_to: Option<String> = None;
3064
3065 for endpoint in &endpoints {
3066 let cursor_key = endpoint_cursor_key(endpoint.scope);
3067 let last_event_id = self_state
3068 .get(&cursor_key)
3069 .and_then(Value::as_str)
3070 .map(str::to_string);
3071 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
3072 let events = match client.list_events(
3073 &endpoint.slot_id,
3074 &endpoint.slot_token,
3075 last_event_id.as_deref(),
3076 Some(1000),
3077 ) {
3078 Ok(ev) => ev,
3079 Err(e) => {
3080 eprintln!(
3084 "wire pull: endpoint {} ({:?}) errored: {}; continuing",
3085 endpoint.relay_url,
3086 endpoint.scope,
3087 crate::relay_client::format_transport_error(&e),
3088 );
3089 continue;
3090 }
3091 };
3092 total_seen += events.len();
3093 let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
3094 all_written.extend(result.written.iter().cloned());
3095 all_rejected.extend(result.rejected.iter().cloned());
3096 if result.blocked {
3097 all_blocked = true;
3098 }
3099 if let Some(eid) = result.advance_cursor_to.clone() {
3102 if endpoint.scope == crate::endpoints::EndpointScope::Federation {
3103 all_advance_cursor_to = Some(eid.clone());
3104 }
3105 let key = cursor_key.clone();
3106 config::update_relay_state(|state| {
3107 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3108 self_obj.insert(key, Value::String(eid));
3109 }
3110 Ok(())
3111 })?;
3112 }
3113 }
3114
3115 let result = crate::pull::PullResult {
3120 written: all_written,
3121 rejected: all_rejected,
3122 blocked: all_blocked,
3123 advance_cursor_to: all_advance_cursor_to,
3124 };
3125 let events_len = total_seen;
3126
3127 if as_json {
3131 println!(
3132 "{}",
3133 serde_json::to_string(&json!({
3134 "written": result.written,
3135 "rejected": result.rejected,
3136 "total_seen": events_len,
3137 "cursor_blocked": result.blocked,
3138 "cursor_advanced_to": result.advance_cursor_to,
3139 }))?
3140 );
3141 } else {
3142 let blocking = result
3143 .rejected
3144 .iter()
3145 .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
3146 .count();
3147 if blocking > 0 {
3148 println!(
3149 "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
3150 events_len,
3151 result.written.len(),
3152 result.rejected.len(),
3153 blocking,
3154 );
3155 } else {
3156 println!(
3157 "pulled {} event(s); wrote {}; rejected {}",
3158 events_len,
3159 result.written.len(),
3160 result.rejected.len(),
3161 );
3162 }
3163 }
3164 Ok(())
3165}
3166
3167fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
3172 match scope {
3173 crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
3174 crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
3175 }
3176}
3177
3178fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
3181 if !config::is_initialized()? {
3182 bail!("not initialized — run `wire init <handle>` first");
3183 }
3184 let mut state = config::read_relay_state()?;
3185 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3186 if self_state.is_null() {
3187 bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
3188 }
3189 let url = self_state["relay_url"]
3190 .as_str()
3191 .ok_or_else(|| anyhow!("self.relay_url missing"))?
3192 .to_string();
3193 let old_slot_id = self_state["slot_id"]
3194 .as_str()
3195 .ok_or_else(|| anyhow!("self.slot_id missing"))?
3196 .to_string();
3197 let old_slot_token = self_state["slot_token"]
3198 .as_str()
3199 .ok_or_else(|| anyhow!("self.slot_token missing"))?
3200 .to_string();
3201
3202 let card = config::read_agent_card()?;
3204 let did = card
3205 .get("did")
3206 .and_then(Value::as_str)
3207 .unwrap_or("")
3208 .to_string();
3209 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
3210 let pk_b64 = card
3211 .get("verify_keys")
3212 .and_then(Value::as_object)
3213 .and_then(|m| m.values().next())
3214 .and_then(|v| v.get("key"))
3215 .and_then(Value::as_str)
3216 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
3217 .to_string();
3218 let pk_bytes = crate::signing::b64decode(&pk_b64)?;
3219 let sk_seed = config::read_private_key()?;
3220
3221 let normalized = url.trim_end_matches('/').to_string();
3223 let client = crate::relay_client::RelayClient::new(&normalized);
3224 client
3225 .check_healthz()
3226 .context("aborting rotation; old slot still valid")?;
3227 let alloc = client.allocate_slot(Some(&handle))?;
3228 let new_slot_id = alloc.slot_id.clone();
3229 let new_slot_token = alloc.slot_token.clone();
3230
3231 let mut announced: Vec<String> = Vec::new();
3238 if !no_announce {
3239 let now = time::OffsetDateTime::now_utc()
3240 .format(&time::format_description::well_known::Rfc3339)
3241 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
3242 let body = json!({
3243 "reason": "operator-initiated slot rotation",
3244 "new_relay_url": url,
3245 "new_slot_id": new_slot_id,
3246 });
3250 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3251 for (peer_handle, _peer_info) in peers.iter() {
3252 let event = json!({
3253 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3254 "timestamp": now.clone(),
3255 "from": did,
3256 "to": format!("did:wire:{peer_handle}"),
3257 "type": "wire_close",
3258 "kind": 1201,
3259 "body": body.clone(),
3260 });
3261 let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
3262 Ok(s) => s,
3263 Err(e) => {
3264 eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
3265 continue;
3266 }
3267 };
3268 let peer_info = match state["peers"].get(peer_handle) {
3273 Some(p) => p.clone(),
3274 None => continue,
3275 };
3276 let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
3277 let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
3278 let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
3279 if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
3280 continue;
3281 }
3282 let peer_client = if peer_url == url {
3283 client.clone()
3284 } else {
3285 crate::relay_client::RelayClient::new(peer_url)
3286 };
3287 match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
3288 Ok(_) => announced.push(peer_handle.clone()),
3289 Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
3290 }
3291 }
3292 }
3293
3294 state["self"] = json!({
3296 "relay_url": url,
3297 "slot_id": new_slot_id,
3298 "slot_token": new_slot_token,
3299 });
3300 config::write_relay_state(&state)?;
3301
3302 if as_json {
3303 println!(
3304 "{}",
3305 serde_json::to_string(&json!({
3306 "rotated": true,
3307 "old_slot_id": old_slot_id,
3308 "new_slot_id": new_slot_id,
3309 "relay_url": url,
3310 "announced_to": announced,
3311 }))?
3312 );
3313 } else {
3314 println!("rotated slot on {url}");
3315 println!(
3316 " old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
3317 );
3318 println!(" new slot_id: {new_slot_id}");
3319 if !announced.is_empty() {
3320 println!(
3321 " announced wire_close (kind=1201) to: {}",
3322 announced.join(", ")
3323 );
3324 }
3325 println!();
3326 println!("next steps:");
3327 println!(" - peers see the wire_close event in their next `wire pull`");
3328 println!(
3329 " - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
3330 );
3331 println!(" (or full re-pair via `wire pair-host`/`wire join`)");
3332 println!(" - until they do, you'll receive but they won't be able to reach you");
3333 let _ = old_slot_token;
3335 }
3336 Ok(())
3337}
3338
3339fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
3342 let mut trust = config::read_trust()?;
3343 let mut removed_from_trust = false;
3344 if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
3345 && agents.remove(handle).is_some()
3346 {
3347 removed_from_trust = true;
3348 }
3349 config::write_trust(&trust)?;
3350
3351 let mut state = config::read_relay_state()?;
3352 let mut removed_from_relay = false;
3353 if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
3354 && peers.remove(handle).is_some()
3355 {
3356 removed_from_relay = true;
3357 }
3358 config::write_relay_state(&state)?;
3359
3360 let mut purged: Vec<String> = Vec::new();
3361 if purge {
3362 for dir in [config::inbox_dir()?, config::outbox_dir()?] {
3363 let path = dir.join(format!("{handle}.jsonl"));
3364 if path.exists() {
3365 std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
3366 purged.push(path.to_string_lossy().into());
3367 }
3368 }
3369 }
3370
3371 if !removed_from_trust && !removed_from_relay {
3372 if as_json {
3373 println!(
3374 "{}",
3375 serde_json::to_string(&json!({
3376 "removed": false,
3377 "reason": format!("peer {handle:?} not pinned"),
3378 }))?
3379 );
3380 } else {
3381 eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
3382 }
3383 return Ok(());
3384 }
3385
3386 if as_json {
3387 println!(
3388 "{}",
3389 serde_json::to_string(&json!({
3390 "handle": handle,
3391 "removed_from_trust": removed_from_trust,
3392 "removed_from_relay_state": removed_from_relay,
3393 "purged_files": purged,
3394 }))?
3395 );
3396 } else {
3397 println!("forgot peer {handle:?}");
3398 if removed_from_trust {
3399 println!(" - removed from trust.json");
3400 }
3401 if removed_from_relay {
3402 println!(" - removed from relay.json");
3403 }
3404 if !purged.is_empty() {
3405 for p in &purged {
3406 println!(" - deleted {p}");
3407 }
3408 } else if !purge {
3409 println!(" (inbox/outbox files preserved; pass --purge to delete them)");
3410 }
3411 }
3412 Ok(())
3413}
3414
3415fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
3418 if !config::is_initialized()? {
3419 bail!("not initialized — run `wire init <handle>` first");
3420 }
3421 let interval = std::time::Duration::from_secs(interval_secs.max(1));
3422
3423 if !as_json {
3424 if once {
3425 eprintln!("wire daemon: single sync cycle, then exit");
3426 } else {
3427 eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
3428 }
3429 }
3430
3431 if let Err(e) = crate::pending_pair::cleanup_on_startup() {
3435 eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
3436 }
3437
3438 let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
3444 if !once {
3445 crate::daemon_stream::spawn_stream_subscriber(wake_tx);
3446 }
3447
3448 loop {
3449 let pushed = run_sync_push().unwrap_or_else(|e| {
3450 eprintln!("daemon: push error: {e:#}");
3451 json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
3452 });
3453 let pulled = run_sync_pull().unwrap_or_else(|e| {
3454 eprintln!("daemon: pull error: {e:#}");
3455 json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
3456 });
3457 let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
3458 eprintln!("daemon: pending-pair tick error: {e:#}");
3459 json!({"transitions": []})
3460 });
3461
3462 if as_json {
3463 println!(
3464 "{}",
3465 serde_json::to_string(&json!({
3466 "ts": time::OffsetDateTime::now_utc()
3467 .format(&time::format_description::well_known::Rfc3339)
3468 .unwrap_or_default(),
3469 "push": pushed,
3470 "pull": pulled,
3471 "pairs": pairs,
3472 }))?
3473 );
3474 } else {
3475 let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
3476 let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
3477 let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
3478 let pair_transitions = pairs["transitions"]
3479 .as_array()
3480 .map(|a| a.len())
3481 .unwrap_or(0);
3482 if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
3483 eprintln!(
3484 "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
3485 );
3486 }
3487 if let Some(arr) = pairs["transitions"].as_array() {
3489 for t in arr {
3490 eprintln!(
3491 " pair {} : {} → {}",
3492 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3493 t.get("from").and_then(Value::as_str).unwrap_or("?"),
3494 t.get("to").and_then(Value::as_str).unwrap_or("?")
3495 );
3496 if let Some(sas) = t.get("sas").and_then(Value::as_str)
3497 && t.get("to").and_then(Value::as_str) == Some("sas_ready")
3498 {
3499 eprintln!(" SAS digits: {}-{}", &sas[..3], &sas[3..]);
3500 eprintln!(
3501 " Run: wire pair-confirm {} {}",
3502 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3503 sas
3504 );
3505 }
3506 }
3507 }
3508 }
3509
3510 if once {
3511 return Ok(());
3512 }
3513 let _ = wake_rx.recv_timeout(interval);
3518 while wake_rx.try_recv().is_ok() {}
3519 }
3520}
3521
3522fn run_sync_push() -> Result<Value> {
3525 let state = config::read_relay_state()?;
3526 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3527 if peers.is_empty() {
3528 return Ok(json!({"pushed": [], "skipped": []}));
3529 }
3530 let outbox_dir = config::outbox_dir()?;
3531 if !outbox_dir.exists() {
3532 return Ok(json!({"pushed": [], "skipped": []}));
3533 }
3534 let mut pushed = Vec::new();
3535 let mut skipped = Vec::new();
3536 for (peer_handle, slot_info) in peers.iter() {
3537 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
3538 if !outbox.exists() {
3539 continue;
3540 }
3541 let url = slot_info["relay_url"].as_str().unwrap_or("");
3542 let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
3543 let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
3544 if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
3545 continue;
3546 }
3547 let client = crate::relay_client::RelayClient::new(url);
3548 let body = std::fs::read_to_string(&outbox)?;
3549 for line in body.lines() {
3550 let event: Value = match serde_json::from_str(line) {
3551 Ok(v) => v,
3552 Err(_) => continue,
3553 };
3554 let event_id = event
3555 .get("event_id")
3556 .and_then(Value::as_str)
3557 .unwrap_or("")
3558 .to_string();
3559 match client.post_event(slot_id, slot_token, &event) {
3560 Ok(resp) => {
3561 if resp.status == "duplicate" {
3562 skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
3563 } else {
3564 pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
3565 }
3566 }
3567 Err(e) => {
3568 let reason = crate::relay_client::format_transport_error(&e);
3572 skipped
3573 .push(json!({"peer": peer_handle, "event_id": event_id, "reason": reason}));
3574 }
3575 }
3576 }
3577 }
3578 Ok(json!({"pushed": pushed, "skipped": skipped}))
3579}
3580
3581fn run_sync_pull() -> Result<Value> {
3583 let state = config::read_relay_state()?;
3584 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3585 if self_state.is_null() {
3586 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3587 }
3588 let url = self_state["relay_url"].as_str().unwrap_or("");
3589 let slot_id = self_state["slot_id"].as_str().unwrap_or("");
3590 let slot_token = self_state["slot_token"].as_str().unwrap_or("");
3591 let last_event_id = self_state
3592 .get("last_pulled_event_id")
3593 .and_then(Value::as_str)
3594 .map(str::to_string);
3595 if url.is_empty() {
3596 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3597 }
3598 let client = crate::relay_client::RelayClient::new(url);
3599 let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3600 let inbox_dir = config::inbox_dir()?;
3601 config::ensure_dirs()?;
3602
3603 let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3607
3608 if let Some(eid) = &result.advance_cursor_to {
3610 let eid = eid.clone();
3611 config::update_relay_state(|state| {
3612 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3613 self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3614 }
3615 Ok(())
3616 })?;
3617 }
3618
3619 Ok(json!({
3620 "written": result.written,
3621 "rejected": result.rejected,
3622 "total_seen": events.len(),
3623 "cursor_blocked": result.blocked,
3624 "cursor_advanced_to": result.advance_cursor_to,
3625 }))
3626}
3627
3628fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3631 let body =
3632 std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3633 let card: Value =
3634 serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3635 crate::agent_card::verify_agent_card(&card)
3636 .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3637
3638 let mut trust = config::read_trust()?;
3639 crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3640
3641 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3642 let handle = crate::agent_card::display_handle_from_did(did).to_string();
3643 config::write_trust(&trust)?;
3644
3645 if as_json {
3646 println!(
3647 "{}",
3648 serde_json::to_string(&json!({
3649 "handle": handle,
3650 "did": did,
3651 "tier": "VERIFIED",
3652 "pinned": true,
3653 }))?
3654 );
3655 } else {
3656 println!("pinned {handle} ({did}) at tier VERIFIED");
3657 }
3658 Ok(())
3659}
3660
3661fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3664 pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3665}
3666
3667fn cmd_pair_join(
3668 code_phrase: &str,
3669 relay_url: &str,
3670 auto_yes: bool,
3671 timeout_secs: u64,
3672) -> Result<()> {
3673 pair_orchestrate(
3674 relay_url,
3675 Some(code_phrase),
3676 "guest",
3677 auto_yes,
3678 timeout_secs,
3679 )
3680}
3681
3682fn pair_orchestrate(
3688 relay_url: &str,
3689 code_in: Option<&str>,
3690 role: &str,
3691 auto_yes: bool,
3692 timeout_secs: u64,
3693) -> Result<()> {
3694 use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3695
3696 let mut s = pair_session_open(role, relay_url, code_in)?;
3697
3698 if role == "host" {
3699 eprintln!();
3700 eprintln!("share this code phrase with your peer:");
3701 eprintln!();
3702 eprintln!(" {}", s.code);
3703 eprintln!();
3704 eprintln!(
3705 "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3706 s.code
3707 );
3708 } else {
3709 eprintln!();
3710 eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3711 }
3712
3713 const HEARTBEAT_SECS: u64 = 10;
3718 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3719 let started = std::time::Instant::now();
3720 let mut last_heartbeat = started;
3721 let formatted = loop {
3722 if let Some(sas) = pair_session_try_sas(&mut s)? {
3723 break sas;
3724 }
3725 let now = std::time::Instant::now();
3726 if now >= deadline {
3727 return Err(anyhow!(
3728 "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3729 ));
3730 }
3731 if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3732 let elapsed = now.duration_since(started).as_secs();
3733 eprintln!(" ... still waiting ({elapsed}s / {timeout_secs}s)");
3734 last_heartbeat = now;
3735 }
3736 std::thread::sleep(std::time::Duration::from_millis(250));
3737 };
3738
3739 eprintln!();
3740 eprintln!("SAS digits (must match peer's terminal):");
3741 eprintln!();
3742 eprintln!(" {formatted}");
3743 eprintln!();
3744
3745 if !auto_yes {
3748 eprint!("does this match your peer's terminal? [y/N]: ");
3749 use std::io::Write;
3750 std::io::stderr().flush().ok();
3751 let mut input = String::new();
3752 std::io::stdin().read_line(&mut input)?;
3753 let trimmed = input.trim().to_lowercase();
3754 if trimmed != "y" && trimmed != "yes" {
3755 bail!("SAS confirmation declined — aborting pairing");
3756 }
3757 }
3758 s.sas_confirmed = true;
3759
3760 let result = pair_session_finalize(&mut s, timeout_secs)?;
3762
3763 let peer_did = result["paired_with"].as_str().unwrap_or("");
3764 let peer_role = if role == "host" { "guest" } else { "host" };
3765 eprintln!("paired with {peer_did} (peer role: {peer_role})");
3766 eprintln!("peer card pinned at tier VERIFIED");
3767 eprintln!(
3768 "peer relay slot saved to {}",
3769 config::relay_state_path()?.display()
3770 );
3771
3772 println!("{}", serde_json::to_string(&result)?);
3773 Ok(())
3774}
3775
3776fn cmd_pair(
3782 handle: &str,
3783 code: Option<&str>,
3784 relay: &str,
3785 auto_yes: bool,
3786 timeout_secs: u64,
3787 no_setup: bool,
3788) -> Result<()> {
3789 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3792 let did = init_result
3793 .get("did")
3794 .and_then(|v| v.as_str())
3795 .unwrap_or("(unknown)")
3796 .to_string();
3797 let already = init_result
3798 .get("already_initialized")
3799 .and_then(|v| v.as_bool())
3800 .unwrap_or(false);
3801 if already {
3802 println!("(identity {did} already initialized — reusing)");
3803 } else {
3804 println!("initialized {did}");
3805 }
3806 println!();
3807
3808 match code {
3810 None => {
3811 println!("hosting pair on {relay} (no code = host) ...");
3812 cmd_pair_host(relay, auto_yes, timeout_secs)?;
3813 }
3814 Some(c) => {
3815 println!("joining pair with code {c} on {relay} ...");
3816 cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3817 }
3818 }
3819
3820 if !no_setup {
3822 println!();
3823 println!("registering wire as MCP server in detected client configs ...");
3824 if let Err(e) = cmd_setup(true) {
3825 eprintln!("warn: setup --apply failed: {e}");
3827 eprintln!(" pair succeeded; you can re-run `wire setup --apply` manually.");
3828 }
3829 }
3830
3831 println!();
3832 println!("pair complete. Next steps:");
3833 println!(" wire daemon start # background sync of inbox/outbox vs relay");
3834 println!(" wire send <peer> claim <msg> # send your peer something");
3835 println!(" wire tail # watch incoming events");
3836 Ok(())
3837}
3838
3839fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3845 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3846 let did = init_result
3847 .get("did")
3848 .and_then(|v| v.as_str())
3849 .unwrap_or("(unknown)")
3850 .to_string();
3851 let already = init_result
3852 .get("already_initialized")
3853 .and_then(|v| v.as_bool())
3854 .unwrap_or(false);
3855 if already {
3856 println!("(identity {did} already initialized — reusing)");
3857 } else {
3858 println!("initialized {did}");
3859 }
3860 println!();
3861 match code {
3862 None => cmd_pair_host_detach(relay, false),
3863 Some(c) => cmd_pair_join_detach(c, relay, false),
3864 }
3865}
3866
3867fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3868 if !config::is_initialized()? {
3869 bail!("not initialized — run `wire init <handle>` first");
3870 }
3871 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3872 Ok(b) => b,
3873 Err(e) => {
3874 if !as_json {
3875 eprintln!(
3876 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3877 );
3878 }
3879 false
3880 }
3881 };
3882 let code = crate::sas::generate_code_phrase();
3883 let code_hash = crate::pair_session::derive_code_hash(&code);
3884 let now = time::OffsetDateTime::now_utc()
3885 .format(&time::format_description::well_known::Rfc3339)
3886 .unwrap_or_default();
3887 let p = crate::pending_pair::PendingPair {
3888 code: code.clone(),
3889 code_hash,
3890 role: "host".to_string(),
3891 relay_url: relay_url.to_string(),
3892 status: "request_host".to_string(),
3893 sas: None,
3894 peer_did: None,
3895 created_at: now,
3896 last_error: None,
3897 pair_id: None,
3898 our_slot_id: None,
3899 our_slot_token: None,
3900 spake2_seed_b64: None,
3901 };
3902 crate::pending_pair::write_pending(&p)?;
3903 if as_json {
3904 println!(
3905 "{}",
3906 serde_json::to_string(&json!({
3907 "state": "queued",
3908 "code_phrase": code,
3909 "relay_url": relay_url,
3910 "role": "host",
3911 "daemon_spawned": daemon_spawned,
3912 }))?
3913 );
3914 } else {
3915 if daemon_spawned {
3916 println!("(started wire daemon in background)");
3917 }
3918 println!("detached pair-host queued. Share this code with your peer:\n");
3919 println!(" {code}\n");
3920 println!("Next steps:");
3921 println!(" wire pair-list # check status");
3922 println!(" wire pair-confirm {code} <digits> # when SAS shows up");
3923 println!(" wire pair-cancel {code} # to abort");
3924 }
3925 Ok(())
3926}
3927
3928fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3929 if !config::is_initialized()? {
3930 bail!("not initialized — run `wire init <handle>` first");
3931 }
3932 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3933 Ok(b) => b,
3934 Err(e) => {
3935 if !as_json {
3936 eprintln!(
3937 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3938 );
3939 }
3940 false
3941 }
3942 };
3943 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3944 let code_hash = crate::pair_session::derive_code_hash(&code);
3945 let now = time::OffsetDateTime::now_utc()
3946 .format(&time::format_description::well_known::Rfc3339)
3947 .unwrap_or_default();
3948 let p = crate::pending_pair::PendingPair {
3949 code: code.clone(),
3950 code_hash,
3951 role: "guest".to_string(),
3952 relay_url: relay_url.to_string(),
3953 status: "request_guest".to_string(),
3954 sas: None,
3955 peer_did: None,
3956 created_at: now,
3957 last_error: None,
3958 pair_id: None,
3959 our_slot_id: None,
3960 our_slot_token: None,
3961 spake2_seed_b64: None,
3962 };
3963 crate::pending_pair::write_pending(&p)?;
3964 if as_json {
3965 println!(
3966 "{}",
3967 serde_json::to_string(&json!({
3968 "state": "queued",
3969 "code_phrase": code,
3970 "relay_url": relay_url,
3971 "role": "guest",
3972 "daemon_spawned": daemon_spawned,
3973 }))?
3974 );
3975 } else {
3976 if daemon_spawned {
3977 println!("(started wire daemon in background)");
3978 }
3979 println!("detached pair-join queued for code {code}.");
3980 println!(
3981 "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3982 );
3983 }
3984 Ok(())
3985}
3986
3987fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3988 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3989 let typed: String = typed_digits
3990 .chars()
3991 .filter(|c| c.is_ascii_digit())
3992 .collect();
3993 if typed.len() != 6 {
3994 bail!(
3995 "expected 6 digits (got {} after stripping non-digits)",
3996 typed.len()
3997 );
3998 }
3999 let mut p = crate::pending_pair::read_pending(&code)?
4000 .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
4001 if p.status != "sas_ready" {
4002 bail!(
4003 "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
4004 p.status
4005 );
4006 }
4007 let stored = p
4008 .sas
4009 .as_ref()
4010 .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
4011 .clone();
4012 if stored == typed {
4013 p.status = "confirmed".to_string();
4014 crate::pending_pair::write_pending(&p)?;
4015 if as_json {
4016 println!(
4017 "{}",
4018 serde_json::to_string(&json!({
4019 "state": "confirmed",
4020 "code_phrase": code,
4021 }))?
4022 );
4023 } else {
4024 println!("digits match. Daemon will finalize the handshake on its next tick.");
4025 println!("Run `wire peers` after a few seconds to confirm.");
4026 }
4027 } else {
4028 p.status = "aborted".to_string();
4029 p.last_error = Some(format!(
4030 "SAS digit mismatch (typed {typed}, expected {stored})"
4031 ));
4032 let client = crate::relay_client::RelayClient::new(&p.relay_url);
4033 let _ = client.pair_abandon(&p.code_hash);
4034 crate::pending_pair::write_pending(&p)?;
4035 crate::os_notify::toast(
4036 &format!("wire — pair aborted ({})", p.code),
4037 p.last_error.as_deref().unwrap_or("digits mismatch"),
4038 );
4039 if as_json {
4040 println!(
4041 "{}",
4042 serde_json::to_string(&json!({
4043 "state": "aborted",
4044 "code_phrase": code,
4045 "error": "digits mismatch",
4046 }))?
4047 );
4048 }
4049 bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
4050 }
4051 Ok(())
4052}
4053
4054fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
4055 if watch {
4056 return cmd_pair_list_watch(watch_interval_secs);
4057 }
4058 let spake2_items = crate::pending_pair::list_pending()?;
4059 let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
4060 if as_json {
4061 println!("{}", serde_json::to_string(&spake2_items)?);
4066 return Ok(());
4067 }
4068 if spake2_items.is_empty() && inbound_items.is_empty() {
4069 println!("no pending pair sessions.");
4070 return Ok(());
4071 }
4072 if !inbound_items.is_empty() {
4075 println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
4076 println!(
4077 "{:<20} {:<35} {:<25} NEXT STEP",
4078 "PEER", "RELAY", "RECEIVED"
4079 );
4080 for p in &inbound_items {
4081 println!(
4082 "{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
4083 p.peer_handle,
4084 p.peer_relay_url,
4085 p.received_at,
4086 peer = p.peer_handle,
4087 );
4088 }
4089 println!();
4090 }
4091 if !spake2_items.is_empty() {
4092 println!("SPAKE2 SESSIONS");
4093 println!(
4094 "{:<15} {:<8} {:<18} {:<10} NOTE",
4095 "CODE", "ROLE", "STATUS", "SAS"
4096 );
4097 for p in spake2_items {
4098 let sas = p
4099 .sas
4100 .as_ref()
4101 .map(|d| format!("{}-{}", &d[..3], &d[3..]))
4102 .unwrap_or_else(|| "—".to_string());
4103 let note = p
4104 .last_error
4105 .as_deref()
4106 .or(p.peer_did.as_deref())
4107 .unwrap_or("");
4108 println!(
4109 "{:<15} {:<8} {:<18} {:<10} {}",
4110 p.code, p.role, p.status, sas, note
4111 );
4112 }
4113 }
4114 Ok(())
4115}
4116
4117fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
4129 use std::collections::HashMap;
4130 use std::io::Write;
4131 let interval = std::time::Duration::from_secs(interval_secs.max(1));
4132 let mut prev: HashMap<String, String> = HashMap::new();
4135 {
4136 let items = crate::pending_pair::list_pending()?;
4137 for p in &items {
4138 println!("{}", serde_json::to_string(&p)?);
4139 prev.insert(p.code.clone(), p.status.clone());
4140 }
4141 let _ = std::io::stdout().flush();
4143 }
4144 loop {
4145 std::thread::sleep(interval);
4146 let items = match crate::pending_pair::list_pending() {
4147 Ok(v) => v,
4148 Err(_) => continue,
4149 };
4150 let mut cur: HashMap<String, String> = HashMap::new();
4151 for p in &items {
4152 cur.insert(p.code.clone(), p.status.clone());
4153 match prev.get(&p.code) {
4154 None => {
4155 println!("{}", serde_json::to_string(&p)?);
4157 }
4158 Some(prev_status) if prev_status != &p.status => {
4159 println!("{}", serde_json::to_string(&p)?);
4161 }
4162 _ => {}
4163 }
4164 }
4165 for code in prev.keys() {
4166 if !cur.contains_key(code) {
4167 println!(
4170 "{}",
4171 serde_json::to_string(&json!({
4172 "code": code,
4173 "status": "removed",
4174 "_synthetic": true,
4175 }))?
4176 );
4177 }
4178 }
4179 let _ = std::io::stdout().flush();
4180 prev = cur;
4181 }
4182}
4183
4184fn cmd_pair_watch(
4188 code_phrase: &str,
4189 target_status: &str,
4190 timeout_secs: u64,
4191 as_json: bool,
4192) -> Result<()> {
4193 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4194 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
4195 let mut last_seen_status: Option<String> = None;
4196 loop {
4197 let p_opt = crate::pending_pair::read_pending(&code)?;
4198 let now = std::time::Instant::now();
4199 match p_opt {
4200 None => {
4201 if last_seen_status.is_some() {
4205 if as_json {
4206 println!(
4207 "{}",
4208 serde_json::to_string(&json!({"state": "finalized", "code": code}))?
4209 );
4210 } else {
4211 println!("pair {code} finalized (file removed)");
4212 }
4213 return Ok(());
4214 } else {
4215 if as_json {
4216 println!(
4217 "{}",
4218 serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
4219 );
4220 }
4221 std::process::exit(1);
4222 }
4223 }
4224 Some(p) => {
4225 let cur = p.status.clone();
4226 if Some(cur.clone()) != last_seen_status {
4227 if as_json {
4228 println!("{}", serde_json::to_string(&p)?);
4230 }
4231 last_seen_status = Some(cur.clone());
4232 }
4233 if cur == target_status {
4234 if !as_json {
4235 let sas_str = p
4236 .sas
4237 .as_ref()
4238 .map(|s| format!("{}-{}", &s[..3], &s[3..]))
4239 .unwrap_or_else(|| "—".to_string());
4240 println!("pair {code} reached {target_status} (SAS: {sas_str})");
4241 }
4242 return Ok(());
4243 }
4244 if cur == "aborted" || cur == "aborted_restart" {
4245 if !as_json {
4246 let err = p.last_error.as_deref().unwrap_or("(no detail)");
4247 eprintln!("pair {code} {cur}: {err}");
4248 }
4249 std::process::exit(1);
4250 }
4251 }
4252 }
4253 if now >= deadline {
4254 if !as_json {
4255 eprintln!(
4256 "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
4257 );
4258 }
4259 std::process::exit(2);
4260 }
4261 std::thread::sleep(std::time::Duration::from_millis(250));
4262 }
4263}
4264
4265fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
4266 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4267 let p = crate::pending_pair::read_pending(&code)?
4268 .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
4269 let client = crate::relay_client::RelayClient::new(&p.relay_url);
4270 let _ = client.pair_abandon(&p.code_hash);
4271 crate::pending_pair::delete_pending(&code)?;
4272 if as_json {
4273 println!(
4274 "{}",
4275 serde_json::to_string(&json!({
4276 "state": "cancelled",
4277 "code_phrase": code,
4278 }))?
4279 );
4280 } else {
4281 println!("cancelled pending pair {code} (relay slot released, file removed).");
4282 }
4283 Ok(())
4284}
4285
4286fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
4289 let code = crate::sas::parse_code_phrase(code_phrase)?;
4292 let code_hash = crate::pair_session::derive_code_hash(code);
4293 let client = crate::relay_client::RelayClient::new(relay_url);
4294 client.pair_abandon(&code_hash)?;
4295 println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
4296 println!("host can now issue a fresh code; guest can re-join.");
4297 Ok(())
4298}
4299
4300fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
4303 let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
4304
4305 let share_payload: Option<Value> = if share {
4308 let client = reqwest::blocking::Client::new();
4309 let single_use = if uses == 1 { Some(1u32) } else { None };
4310 let body = json!({
4311 "invite_url": url,
4312 "ttl_seconds": ttl,
4313 "uses": single_use,
4314 });
4315 let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
4316 let resp = client.post(&endpoint).json(&body).send()?;
4317 if !resp.status().is_success() {
4318 let code = resp.status();
4319 let txt = resp.text().unwrap_or_default();
4320 bail!("relay {code} on /v1/invite/register: {txt}");
4321 }
4322 let parsed: Value = resp.json()?;
4323 let token = parsed
4324 .get("token")
4325 .and_then(Value::as_str)
4326 .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
4327 .to_string();
4328 let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
4329 let curl_line = format!("curl -fsSL {share_url} | sh");
4330 Some(json!({
4331 "token": token,
4332 "share_url": share_url,
4333 "curl": curl_line,
4334 "expires_unix": parsed.get("expires_unix"),
4335 }))
4336 } else {
4337 None
4338 };
4339
4340 if as_json {
4341 let mut out = json!({
4342 "invite_url": url,
4343 "ttl_secs": ttl,
4344 "uses": uses,
4345 "relay": relay,
4346 });
4347 if let Some(s) = &share_payload {
4348 out["share"] = s.clone();
4349 }
4350 println!("{}", serde_json::to_string(&out)?);
4351 } else if let Some(s) = share_payload {
4352 let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
4353 eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
4354 eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
4355 println!("{curl}");
4356 } else {
4357 eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
4358 eprintln!("# TTL: {ttl}s. Uses: {uses}.");
4359 println!("{url}");
4360 }
4361 Ok(())
4362}
4363
4364fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
4365 let resolved = if url.starts_with("http://") || url.starts_with("https://") {
4369 let sep = if url.contains('?') { '&' } else { '?' };
4370 let resolve_url = format!("{url}{sep}format=url");
4371 let client = reqwest::blocking::Client::new();
4372 let resp = client
4373 .get(&resolve_url)
4374 .send()
4375 .with_context(|| format!("GET {resolve_url}"))?;
4376 if !resp.status().is_success() {
4377 bail!("could not resolve short URL {url} (HTTP {})", resp.status());
4378 }
4379 let body = resp.text().unwrap_or_default().trim().to_string();
4380 if !body.starts_with("wire://pair?") {
4381 bail!(
4382 "short URL {url} did not resolve to a wire:// invite. \
4383 (got: {}{})",
4384 body.chars().take(80).collect::<String>(),
4385 if body.chars().count() > 80 { "…" } else { "" }
4386 );
4387 }
4388 body
4389 } else {
4390 url.to_string()
4391 };
4392
4393 let result = crate::pair_invite::accept_invite(&resolved)?;
4394 if as_json {
4395 println!("{}", serde_json::to_string(&result)?);
4396 } else {
4397 let did = result
4398 .get("paired_with")
4399 .and_then(Value::as_str)
4400 .unwrap_or("?");
4401 println!("paired with {did}");
4402 println!(
4403 "you can now: wire send {} <kind> <body>",
4404 crate::agent_card::display_handle_from_did(did)
4405 );
4406 }
4407 Ok(())
4408}
4409
4410fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
4413 if let Some(h) = handle {
4414 let parsed = crate::pair_profile::parse_handle(h)?;
4415 if config::is_initialized()? {
4418 let card = config::read_agent_card()?;
4419 let local_handle = card
4420 .get("profile")
4421 .and_then(|p| p.get("handle"))
4422 .and_then(Value::as_str)
4423 .map(str::to_string);
4424 if local_handle.as_deref() == Some(h) {
4425 return cmd_whois(None, as_json, None);
4426 }
4427 }
4428 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4430 if as_json {
4431 println!("{}", serde_json::to_string(&resolved)?);
4432 } else {
4433 print_resolved_profile(&resolved);
4434 }
4435 return Ok(());
4436 }
4437 let card = config::read_agent_card()?;
4438 if as_json {
4439 let profile = card.get("profile").cloned().unwrap_or(Value::Null);
4440 println!(
4441 "{}",
4442 serde_json::to_string(&json!({
4443 "did": card.get("did").cloned().unwrap_or(Value::Null),
4444 "profile": profile,
4445 }))?
4446 );
4447 } else {
4448 print!("{}", crate::pair_profile::render_self_summary()?);
4449 }
4450 Ok(())
4451}
4452
4453fn print_resolved_profile(resolved: &Value) {
4454 let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
4455 let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
4456 let relay = resolved
4457 .get("relay_url")
4458 .and_then(Value::as_str)
4459 .unwrap_or("");
4460 let slot = resolved
4461 .get("slot_id")
4462 .and_then(Value::as_str)
4463 .unwrap_or("");
4464 let profile = resolved
4465 .get("card")
4466 .and_then(|c| c.get("profile"))
4467 .cloned()
4468 .unwrap_or(Value::Null);
4469 println!("{did}");
4470 println!(" nick: {nick}");
4471 if !relay.is_empty() {
4472 println!(" relay_url: {relay}");
4473 }
4474 if !slot.is_empty() {
4475 println!(" slot_id: {slot}");
4476 }
4477 let pick =
4478 |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
4479 if let Some(s) = pick("display_name") {
4480 println!(" display_name: {s}");
4481 }
4482 if let Some(s) = pick("emoji") {
4483 println!(" emoji: {s}");
4484 }
4485 if let Some(s) = pick("motto") {
4486 println!(" motto: {s}");
4487 }
4488 if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
4489 let joined: Vec<String> = arr
4490 .iter()
4491 .filter_map(|v| v.as_str().map(str::to_string))
4492 .collect();
4493 println!(" vibe: {}", joined.join(", "));
4494 }
4495 if let Some(s) = pick("pronouns") {
4496 println!(" pronouns: {s}");
4497 }
4498}
4499
4500fn host_of_url(url: &str) -> String {
4508 let no_scheme = url
4509 .trim_start_matches("https://")
4510 .trim_start_matches("http://");
4511 no_scheme
4512 .split('/')
4513 .next()
4514 .unwrap_or("")
4515 .split(':')
4516 .next()
4517 .unwrap_or("")
4518 .to_string()
4519}
4520
4521fn is_known_relay_domain(peer_domain: &str, our_relay_url: &str) -> bool {
4525 const KNOWN_GOOD: &[&str] = &["wireup.net", "wire.laulpogan.com"];
4527 let peer_domain = peer_domain.trim().to_ascii_lowercase();
4528 if KNOWN_GOOD.iter().any(|k| *k == peer_domain) {
4529 return true;
4530 }
4531 let our_host = host_of_url(our_relay_url).to_ascii_lowercase();
4534 if !our_host.is_empty() && our_host == peer_domain {
4535 return true;
4536 }
4537 false
4538}
4539
4540fn cmd_add_local_sister(sister_name: &str, as_json: bool) -> Result<()> {
4548 let sessions = crate::session::list_sessions()?;
4550 let sister = sessions
4551 .iter()
4552 .find(|s| s.name == sister_name)
4553 .ok_or_else(|| {
4554 anyhow!(
4555 "no sister session named `{sister_name}` (run `wire session list` to see what's available)"
4556 )
4557 })?;
4558
4559 let our_card = config::read_agent_card()
4562 .map_err(|_| anyhow!("not initialized — run `wire init <handle>` first"))?;
4563 let our_did = our_card
4564 .get("did")
4565 .and_then(Value::as_str)
4566 .ok_or_else(|| anyhow!("agent-card missing did"))?
4567 .to_string();
4568 if let Some(sister_did) = sister.did.as_deref()
4569 && sister_did == our_did
4570 {
4571 bail!("refusing to add self (`{sister_name}` is this very session)");
4572 }
4573
4574 let sister_card_path = sister
4576 .home_dir
4577 .join("config")
4578 .join("wire")
4579 .join("agent-card.json");
4580 let sister_card: Value = serde_json::from_slice(
4581 &std::fs::read(&sister_card_path)
4582 .with_context(|| format!("reading sister card {sister_card_path:?}"))?,
4583 )
4584 .with_context(|| format!("parsing sister card {sister_card_path:?}"))?;
4585 let sister_relay_state: Value = std::fs::read(
4586 sister
4587 .home_dir
4588 .join("config")
4589 .join("wire")
4590 .join("relay.json"),
4591 )
4592 .ok()
4593 .and_then(|b| serde_json::from_slice(&b).ok())
4594 .unwrap_or_else(|| json!({"self": Value::Null, "peers": {}}));
4595
4596 let sister_did = sister_card
4597 .get("did")
4598 .and_then(Value::as_str)
4599 .ok_or_else(|| anyhow!("sister card missing did"))?
4600 .to_string();
4601 let sister_handle = crate::agent_card::display_handle_from_did(&sister_did).to_string();
4602
4603 let sister_endpoints = crate::endpoints::self_endpoints(&sister_relay_state);
4607 if sister_endpoints.is_empty() {
4608 bail!(
4609 "sister `{sister_name}` has no endpoints in its relay.json — recreate with `wire session new --local-only` or `--with-local`"
4610 );
4611 }
4612 let sister_local = sister_endpoints
4613 .iter()
4614 .find(|e| e.scope == crate::endpoints::EndpointScope::Local);
4615 let delivery_endpoint = match sister_local {
4616 Some(e) => e.clone(),
4617 None => sister_endpoints[0].clone(),
4618 };
4619
4620 let our_relay_state = config::read_relay_state()?;
4626 let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4627 if our_endpoints.is_empty() {
4628 bail!(
4629 "this session has no endpoints — run `wire session new --local-only` or `wire bind-relay` first"
4630 );
4631 }
4632 let our_advertised = our_endpoints
4633 .iter()
4634 .find(|e| e.scope == crate::endpoints::EndpointScope::Federation)
4635 .cloned()
4636 .unwrap_or_else(|| our_endpoints[0].clone());
4637
4638 let mut trust = config::read_trust()?;
4642 crate::trust::add_agent_card_pin(&mut trust, &sister_card, Some("VERIFIED"));
4643 config::write_trust(&trust)?;
4644 let mut relay_state = config::read_relay_state()?;
4645 crate::endpoints::pin_peer_endpoints(&mut relay_state, &sister_handle, &sister_endpoints)?;
4646 config::write_relay_state(&relay_state)?;
4647
4648 let sk_seed = config::read_private_key()?;
4651 let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4652 let pk_b64 = our_card
4653 .get("verify_keys")
4654 .and_then(Value::as_object)
4655 .and_then(|m| m.values().next())
4656 .and_then(|v| v.get("key"))
4657 .and_then(Value::as_str)
4658 .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4659 let pk_bytes = crate::signing::b64decode(pk_b64)?;
4660 let now = time::OffsetDateTime::now_utc()
4661 .format(&time::format_description::well_known::Rfc3339)
4662 .unwrap_or_default();
4663 let mut body = json!({
4664 "card": our_card,
4665 "relay_url": our_advertised.relay_url,
4666 "slot_id": our_advertised.slot_id,
4667 "slot_token": our_advertised.slot_token,
4668 });
4669 body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4670 let event = json!({
4671 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4672 "timestamp": now,
4673 "from": our_did,
4674 "to": sister_did,
4675 "type": "pair_drop",
4676 "kind": 1100u32,
4677 "body": body,
4678 });
4679 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4680 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
4681
4682 let client = crate::relay_client::RelayClient::new(&delivery_endpoint.relay_url);
4686 client
4687 .post_event(
4688 &delivery_endpoint.slot_id,
4689 &delivery_endpoint.slot_token,
4690 &signed,
4691 )
4692 .with_context(|| format!("delivering pair_drop to `{sister_name}`'s local slot"))?;
4693
4694 if as_json {
4695 println!(
4696 "{}",
4697 serde_json::to_string(&json!({
4698 "handle": sister_name,
4699 "paired_with": sister_did,
4700 "peer_handle": sister_handle,
4701 "event_id": event_id,
4702 "delivered_via": match delivery_endpoint.scope {
4703 crate::endpoints::EndpointScope::Local => "local",
4704 crate::endpoints::EndpointScope::Federation => "federation",
4705 },
4706 "status": "drop_sent",
4707 }))?
4708 );
4709 } else {
4710 let scope = match delivery_endpoint.scope {
4711 crate::endpoints::EndpointScope::Local => "local",
4712 crate::endpoints::EndpointScope::Federation => "federation",
4713 };
4714 println!(
4715 "→ found sister `{sister_name}` (did={sister_did})\n→ pinned peer locally\n→ pair_drop delivered to {scope} slot on {}\nawaiting pair_drop_ack from {sister_handle} to complete bilateral pin.",
4716 delivery_endpoint.relay_url
4717 );
4718 }
4719 Ok(())
4720}
4721
4722fn cmd_add(
4723 handle_arg: &str,
4724 relay_override: Option<&str>,
4725 local_sister: bool,
4726 as_json: bool,
4727) -> Result<()> {
4728 if local_sister {
4729 return cmd_add_local_sister(handle_arg, as_json);
4730 }
4731 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4732
4733 let (our_did, our_relay, our_slot_id, our_slot_token) =
4735 crate::pair_invite::ensure_self_with_relay(relay_override)?;
4736 if our_did == format!("did:wire:{}", parsed.nick) {
4737 bail!("refusing to add self (handle matches own DID)");
4739 }
4740
4741 if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
4751 return cmd_add_accept_pending(
4752 handle_arg,
4753 &parsed.nick,
4754 &pending,
4755 &our_relay,
4756 &our_slot_id,
4757 &our_slot_token,
4758 as_json,
4759 );
4760 }
4761
4762 if !is_known_relay_domain(&parsed.domain, &our_relay) {
4779 eprintln!(
4780 "wire add: WARN unfamiliar relay domain `{}`.",
4781 parsed.domain
4782 );
4783 eprintln!(
4784 " This is NOT `wireup.net` (the default), NOT your own relay (`{}`), ",
4785 host_of_url(&our_relay)
4786 );
4787 eprintln!(
4788 " and not on the known-good list. If you meant `{}@wireup.net`, ",
4789 parsed.nick
4790 );
4791 eprintln!(
4792 " run `wire add {}@wireup.net` instead. Otherwise verify with your",
4793 parsed.nick
4794 );
4795 eprintln!(" peer out-of-band that they actually run a relay at this domain");
4796 eprintln!(" before relying on the pair. (See issue #9.4.)");
4797 }
4798
4799 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4801 let peer_card = resolved
4802 .get("card")
4803 .cloned()
4804 .ok_or_else(|| anyhow!("resolved missing card"))?;
4805 let peer_did = resolved
4806 .get("did")
4807 .and_then(Value::as_str)
4808 .ok_or_else(|| anyhow!("resolved missing did"))?
4809 .to_string();
4810 let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
4811 let peer_slot_id = resolved
4812 .get("slot_id")
4813 .and_then(Value::as_str)
4814 .ok_or_else(|| anyhow!("resolved missing slot_id"))?
4815 .to_string();
4816 let peer_relay = resolved
4817 .get("relay_url")
4818 .and_then(Value::as_str)
4819 .map(str::to_string)
4820 .or_else(|| relay_override.map(str::to_string))
4821 .unwrap_or_else(|| format!("https://{}", parsed.domain));
4822
4823 let mut trust = config::read_trust()?;
4825 crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
4826 config::write_trust(&trust)?;
4827 let mut relay_state = config::read_relay_state()?;
4828 let existing_token = relay_state
4829 .get("peers")
4830 .and_then(|p| p.get(&peer_handle))
4831 .and_then(|p| p.get("slot_token"))
4832 .and_then(Value::as_str)
4833 .map(str::to_string)
4834 .unwrap_or_default();
4835 relay_state["peers"][&peer_handle] = json!({
4836 "relay_url": peer_relay,
4837 "slot_id": peer_slot_id,
4838 "slot_token": existing_token, });
4840 config::write_relay_state(&relay_state)?;
4841
4842 let our_card = config::read_agent_card()?;
4845 let sk_seed = config::read_private_key()?;
4846 let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4847 let pk_b64 = our_card
4848 .get("verify_keys")
4849 .and_then(Value::as_object)
4850 .and_then(|m| m.values().next())
4851 .and_then(|v| v.get("key"))
4852 .and_then(Value::as_str)
4853 .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4854 let pk_bytes = crate::signing::b64decode(pk_b64)?;
4855 let now = time::OffsetDateTime::now_utc()
4856 .format(&time::format_description::well_known::Rfc3339)
4857 .unwrap_or_default();
4858 let our_relay_state = config::read_relay_state().unwrap_or_else(|_| json!({}));
4863 let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4864 let mut body = json!({
4865 "card": our_card,
4866 "relay_url": our_relay,
4867 "slot_id": our_slot_id,
4868 "slot_token": our_slot_token,
4869 });
4870 if !our_endpoints.is_empty() {
4871 body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4872 }
4873 let event = json!({
4874 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4875 "timestamp": now,
4876 "from": our_did,
4877 "to": peer_did,
4878 "type": "pair_drop",
4879 "kind": 1100u32,
4880 "body": body,
4881 });
4882 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4883
4884 let client = crate::relay_client::RelayClient::new(&peer_relay);
4886 let resp = client.handle_intro(&parsed.nick, &signed)?;
4887 let event_id = signed
4888 .get("event_id")
4889 .and_then(Value::as_str)
4890 .unwrap_or("")
4891 .to_string();
4892
4893 if as_json {
4894 println!(
4895 "{}",
4896 serde_json::to_string(&json!({
4897 "handle": handle_arg,
4898 "paired_with": peer_did,
4899 "peer_handle": peer_handle,
4900 "event_id": event_id,
4901 "drop_response": resp,
4902 "status": "drop_sent",
4903 }))?
4904 );
4905 } else {
4906 println!(
4907 "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
4908 );
4909 }
4910 Ok(())
4911}
4912
4913fn cmd_add_accept_pending(
4920 handle_arg: &str,
4921 peer_nick: &str,
4922 pending: &crate::pending_inbound_pair::PendingInboundPair,
4923 _our_relay: &str,
4924 _our_slot_id: &str,
4925 _our_slot_token: &str,
4926 as_json: bool,
4927) -> Result<()> {
4928 let mut trust = config::read_trust()?;
4931 crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
4932 config::write_trust(&trust)?;
4933
4934 let mut relay_state = config::read_relay_state()?;
4940 let endpoints_to_pin = if pending.peer_endpoints.is_empty() {
4941 vec![crate::endpoints::Endpoint::federation(
4942 pending.peer_relay_url.clone(),
4943 pending.peer_slot_id.clone(),
4944 pending.peer_slot_token.clone(),
4945 )]
4946 } else {
4947 pending.peer_endpoints.clone()
4948 };
4949 crate::endpoints::pin_peer_endpoints(
4950 &mut relay_state,
4951 &pending.peer_handle,
4952 &endpoints_to_pin,
4953 )?;
4954 config::write_relay_state(&relay_state)?;
4955
4956 crate::pair_invite::send_pair_drop_ack(
4958 &pending.peer_handle,
4959 &pending.peer_relay_url,
4960 &pending.peer_slot_id,
4961 &pending.peer_slot_token,
4962 )
4963 .with_context(|| {
4964 format!(
4965 "pair_drop_ack send to {} @ {} slot {} failed",
4966 pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
4967 )
4968 })?;
4969
4970 crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
4972
4973 if as_json {
4974 println!(
4975 "{}",
4976 serde_json::to_string(&json!({
4977 "handle": handle_arg,
4978 "paired_with": pending.peer_did,
4979 "peer_handle": pending.peer_handle,
4980 "status": "bilateral_accepted",
4981 "via": "pending_inbound",
4982 }))?
4983 );
4984 } else {
4985 println!(
4986 "→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
4987 peer = pending.peer_handle,
4988 );
4989 }
4990 Ok(())
4991}
4992
4993fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
5000 let nick = crate::agent_card::bare_handle(peer_nick);
5001 let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
5002 anyhow!(
5003 "no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
5004 or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
5005 )
5006 })?;
5007 let (_our_did, our_relay, our_slot_id, our_slot_token) =
5008 crate::pair_invite::ensure_self_with_relay(None)?;
5009 let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
5010 cmd_add_accept_pending(
5011 &handle_arg,
5012 nick,
5013 &pending,
5014 &our_relay,
5015 &our_slot_id,
5016 &our_slot_token,
5017 as_json,
5018 )
5019}
5020
5021fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
5024 let items = crate::pending_inbound_pair::list_pending_inbound()?;
5025 if as_json {
5026 println!("{}", serde_json::to_string(&items)?);
5027 return Ok(());
5028 }
5029 if items.is_empty() {
5030 println!("no pending inbound pair requests.");
5031 return Ok(());
5032 }
5033 println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
5034 for p in items {
5035 println!(
5036 "{:<20} {:<35} {:<25} {}",
5037 p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
5038 );
5039 }
5040 println!("→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`.");
5041 Ok(())
5042}
5043
5044fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
5048 let nick = crate::agent_card::bare_handle(peer_nick);
5049 let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
5050 crate::pending_inbound_pair::consume_pending_inbound(nick)?;
5051
5052 if as_json {
5053 println!(
5054 "{}",
5055 serde_json::to_string(&json!({
5056 "peer": nick,
5057 "rejected": existed.is_some(),
5058 "had_pending": existed.is_some(),
5059 }))?
5060 );
5061 } else if existed.is_some() {
5062 println!(
5063 "→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent."
5064 );
5065 } else {
5066 println!("no pending pair from {nick} — nothing to reject");
5067 }
5068 Ok(())
5069}
5070
5071fn cmd_mesh(cmd: MeshCommand) -> Result<()> {
5082 match cmd {
5083 MeshCommand::Status { stale_secs, json } => cmd_session_mesh_status(stale_secs, json),
5084 MeshCommand::Broadcast {
5085 kind,
5086 scope,
5087 exclude,
5088 noreply,
5089 body,
5090 json,
5091 } => cmd_mesh_broadcast(&kind, &scope, &exclude, noreply, &body, json),
5092 MeshCommand::Role { action } => cmd_mesh_role(action),
5093 MeshCommand::Route {
5094 role,
5095 strategy,
5096 exclude,
5097 kind,
5098 body,
5099 json,
5100 } => cmd_mesh_route(&role, &strategy, &exclude, &kind, &body, json),
5101 }
5102}
5103
5104fn cmd_mesh_route(
5109 role: &str,
5110 strategy: &str,
5111 exclude: &[String],
5112 kind: &str,
5113 body_arg: &str,
5114 as_json: bool,
5115) -> Result<()> {
5116 use std::time::Instant;
5117
5118 if !config::is_initialized()? {
5119 bail!("not initialized — run `wire init <handle>` first");
5120 }
5121 let strategy = strategy.to_ascii_lowercase();
5122 if !matches!(strategy.as_str(), "round-robin" | "first" | "random") {
5123 bail!("unknown strategy `{strategy}` — use round-robin | first | random");
5124 }
5125
5126 let state = config::read_relay_state()?;
5129 let pinned: std::collections::BTreeSet<String> = state["peers"]
5130 .as_object()
5131 .map(|m| m.keys().cloned().collect())
5132 .unwrap_or_default();
5133
5134 let exclude_set: std::collections::HashSet<&str> = exclude.iter().map(String::as_str).collect();
5135
5136 let sessions = crate::session::list_sessions()?;
5141 let mut candidates: Vec<(String, Option<String>)> = Vec::new(); for s in &sessions {
5143 let handle = match s.handle.as_ref() {
5144 Some(h) => h.clone(),
5145 None => continue,
5146 };
5147 if exclude_set.contains(handle.as_str()) {
5148 continue;
5149 }
5150 if !pinned.contains(&handle) {
5151 continue;
5152 }
5153 let card_path = s
5154 .home_dir
5155 .join("config")
5156 .join("wire")
5157 .join("agent-card.json");
5158 let card_role = std::fs::read(&card_path)
5159 .ok()
5160 .and_then(|b| serde_json::from_slice::<Value>(&b).ok())
5161 .and_then(|c| {
5162 c.get("profile")
5163 .and_then(|p| p.get("role"))
5164 .and_then(Value::as_str)
5165 .map(str::to_string)
5166 });
5167 if card_role.as_deref() == Some(role) {
5168 candidates.push((handle, s.did.clone()));
5169 }
5170 }
5171
5172 candidates.sort_by(|a, b| a.0.cmp(&b.0));
5173 candidates.dedup_by(|a, b| a.0 == b.0);
5174
5175 if candidates.is_empty() {
5176 bail!(
5177 "no pinned sister with role=`{role}` (run `wire mesh role list` to see what's available)"
5178 );
5179 }
5180
5181 let chosen = match strategy.as_str() {
5182 "first" => candidates[0].clone(),
5183 "random" => {
5184 use rand::Rng;
5185 let idx = rand::thread_rng().gen_range(0..candidates.len());
5186 candidates[idx].clone()
5187 }
5188 "round-robin" => {
5189 let cursor_path = mesh_route_cursor_path()?;
5194 let mut cursors: std::collections::BTreeMap<String, String> =
5195 read_mesh_route_cursors(&cursor_path);
5196 let last = cursors.get(role).cloned();
5197 let pick = match last {
5198 None => candidates[0].clone(),
5199 Some(last_h) => candidates
5200 .iter()
5201 .find(|(h, _)| h.as_str() > last_h.as_str())
5202 .cloned()
5203 .unwrap_or_else(|| candidates[0].clone()),
5204 };
5205 cursors.insert(role.to_string(), pick.0.clone());
5206 write_mesh_route_cursors(&cursor_path, &cursors)?;
5207 pick
5208 }
5209 _ => unreachable!(),
5210 };
5211
5212 let (chosen_handle, _chosen_did) = chosen;
5213
5214 let body_value: Value = if body_arg == "-" {
5216 use std::io::Read;
5217 let mut raw = String::new();
5218 std::io::stdin()
5219 .read_to_string(&mut raw)
5220 .with_context(|| "reading body from stdin")?;
5221 serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
5222 } else if let Some(path) = body_arg.strip_prefix('@') {
5223 let raw =
5224 std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
5225 serde_json::from_str(&raw).unwrap_or(Value::String(raw))
5226 } else {
5227 Value::String(body_arg.to_string())
5228 };
5229
5230 let sk_seed = config::read_private_key()?;
5231 let card = config::read_agent_card()?;
5232 let did = card
5233 .get("did")
5234 .and_then(Value::as_str)
5235 .ok_or_else(|| anyhow!("agent-card missing did"))?
5236 .to_string();
5237 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
5238 let pk_b64 = card
5239 .get("verify_keys")
5240 .and_then(Value::as_object)
5241 .and_then(|m| m.values().next())
5242 .and_then(|v| v.get("key"))
5243 .and_then(Value::as_str)
5244 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
5245 let pk_bytes = crate::signing::b64decode(pk_b64)?;
5246
5247 let kind_id = parse_kind(kind)?;
5248 let now_iso = time::OffsetDateTime::now_utc()
5249 .format(&time::format_description::well_known::Rfc3339)
5250 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
5251
5252 let event = json!({
5253 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
5254 "timestamp": now_iso,
5255 "from": did,
5256 "to": format!("did:wire:{chosen_handle}"),
5257 "type": kind,
5258 "kind": kind_id,
5259 "body": json!({
5260 "content": body_value,
5261 "routed_via": {
5262 "role": role,
5263 "strategy": strategy,
5264 },
5265 }),
5266 });
5267 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)
5268 .map_err(|e| anyhow!("sign_message_v31 failed: {e:?}"))?;
5269 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
5270
5271 let line = serde_json::to_vec(&signed)?;
5272 config::append_outbox_record(&chosen_handle, &line)?;
5273
5274 let endpoints = crate::endpoints::peer_endpoints_in_priority_order(&state, &chosen_handle);
5275 if endpoints.is_empty() {
5276 bail!(
5277 "no reachable endpoint pinned for `{chosen_handle}` (the role matched, but we can't push)"
5278 );
5279 }
5280 let start = Instant::now();
5281 let mut delivered = false;
5282 let mut last_err: Option<String> = None;
5283 let mut via_scope: Option<String> = None;
5284 for ep in &endpoints {
5285 let client = crate::relay_client::RelayClient::new(&ep.relay_url);
5286 match client.post_event(&ep.slot_id, &ep.slot_token, &signed) {
5287 Ok(_) => {
5288 delivered = true;
5289 via_scope = Some(
5290 match ep.scope {
5291 crate::endpoints::EndpointScope::Local => "local",
5292 crate::endpoints::EndpointScope::Federation => "federation",
5293 }
5294 .to_string(),
5295 );
5296 break;
5297 }
5298 Err(e) => last_err = Some(format!("{e:#}")),
5299 }
5300 }
5301 let rtt_ms = start.elapsed().as_millis() as u64;
5302
5303 let summary = json!({
5304 "role": role,
5305 "strategy": strategy,
5306 "routed_to": chosen_handle,
5307 "event_id": event_id,
5308 "delivered": delivered,
5309 "delivered_via": via_scope,
5310 "rtt_ms": rtt_ms,
5311 "candidates": candidates.iter().map(|(h, _)| h.clone()).collect::<Vec<_>>(),
5312 "error": last_err,
5313 });
5314
5315 if as_json {
5316 println!("{}", serde_json::to_string(&summary)?);
5317 } else if delivered {
5318 let via = via_scope.as_deref().unwrap_or("?");
5319 println!("wire mesh route: {role} → {chosen_handle} ({rtt_ms}ms, {via})");
5320 } else {
5321 let err = last_err.as_deref().unwrap_or("no endpoints reachable");
5322 bail!("delivery to `{chosen_handle}` failed: {err}");
5323 }
5324 Ok(())
5325}
5326
5327fn mesh_route_cursor_path() -> Result<std::path::PathBuf> {
5328 Ok(config::state_dir()?.join("mesh-route-cursor.json"))
5329}
5330
5331fn read_mesh_route_cursors(path: &std::path::Path) -> std::collections::BTreeMap<String, String> {
5332 std::fs::read(path)
5333 .ok()
5334 .and_then(|b| serde_json::from_slice(&b).ok())
5335 .unwrap_or_default()
5336}
5337
5338fn write_mesh_route_cursors(
5339 path: &std::path::Path,
5340 cursors: &std::collections::BTreeMap<String, String>,
5341) -> Result<()> {
5342 if let Some(parent) = path.parent() {
5343 std::fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
5344 }
5345 let body = serde_json::to_vec_pretty(cursors)?;
5346 std::fs::write(path, body).with_context(|| format!("writing {path:?}"))?;
5347 Ok(())
5348}
5349
5350fn cmd_mesh_role(action: MeshRoleAction) -> Result<()> {
5355 match action {
5356 MeshRoleAction::Set { role, json } => {
5357 validate_role_tag(&role)?;
5358 let new_profile =
5359 crate::pair_profile::write_profile_field("role", Value::String(role.clone()))?;
5360 if json {
5361 println!(
5362 "{}",
5363 serde_json::to_string(&json!({
5364 "role": role,
5365 "profile": new_profile,
5366 }))?
5367 );
5368 } else {
5369 println!("self role = {role} (signed into agent-card)");
5370 }
5371 }
5372 MeshRoleAction::Get { peer, json } => {
5373 let (who, role) = match peer.as_deref() {
5374 None => {
5375 let card = config::read_agent_card()?;
5376 let role = card
5377 .get("profile")
5378 .and_then(|p| p.get("role"))
5379 .and_then(Value::as_str)
5380 .map(str::to_string);
5381 let who = card
5382 .get("did")
5383 .and_then(Value::as_str)
5384 .map(|d| crate::agent_card::display_handle_from_did(d).to_string())
5385 .unwrap_or_else(|| "self".to_string());
5386 (who, role)
5387 }
5388 Some(handle) => {
5389 let bare = crate::agent_card::bare_handle(handle).to_string();
5390 let trust = config::read_trust()?;
5391 let role = trust
5392 .get("agents")
5393 .and_then(|a| a.get(&bare))
5394 .and_then(|a| a.get("card"))
5395 .and_then(|c| c.get("profile"))
5396 .and_then(|p| p.get("role"))
5397 .and_then(Value::as_str)
5398 .map(str::to_string);
5399 (bare, role)
5400 }
5401 };
5402 if json {
5403 println!(
5404 "{}",
5405 serde_json::to_string(&json!({
5406 "handle": who,
5407 "role": role,
5408 }))?
5409 );
5410 } else {
5411 match role {
5412 Some(r) => println!("{who}: {r}"),
5413 None => println!("{who}: (unset)"),
5414 }
5415 }
5416 }
5417 MeshRoleAction::List { json } => {
5418 let mut self_did: Option<String> = None;
5419 if let Ok(card) = config::read_agent_card() {
5420 self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
5421 }
5422 let sessions = crate::session::list_sessions()?;
5423 let mut rows: Vec<Value> = Vec::new();
5424 for s in &sessions {
5425 let card_path = s
5426 .home_dir
5427 .join("config")
5428 .join("wire")
5429 .join("agent-card.json");
5430 let role = std::fs::read(&card_path)
5431 .ok()
5432 .and_then(|b| serde_json::from_slice::<Value>(&b).ok())
5433 .and_then(|c| {
5434 c.get("profile")
5435 .and_then(|p| p.get("role"))
5436 .and_then(Value::as_str)
5437 .map(str::to_string)
5438 });
5439 let is_self = match (&self_did, &s.did) {
5440 (Some(a), Some(b)) => a == b,
5441 _ => false,
5442 };
5443 rows.push(json!({
5444 "name": s.name,
5445 "handle": s.handle,
5446 "role": role,
5447 "self": is_self,
5448 }));
5449 }
5450 rows.sort_by(|a, b| {
5451 a["name"]
5452 .as_str()
5453 .unwrap_or("")
5454 .cmp(b["name"].as_str().unwrap_or(""))
5455 });
5456 if json {
5457 println!("{}", serde_json::to_string(&json!({"sessions": rows}))?);
5458 } else if rows.is_empty() {
5459 println!("no sister sessions on this machine.");
5460 } else {
5461 println!("SISTER ROLES (this machine):");
5462 for r in &rows {
5463 let name = r["name"].as_str().unwrap_or("?");
5464 let role = r["role"].as_str().unwrap_or("(unset)");
5465 let marker = if r["self"].as_bool().unwrap_or(false) {
5466 " ← you"
5467 } else {
5468 ""
5469 };
5470 println!(" {name:<24} {role}{marker}");
5471 }
5472 }
5473 }
5474 MeshRoleAction::Clear { json } => {
5475 let new_profile = crate::pair_profile::write_profile_field("role", Value::Null)?;
5476 if json {
5477 println!(
5478 "{}",
5479 serde_json::to_string(&json!({
5480 "cleared": true,
5481 "profile": new_profile,
5482 }))?
5483 );
5484 } else {
5485 println!("self role cleared");
5486 }
5487 }
5488 }
5489 Ok(())
5490}
5491
5492fn validate_role_tag(role: &str) -> Result<()> {
5497 if role.is_empty() {
5498 bail!("role must not be empty (use `wire mesh role --clear` to unset)");
5499 }
5500 if role.len() > 32 {
5501 bail!("role too long ({} chars; max 32)", role.len());
5502 }
5503 for c in role.chars() {
5504 if !(c.is_ascii_alphanumeric() || c == '-' || c == '_') {
5505 bail!("role contains illegal char {c:?} (allowed: A-Z a-z 0-9 - _)");
5506 }
5507 }
5508 Ok(())
5509}
5510
5511fn cmd_mesh_broadcast(
5531 kind: &str,
5532 scope_str: &str,
5533 exclude: &[String],
5534 _noreply: bool,
5535 body_arg: &str,
5536 as_json: bool,
5537) -> Result<()> {
5538 use std::time::Instant;
5539
5540 if !config::is_initialized()? {
5541 bail!("not initialized — run `wire init <handle>` first");
5542 }
5543
5544 let scope = match scope_str {
5545 "local" => crate::endpoints::EndpointScope::Local,
5546 "federation" => crate::endpoints::EndpointScope::Federation,
5547 "both" => {
5548 crate::endpoints::EndpointScope::Local
5552 }
5553 other => bail!("unknown scope `{other}` — use local | federation | both"),
5554 };
5555 let any_scope = scope_str == "both";
5556
5557 let state = config::read_relay_state()?;
5558 let peers = state["peers"].as_object().cloned().unwrap_or_default();
5559 if peers.is_empty() {
5560 bail!("no peers pinned — run `wire accept <invite-url>` or `wire pair-accept` first");
5561 }
5562
5563 let exclude_set: std::collections::HashSet<&str> = exclude.iter().map(String::as_str).collect();
5564
5565 struct Target {
5569 handle: String,
5570 endpoints: Vec<crate::endpoints::Endpoint>,
5571 }
5572 let mut targets: Vec<Target> = Vec::new();
5573 let mut skipped_wrong_scope: Vec<String> = Vec::new();
5574 let mut skipped_excluded: Vec<String> = Vec::new();
5575 for handle in peers.keys() {
5576 if exclude_set.contains(handle.as_str()) {
5577 skipped_excluded.push(handle.clone());
5578 continue;
5579 }
5580 let ordered = crate::endpoints::peer_endpoints_in_priority_order(&state, handle);
5581 let filtered: Vec<crate::endpoints::Endpoint> = ordered
5582 .into_iter()
5583 .filter(|ep| any_scope || ep.scope == scope)
5584 .collect();
5585 if filtered.is_empty() {
5586 skipped_wrong_scope.push(handle.clone());
5587 continue;
5588 }
5589 targets.push(Target {
5590 handle: handle.clone(),
5591 endpoints: filtered,
5592 });
5593 }
5594
5595 if targets.is_empty() {
5596 bail!(
5597 "no peers matched scope=`{scope_str}` after exclude filter ({} excluded, {} wrong-scope)",
5598 skipped_excluded.len(),
5599 skipped_wrong_scope.len()
5600 );
5601 }
5602
5603 let sk_seed = config::read_private_key()?;
5605 let card = config::read_agent_card()?;
5606 let did = card
5607 .get("did")
5608 .and_then(Value::as_str)
5609 .ok_or_else(|| anyhow!("agent-card missing did"))?
5610 .to_string();
5611 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
5612 let pk_b64 = card
5613 .get("verify_keys")
5614 .and_then(Value::as_object)
5615 .and_then(|m| m.values().next())
5616 .and_then(|v| v.get("key"))
5617 .and_then(Value::as_str)
5618 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
5619 let pk_bytes = crate::signing::b64decode(pk_b64)?;
5620
5621 let body_value: Value = if body_arg == "-" {
5622 use std::io::Read;
5623 let mut raw = String::new();
5624 std::io::stdin()
5625 .read_to_string(&mut raw)
5626 .with_context(|| "reading body from stdin")?;
5627 serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
5628 } else if let Some(path) = body_arg.strip_prefix('@') {
5629 let raw =
5630 std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
5631 serde_json::from_str(&raw).unwrap_or(Value::String(raw))
5632 } else {
5633 Value::String(body_arg.to_string())
5634 };
5635
5636 let kind_id = parse_kind(kind)?;
5637 let now_iso = time::OffsetDateTime::now_utc()
5638 .format(&time::format_description::well_known::Rfc3339)
5639 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
5640
5641 let broadcast_id = generate_broadcast_id();
5642 let target_count = targets.len();
5643
5644 let mut signed_per_peer: Vec<(String, Vec<crate::endpoints::Endpoint>, Value, String)> =
5648 Vec::with_capacity(targets.len());
5649 for t in &targets {
5650 let body = json!({
5651 "content": body_value,
5652 "broadcast_id": broadcast_id,
5653 "broadcast_target_count": target_count,
5654 });
5655 let event = json!({
5656 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
5657 "timestamp": now_iso,
5658 "from": did,
5659 "to": format!("did:wire:{}", t.handle),
5660 "type": kind,
5661 "kind": kind_id,
5662 "body": body,
5663 });
5664 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)
5665 .map_err(|e| anyhow!("sign_message_v31 failed for `{}`: {e:?}", t.handle))?;
5666 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
5667 signed_per_peer.push((t.handle.clone(), t.endpoints.clone(), signed, event_id));
5668 }
5669
5670 for (peer, _, signed, _) in &signed_per_peer {
5674 let line = serde_json::to_vec(signed)?;
5675 config::append_outbox_record(peer, &line)?;
5676 }
5677
5678 use std::sync::mpsc;
5682 let (tx, rx) = mpsc::channel::<Value>();
5683 std::thread::scope(|s| {
5684 for (peer, endpoints, signed, event_id) in &signed_per_peer {
5685 let tx = tx.clone();
5686 let peer = peer.clone();
5687 let event_id = event_id.clone();
5688 let endpoints = endpoints.clone();
5689 let signed = signed.clone();
5690 s.spawn(move || {
5691 let start = Instant::now();
5692 let mut delivered = false;
5693 let mut last_err: Option<String> = None;
5694 let mut delivered_via: Option<String> = None;
5695 for ep in &endpoints {
5696 let client = crate::relay_client::RelayClient::new(&ep.relay_url);
5697 match client.post_event(&ep.slot_id, &ep.slot_token, &signed) {
5698 Ok(_) => {
5699 delivered = true;
5700 delivered_via = Some(
5701 match ep.scope {
5702 crate::endpoints::EndpointScope::Local => "local",
5703 crate::endpoints::EndpointScope::Federation => "federation",
5704 }
5705 .to_string(),
5706 );
5707 break;
5708 }
5709 Err(e) => last_err = Some(format!("{e:#}")),
5710 }
5711 }
5712 let rtt_ms = start.elapsed().as_millis() as u64;
5713 let _ = tx.send(json!({
5714 "peer": peer,
5715 "event_id": event_id,
5716 "delivered": delivered,
5717 "delivered_via": delivered_via,
5718 "rtt_ms": rtt_ms,
5719 "error": last_err,
5720 }));
5721 });
5722 }
5723 });
5724 drop(tx);
5725
5726 let mut results: Vec<Value> = rx.iter().collect();
5727 results.sort_by(|a, b| {
5728 a["peer"]
5729 .as_str()
5730 .unwrap_or("")
5731 .cmp(b["peer"].as_str().unwrap_or(""))
5732 });
5733
5734 let delivered = results
5735 .iter()
5736 .filter(|r| r["delivered"].as_bool().unwrap_or(false))
5737 .count();
5738 let failed = results.len() - delivered;
5739
5740 let summary = json!({
5741 "broadcast_id": broadcast_id,
5742 "kind": kind,
5743 "scope": scope_str,
5744 "target_count": target_count,
5745 "delivered": delivered,
5746 "failed": failed,
5747 "skipped_excluded": skipped_excluded,
5748 "skipped_wrong_scope": skipped_wrong_scope,
5749 "results": results,
5750 });
5751
5752 if as_json {
5753 println!("{}", serde_json::to_string(&summary)?);
5754 return Ok(());
5755 }
5756
5757 println!("wire mesh broadcast: scope={scope_str} → {target_count} pinned peer(s)");
5758 for r in &results {
5759 let peer = r["peer"].as_str().unwrap_or("?");
5760 let delivered = r["delivered"].as_bool().unwrap_or(false);
5761 let rtt = r["rtt_ms"].as_u64().unwrap_or(0);
5762 let via = r["delivered_via"].as_str().unwrap_or("");
5763 if delivered {
5764 println!(" {peer:<24} ✓ delivered ({rtt}ms, {via})");
5765 } else {
5766 let err = r["error"].as_str().unwrap_or("?");
5767 println!(" {peer:<24} ✗ failed — {err}");
5768 }
5769 }
5770 if !skipped_excluded.is_empty() {
5771 println!(" excluded: {}", skipped_excluded.join(", "));
5772 }
5773 if !skipped_wrong_scope.is_empty() {
5774 println!(
5775 " skipped (wrong scope): {}",
5776 skipped_wrong_scope.join(", ")
5777 );
5778 }
5779 println!("broadcast_id: {broadcast_id}");
5780 Ok(())
5781}
5782
5783fn generate_broadcast_id() -> String {
5787 use rand::RngCore;
5788 let mut buf = [0u8; 16];
5789 rand::thread_rng().fill_bytes(&mut buf);
5790 let h = hex::encode(buf);
5791 format!(
5792 "{}-{}-{}-{}-{}",
5793 &h[0..8],
5794 &h[8..12],
5795 &h[12..16],
5796 &h[16..20],
5797 &h[20..32],
5798 )
5799}
5800
5801fn cmd_session(cmd: SessionCommand) -> Result<()> {
5802 match cmd {
5803 SessionCommand::New {
5804 name,
5805 relay,
5806 with_local,
5807 local_relay,
5808 no_daemon,
5809 local_only,
5810 json,
5811 } => cmd_session_new(
5812 name.as_deref(),
5813 &relay,
5814 with_local,
5815 &local_relay,
5816 no_daemon,
5817 local_only,
5818 json,
5819 ),
5820 SessionCommand::List { json } => cmd_session_list(json),
5821 SessionCommand::ListLocal { json } => cmd_session_list_local(json),
5822 SessionCommand::PairAllLocal {
5823 settle_secs,
5824 federation_relay,
5825 json,
5826 } => cmd_session_pair_all_local(settle_secs, &federation_relay, json),
5827 SessionCommand::MeshStatus { stale_secs, json } => {
5828 cmd_session_mesh_status(stale_secs, json)
5829 }
5830 SessionCommand::Env { name, json } => cmd_session_env(name.as_deref(), json),
5831 SessionCommand::Current { json } => cmd_session_current(json),
5832 SessionCommand::Destroy { name, force, json } => cmd_session_destroy(&name, force, json),
5833 }
5834}
5835
5836fn resolve_session_name(name: Option<&str>) -> Result<String> {
5837 if let Some(n) = name {
5838 return Ok(crate::session::sanitize_name(n));
5839 }
5840 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5841 let registry = crate::session::read_registry().unwrap_or_default();
5842 Ok(crate::session::derive_name_from_cwd(&cwd, ®istry))
5843}
5844
5845fn cmd_session_new(
5846 name_arg: Option<&str>,
5847 relay: &str,
5848 with_local: bool,
5849 local_relay: &str,
5850 no_daemon: bool,
5851 local_only: bool,
5852 as_json: bool,
5853) -> Result<()> {
5854 let with_local = with_local || local_only;
5857 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5858 let mut registry = crate::session::read_registry().unwrap_or_default();
5859 let name = match name_arg {
5860 Some(n) => crate::session::sanitize_name(n),
5861 None => crate::session::derive_name_from_cwd(&cwd, ®istry),
5862 };
5863 let session_home = crate::session::session_dir(&name)?;
5864
5865 let already_exists = session_home.exists()
5866 && session_home
5867 .join("config")
5868 .join("wire")
5869 .join("agent-card.json")
5870 .exists();
5871 if already_exists {
5872 registry
5876 .by_cwd
5877 .insert(cwd.to_string_lossy().into_owned(), name.clone());
5878 crate::session::write_registry(®istry)?;
5879 let info = render_session_info(&name, &session_home, &cwd)?;
5880 emit_session_new_result(&info, "already_exists", as_json)?;
5881 if !no_daemon {
5882 ensure_session_daemon(&session_home)?;
5883 }
5884 return Ok(());
5885 }
5886
5887 std::fs::create_dir_all(&session_home)
5888 .with_context(|| format!("creating session dir {session_home:?}"))?;
5889
5890 let init_args: Vec<&str> = if local_only {
5895 vec!["init", &name]
5896 } else {
5897 vec!["init", &name, "--relay", relay]
5898 };
5899 let init_status = run_wire_with_home(&session_home, &init_args)?;
5900 if !init_status.success() {
5901 let how = if local_only {
5902 format!("`wire init {name}` (local-only)")
5903 } else {
5904 format!("`wire init {name} --relay {relay}`")
5905 };
5906 bail!("{how} failed inside session dir {session_home:?}");
5907 }
5908
5909 let effective_handle = if local_only {
5914 name.clone()
5915 } else {
5916 let mut claim_attempt = 0u32;
5917 let mut effective = name.clone();
5918 loop {
5919 claim_attempt += 1;
5920 let status =
5921 run_wire_with_home(&session_home, &["claim", &effective, "--relay", relay])?;
5922 if status.success() {
5923 break;
5924 }
5925 if claim_attempt >= 5 {
5926 bail!(
5927 "5 failed attempts to claim a handle on {relay} for session {name}. \
5928 Try `wire session destroy {name} --force` and re-run with a different name, \
5929 or use `--local-only` if you don't need a federation address."
5930 );
5931 }
5932 let attempt_path = cwd.join(format!("__attempt_{claim_attempt}"));
5933 let suffix = crate::session::derive_name_from_cwd(&attempt_path, ®istry);
5934 let token = suffix
5935 .rsplit('-')
5936 .next()
5937 .filter(|t| t.len() == 4)
5938 .map(str::to_string)
5939 .unwrap_or_else(|| format!("{claim_attempt}"));
5940 effective = format!("{name}-{token}");
5941 }
5942 effective
5943 };
5944
5945 registry
5948 .by_cwd
5949 .insert(cwd.to_string_lossy().into_owned(), name.clone());
5950 crate::session::write_registry(®istry)?;
5951
5952 if with_local {
5963 try_allocate_local_slot(&session_home, &effective_handle, relay, local_relay);
5964 if local_only {
5965 let relay_state_path = session_home.join("config").join("wire").join("relay.json");
5970 let state: Value = std::fs::read(&relay_state_path)
5971 .ok()
5972 .and_then(|b| serde_json::from_slice(&b).ok())
5973 .unwrap_or_else(|| json!({"self": Value::Null, "peers": {}}));
5974 let endpoints = crate::endpoints::self_endpoints(&state);
5975 let has_local = endpoints
5976 .iter()
5977 .any(|e| e.scope == crate::endpoints::EndpointScope::Local);
5978 if !has_local {
5979 bail!(
5980 "--local-only requested but local-relay probe at {local_relay} failed — \
5981 ensure the local relay is running (`wire service install --local-relay`), \
5982 then re-run `wire session new {name} --local-only`."
5983 );
5984 }
5985 }
5986 }
5987
5988 if !no_daemon {
5989 ensure_session_daemon(&session_home)?;
5990 }
5991
5992 let info = render_session_info(&name, &session_home, &cwd)?;
5993 emit_session_new_result(&info, "created", as_json)
5994}
5995
5996fn try_allocate_local_slot(
6004 session_home: &std::path::Path,
6005 handle: &str,
6006 _federation_relay: &str,
6007 local_relay: &str,
6008) {
6009 let probe = match crate::relay_client::build_blocking_client(Some(
6012 std::time::Duration::from_millis(500),
6013 )) {
6014 Ok(c) => c,
6015 Err(e) => {
6016 eprintln!("wire session new: cannot build probe client for {local_relay}: {e:#}");
6017 return;
6018 }
6019 };
6020 let healthz_url = format!("{}/healthz", local_relay.trim_end_matches('/'));
6021 match probe.get(&healthz_url).send() {
6022 Ok(resp) if resp.status().is_success() => {}
6023 Ok(resp) => {
6024 eprintln!(
6025 "wire session new: local relay probe at {healthz_url} returned {} — staying federation-only",
6026 resp.status()
6027 );
6028 return;
6029 }
6030 Err(e) => {
6031 eprintln!(
6032 "wire session new: local relay at {local_relay} unreachable ({}) — staying federation-only. \
6033 Start one with `wire relay-server --bind 127.0.0.1:8771 --local-only`.",
6034 crate::relay_client::format_transport_error(&anyhow::Error::new(e))
6035 );
6036 return;
6037 }
6038 };
6039
6040 let local_client = crate::relay_client::RelayClient::new(local_relay);
6042 let alloc = match local_client.allocate_slot(Some(handle)) {
6043 Ok(a) => a,
6044 Err(e) => {
6045 eprintln!(
6046 "wire session new: local relay slot allocation failed: {e:#} — staying federation-only"
6047 );
6048 return;
6049 }
6050 };
6051
6052 let state_path = session_home.join("config").join("wire").join("relay.json");
6067 let mut state: serde_json::Value = std::fs::read(&state_path)
6068 .ok()
6069 .and_then(|b| serde_json::from_slice(&b).ok())
6070 .unwrap_or_else(|| serde_json::json!({}));
6071 let fed_endpoint = state.get("self").and_then(|s| {
6074 let url = s.get("relay_url").and_then(serde_json::Value::as_str)?;
6075 let slot_id = s.get("slot_id").and_then(serde_json::Value::as_str)?;
6076 let slot_token = s.get("slot_token").and_then(serde_json::Value::as_str)?;
6077 Some(crate::endpoints::Endpoint::federation(
6078 url.to_string(),
6079 slot_id.to_string(),
6080 slot_token.to_string(),
6081 ))
6082 });
6083
6084 let local_endpoint = crate::endpoints::Endpoint::local(
6085 local_relay.trim_end_matches('/').to_string(),
6086 alloc.slot_id.clone(),
6087 alloc.slot_token.clone(),
6088 );
6089
6090 let mut endpoints: Vec<crate::endpoints::Endpoint> = Vec::new();
6091 if let Some(f) = fed_endpoint.clone() {
6092 endpoints.push(f);
6093 }
6094 endpoints.push(local_endpoint);
6095
6096 let (legacy_relay, legacy_slot_id, legacy_slot_token) = match fed_endpoint.clone() {
6106 Some(f) => (f.relay_url, f.slot_id, f.slot_token),
6107 None => (
6108 local_relay.trim_end_matches('/').to_string(),
6109 alloc.slot_id.clone(),
6110 alloc.slot_token.clone(),
6111 ),
6112 };
6113 let self_obj = state
6114 .as_object_mut()
6115 .expect("relay_state root is an object")
6116 .entry("self")
6117 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
6118 if !self_obj.is_object() {
6121 *self_obj = serde_json::Value::Object(serde_json::Map::new());
6122 }
6123 if let Some(obj) = self_obj.as_object_mut() {
6124 obj.insert("relay_url".into(), serde_json::Value::String(legacy_relay));
6125 obj.insert("slot_id".into(), serde_json::Value::String(legacy_slot_id));
6126 obj.insert(
6127 "slot_token".into(),
6128 serde_json::Value::String(legacy_slot_token),
6129 );
6130 obj.insert(
6131 "endpoints".into(),
6132 serde_json::to_value(&endpoints).unwrap_or(serde_json::Value::Null),
6133 );
6134 }
6135
6136 if let Err(e) = std::fs::write(
6137 &state_path,
6138 serde_json::to_vec_pretty(&state).unwrap_or_default(),
6139 ) {
6140 eprintln!(
6141 "wire session new: persisting dual-slot relay_state at {state_path:?} failed: {e}"
6142 );
6143 return;
6144 }
6145 eprintln!(
6146 "wire session new: local slot allocated on {local_relay} (slot_id={})",
6147 alloc.slot_id
6148 );
6149}
6150
6151fn render_session_info(
6152 name: &str,
6153 session_home: &std::path::Path,
6154 cwd: &std::path::Path,
6155) -> Result<serde_json::Value> {
6156 let card_path = session_home
6157 .join("config")
6158 .join("wire")
6159 .join("agent-card.json");
6160 let (did, handle) = if card_path.exists() {
6161 let card: Value = serde_json::from_slice(&std::fs::read(&card_path)?)?;
6162 let did = card
6163 .get("did")
6164 .and_then(Value::as_str)
6165 .unwrap_or("")
6166 .to_string();
6167 let handle = card
6168 .get("handle")
6169 .and_then(Value::as_str)
6170 .map(str::to_string)
6171 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
6172 (did, handle)
6173 } else {
6174 (String::new(), String::new())
6175 };
6176 Ok(json!({
6177 "name": name,
6178 "home_dir": session_home.to_string_lossy(),
6179 "cwd": cwd.to_string_lossy(),
6180 "did": did,
6181 "handle": handle,
6182 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
6183 }))
6184}
6185
6186fn emit_session_new_result(info: &serde_json::Value, status: &str, as_json: bool) -> Result<()> {
6187 if as_json {
6188 let mut obj = info.clone();
6189 obj["status"] = json!(status);
6190 println!("{}", serde_json::to_string(&obj)?);
6191 } else {
6192 let name = info["name"].as_str().unwrap_or("?");
6193 let handle = info["handle"].as_str().unwrap_or("?");
6194 let home = info["home_dir"].as_str().unwrap_or("?");
6195 let did = info["did"].as_str().unwrap_or("?");
6196 let export = info["export"].as_str().unwrap_or("?");
6197 let prefix = if status == "already_exists" {
6198 "session already exists (re-registered cwd)"
6199 } else {
6200 "session created"
6201 };
6202 println!(
6203 "{prefix}\n name: {name}\n handle: {handle}\n did: {did}\n home: {home}\n\nactivate with:\n {export}"
6204 );
6205 }
6206 Ok(())
6207}
6208
6209fn run_wire_with_home(
6210 session_home: &std::path::Path,
6211 args: &[&str],
6212) -> Result<std::process::ExitStatus> {
6213 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
6214 let status = std::process::Command::new(&bin)
6215 .env("WIRE_HOME", session_home)
6216 .env_remove("RUST_LOG")
6217 .args(args)
6218 .status()
6219 .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
6220 Ok(status)
6221}
6222
6223fn ensure_session_daemon(session_home: &std::path::Path) -> Result<()> {
6224 let pidfile = session_home.join("state").join("wire").join("daemon.pid");
6227 if pidfile.exists() {
6228 let bytes = std::fs::read(&pidfile).unwrap_or_default();
6229 let pid: Option<u32> = if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
6230 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
6231 } else {
6232 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
6233 };
6234 if let Some(p) = pid {
6235 let alive = {
6236 #[cfg(target_os = "linux")]
6237 {
6238 std::path::Path::new(&format!("/proc/{p}")).exists()
6239 }
6240 #[cfg(not(target_os = "linux"))]
6241 {
6242 std::process::Command::new("kill")
6243 .args(["-0", &p.to_string()])
6244 .output()
6245 .map(|o| o.status.success())
6246 .unwrap_or(false)
6247 }
6248 };
6249 if alive {
6250 return Ok(());
6251 }
6252 }
6253 }
6254
6255 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
6258 let log_path = session_home.join("state").join("wire").join("daemon.log");
6259 if let Some(parent) = log_path.parent() {
6260 std::fs::create_dir_all(parent).ok();
6261 }
6262 let log_file = std::fs::OpenOptions::new()
6263 .create(true)
6264 .append(true)
6265 .open(&log_path)
6266 .with_context(|| format!("opening daemon log {log_path:?}"))?;
6267 let log_err = log_file.try_clone()?;
6268 std::process::Command::new(&bin)
6269 .env("WIRE_HOME", session_home)
6270 .env_remove("RUST_LOG")
6271 .args(["daemon", "--interval", "5"])
6272 .stdout(log_file)
6273 .stderr(log_err)
6274 .stdin(std::process::Stdio::null())
6275 .spawn()
6276 .with_context(|| "spawning session-local `wire daemon`")?;
6277 Ok(())
6278}
6279
6280fn cmd_session_list(as_json: bool) -> Result<()> {
6281 let items = crate::session::list_sessions()?;
6282 if as_json {
6283 println!("{}", serde_json::to_string(&items)?);
6284 return Ok(());
6285 }
6286 if items.is_empty() {
6287 println!("no sessions on this machine. `wire session new` to create one.");
6288 return Ok(());
6289 }
6290 println!("{:<24} {:<24} {:<10} CWD", "NAME", "HANDLE", "DAEMON");
6291 for s in items {
6292 println!(
6293 "{:<24} {:<24} {:<10} {}",
6294 s.name,
6295 s.handle.as_deref().unwrap_or("?"),
6296 if s.daemon_running { "running" } else { "down" },
6297 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
6298 );
6299 }
6300 Ok(())
6301}
6302
6303fn cmd_session_list_local(as_json: bool) -> Result<()> {
6315 let listing = crate::session::list_local_sessions()?;
6316 if as_json {
6317 println!("{}", serde_json::to_string(&listing)?);
6318 return Ok(());
6319 }
6320
6321 if listing.local.is_empty() && listing.federation_only.is_empty() {
6322 println!(
6323 "no sessions on this machine. `wire session new --with-local` to create one \
6324 with a local-relay endpoint (start the relay first: \
6325 `wire relay-server --bind 127.0.0.1:8771 --local-only`)."
6326 );
6327 return Ok(());
6328 }
6329
6330 if listing.local.is_empty() {
6331 println!(
6332 "no sister sessions reachable via a local relay. \
6333 Re-run `wire session new --with-local` to add a Local endpoint, or \
6334 start a local relay with `wire relay-server --bind 127.0.0.1:8771 --local-only`."
6335 );
6336 } else {
6337 let mut keys: Vec<&String> = listing.local.keys().collect();
6339 keys.sort();
6340 for relay_url in keys {
6341 let group = &listing.local[relay_url];
6342 println!("LOCAL RELAY: {relay_url}");
6343 println!(" {:<24} {:<32} {:<10} CWD", "NAME", "HANDLE", "DAEMON");
6344 for s in group {
6345 println!(
6346 " {:<24} {:<32} {:<10} {}",
6347 s.name,
6348 s.handle.as_deref().unwrap_or("?"),
6349 if s.daemon_running { "running" } else { "down" },
6350 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
6351 );
6352 }
6353 println!();
6354 }
6355 }
6356
6357 if !listing.federation_only.is_empty() {
6358 println!("federation-only (no local endpoint):");
6359 for s in &listing.federation_only {
6360 println!(
6361 " {:<24} {:<32} {}",
6362 s.name,
6363 s.handle.as_deref().unwrap_or("?"),
6364 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
6365 );
6366 }
6367 }
6368 Ok(())
6369}
6370
6371fn cmd_session_pair_all_local(
6390 settle_secs: u64,
6391 federation_relay: &str,
6392 as_json: bool,
6393) -> Result<()> {
6394 use std::collections::BTreeSet;
6395 use std::time::Duration;
6396
6397 let listing = crate::session::list_local_sessions()?;
6398 let mut by_name: std::collections::BTreeMap<String, crate::session::LocalSessionView> =
6402 Default::default();
6403 for group in listing.local.into_values() {
6404 for s in group {
6405 by_name.entry(s.name.clone()).or_insert(s);
6406 }
6407 }
6408 let sessions: Vec<crate::session::LocalSessionView> = by_name.into_values().collect();
6409
6410 if sessions.len() < 2 {
6411 let msg = format!(
6412 "{} sister session(s) with a local endpoint — need at least 2 to pair.",
6413 sessions.len()
6414 );
6415 if as_json {
6416 println!(
6417 "{}",
6418 serde_json::to_string(&json!({
6419 "sessions": sessions.iter().map(|s| &s.name).collect::<Vec<_>>(),
6420 "pairs_attempted": 0,
6421 "pairs_succeeded": 0,
6422 "pairs_skipped_already_paired": 0,
6423 "pairs_failed": 0,
6424 "note": msg,
6425 }))?
6426 );
6427 } else {
6428 println!("{msg}");
6429 if let Some(s) = sessions.first() {
6430 println!(" - {} ({})", s.name, s.cwd.as_deref().unwrap_or("?"));
6431 }
6432 println!("Use `wire session new --with-local` to add more.");
6433 }
6434 return Ok(());
6435 }
6436
6437 let fed_host = host_of_url(federation_relay);
6438 if fed_host.is_empty() {
6439 bail!(
6440 "federation_relay `{federation_relay}` has no parseable host — \
6441 pass a full URL like `https://wireup.net`."
6442 );
6443 }
6444
6445 let mut attempted = 0u32;
6447 let mut succeeded = 0u32;
6448 let mut skipped_already = 0u32;
6449 let mut failed = 0u32;
6450 let mut per_pair: Vec<Value> = Vec::new();
6451
6452 for i in 0..sessions.len() {
6453 for j in (i + 1)..sessions.len() {
6454 let a = &sessions[i];
6455 let b = &sessions[j];
6456 attempted += 1;
6457
6458 let a_pinned_b = session_has_peer(&a.home_dir, &b.name);
6461 let b_pinned_a = session_has_peer(&b.home_dir, &a.name);
6462 if a_pinned_b && b_pinned_a {
6463 skipped_already += 1;
6464 per_pair.push(json!({
6465 "from": a.name,
6466 "to": b.name,
6467 "status": "already_paired",
6468 }));
6469 continue;
6470 }
6471
6472 let pair_result = drive_bilateral_pair(
6473 &a.home_dir,
6474 &a.name,
6475 &b.home_dir,
6476 &b.name,
6477 &fed_host,
6478 federation_relay,
6479 settle_secs,
6480 );
6481
6482 match pair_result {
6483 Ok(()) => {
6484 succeeded += 1;
6485 per_pair.push(json!({
6486 "from": a.name,
6487 "to": b.name,
6488 "status": "paired",
6489 }));
6490 }
6491 Err(e) => {
6492 failed += 1;
6493 let detail = format!("{e:#}");
6494 per_pair.push(json!({
6495 "from": a.name,
6496 "to": b.name,
6497 "status": "failed",
6498 "error": detail,
6499 }));
6500 }
6501 }
6502
6503 std::thread::sleep(Duration::from_millis(200));
6506 }
6507 }
6508
6509 let _ = BTreeSet::<String>::new(); let summary = json!({
6511 "sessions": sessions.iter().map(|s| s.name.clone()).collect::<Vec<_>>(),
6512 "pairs_attempted": attempted,
6513 "pairs_succeeded": succeeded,
6514 "pairs_skipped_already_paired": skipped_already,
6515 "pairs_failed": failed,
6516 "results": per_pair,
6517 });
6518 if as_json {
6519 println!("{}", serde_json::to_string(&summary)?);
6520 } else {
6521 println!(
6522 "wire session pair-all-local: {} session(s), {} pair(s) attempted",
6523 sessions.len(),
6524 attempted
6525 );
6526 println!(" paired: {succeeded}");
6527 println!(" skipped (already pinned): {skipped_already}");
6528 println!(" failed: {failed}");
6529 for entry in summary["results"].as_array().unwrap_or(&vec![]) {
6530 let from = entry["from"].as_str().unwrap_or("?");
6531 let to = entry["to"].as_str().unwrap_or("?");
6532 let status = entry["status"].as_str().unwrap_or("?");
6533 let err = entry.get("error").and_then(Value::as_str).unwrap_or("");
6534 if err.is_empty() {
6535 println!(" {from:<24} ↔ {to:<24} {status}");
6536 } else {
6537 println!(" {from:<24} ↔ {to:<24} {status} — {err}");
6538 }
6539 }
6540 }
6541 Ok(())
6542}
6543
6544fn session_has_peer(session_home: &std::path::Path, peer_name: &str) -> bool {
6547 val_session_relay_state(session_home)
6548 .and_then(|v| v.get("peers").cloned())
6549 .and_then(|p| p.get(peer_name).cloned())
6550 .is_some()
6551}
6552
6553fn val_session_relay_state(session_home: &std::path::Path) -> Option<Value> {
6558 let path = session_home.join("config").join("wire").join("relay.json");
6559 let bytes = std::fs::read(&path).ok()?;
6560 serde_json::from_slice(&bytes).ok()
6561}
6562
6563fn cmd_session_mesh_status(stale_secs: u64, as_json: bool) -> Result<()> {
6567 use std::collections::BTreeMap;
6568
6569 let listing = crate::session::list_local_sessions()?;
6572 let mut by_name: BTreeMap<String, crate::session::LocalSessionView> = BTreeMap::new();
6573 for group in listing.local.into_values() {
6574 for s in group {
6575 by_name.entry(s.name.clone()).or_insert(s);
6576 }
6577 }
6578 let sessions: Vec<crate::session::LocalSessionView> = by_name.into_values().collect();
6579 let federation_only = listing.federation_only;
6580
6581 if sessions.is_empty() {
6582 let msg = "no sister sessions with a local endpoint on this machine.".to_string();
6583 if as_json {
6584 println!(
6585 "{}",
6586 serde_json::to_string(&json!({
6587 "sessions": [],
6588 "edges": [],
6589 "local_relay": null,
6590 "federation_only": federation_only.iter().map(|f| &f.name).collect::<Vec<_>>(),
6591 "summary": {
6592 "session_count": 0,
6593 "edge_count": 0,
6594 "healthy": 0,
6595 "stale": 0,
6596 "asymmetric": 0,
6597 },
6598 "note": msg,
6599 }))?
6600 );
6601 } else {
6602 println!("{msg}");
6603 println!("Use `wire session new --with-local` to create one.");
6604 }
6605 return Ok(());
6606 }
6607
6608 struct SessionState {
6610 view: crate::session::LocalSessionView,
6611 relay_state: Value,
6612 local_relay_url: Option<String>,
6613 }
6614 let mut sstates: Vec<SessionState> = Vec::with_capacity(sessions.len());
6615 for s in sessions {
6616 let relay_state = val_session_relay_state(&s.home_dir)
6617 .unwrap_or_else(|| json!({"self": Value::Null, "peers": {}}));
6618 let local_relay_url = s.local_endpoints.first().map(|e| e.relay_url.clone());
6619 sstates.push(SessionState {
6620 view: s,
6621 relay_state,
6622 local_relay_url,
6623 });
6624 }
6625
6626 let mut local_relays: BTreeMap<String, bool> = BTreeMap::new();
6629 for s in &sstates {
6630 if let Some(url) = &s.local_relay_url
6631 && !local_relays.contains_key(url)
6632 {
6633 let healthy = probe_relay_healthz(url);
6634 local_relays.insert(url.clone(), healthy);
6635 }
6636 }
6637
6638 let now = std::time::SystemTime::now()
6639 .duration_since(std::time::UNIX_EPOCH)
6640 .map(|d| d.as_secs())
6641 .unwrap_or(0);
6642
6643 let mut edges: Vec<Value> = Vec::new();
6647 let mut healthy_count = 0u32;
6648 let mut stale_count = 0u32;
6649 let mut asymmetric_count = 0u32;
6650
6651 for i in 0..sstates.len() {
6652 for j in (i + 1)..sstates.len() {
6653 let a = &sstates[i];
6654 let b = &sstates[j];
6655 let a_to_b = probe_directed_edge(&a.relay_state, &b.view.name, now);
6656 let b_to_a = probe_directed_edge(&b.relay_state, &a.view.name, now);
6657
6658 let bilateral = a_to_b.pinned && b_to_a.pinned;
6659 let scope = match (a_to_b.scope.as_deref(), b_to_a.scope.as_deref()) {
6663 (Some("local"), _) | (_, Some("local")) => "local",
6664 (Some("federation"), _) | (_, Some("federation")) => "federation",
6665 _ => "unknown",
6666 };
6667
6668 let mut status = if bilateral { "healthy" } else { "asymmetric" };
6671 if bilateral {
6672 let either_stale = [&a_to_b, &b_to_a].iter().any(|d| match d.silent_secs {
6673 Some(s) => s > stale_secs,
6674 None => d.probed,
6675 });
6676 if either_stale {
6677 status = "stale";
6678 }
6679 }
6680
6681 match status {
6682 "healthy" => healthy_count += 1,
6683 "stale" => stale_count += 1,
6684 "asymmetric" => asymmetric_count += 1,
6685 _ => {}
6686 }
6687
6688 edges.push(json!({
6689 "from": a.view.name,
6690 "to": b.view.name,
6691 "bilateral": bilateral,
6692 "scope": scope,
6693 "status": status,
6694 "directions": {
6695 a.view.name.clone(): direction_summary(&a_to_b),
6696 b.view.name.clone(): direction_summary(&b_to_a),
6697 },
6698 }));
6699 }
6700 }
6701
6702 let summary = json!({
6703 "sessions": sstates.iter().map(|s| json!({
6704 "name": s.view.name,
6705 "handle": s.view.handle,
6706 "cwd": s.view.cwd,
6707 "daemon_running": s.view.daemon_running,
6708 "local_relay": s.local_relay_url,
6709 })).collect::<Vec<_>>(),
6710 "edges": edges,
6711 "local_relays": local_relays.iter().map(|(url, healthy)| json!({
6712 "url": url,
6713 "healthy": healthy,
6714 })).collect::<Vec<_>>(),
6715 "federation_only": federation_only.iter().map(|f| &f.name).collect::<Vec<_>>(),
6716 "summary": {
6717 "session_count": sstates.len(),
6718 "edge_count": edges.len(),
6719 "healthy": healthy_count,
6720 "stale": stale_count,
6721 "asymmetric": asymmetric_count,
6722 "stale_threshold_secs": stale_secs,
6723 },
6724 });
6725
6726 if as_json {
6727 println!("{}", serde_json::to_string(&summary)?);
6728 return Ok(());
6729 }
6730
6731 println!(
6732 "wire mesh: {} session(s), {} edge(s)",
6733 sstates.len(),
6734 edges.len()
6735 );
6736 for (url, healthy) in &local_relays {
6737 let tick = if *healthy { "✓" } else { "✗" };
6738 println!(" local-relay {url} {tick}");
6739 }
6740 if !federation_only.is_empty() {
6741 print!(" federation-only sessions:");
6742 for f in &federation_only {
6743 print!(" {}", f.name);
6744 }
6745 println!();
6746 }
6747
6748 let names: Vec<&str> = sstates.iter().map(|s| s.view.name.as_str()).collect();
6750 let col_w = names.iter().map(|n| n.len()).max().unwrap_or(8).max(7) + 1;
6751 print!("\n{:>col_w$}", "", col_w = col_w);
6752 for n in &names {
6753 print!("{:>col_w$}", n, col_w = col_w);
6754 }
6755 println!();
6756 for (i, row) in names.iter().enumerate() {
6757 print!("{:>col_w$}", row, col_w = col_w);
6758 for (j, col) in names.iter().enumerate() {
6759 let cell = if i == j {
6760 "self".to_string()
6761 } else {
6762 let d = probe_directed_edge(&sstates[i].relay_state, col, now);
6763 match d.scope.as_deref() {
6764 Some("local") => "local".to_string(),
6765 Some("federation") => "fed".to_string(),
6766 _ => "—".to_string(),
6767 }
6768 };
6769 print!("{:>col_w$}", cell, col_w = col_w);
6770 }
6771 println!();
6772 }
6773
6774 println!("\nHealth (stale threshold: {stale_secs}s):");
6775 for e in &edges {
6776 let from = e["from"].as_str().unwrap_or("?");
6777 let to = e["to"].as_str().unwrap_or("?");
6778 let scope = e["scope"].as_str().unwrap_or("?");
6779 let status = e["status"].as_str().unwrap_or("?");
6780 let mark = match status {
6781 "healthy" => "✓",
6782 "stale" => "⚠",
6783 "asymmetric" => "!",
6784 _ => "?",
6785 };
6786 let dirs = e["directions"].as_object().cloned().unwrap_or_default();
6787 let mut details: Vec<String> = Vec::new();
6788 for (who, d) in &dirs {
6789 let silent = d.get("silent_secs").and_then(Value::as_u64);
6790 let pinned = d.get("pinned").and_then(Value::as_bool).unwrap_or(false);
6791 let probed = d.get("probed").and_then(Value::as_bool).unwrap_or(false);
6792 let label = match (pinned, probed, silent) {
6793 (false, _, _) => format!("{who} has not pinned"),
6794 (true, false, _) => format!("{who} pinned but no endpoint to probe"),
6795 (true, true, Some(s)) if s <= stale_secs => format!("{who} fresh ({s}s)"),
6796 (true, true, Some(s)) => format!("{who} silent {s}s"),
6797 (true, true, None) => format!("{who} never pulled"),
6798 };
6799 details.push(label);
6800 }
6801 println!(
6802 " {mark} {from} ↔ {to} scope={scope} {status:>10} [{}]",
6803 details.join(" | ")
6804 );
6805 }
6806 Ok(())
6807}
6808
6809#[derive(Default)]
6810struct DirectedEdge {
6811 pinned: bool,
6812 scope: Option<String>,
6813 last_pull_at_unix: Option<u64>,
6814 silent_secs: Option<u64>,
6815 probed: bool,
6816 event_count: usize,
6817}
6818
6819fn probe_directed_edge(from_state: &Value, to_name: &str, now: u64) -> DirectedEdge {
6825 let pinned = from_state
6826 .get("peers")
6827 .and_then(|p| p.get(to_name))
6828 .is_some();
6829 if !pinned {
6830 return DirectedEdge::default();
6831 }
6832 let endpoints = crate::endpoints::peer_endpoints_in_priority_order(from_state, to_name);
6833 let ep = match endpoints.into_iter().next() {
6834 Some(e) => e,
6835 None => {
6836 return DirectedEdge {
6837 pinned: true,
6838 ..Default::default()
6839 };
6840 }
6841 };
6842 let scope = Some(
6843 match ep.scope {
6844 crate::endpoints::EndpointScope::Local => "local",
6845 crate::endpoints::EndpointScope::Federation => "federation",
6846 }
6847 .to_string(),
6848 );
6849 let client = crate::relay_client::RelayClient::new(&ep.relay_url);
6850 let (count, last) = client
6851 .slot_state(&ep.slot_id, &ep.slot_token)
6852 .unwrap_or((0, None));
6853 let silent = last.map(|t| now.saturating_sub(t));
6854 DirectedEdge {
6855 pinned: true,
6856 scope,
6857 last_pull_at_unix: last,
6858 silent_secs: silent,
6859 probed: true,
6860 event_count: count,
6861 }
6862}
6863
6864fn direction_summary(d: &DirectedEdge) -> Value {
6865 json!({
6866 "pinned": d.pinned,
6867 "scope": d.scope,
6868 "probed": d.probed,
6869 "last_pull_at_unix": d.last_pull_at_unix,
6870 "silent_secs": d.silent_secs,
6871 "event_count": d.event_count,
6872 })
6873}
6874
6875fn probe_relay_healthz(url: &str) -> bool {
6877 let probe_url = format!("{}/healthz", url.trim_end_matches('/'));
6878 let client = match reqwest::blocking::Client::builder()
6879 .timeout(std::time::Duration::from_millis(500))
6880 .build()
6881 {
6882 Ok(c) => c,
6883 Err(_) => return false,
6884 };
6885 match client.get(&probe_url).send() {
6886 Ok(r) => r.status().is_success(),
6887 Err(_) => false,
6888 }
6889}
6890
6891fn drive_bilateral_pair(
6906 a_home: &std::path::Path,
6907 a_name: &str,
6908 b_home: &std::path::Path,
6909 b_name: &str,
6910 _fed_host: &str,
6911 _federation_relay: &str,
6912 settle_secs: u64,
6913) -> Result<()> {
6914 use std::time::Duration;
6915 let bin = std::env::current_exe().context("locating self exe")?;
6916
6917 let run = |home: &std::path::Path, args: &[&str]| -> Result<()> {
6918 let out = std::process::Command::new(&bin)
6919 .env("WIRE_HOME", home)
6920 .env_remove("RUST_LOG")
6921 .args(args)
6922 .output()
6923 .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
6924 if !out.status.success() {
6925 bail!(
6926 "`wire {}` failed: stderr={}",
6927 args.join(" "),
6928 String::from_utf8_lossy(&out.stderr).trim()
6929 );
6930 }
6931 Ok(())
6932 };
6933
6934 run(a_home, &["add", b_name, "--local-sister", "--json"])
6940 .with_context(|| format!("step 1/8: {a_name} `wire add {b_name} --local-sister`"))?;
6941
6942 std::thread::sleep(Duration::from_secs(settle_secs));
6944
6945 run(b_home, &["pull", "--json"]).with_context(|| format!("step 4/8: {b_name} `wire pull`"))?;
6947 run(b_home, &["pair-accept", a_name, "--json"])
6948 .with_context(|| format!("step 5/8: {b_name} `wire pair-accept {a_name}`"))?;
6949 run(b_home, &["push", "--json"]).with_context(|| format!("step 6/8: {b_name} `wire push`"))?;
6950
6951 std::thread::sleep(Duration::from_secs(settle_secs));
6953
6954 run(a_home, &["pull", "--json"]).with_context(|| format!("step 8/8: {a_name} `wire pull`"))?;
6956
6957 Ok(())
6958}
6959
6960fn cmd_session_env(name_arg: Option<&str>, as_json: bool) -> Result<()> {
6961 let name = resolve_session_name(name_arg)?;
6962 let session_home = crate::session::session_dir(&name)?;
6963 if !session_home.exists() {
6964 bail!(
6965 "no session named {name:?} on this machine. `wire session list` to enumerate, \
6966 `wire session new {name}` to create."
6967 );
6968 }
6969 if as_json {
6970 println!(
6971 "{}",
6972 serde_json::to_string(&json!({
6973 "name": name,
6974 "home_dir": session_home.to_string_lossy(),
6975 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
6976 }))?
6977 );
6978 } else {
6979 println!("export WIRE_HOME={}", session_home.to_string_lossy());
6980 }
6981 Ok(())
6982}
6983
6984fn cmd_session_current(as_json: bool) -> Result<()> {
6985 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
6986 let registry = crate::session::read_registry().unwrap_or_default();
6987 let cwd_key = cwd.to_string_lossy().into_owned();
6988 let name = registry.by_cwd.get(&cwd_key).cloned();
6989 if as_json {
6990 println!(
6991 "{}",
6992 serde_json::to_string(&json!({
6993 "cwd": cwd_key,
6994 "session": name,
6995 }))?
6996 );
6997 } else if let Some(n) = name {
6998 println!("{n}");
6999 } else {
7000 println!("(no session registered for this cwd)");
7001 }
7002 Ok(())
7003}
7004
7005fn cmd_session_destroy(name_arg: &str, force: bool, as_json: bool) -> Result<()> {
7006 let name = crate::session::sanitize_name(name_arg);
7007 let session_home = crate::session::session_dir(&name)?;
7008 if !session_home.exists() {
7009 if as_json {
7010 println!(
7011 "{}",
7012 serde_json::to_string(&json!({
7013 "name": name,
7014 "destroyed": false,
7015 "reason": "no such session",
7016 }))?
7017 );
7018 } else {
7019 println!("no session named {name:?} — nothing to destroy.");
7020 }
7021 return Ok(());
7022 }
7023 if !force {
7024 bail!(
7025 "destroying session {name:?} would delete its keypair + state irrecoverably. \
7026 Pass --force to confirm."
7027 );
7028 }
7029
7030 let pidfile = session_home.join("state").join("wire").join("daemon.pid");
7032 if let Ok(bytes) = std::fs::read(&pidfile) {
7033 let pid: Option<u32> = if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
7034 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
7035 } else {
7036 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
7037 };
7038 if let Some(p) = pid {
7039 let _ = std::process::Command::new("kill")
7040 .args(["-TERM", &p.to_string()])
7041 .output();
7042 }
7043 }
7044
7045 std::fs::remove_dir_all(&session_home)
7046 .with_context(|| format!("removing session dir {session_home:?}"))?;
7047
7048 let mut registry = crate::session::read_registry().unwrap_or_default();
7050 registry.by_cwd.retain(|_, v| v != &name);
7051 crate::session::write_registry(®istry)?;
7052
7053 if as_json {
7054 println!(
7055 "{}",
7056 serde_json::to_string(&json!({
7057 "name": name,
7058 "destroyed": true,
7059 }))?
7060 );
7061 } else {
7062 println!("destroyed session {name:?}.");
7063 }
7064 Ok(())
7065}
7066
7067fn cmd_diag(action: DiagAction) -> Result<()> {
7070 let state = config::state_dir()?;
7071 let knob = state.join("diag.enabled");
7072 let log_path = state.join("diag.jsonl");
7073 match action {
7074 DiagAction::Tail { limit, json } => {
7075 let entries = crate::diag::tail(limit);
7076 if json {
7077 for e in entries {
7078 println!("{}", serde_json::to_string(&e)?);
7079 }
7080 } else if entries.is_empty() {
7081 println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
7082 } else {
7083 for e in entries {
7084 let ts = e["ts"].as_u64().unwrap_or(0);
7085 let ty = e["type"].as_str().unwrap_or("?");
7086 let pid = e["pid"].as_u64().unwrap_or(0);
7087 let payload = e["payload"].to_string();
7088 println!("[{ts}] pid={pid} {ty} {payload}");
7089 }
7090 }
7091 }
7092 DiagAction::Enable => {
7093 config::ensure_dirs()?;
7094 std::fs::write(&knob, "1")?;
7095 println!("wire diag: enabled at {knob:?}");
7096 }
7097 DiagAction::Disable => {
7098 if knob.exists() {
7099 std::fs::remove_file(&knob)?;
7100 }
7101 println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
7102 }
7103 DiagAction::Status { json } => {
7104 let enabled = crate::diag::is_enabled();
7105 let size = std::fs::metadata(&log_path).map(|m| m.len()).unwrap_or(0);
7106 if json {
7107 println!(
7108 "{}",
7109 serde_json::to_string(&serde_json::json!({
7110 "enabled": enabled,
7111 "log_path": log_path,
7112 "log_size_bytes": size,
7113 }))?
7114 );
7115 } else {
7116 println!("wire diag status");
7117 println!(" enabled: {enabled}");
7118 println!(" log: {log_path:?}");
7119 println!(" log size: {size} bytes");
7120 }
7121 }
7122 }
7123 Ok(())
7124}
7125
7126fn cmd_service(action: ServiceAction) -> Result<()> {
7129 let kind = |local_relay: bool| {
7130 if local_relay {
7131 crate::service::ServiceKind::LocalRelay
7132 } else {
7133 crate::service::ServiceKind::Daemon
7134 }
7135 };
7136 let (report, as_json) = match action {
7137 ServiceAction::Install { local_relay, json } => {
7138 (crate::service::install_kind(kind(local_relay))?, json)
7139 }
7140 ServiceAction::Uninstall { local_relay, json } => {
7141 (crate::service::uninstall_kind(kind(local_relay))?, json)
7142 }
7143 ServiceAction::Status { local_relay, json } => {
7144 (crate::service::status_kind(kind(local_relay))?, json)
7145 }
7146 };
7147 if as_json {
7148 println!("{}", serde_json::to_string(&report)?);
7149 } else {
7150 println!("wire service {}", report.action);
7151 println!(" platform: {}", report.platform);
7152 println!(" unit: {}", report.unit_path);
7153 println!(" status: {}", report.status);
7154 println!(" detail: {}", report.detail);
7155 }
7156 Ok(())
7157}
7158
7159fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
7174 let pgrep_out = std::process::Command::new("pgrep")
7176 .args(["-f", "wire daemon"])
7177 .output();
7178 let running_pids: Vec<u32> = match pgrep_out {
7179 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
7180 .split_whitespace()
7181 .filter_map(|s| s.parse::<u32>().ok())
7182 .collect(),
7183 _ => Vec::new(),
7184 };
7185
7186 let record = crate::ensure_up::read_pid_record("daemon");
7188 let recorded_version: Option<String> = match &record {
7189 crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
7190 crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
7191 _ => None,
7192 };
7193 let cli_version = env!("CARGO_PKG_VERSION").to_string();
7194
7195 let sessions_to_respawn_after_kill: Vec<std::path::PathBuf> =
7202 crate::session::list_sessions()
7203 .unwrap_or_default()
7204 .into_iter()
7205 .filter(|s| s.daemon_running)
7206 .map(|s| s.home_dir)
7207 .collect();
7208
7209 if check_only {
7210 let sessions_with_daemons: Vec<String> = crate::session::list_sessions()
7212 .unwrap_or_default()
7213 .iter()
7214 .filter(|s| s.daemon_running)
7215 .map(|s| s.name.clone())
7216 .collect();
7217 let mut path_dupes: Vec<String> = Vec::new();
7218 if let Ok(path) = std::env::var("PATH") {
7219 let mut seen: std::collections::HashSet<std::path::PathBuf> =
7220 std::collections::HashSet::new();
7221 for dir in path.split(':') {
7222 let candidate = std::path::PathBuf::from(dir).join("wire");
7223 if candidate.exists() {
7224 let canon = candidate.canonicalize().unwrap_or(candidate);
7225 if seen.insert(canon.clone()) {
7226 path_dupes.push(canon.to_string_lossy().into_owned());
7227 }
7228 }
7229 }
7230 }
7231 let report = json!({
7232 "running_pids": running_pids,
7233 "pidfile_version": recorded_version,
7234 "cli_version": cli_version,
7235 "would_kill": running_pids,
7236 "session_daemons_running": sessions_with_daemons,
7237 "path_binaries": path_dupes,
7238 "path_duplicate_warning": path_dupes.len() > 1,
7239 });
7240 if as_json {
7241 println!("{}", serde_json::to_string(&report)?);
7242 } else {
7243 println!("wire upgrade --check");
7244 println!(" cli version: {cli_version}");
7245 println!(
7246 " pidfile version: {}",
7247 recorded_version.as_deref().unwrap_or("(missing)")
7248 );
7249 if running_pids.is_empty() {
7250 println!(" running daemons: none");
7251 } else {
7252 let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
7253 println!(" running daemons: pids {}", pids.join(", "));
7254 println!(" would kill all + spawn fresh");
7255 }
7256 if !sessions_with_daemons.is_empty() {
7257 println!(
7258 " session daemons: {} (would respawn under new binary)",
7259 sessions_with_daemons.join(", ")
7260 );
7261 }
7262 if path_dupes.len() > 1 {
7263 println!(
7264 " PATH warning: {} distinct `wire` binaries on PATH:",
7265 path_dupes.len()
7266 );
7267 for b in &path_dupes {
7268 println!(" {b}");
7269 }
7270 println!(" operators should remove the stale ones");
7271 }
7272 }
7273 return Ok(());
7274 }
7275
7276 let mut killed: Vec<u32> = Vec::new();
7279 for pid in &running_pids {
7280 let _ = std::process::Command::new("kill")
7282 .args(["-15", &pid.to_string()])
7283 .status();
7284 killed.push(*pid);
7285 }
7286 if !killed.is_empty() {
7288 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
7289 loop {
7290 let still_alive: Vec<u32> = killed
7291 .iter()
7292 .copied()
7293 .filter(|p| process_alive_pid(*p))
7294 .collect();
7295 if still_alive.is_empty() {
7296 break;
7297 }
7298 if std::time::Instant::now() >= deadline {
7299 for pid in still_alive {
7301 let _ = std::process::Command::new("kill")
7302 .args(["-9", &pid.to_string()])
7303 .status();
7304 }
7305 break;
7306 }
7307 std::thread::sleep(std::time::Duration::from_millis(50));
7308 }
7309 }
7310
7311 let pidfile = config::state_dir()?.join("daemon.pid");
7314 if pidfile.exists() {
7315 let _ = std::fs::remove_file(&pidfile);
7316 }
7317
7318 if let Ok(sessions) = crate::session::list_sessions() {
7325 for s in &sessions {
7326 let session_pidfile = s.home_dir.join("state").join("wire").join("daemon.pid");
7327 if session_pidfile.exists() {
7328 let _ = std::fs::remove_file(&session_pidfile);
7329 }
7330 }
7331 }
7332 let session_daemons_to_respawn = sessions_to_respawn_after_kill;
7333
7334 let mut path_dupes: Vec<String> = Vec::new();
7339 if let Ok(path) = std::env::var("PATH") {
7340 let mut seen: std::collections::HashSet<std::path::PathBuf> =
7341 std::collections::HashSet::new();
7342 for dir in path.split(':') {
7343 let candidate = std::path::PathBuf::from(dir).join("wire");
7344 if candidate.exists() {
7345 let canon = candidate.canonicalize().unwrap_or(candidate);
7346 if seen.insert(canon.clone()) {
7347 path_dupes.push(canon.to_string_lossy().into_owned());
7348 }
7349 }
7350 }
7351 }
7352 let path_warning = if path_dupes.len() > 1 {
7353 Some(format!(
7354 "WARN: {} distinct `wire` binaries on PATH — old versions can shadow the fresh install:\n {}",
7355 path_dupes.len(),
7356 path_dupes.join("\n ")
7357 ))
7358 } else {
7359 None
7360 };
7361
7362 let spawned = crate::ensure_up::ensure_daemon_running()?;
7365
7366 let mut session_respawns: Vec<Value> = Vec::new();
7372 for home in &session_daemons_to_respawn {
7373 match ensure_session_daemon(home) {
7374 Ok(()) => session_respawns.push(json!({
7375 "session_home": home.to_string_lossy(),
7376 "status": "respawned",
7377 })),
7378 Err(e) => session_respawns.push(json!({
7379 "session_home": home.to_string_lossy(),
7380 "status": "failed",
7381 "error": format!("{e:#}"),
7382 })),
7383 }
7384 }
7385
7386 let new_record = crate::ensure_up::read_pid_record("daemon");
7387 let new_pid = new_record.pid();
7388 let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
7389 Some(d.version.clone())
7390 } else {
7391 None
7392 };
7393
7394 if as_json {
7395 println!(
7396 "{}",
7397 serde_json::to_string(&json!({
7398 "killed": killed,
7399 "spawned_fresh_daemon": spawned,
7400 "new_pid": new_pid,
7401 "new_version": new_version,
7402 "cli_version": cli_version,
7403 "session_respawns": session_respawns,
7404 "path_binaries": path_dupes,
7405 "path_warning": path_warning,
7406 }))?
7407 );
7408 } else {
7409 if killed.is_empty() {
7410 println!("wire upgrade: no stale daemons running");
7411 } else {
7412 println!(
7413 "wire upgrade: killed {} daemon(s) (pids {})",
7414 killed.len(),
7415 killed
7416 .iter()
7417 .map(|p| p.to_string())
7418 .collect::<Vec<_>>()
7419 .join(", ")
7420 );
7421 }
7422 if spawned {
7423 println!(
7424 "wire upgrade: spawned fresh daemon (pid {} v{})",
7425 new_pid
7426 .map(|p| p.to_string())
7427 .unwrap_or_else(|| "?".to_string()),
7428 new_version.as_deref().unwrap_or(&cli_version),
7429 );
7430 } else {
7431 println!("wire upgrade: daemon was already running on current binary");
7432 }
7433 if !session_respawns.is_empty() {
7434 println!(
7435 "wire upgrade: refreshed {} session daemon(s):",
7436 session_respawns.len()
7437 );
7438 for r in &session_respawns {
7439 let h = r["session_home"].as_str().unwrap_or("?");
7440 let s = r["status"].as_str().unwrap_or("?");
7441 let label = std::path::Path::new(h)
7442 .file_name()
7443 .map(|f| f.to_string_lossy().into_owned())
7444 .unwrap_or_else(|| h.to_string());
7445 println!(" {label:<24} {s}");
7446 }
7447 }
7448 if let Some(msg) = &path_warning {
7449 eprintln!("wire upgrade: {msg}");
7450 }
7451 }
7452 Ok(())
7453}
7454
7455fn process_alive_pid(pid: u32) -> bool {
7456 #[cfg(target_os = "linux")]
7457 {
7458 std::path::Path::new(&format!("/proc/{pid}")).exists()
7459 }
7460 #[cfg(not(target_os = "linux"))]
7461 {
7462 std::process::Command::new("kill")
7463 .args(["-0", &pid.to_string()])
7464 .stdin(std::process::Stdio::null())
7465 .stdout(std::process::Stdio::null())
7466 .stderr(std::process::Stdio::null())
7467 .status()
7468 .map(|s| s.success())
7469 .unwrap_or(false)
7470 }
7471}
7472
7473#[derive(Clone, Debug, serde::Serialize)]
7477pub struct DoctorCheck {
7478 pub id: String,
7481 pub status: String,
7483 pub detail: String,
7485 #[serde(skip_serializing_if = "Option::is_none")]
7487 pub fix: Option<String>,
7488}
7489
7490impl DoctorCheck {
7491 fn pass(id: &str, detail: impl Into<String>) -> Self {
7492 Self {
7493 id: id.into(),
7494 status: "PASS".into(),
7495 detail: detail.into(),
7496 fix: None,
7497 }
7498 }
7499 fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
7500 Self {
7501 id: id.into(),
7502 status: "WARN".into(),
7503 detail: detail.into(),
7504 fix: Some(fix.into()),
7505 }
7506 }
7507 fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
7508 Self {
7509 id: id.into(),
7510 status: "FAIL".into(),
7511 detail: detail.into(),
7512 fix: Some(fix.into()),
7513 }
7514 }
7515}
7516
7517fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
7522 let checks: Vec<DoctorCheck> = vec![
7523 check_daemon_health(),
7524 check_daemon_pid_consistency(),
7525 check_relay_reachable(),
7526 check_pair_rejections(recent_rejections),
7527 check_cursor_progress(),
7528 ];
7529
7530 let fails = checks.iter().filter(|c| c.status == "FAIL").count();
7531 let warns = checks.iter().filter(|c| c.status == "WARN").count();
7532
7533 if as_json {
7534 println!(
7535 "{}",
7536 serde_json::to_string(&json!({
7537 "checks": checks,
7538 "fail_count": fails,
7539 "warn_count": warns,
7540 "ok": fails == 0,
7541 }))?
7542 );
7543 } else {
7544 println!("wire doctor — {} checks", checks.len());
7545 for c in &checks {
7546 let bullet = match c.status.as_str() {
7547 "PASS" => "✓",
7548 "WARN" => "!",
7549 "FAIL" => "✗",
7550 _ => "?",
7551 };
7552 println!(" {bullet} [{}] {}: {}", c.status, c.id, c.detail);
7553 if let Some(fix) = &c.fix {
7554 println!(" fix: {fix}");
7555 }
7556 }
7557 println!();
7558 if fails == 0 && warns == 0 {
7559 println!("ALL GREEN");
7560 } else {
7561 println!("{fails} FAIL, {warns} WARN");
7562 }
7563 }
7564
7565 if fails > 0 {
7566 std::process::exit(1);
7567 }
7568 Ok(())
7569}
7570
7571fn check_daemon_health() -> DoctorCheck {
7578 let snap = crate::ensure_up::daemon_liveness();
7584 let pgrep_pids = &snap.pgrep_pids;
7585 let pidfile_pid = snap.pidfile_pid;
7586 let pidfile_alive = snap.pidfile_alive;
7587 let orphan_pids = &snap.orphan_pids;
7588
7589 let fmt_pids = |xs: &[u32]| -> String {
7590 xs.iter()
7591 .map(|p| p.to_string())
7592 .collect::<Vec<_>>()
7593 .join(", ")
7594 };
7595
7596 match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
7597 (0, _, _) => DoctorCheck::fail(
7598 "daemon",
7599 "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
7600 "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
7601 ),
7602 (1, true, true) => DoctorCheck::pass(
7604 "daemon",
7605 format!(
7606 "one daemon running (pid {}, matches pidfile)",
7607 pgrep_pids[0]
7608 ),
7609 ),
7610 (n, true, false) => DoctorCheck::fail(
7612 "daemon",
7613 format!(
7614 "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
7615 The orphans race the relay cursor — they advance past events your current binary can't process. \
7616 (Issue #2 exact class.)",
7617 fmt_pids(pgrep_pids),
7618 pidfile_pid.unwrap(),
7619 fmt_pids(orphan_pids),
7620 ),
7621 "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
7622 ),
7623 (n, false, _) => DoctorCheck::fail(
7625 "daemon",
7626 format!(
7627 "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
7628 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
7629 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
7630 fmt_pids(pgrep_pids),
7631 match pidfile_pid {
7632 Some(p) => format!("claims pid {p} which is dead"),
7633 None => "is missing".to_string(),
7634 },
7635 ),
7636 "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
7637 ),
7638 (n, true, true) => DoctorCheck::warn(
7640 "daemon",
7641 format!(
7642 "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
7643 fmt_pids(pgrep_pids)
7644 ),
7645 "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
7646 ),
7647 }
7648}
7649
7650fn check_daemon_pid_consistency() -> DoctorCheck {
7662 let snap = crate::ensure_up::daemon_liveness();
7663 match &snap.record {
7664 crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
7665 "daemon_pid_consistency",
7666 "no daemon.pid yet — fresh box or daemon never started",
7667 ),
7668 crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
7669 "daemon_pid_consistency",
7670 format!("daemon.pid is corrupt: {reason}"),
7671 "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
7672 ),
7673 crate::ensure_up::PidRecord::LegacyInt(pid) => {
7674 let pid = *pid;
7677 if !crate::ensure_up::pid_is_alive(pid) {
7678 return DoctorCheck::warn(
7679 "daemon_pid_consistency",
7680 format!(
7681 "daemon.pid (legacy-int) points at pid {pid} which is not running. \
7682 Stale pidfile from a crashed pre-0.5.11 daemon. \
7683 (Issue #2: this surface used to PASS while `wire status` said DOWN.)"
7684 ),
7685 "`wire upgrade` (kills any orphan + spawns a fresh daemon with JSON pidfile)",
7686 );
7687 }
7688 DoctorCheck::warn(
7689 "daemon_pid_consistency",
7690 format!(
7691 "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
7692 Daemon was started by a pre-0.5.11 binary."
7693 ),
7694 "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
7695 )
7696 }
7697 crate::ensure_up::PidRecord::Json(d) => {
7698 if !snap.pidfile_alive {
7702 return DoctorCheck::warn(
7703 "daemon_pid_consistency",
7704 format!(
7705 "daemon.pid records pid {pid} (v{version}) but that process is not running — \
7706 pidfile is stale. `wire status` will report DOWN, but pre-v0.5.19 doctor \
7707 silently PASSed this state and ignored any live orphan daemons (#2 root cause).",
7708 pid = d.pid,
7709 version = d.version,
7710 ),
7711 "`wire upgrade` to clean up the stale pidfile + spawn a fresh daemon \
7712 (kills any orphan daemon advancing the cursor without coordination)",
7713 );
7714 }
7715 let mut issues: Vec<String> = Vec::new();
7716 if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
7717 issues.push(format!(
7718 "schema={} (expected {})",
7719 d.schema,
7720 crate::ensure_up::DAEMON_PID_SCHEMA
7721 ));
7722 }
7723 let cli_version = env!("CARGO_PKG_VERSION");
7724 if d.version != cli_version {
7725 issues.push(format!("version daemon={} cli={cli_version}", d.version));
7726 }
7727 if !std::path::Path::new(&d.bin_path).exists() {
7728 issues.push(format!("bin_path {} missing on disk", d.bin_path));
7729 }
7730 if let Ok(card) = config::read_agent_card()
7732 && let Some(current_did) = card.get("did").and_then(Value::as_str)
7733 && let Some(recorded_did) = &d.did
7734 && recorded_did != current_did
7735 {
7736 issues.push(format!(
7737 "did daemon={recorded_did} config={current_did} — identity drift"
7738 ));
7739 }
7740 if let Ok(state) = config::read_relay_state()
7741 && let Some(current_relay) = state
7742 .get("self")
7743 .and_then(|s| s.get("relay_url"))
7744 .and_then(Value::as_str)
7745 && let Some(recorded_relay) = &d.relay_url
7746 && recorded_relay != current_relay
7747 {
7748 issues.push(format!(
7749 "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
7750 ));
7751 }
7752 if issues.is_empty() {
7753 DoctorCheck::pass(
7754 "daemon_pid_consistency",
7755 format!(
7756 "daemon v{} bound to {} as {}",
7757 d.version,
7758 d.relay_url.as_deref().unwrap_or("?"),
7759 d.did.as_deref().unwrap_or("?")
7760 ),
7761 )
7762 } else {
7763 DoctorCheck::warn(
7764 "daemon_pid_consistency",
7765 format!("daemon pidfile drift: {}", issues.join("; ")),
7766 "`wire upgrade` to atomically restart daemon with current config".to_string(),
7767 )
7768 }
7769 }
7770 }
7771}
7772
7773fn check_relay_reachable() -> DoctorCheck {
7775 let state = match config::read_relay_state() {
7776 Ok(s) => s,
7777 Err(e) => {
7778 return DoctorCheck::fail(
7779 "relay",
7780 format!("could not read relay state: {e}"),
7781 "run `wire up <handle>@<relay>` to bootstrap",
7782 );
7783 }
7784 };
7785 let url = state
7786 .get("self")
7787 .and_then(|s| s.get("relay_url"))
7788 .and_then(Value::as_str)
7789 .unwrap_or("");
7790 if url.is_empty() {
7791 return DoctorCheck::warn(
7792 "relay",
7793 "no relay bound — wire send/pull will not work",
7794 "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
7795 );
7796 }
7797 let client = crate::relay_client::RelayClient::new(url);
7798 match client.check_healthz() {
7799 Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
7800 Err(e) => DoctorCheck::fail(
7801 "relay",
7802 format!("{url} unreachable: {e}"),
7803 format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
7804 ),
7805 }
7806}
7807
7808fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
7812 let path = match config::state_dir() {
7813 Ok(d) => d.join("pair-rejected.jsonl"),
7814 Err(e) => {
7815 return DoctorCheck::warn(
7816 "pair_rejections",
7817 format!("could not resolve state dir: {e}"),
7818 "set WIRE_HOME or fix XDG_STATE_HOME",
7819 );
7820 }
7821 };
7822 if !path.exists() {
7823 return DoctorCheck::pass(
7824 "pair_rejections",
7825 "no pair-rejected.jsonl — no recorded pair failures",
7826 );
7827 }
7828 let body = match std::fs::read_to_string(&path) {
7829 Ok(b) => b,
7830 Err(e) => {
7831 return DoctorCheck::warn(
7832 "pair_rejections",
7833 format!("could not read {path:?}: {e}"),
7834 "check file permissions",
7835 );
7836 }
7837 };
7838 let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
7839 if lines.is_empty() {
7840 return DoctorCheck::pass("pair_rejections", "pair-rejected.jsonl present but empty");
7841 }
7842 let total = lines.len();
7843 let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
7844 let mut summary: Vec<String> = Vec::new();
7845 for line in &recent {
7846 if let Ok(rec) = serde_json::from_str::<Value>(line) {
7847 let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
7848 let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
7849 summary.push(format!("{peer}/{code}"));
7850 }
7851 }
7852 DoctorCheck::warn(
7853 "pair_rejections",
7854 format!(
7855 "{total} pair failures recorded. recent: [{}]",
7856 summary.join(", ")
7857 ),
7858 format!(
7859 "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
7860 ),
7861 )
7862}
7863
7864fn check_cursor_progress() -> DoctorCheck {
7869 let state = match config::read_relay_state() {
7870 Ok(s) => s,
7871 Err(e) => {
7872 return DoctorCheck::warn(
7873 "cursor",
7874 format!("could not read relay state: {e}"),
7875 "check ~/Library/Application Support/wire/relay.json",
7876 );
7877 }
7878 };
7879 let cursor = state
7880 .get("self")
7881 .and_then(|s| s.get("last_pulled_event_id"))
7882 .and_then(Value::as_str)
7883 .map(|s| s.chars().take(16).collect::<String>())
7884 .unwrap_or_else(|| "<none>".to_string());
7885 DoctorCheck::pass(
7886 "cursor",
7887 format!(
7888 "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
7889 ),
7890 )
7891}
7892
7893#[cfg(test)]
7894mod doctor_tests {
7895 use super::*;
7896
7897 #[test]
7898 fn doctor_check_constructors_set_status_correctly() {
7899 let p = DoctorCheck::pass("x", "ok");
7904 assert_eq!(p.status, "PASS");
7905 assert_eq!(p.fix, None);
7906
7907 let w = DoctorCheck::warn("x", "watch out", "do this");
7908 assert_eq!(w.status, "WARN");
7909 assert_eq!(w.fix, Some("do this".to_string()));
7910
7911 let f = DoctorCheck::fail("x", "broken", "fix it");
7912 assert_eq!(f.status, "FAIL");
7913 assert_eq!(f.fix, Some("fix it".to_string()));
7914 }
7915
7916 #[test]
7917 fn check_pair_rejections_no_file_is_pass() {
7918 config::test_support::with_temp_home(|| {
7921 config::ensure_dirs().unwrap();
7922 let c = check_pair_rejections(5);
7923 assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
7924 });
7925 }
7926
7927 #[test]
7928 fn check_pair_rejections_with_entries_warns() {
7929 config::test_support::with_temp_home(|| {
7933 config::ensure_dirs().unwrap();
7934 crate::pair_invite::record_pair_rejection(
7935 "willard",
7936 "pair_drop_ack_send_failed",
7937 "POST 502",
7938 );
7939 let c = check_pair_rejections(5);
7940 assert_eq!(c.status, "WARN");
7941 assert!(c.detail.contains("1 pair failures"));
7942 assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
7943 });
7944 }
7945}
7946
7947fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
7959 let (nick, relay_url) = match handle_arg.split_once('@') {
7960 Some((n, host)) => {
7961 let url = if host.starts_with("http://") || host.starts_with("https://") {
7962 host.to_string()
7963 } else {
7964 format!("https://{host}")
7965 };
7966 (n.to_string(), url)
7967 }
7968 None => (
7969 handle_arg.to_string(),
7970 crate::pair_invite::DEFAULT_RELAY.to_string(),
7971 ),
7972 };
7973
7974 let mut report: Vec<(String, String)> = Vec::new();
7975 let mut step = |stage: &str, detail: String| {
7976 report.push((stage.to_string(), detail.clone()));
7977 if !as_json {
7978 eprintln!("wire up: {stage} — {detail}");
7979 }
7980 };
7981
7982 if config::is_initialized()? {
7984 let card = config::read_agent_card()?;
7985 let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
7986 let existing_handle = crate::agent_card::display_handle_from_did(existing_did).to_string();
7987 if existing_handle != nick {
7988 bail!(
7989 "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
7990 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
7991 delete `{:?}` to start fresh.",
7992 config::config_dir()?
7993 );
7994 }
7995 step("init", format!("already initialized as {existing_handle}"));
7996 } else {
7997 cmd_init(&nick, name, Some(&relay_url), false)?;
7998 step(
7999 "init",
8000 format!("created identity {nick} bound to {relay_url}"),
8001 );
8002 }
8003
8004 let relay_state = config::read_relay_state()?;
8008 let bound_relay = relay_state
8009 .get("self")
8010 .and_then(|s| s.get("relay_url"))
8011 .and_then(Value::as_str)
8012 .unwrap_or("")
8013 .to_string();
8014 if bound_relay.is_empty() {
8015 cmd_bind_relay(
8019 &relay_url, false, false,
8020 )?;
8021 step("bind-relay", format!("bound to {relay_url}"));
8022 } else if bound_relay != relay_url {
8023 step(
8024 "bind-relay",
8025 format!(
8026 "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
8027 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
8028 ),
8029 );
8030 } else {
8031 step("bind-relay", format!("already bound to {bound_relay}"));
8032 }
8033
8034 match cmd_claim(
8037 &nick,
8038 Some(&relay_url),
8039 None,
8040 false,
8041 false,
8042 ) {
8043 Ok(()) => step(
8044 "claim",
8045 format!("{nick}@{} claimed", strip_proto(&relay_url)),
8046 ),
8047 Err(e) => step(
8048 "claim",
8049 format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
8050 ),
8051 }
8052
8053 match crate::ensure_up::ensure_daemon_running() {
8055 Ok(true) => step("daemon", "started fresh background daemon".to_string()),
8056 Ok(false) => step("daemon", "already running".to_string()),
8057 Err(e) => step(
8058 "daemon",
8059 format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
8060 ),
8061 }
8062
8063 let summary =
8065 "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
8066 `wire monitor` to watch incoming events."
8067 .to_string();
8068 step("ready", summary.clone());
8069
8070 if as_json {
8071 let steps_json: Vec<_> = report
8072 .iter()
8073 .map(|(k, v)| json!({"stage": k, "detail": v}))
8074 .collect();
8075 println!(
8076 "{}",
8077 serde_json::to_string(&json!({
8078 "nick": nick,
8079 "relay": relay_url,
8080 "steps": steps_json,
8081 }))?
8082 );
8083 }
8084 Ok(())
8085}
8086
8087fn strip_proto(url: &str) -> String {
8089 url.trim_start_matches("https://")
8090 .trim_start_matches("http://")
8091 .to_string()
8092}
8093
8094fn cmd_pair_megacommand(
8108 handle_arg: &str,
8109 relay_override: Option<&str>,
8110 timeout_secs: u64,
8111 _as_json: bool,
8112) -> Result<()> {
8113 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
8114 let peer_handle = parsed.nick.clone();
8115
8116 eprintln!("wire pair: resolving {handle_arg}...");
8117 cmd_add(
8118 handle_arg,
8119 relay_override,
8120 false,
8121 false,
8122 )?;
8123
8124 eprintln!(
8125 "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
8126 to ack (their daemon must be running + pulling)..."
8127 );
8128
8129 let _ = run_sync_pull();
8133
8134 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
8135 let poll_interval = std::time::Duration::from_millis(500);
8136
8137 loop {
8138 let _ = run_sync_pull();
8140 let relay_state = config::read_relay_state()?;
8141 let peer_entry = relay_state
8142 .get("peers")
8143 .and_then(|p| p.get(&peer_handle))
8144 .cloned();
8145 let token = peer_entry
8146 .as_ref()
8147 .and_then(|e| e.get("slot_token"))
8148 .and_then(Value::as_str)
8149 .unwrap_or("");
8150
8151 if !token.is_empty() {
8152 let trust = config::read_trust()?;
8154 let pinned_in_trust = trust
8155 .get("agents")
8156 .and_then(|a| a.get(&peer_handle))
8157 .is_some();
8158 println!(
8159 "wire pair: paired with {peer_handle}.\n trust: {} bilateral: yes (slot_token recorded)\n next: `wire send {peer_handle} \"<msg>\"`",
8160 if pinned_in_trust {
8161 "VERIFIED"
8162 } else {
8163 "MISSING (bug)"
8164 }
8165 );
8166 return Ok(());
8167 }
8168
8169 if std::time::Instant::now() >= deadline {
8170 bail!(
8177 "wire pair: timed out after {timeout_secs}s. \
8178 peer {peer_handle} never sent pair_drop_ack. \
8179 likely causes: (a) their daemon is down — ask them to run \
8180 `wire status` and `wire daemon &`; (b) their binary is older \
8181 than 0.5.x and doesn't understand pair_drop events — ask \
8182 them to `wire upgrade`; (c) network / relay blip — re-run \
8183 `wire pair {handle_arg}` to retry."
8184 );
8185 }
8186
8187 std::thread::sleep(poll_interval);
8188 }
8189}
8190
8191fn cmd_claim(
8192 nick: &str,
8193 relay_override: Option<&str>,
8194 public_url: Option<&str>,
8195 hidden: bool,
8196 as_json: bool,
8197) -> Result<()> {
8198 if !crate::pair_profile::is_valid_nick(nick) {
8199 bail!(
8200 "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
8201 );
8202 }
8203 let (_did, relay_url, slot_id, slot_token) =
8206 crate::pair_invite::ensure_self_with_relay(relay_override)?;
8207 let card = config::read_agent_card()?;
8208
8209 let client = crate::relay_client::RelayClient::new(&relay_url);
8210 let discoverable = if hidden { Some(false) } else { None };
8214 let resp =
8215 client.handle_claim_v2(nick, &slot_id, &slot_token, public_url, &card, discoverable)?;
8216
8217 if as_json {
8218 println!(
8219 "{}",
8220 serde_json::to_string(&json!({
8221 "nick": nick,
8222 "relay": relay_url,
8223 "response": resp,
8224 }))?
8225 );
8226 } else {
8227 let domain = public_url
8231 .unwrap_or(&relay_url)
8232 .trim_start_matches("https://")
8233 .trim_start_matches("http://")
8234 .trim_end_matches('/')
8235 .split('/')
8236 .next()
8237 .unwrap_or("<this-relay-domain>")
8238 .to_string();
8239 println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
8240 println!("verify with: wire whois {nick}@{domain}");
8241 }
8242 Ok(())
8243}
8244
8245fn cmd_profile(action: ProfileAction) -> Result<()> {
8246 match action {
8247 ProfileAction::Set { field, value, json } => {
8248 let parsed: Value =
8252 serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
8253 let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
8254 if json {
8255 println!(
8256 "{}",
8257 serde_json::to_string(&json!({
8258 "field": field,
8259 "profile": new_profile,
8260 }))?
8261 );
8262 } else {
8263 println!("profile.{field} set");
8264 }
8265 }
8266 ProfileAction::Get { json } => return cmd_whois(None, json, None),
8267 ProfileAction::Clear { field, json } => {
8268 let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
8269 if json {
8270 println!(
8271 "{}",
8272 serde_json::to_string(&json!({
8273 "field": field,
8274 "cleared": true,
8275 "profile": new_profile,
8276 }))?
8277 );
8278 } else {
8279 println!("profile.{field} cleared");
8280 }
8281 }
8282 }
8283 Ok(())
8284}
8285
8286fn cmd_setup(apply: bool) -> Result<()> {
8289 use std::path::PathBuf;
8290
8291 let entry = json!({"command": "wire", "args": ["mcp"]});
8292 let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
8293
8294 let mut targets: Vec<(&str, PathBuf)> = Vec::new();
8297 if let Some(home) = dirs::home_dir() {
8298 targets.push(("Claude Code", home.join(".claude.json")));
8301 targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
8303 #[cfg(target_os = "macos")]
8305 targets.push((
8306 "Claude Desktop (macOS)",
8307 home.join("Library/Application Support/Claude/claude_desktop_config.json"),
8308 ));
8309 #[cfg(target_os = "windows")]
8311 if let Ok(appdata) = std::env::var("APPDATA") {
8312 targets.push((
8313 "Claude Desktop (Windows)",
8314 PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
8315 ));
8316 }
8317 targets.push(("Cursor", home.join(".cursor/mcp.json")));
8319 }
8320 targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
8322
8323 println!("wire setup\n");
8324 println!("MCP server snippet (add this to your client's mcpServers):");
8325 println!();
8326 println!("{entry_pretty}");
8327 println!();
8328
8329 if !apply {
8330 println!("Probable MCP host config locations on this machine:");
8331 for (name, path) in &targets {
8332 let marker = if path.exists() {
8333 "✓ found"
8334 } else {
8335 " (would create)"
8336 };
8337 println!(" {marker:14} {name}: {}", path.display());
8338 }
8339 println!();
8340 println!("Run `wire setup --apply` to merge wire into each config above.");
8341 println!(
8342 "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
8343 );
8344 return Ok(());
8345 }
8346
8347 let mut modified: Vec<String> = Vec::new();
8348 let mut skipped: Vec<String> = Vec::new();
8349 for (name, path) in &targets {
8350 match upsert_mcp_entry(path, "wire", &entry) {
8351 Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
8352 Ok(false) => skipped.push(format!(" {name} ({}): already configured", path.display())),
8353 Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
8354 }
8355 }
8356 if !modified.is_empty() {
8357 println!("Modified:");
8358 for line in &modified {
8359 println!(" {line}");
8360 }
8361 println!();
8362 println!("Restart the app(s) above to load wire MCP.");
8363 }
8364 if !skipped.is_empty() {
8365 println!();
8366 println!("Skipped:");
8367 for line in &skipped {
8368 println!(" {line}");
8369 }
8370 }
8371 Ok(())
8372}
8373
8374fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
8377 let mut cfg: Value = if path.exists() {
8378 let body = std::fs::read_to_string(path).context("reading config")?;
8379 serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
8380 } else {
8381 json!({})
8382 };
8383 if !cfg.is_object() {
8384 cfg = json!({});
8385 }
8386 let root = cfg.as_object_mut().unwrap();
8387 let servers = root
8388 .entry("mcpServers".to_string())
8389 .or_insert_with(|| json!({}));
8390 if !servers.is_object() {
8391 *servers = json!({});
8392 }
8393 let map = servers.as_object_mut().unwrap();
8394 if map.get(server_name) == Some(entry) {
8395 return Ok(false);
8396 }
8397 map.insert(server_name.to_string(), entry.clone());
8398 if let Some(parent) = path.parent()
8399 && !parent.as_os_str().is_empty()
8400 {
8401 std::fs::create_dir_all(parent).context("creating parent dir")?;
8402 }
8403 let out = serde_json::to_string_pretty(&cfg)? + "\n";
8404 std::fs::write(path, out).context("writing config")?;
8405 Ok(true)
8406}
8407
8408#[allow(clippy::too_many_arguments)]
8411fn cmd_reactor(
8412 on_event: &str,
8413 peer_filter: Option<&str>,
8414 kind_filter: Option<&str>,
8415 verified_only: bool,
8416 interval_secs: u64,
8417 once: bool,
8418 dry_run: bool,
8419 max_per_minute: u32,
8420 max_chain_depth: u32,
8421) -> Result<()> {
8422 use crate::inbox_watch::{InboxEvent, InboxWatcher};
8423 use std::collections::{HashMap, HashSet, VecDeque};
8424 use std::io::Write;
8425 use std::process::{Command, Stdio};
8426 use std::time::{Duration, Instant};
8427
8428 let cursor_path = config::state_dir()?.join("reactor.cursor");
8429 let emitted_path = config::state_dir()?.join("reactor-emitted.log");
8438 let mut emitted_ids: HashSet<String> = HashSet::new();
8439 if emitted_path.exists()
8440 && let Ok(body) = std::fs::read_to_string(&emitted_path)
8441 {
8442 for line in body.lines() {
8443 let t = line.trim();
8444 if !t.is_empty() {
8445 emitted_ids.insert(t.to_string());
8446 }
8447 }
8448 }
8449 let outbox_dir = config::outbox_dir()?;
8451 let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
8454
8455 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
8456
8457 let kind_num: Option<u32> = match kind_filter {
8458 Some(k) => Some(parse_kind(k)?),
8459 None => None,
8460 };
8461
8462 let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
8464
8465 let dispatch = |ev: &InboxEvent,
8466 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
8467 emitted_ids: &HashSet<String>|
8468 -> Result<bool> {
8469 if let Some(p) = peer_filter
8470 && ev.peer != p
8471 {
8472 return Ok(false);
8473 }
8474 if verified_only && !ev.verified {
8475 return Ok(false);
8476 }
8477 if let Some(want) = kind_num {
8478 let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
8479 if ev_kind != Some(want) {
8480 return Ok(false);
8481 }
8482 }
8483
8484 if max_chain_depth > 0 {
8488 let body_str = match &ev.raw["body"] {
8489 Value::String(s) => s.clone(),
8490 other => serde_json::to_string(other).unwrap_or_default(),
8491 };
8492 if let Some(referenced) = parse_re_marker(&body_str) {
8493 let matched = emitted_ids.contains(&referenced)
8496 || emitted_ids.iter().any(|full| full.starts_with(&referenced));
8497 if matched {
8498 eprintln!(
8499 "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
8500 ev.event_id, ev.peer, referenced
8501 );
8502 return Ok(false);
8503 }
8504 }
8505 }
8506
8507 if max_per_minute > 0 {
8509 let now = Instant::now();
8510 let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
8511 while let Some(&front) = win.front() {
8512 if now.duration_since(front) > Duration::from_secs(60) {
8513 win.pop_front();
8514 } else {
8515 break;
8516 }
8517 }
8518 if win.len() as u32 >= max_per_minute {
8519 eprintln!(
8520 "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
8521 ev.event_id, ev.peer, max_per_minute
8522 );
8523 return Ok(false);
8524 }
8525 win.push_back(now);
8526 }
8527
8528 if dry_run {
8529 println!("{}", serde_json::to_string(&ev.raw)?);
8530 return Ok(true);
8531 }
8532
8533 let mut child = Command::new("sh")
8534 .arg("-c")
8535 .arg(on_event)
8536 .stdin(Stdio::piped())
8537 .stdout(Stdio::inherit())
8538 .stderr(Stdio::inherit())
8539 .env("WIRE_EVENT_PEER", &ev.peer)
8540 .env("WIRE_EVENT_ID", &ev.event_id)
8541 .env("WIRE_EVENT_KIND", &ev.kind)
8542 .spawn()
8543 .with_context(|| format!("spawning reactor handler: {on_event}"))?;
8544 if let Some(mut stdin) = child.stdin.take() {
8545 let body = serde_json::to_vec(&ev.raw)?;
8546 let _ = stdin.write_all(&body);
8547 let _ = stdin.write_all(b"\n");
8548 }
8549 std::mem::drop(child);
8550 Ok(true)
8551 };
8552
8553 let scan_outbox = |emitted_ids: &mut HashSet<String>,
8555 outbox_cursors: &mut HashMap<String, u64>|
8556 -> Result<usize> {
8557 if !outbox_dir.exists() {
8558 return Ok(0);
8559 }
8560 let mut added = 0;
8561 let mut new_ids: Vec<String> = Vec::new();
8562 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
8563 let path = entry.path();
8564 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
8565 continue;
8566 }
8567 let peer = match path.file_stem().and_then(|s| s.to_str()) {
8568 Some(s) => s.to_string(),
8569 None => continue,
8570 };
8571 let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
8572 let start = *outbox_cursors.get(&peer).unwrap_or(&0);
8573 if cur_len <= start {
8574 outbox_cursors.insert(peer, start);
8575 continue;
8576 }
8577 let body = std::fs::read_to_string(&path).unwrap_or_default();
8578 let tail = &body[start as usize..];
8579 for line in tail.lines() {
8580 if let Ok(v) = serde_json::from_str::<Value>(line)
8581 && let Some(eid) = v.get("event_id").and_then(Value::as_str)
8582 && emitted_ids.insert(eid.to_string())
8583 {
8584 new_ids.push(eid.to_string());
8585 added += 1;
8586 }
8587 }
8588 outbox_cursors.insert(peer, cur_len);
8589 }
8590 if !new_ids.is_empty() {
8591 let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
8593 if all.len() > 500 {
8594 all.sort();
8595 let drop_n = all.len() - 500;
8596 let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
8597 emitted_ids.retain(|x| !dropped.contains(x));
8598 all = emitted_ids.iter().cloned().collect();
8599 }
8600 let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
8601 }
8602 Ok(added)
8603 };
8604
8605 let sweep = |watcher: &mut InboxWatcher,
8606 emitted_ids: &mut HashSet<String>,
8607 outbox_cursors: &mut HashMap<String, u64>,
8608 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
8609 -> Result<usize> {
8610 let _ = scan_outbox(emitted_ids, outbox_cursors);
8612
8613 let events = watcher.poll()?;
8614 let mut fired = 0usize;
8615 for ev in &events {
8616 match dispatch(ev, peer_dispatch_log, emitted_ids) {
8617 Ok(true) => fired += 1,
8618 Ok(false) => {}
8619 Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
8620 }
8621 }
8622 watcher.save_cursors(&cursor_path)?;
8623 Ok(fired)
8624 };
8625
8626 if once {
8627 sweep(
8628 &mut watcher,
8629 &mut emitted_ids,
8630 &mut outbox_cursors,
8631 &mut peer_dispatch_log,
8632 )?;
8633 return Ok(());
8634 }
8635 let interval = std::time::Duration::from_secs(interval_secs.max(1));
8636 loop {
8637 if let Err(e) = sweep(
8638 &mut watcher,
8639 &mut emitted_ids,
8640 &mut outbox_cursors,
8641 &mut peer_dispatch_log,
8642 ) {
8643 eprintln!("wire reactor: sweep error: {e}");
8644 }
8645 std::thread::sleep(interval);
8646 }
8647}
8648
8649fn parse_re_marker(body: &str) -> Option<String> {
8652 let needle = "(re:";
8653 let i = body.find(needle)?;
8654 let rest = &body[i + needle.len()..];
8655 let end = rest.find(')')?;
8656 let id = rest[..end].trim().to_string();
8657 if id.is_empty() {
8658 return None;
8659 }
8660 Some(id)
8661}
8662
8663fn cmd_notify(
8666 interval_secs: u64,
8667 peer_filter: Option<&str>,
8668 once: bool,
8669 as_json: bool,
8670) -> Result<()> {
8671 use crate::inbox_watch::InboxWatcher;
8672 let cursor_path = config::state_dir()?.join("notify.cursor");
8673 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
8674
8675 let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
8676 let events = watcher.poll()?;
8677 for ev in events {
8678 if let Some(p) = peer_filter
8679 && ev.peer != p
8680 {
8681 continue;
8682 }
8683 if as_json {
8684 println!("{}", serde_json::to_string(&ev)?);
8685 } else {
8686 os_notify_inbox_event(&ev);
8687 }
8688 }
8689 watcher.save_cursors(&cursor_path)?;
8690 Ok(())
8691 };
8692
8693 if once {
8694 return sweep(&mut watcher);
8695 }
8696
8697 let interval = std::time::Duration::from_secs(interval_secs.max(1));
8698 loop {
8699 if let Err(e) = sweep(&mut watcher) {
8700 eprintln!("wire notify: sweep error: {e}");
8701 }
8702 std::thread::sleep(interval);
8703 }
8704}
8705
8706fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
8707 let title = if ev.verified {
8708 format!("wire ← {}", ev.peer)
8709 } else {
8710 format!("wire ← {} (UNVERIFIED)", ev.peer)
8711 };
8712 let body = format!("{}: {}", ev.kind, ev.body_preview);
8713 crate::os_notify::toast(&title, &body);
8714}
8715
8716#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
8717fn os_toast(title: &str, body: &str) {
8718 eprintln!("[wire notify] {title}\n {body}");
8719}
8720
8721