1use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21 agent_card::{build_agent_card, sign_agent_card},
22 config,
23 signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24 trust::{add_self_to_trust, empty_trust},
25};
26
27#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31 #[command(subcommand)]
32 pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37 Init {
39 handle: String,
41 #[arg(long)]
43 name: Option<String>,
44 #[arg(long)]
47 relay: Option<String>,
48 #[arg(long)]
50 json: bool,
51 },
52 Whoami {
56 #[arg(long)]
57 json: bool,
58 },
59 Peers {
61 #[arg(long)]
62 json: bool,
63 },
64 Send {
72 peer: String,
74 kind_or_body: String,
79 body: Option<String>,
83 #[arg(long)]
85 deadline: Option<String>,
86 #[arg(long)]
88 json: bool,
89 },
90 Tail {
92 peer: Option<String>,
94 #[arg(long)]
96 json: bool,
97 #[arg(long, default_value_t = 0)]
99 limit: usize,
100 },
101 Monitor {
112 #[arg(long)]
114 peer: Option<String>,
115 #[arg(long)]
117 json: bool,
118 #[arg(long)]
121 include_handshake: bool,
122 #[arg(long, default_value_t = 500)]
124 interval_ms: u64,
125 #[arg(long, default_value_t = 0)]
127 replay: usize,
128 },
129 Verify {
131 path: String,
133 #[arg(long)]
135 json: bool,
136 },
137 Mcp,
141 RelayServer {
143 #[arg(long, default_value = "127.0.0.1:8770")]
145 bind: String,
146 #[arg(long)]
154 local_only: bool,
155 },
156 BindRelay {
165 url: String,
167 #[arg(long)]
172 migrate_pinned: bool,
173 #[arg(long)]
174 json: bool,
175 },
176 AddPeerSlot {
179 handle: String,
181 url: String,
183 slot_id: String,
185 slot_token: String,
187 #[arg(long)]
188 json: bool,
189 },
190 Push {
192 peer: Option<String>,
194 #[arg(long)]
195 json: bool,
196 },
197 Pull {
199 #[arg(long)]
200 json: bool,
201 },
202 Status {
205 #[arg(long)]
207 peer: Option<String>,
208 #[arg(long)]
209 json: bool,
210 },
211 Responder {
213 #[command(subcommand)]
214 command: ResponderCommand,
215 },
216 Pin {
219 card_file: String,
221 #[arg(long)]
222 json: bool,
223 },
224 RotateSlot {
235 #[arg(long)]
238 no_announce: bool,
239 #[arg(long)]
240 json: bool,
241 },
242 ForgetPeer {
246 handle: String,
248 #[arg(long)]
250 purge: bool,
251 #[arg(long)]
252 json: bool,
253 },
254 Daemon {
258 #[arg(long, default_value_t = 5)]
260 interval: u64,
261 #[arg(long)]
263 once: bool,
264 #[arg(long)]
265 json: bool,
266 },
267 PairHost {
272 #[arg(long)]
274 relay: String,
275 #[arg(long)]
279 yes: bool,
280 #[arg(long, default_value_t = 300)]
282 timeout: u64,
283 #[arg(long)]
289 detach: bool,
290 #[arg(long)]
292 json: bool,
293 },
294 #[command(alias = "join")]
298 PairJoin {
299 code_phrase: String,
301 #[arg(long)]
303 relay: String,
304 #[arg(long)]
305 yes: bool,
306 #[arg(long, default_value_t = 300)]
307 timeout: u64,
308 #[arg(long)]
310 detach: bool,
311 #[arg(long)]
313 json: bool,
314 },
315 PairConfirm {
319 code_phrase: String,
321 digits: String,
323 #[arg(long)]
325 json: bool,
326 },
327 PairList {
329 #[arg(long)]
331 json: bool,
332 #[arg(long)]
336 watch: bool,
337 #[arg(long, default_value_t = 1)]
339 watch_interval: u64,
340 },
341 PairCancel {
343 code_phrase: String,
344 #[arg(long)]
345 json: bool,
346 },
347 PairWatch {
357 code_phrase: String,
358 #[arg(long, default_value = "sas_ready")]
360 status: String,
361 #[arg(long, default_value_t = 300)]
363 timeout: u64,
364 #[arg(long)]
366 json: bool,
367 },
368 Pair {
377 handle: String,
380 #[arg(long)]
383 code: Option<String>,
384 #[arg(long, default_value = "https://wireup.net")]
386 relay: String,
387 #[arg(long)]
389 yes: bool,
390 #[arg(long, default_value_t = 300)]
392 timeout: u64,
393 #[arg(long)]
396 no_setup: bool,
397 #[arg(long)]
402 detach: bool,
403 },
404 PairAbandon {
410 code_phrase: String,
412 #[arg(long, default_value = "https://wireup.net")]
414 relay: String,
415 },
416 PairAccept {
422 peer: String,
424 #[arg(long)]
426 json: bool,
427 },
428 PairReject {
435 peer: String,
437 #[arg(long)]
439 json: bool,
440 },
441 PairListInbound {
447 #[arg(long)]
449 json: bool,
450 },
451 #[command(subcommand)]
461 Session(SessionCommand),
462 Setup {
467 #[arg(long)]
469 apply: bool,
470 },
471 Whois {
475 handle: Option<String>,
477 #[arg(long)]
478 json: bool,
479 #[arg(long)]
482 relay: Option<String>,
483 },
484 Add {
490 handle: String,
492 #[arg(long)]
494 relay: Option<String>,
495 #[arg(long)]
496 json: bool,
497 },
498 Up {
508 handle: String,
511 #[arg(long)]
513 name: Option<String>,
514 #[arg(long)]
515 json: bool,
516 },
517 Doctor {
524 #[arg(long)]
526 json: bool,
527 #[arg(long, default_value_t = 5)]
529 recent_rejections: usize,
530 },
531 Upgrade {
536 #[arg(long)]
539 check: bool,
540 #[arg(long)]
541 json: bool,
542 },
543 Service {
548 #[command(subcommand)]
549 action: ServiceAction,
550 },
551 Diag {
556 #[command(subcommand)]
557 action: DiagAction,
558 },
559 Claim {
563 nick: String,
564 #[arg(long)]
566 relay: Option<String>,
567 #[arg(long)]
569 public_url: Option<String>,
570 #[arg(long)]
578 hidden: bool,
579 #[arg(long)]
580 json: bool,
581 },
582 Profile {
592 #[command(subcommand)]
593 action: ProfileAction,
594 },
595 Invite {
599 #[arg(long, default_value = "https://wireup.net")]
601 relay: String,
602 #[arg(long, default_value_t = 86_400)]
604 ttl: u64,
605 #[arg(long, default_value_t = 1)]
608 uses: u32,
609 #[arg(long)]
613 share: bool,
614 #[arg(long)]
616 json: bool,
617 },
618 Accept {
621 url: String,
623 #[arg(long)]
625 json: bool,
626 },
627 Reactor {
633 #[arg(long)]
635 on_event: String,
636 #[arg(long)]
638 peer: Option<String>,
639 #[arg(long)]
641 kind: Option<String>,
642 #[arg(long, default_value_t = true)]
644 verified_only: bool,
645 #[arg(long, default_value_t = 2)]
647 interval: u64,
648 #[arg(long)]
650 once: bool,
651 #[arg(long)]
653 dry_run: bool,
654 #[arg(long, default_value_t = 6)]
658 max_per_minute: u32,
659 #[arg(long, default_value_t = 1)]
663 max_chain_depth: u32,
664 },
665 Notify {
670 #[arg(long, default_value_t = 2)]
672 interval: u64,
673 #[arg(long)]
675 peer: Option<String>,
676 #[arg(long)]
678 once: bool,
679 #[arg(long)]
683 json: bool,
684 },
685}
686
687#[derive(Subcommand, Debug)]
688pub enum DiagAction {
689 Tail {
691 #[arg(long, default_value_t = 20)]
692 limit: usize,
693 #[arg(long)]
694 json: bool,
695 },
696 Enable,
699 Disable,
701 Status {
703 #[arg(long)]
704 json: bool,
705 },
706}
707
708#[derive(Subcommand, Debug)]
709pub enum SessionCommand {
710 New {
718 name: Option<String>,
720 #[arg(long, default_value = "https://wireup.net")]
722 relay: String,
723 #[arg(long)]
730 with_local: bool,
731 #[arg(long, default_value = "http://127.0.0.1:8771")]
735 local_relay: String,
736 #[arg(long)]
739 no_daemon: bool,
740 #[arg(long)]
742 json: bool,
743 },
744 List {
747 #[arg(long)]
748 json: bool,
749 },
750 ListLocal {
756 #[arg(long)]
757 json: bool,
758 },
759 Env {
763 name: Option<String>,
765 #[arg(long)]
766 json: bool,
767 },
768 Current {
772 #[arg(long)]
773 json: bool,
774 },
775 Destroy {
779 name: String,
780 #[arg(long)]
782 force: bool,
783 #[arg(long)]
784 json: bool,
785 },
786}
787
788#[derive(Subcommand, Debug)]
789pub enum ServiceAction {
790 Install {
800 #[arg(long)]
802 local_relay: bool,
803 #[arg(long)]
804 json: bool,
805 },
806 Uninstall {
810 #[arg(long)]
812 local_relay: bool,
813 #[arg(long)]
814 json: bool,
815 },
816 Status {
818 #[arg(long)]
820 local_relay: bool,
821 #[arg(long)]
822 json: bool,
823 },
824}
825
826#[derive(Subcommand, Debug)]
827pub enum ResponderCommand {
828 Set {
830 status: String,
832 #[arg(long)]
834 reason: Option<String>,
835 #[arg(long)]
837 json: bool,
838 },
839 Get {
841 peer: Option<String>,
843 #[arg(long)]
845 json: bool,
846 },
847}
848
849#[derive(Subcommand, Debug)]
850pub enum ProfileAction {
851 Set {
855 field: String,
856 value: String,
857 #[arg(long)]
858 json: bool,
859 },
860 Get {
862 #[arg(long)]
863 json: bool,
864 },
865 Clear {
867 field: String,
868 #[arg(long)]
869 json: bool,
870 },
871}
872
873pub fn run() -> Result<()> {
875 let cli = Cli::parse();
876 match cli.command {
877 Command::Init {
878 handle,
879 name,
880 relay,
881 json,
882 } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
883 Command::Status { peer, json } => {
884 if let Some(peer) = peer {
885 cmd_status_peer(&peer, json)
886 } else {
887 cmd_status(json)
888 }
889 }
890 Command::Whoami { json } => cmd_whoami(json),
891 Command::Peers { json } => cmd_peers(json),
892 Command::Send {
893 peer,
894 kind_or_body,
895 body,
896 deadline,
897 json,
898 } => {
899 let (kind, body) = match body {
902 Some(real_body) => (kind_or_body, real_body),
903 None => ("claim".to_string(), kind_or_body),
904 };
905 cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
906 }
907 Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
908 Command::Monitor {
909 peer,
910 json,
911 include_handshake,
912 interval_ms,
913 replay,
914 } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
915 Command::Verify { path, json } => cmd_verify(&path, json),
916 Command::Responder { command } => match command {
917 ResponderCommand::Set {
918 status,
919 reason,
920 json,
921 } => cmd_responder_set(&status, reason.as_deref(), json),
922 ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
923 },
924 Command::Mcp => cmd_mcp(),
925 Command::RelayServer { bind, local_only } => cmd_relay_server(&bind, local_only),
926 Command::BindRelay {
927 url,
928 migrate_pinned,
929 json,
930 } => cmd_bind_relay(&url, migrate_pinned, json),
931 Command::AddPeerSlot {
932 handle,
933 url,
934 slot_id,
935 slot_token,
936 json,
937 } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
938 Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
939 Command::Pull { json } => cmd_pull(json),
940 Command::Pin { card_file, json } => cmd_pin(&card_file, json),
941 Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
942 Command::ForgetPeer {
943 handle,
944 purge,
945 json,
946 } => cmd_forget_peer(&handle, purge, json),
947 Command::Daemon {
948 interval,
949 once,
950 json,
951 } => cmd_daemon(interval, once, json),
952 Command::PairHost {
953 relay,
954 yes,
955 timeout,
956 detach,
957 json,
958 } => {
959 if detach {
960 cmd_pair_host_detach(&relay, json)
961 } else {
962 cmd_pair_host(&relay, yes, timeout)
963 }
964 }
965 Command::PairJoin {
966 code_phrase,
967 relay,
968 yes,
969 timeout,
970 detach,
971 json,
972 } => {
973 if detach {
974 cmd_pair_join_detach(&code_phrase, &relay, json)
975 } else {
976 cmd_pair_join(&code_phrase, &relay, yes, timeout)
977 }
978 }
979 Command::PairConfirm {
980 code_phrase,
981 digits,
982 json,
983 } => cmd_pair_confirm(&code_phrase, &digits, json),
984 Command::PairList {
985 json,
986 watch,
987 watch_interval,
988 } => cmd_pair_list(json, watch, watch_interval),
989 Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
990 Command::PairWatch {
991 code_phrase,
992 status,
993 timeout,
994 json,
995 } => cmd_pair_watch(&code_phrase, &status, timeout, json),
996 Command::Pair {
997 handle,
998 code,
999 relay,
1000 yes,
1001 timeout,
1002 no_setup,
1003 detach,
1004 } => {
1005 if handle.contains('@') && code.is_none() {
1012 cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
1013 } else if detach {
1014 cmd_pair_detach(&handle, code.as_deref(), &relay)
1015 } else {
1016 cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
1017 }
1018 }
1019 Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
1020 Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
1021 Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
1022 Command::PairListInbound { json } => cmd_pair_list_inbound(json),
1023 Command::Session(cmd) => cmd_session(cmd),
1024 Command::Invite {
1025 relay,
1026 ttl,
1027 uses,
1028 share,
1029 json,
1030 } => cmd_invite(&relay, ttl, uses, share, json),
1031 Command::Accept { url, json } => cmd_accept(&url, json),
1032 Command::Whois {
1033 handle,
1034 json,
1035 relay,
1036 } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
1037 Command::Add {
1038 handle,
1039 relay,
1040 json,
1041 } => cmd_add(&handle, relay.as_deref(), json),
1042 Command::Up {
1043 handle,
1044 name,
1045 json,
1046 } => cmd_up(&handle, name.as_deref(), json),
1047 Command::Doctor {
1048 json,
1049 recent_rejections,
1050 } => cmd_doctor(json, recent_rejections),
1051 Command::Upgrade { check, json } => cmd_upgrade(check, json),
1052 Command::Service { action } => cmd_service(action),
1053 Command::Diag { action } => cmd_diag(action),
1054 Command::Claim {
1055 nick,
1056 relay,
1057 public_url,
1058 hidden,
1059 json,
1060 } => cmd_claim(
1061 &nick,
1062 relay.as_deref(),
1063 public_url.as_deref(),
1064 hidden,
1065 json,
1066 ),
1067 Command::Profile { action } => cmd_profile(action),
1068 Command::Setup { apply } => cmd_setup(apply),
1069 Command::Reactor {
1070 on_event,
1071 peer,
1072 kind,
1073 verified_only,
1074 interval,
1075 once,
1076 dry_run,
1077 max_per_minute,
1078 max_chain_depth,
1079 } => cmd_reactor(
1080 &on_event,
1081 peer.as_deref(),
1082 kind.as_deref(),
1083 verified_only,
1084 interval,
1085 once,
1086 dry_run,
1087 max_per_minute,
1088 max_chain_depth,
1089 ),
1090 Command::Notify {
1091 interval,
1092 peer,
1093 once,
1094 json,
1095 } => cmd_notify(interval, peer.as_deref(), once, json),
1096 }
1097}
1098
1099fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
1102 if !handle
1103 .chars()
1104 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1105 {
1106 bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
1107 }
1108 if config::is_initialized()? {
1109 bail!(
1110 "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
1111 config::config_dir()?
1112 );
1113 }
1114
1115 config::ensure_dirs()?;
1116 let (sk_seed, pk_bytes) = generate_keypair();
1117 config::write_private_key(&sk_seed)?;
1118
1119 let card = build_agent_card(handle, &pk_bytes, name, None, None);
1120 let signed = sign_agent_card(&card, &sk_seed);
1121 config::write_agent_card(&signed)?;
1122
1123 let mut trust = empty_trust();
1124 add_self_to_trust(&mut trust, handle, &pk_bytes);
1125 config::write_trust(&trust)?;
1126
1127 let fp = fingerprint(&pk_bytes);
1128 let key_id = make_key_id(handle, &pk_bytes);
1129
1130 let mut relay_info: Option<(String, String)> = None;
1132 if let Some(url) = relay {
1133 let normalized = url.trim_end_matches('/');
1134 let client = crate::relay_client::RelayClient::new(normalized);
1135 client.check_healthz()?;
1136 let alloc = client.allocate_slot(Some(handle))?;
1137 let mut state = config::read_relay_state()?;
1138 state["self"] = json!({
1139 "relay_url": normalized,
1140 "slot_id": alloc.slot_id.clone(),
1141 "slot_token": alloc.slot_token,
1142 });
1143 config::write_relay_state(&state)?;
1144 relay_info = Some((normalized.to_string(), alloc.slot_id));
1145 }
1146
1147 let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
1148 if as_json {
1149 let mut out = json!({
1150 "did": did_str.clone(),
1151 "fingerprint": fp,
1152 "key_id": key_id,
1153 "config_dir": config::config_dir()?.to_string_lossy(),
1154 });
1155 if let Some((url, slot_id)) = &relay_info {
1156 out["relay_url"] = json!(url);
1157 out["slot_id"] = json!(slot_id);
1158 }
1159 println!("{}", serde_json::to_string(&out)?);
1160 } else {
1161 println!("generated {did_str} (ed25519:{key_id})");
1162 println!(
1163 "config written to {}",
1164 config::config_dir()?.to_string_lossy()
1165 );
1166 if let Some((url, slot_id)) = &relay_info {
1167 println!("bound to relay {url} (slot {slot_id})");
1168 println!();
1169 println!(
1170 "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
1171 );
1172 } else {
1173 println!();
1174 println!(
1175 "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
1176 );
1177 }
1178 }
1179 Ok(())
1180}
1181
1182fn cmd_status(as_json: bool) -> Result<()> {
1185 let initialized = config::is_initialized()?;
1186
1187 let mut summary = json!({
1188 "initialized": initialized,
1189 });
1190
1191 if initialized {
1192 let card = config::read_agent_card()?;
1193 let did = card
1194 .get("did")
1195 .and_then(Value::as_str)
1196 .unwrap_or("")
1197 .to_string();
1198 let handle = card
1202 .get("handle")
1203 .and_then(Value::as_str)
1204 .map(str::to_string)
1205 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1206 let pk_b64 = card
1207 .get("verify_keys")
1208 .and_then(Value::as_object)
1209 .and_then(|m| m.values().next())
1210 .and_then(|v| v.get("key"))
1211 .and_then(Value::as_str)
1212 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1213 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1214 summary["did"] = json!(did);
1215 summary["handle"] = json!(handle);
1216 summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1217 summary["capabilities"] = card
1218 .get("capabilities")
1219 .cloned()
1220 .unwrap_or_else(|| json!([]));
1221
1222 let trust = config::read_trust()?;
1223 let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1224 let mut peers = Vec::new();
1225 if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1226 for (peer_handle, _agent) in agents {
1227 if peer_handle == &handle {
1228 continue; }
1230 peers.push(json!({
1235 "handle": peer_handle,
1236 "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1237 }));
1238 }
1239 }
1240 summary["peers"] = json!(peers);
1241
1242 let relay_state = config::read_relay_state()?;
1243 summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1244 if !summary["self_relay"].is_null() {
1245 if let Some(obj) = summary["self_relay"].as_object_mut() {
1247 obj.remove("slot_token");
1248 }
1249 }
1250 summary["peer_slots_count"] = json!(
1251 relay_state
1252 .get("peers")
1253 .and_then(Value::as_object)
1254 .map(|m| m.len())
1255 .unwrap_or(0)
1256 );
1257
1258 let outbox = config::outbox_dir()?;
1260 let inbox = config::inbox_dir()?;
1261 summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1262 summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1263
1264 let snap = crate::ensure_up::daemon_liveness();
1270 let mut daemon = json!({
1271 "running": snap.pidfile_alive,
1272 "pid": snap.pidfile_pid,
1273 "all_running_pids": snap.pgrep_pids,
1274 "orphans": snap.orphan_pids,
1275 });
1276 if let crate::ensure_up::PidRecord::Json(d) = &snap.record {
1277 daemon["version"] = json!(d.version);
1278 daemon["bin_path"] = json!(d.bin_path);
1279 daemon["did"] = json!(d.did);
1280 daemon["relay_url"] = json!(d.relay_url);
1281 daemon["started_at"] = json!(d.started_at);
1282 daemon["schema"] = json!(d.schema);
1283 if d.version != env!("CARGO_PKG_VERSION") {
1284 daemon["version_mismatch"] = json!({
1285 "daemon": d.version.clone(),
1286 "cli": env!("CARGO_PKG_VERSION"),
1287 });
1288 }
1289 } else if matches!(snap.record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1290 daemon["pidfile_form"] = json!("legacy-int");
1291 daemon["version_mismatch"] = json!({
1292 "daemon": "<pre-0.5.11>",
1293 "cli": env!("CARGO_PKG_VERSION"),
1294 });
1295 }
1296 summary["daemon"] = daemon;
1297
1298 let pending = crate::pending_pair::list_pending().unwrap_or_default();
1300 let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1301 for p in &pending {
1302 *counts.entry(p.status.clone()).or_default() += 1;
1303 }
1304 let pending_inbound =
1306 crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
1307 let inbound_handles: Vec<&str> = pending_inbound
1308 .iter()
1309 .map(|p| p.peer_handle.as_str())
1310 .collect();
1311 summary["pending_pairs"] = json!({
1312 "total": pending.len(),
1313 "by_status": counts,
1314 "inbound_count": pending_inbound.len(),
1315 "inbound_handles": inbound_handles,
1316 });
1317 }
1318
1319 if as_json {
1320 println!("{}", serde_json::to_string(&summary)?);
1321 } else if !initialized {
1322 println!("not initialized — run `wire init <handle>` first");
1323 } else {
1324 println!("did: {}", summary["did"].as_str().unwrap_or("?"));
1325 println!(
1326 "fingerprint: {}",
1327 summary["fingerprint"].as_str().unwrap_or("?")
1328 );
1329 println!("capabilities: {}", summary["capabilities"]);
1330 if !summary["self_relay"].is_null() {
1331 println!(
1332 "self relay: {} (slot {})",
1333 summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1334 summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1335 );
1336 } else {
1337 println!("self relay: (not bound — run `wire pair-host --relay <url>` to bind)");
1338 }
1339 println!(
1340 "peers: {}",
1341 summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1342 );
1343 for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1344 println!(
1345 " - {:<20} tier={}",
1346 p["handle"].as_str().unwrap_or(""),
1347 p["tier"].as_str().unwrap_or("?")
1348 );
1349 }
1350 println!(
1351 "outbox: {} file(s), {} event(s) queued",
1352 summary["outbox"]["files"].as_u64().unwrap_or(0),
1353 summary["outbox"]["events"].as_u64().unwrap_or(0)
1354 );
1355 println!(
1356 "inbox: {} file(s), {} event(s) received",
1357 summary["inbox"]["files"].as_u64().unwrap_or(0),
1358 summary["inbox"]["events"].as_u64().unwrap_or(0)
1359 );
1360 let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1361 let daemon_pid = summary["daemon"]["pid"]
1362 .as_u64()
1363 .map(|p| p.to_string())
1364 .unwrap_or_else(|| "—".to_string());
1365 let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1366 let version_suffix = if !daemon_version.is_empty() {
1367 format!(" v{daemon_version}")
1368 } else {
1369 String::new()
1370 };
1371 println!(
1372 "daemon: {} (pid {}{})",
1373 if daemon_running { "running" } else { "DOWN" },
1374 daemon_pid,
1375 version_suffix,
1376 );
1377 if let Some(mm) = summary["daemon"].get("version_mismatch") {
1379 println!(
1380 " !! version mismatch: daemon={} CLI={}. \
1381 run `wire upgrade` to swap atomically.",
1382 mm["daemon"].as_str().unwrap_or("?"),
1383 mm["cli"].as_str().unwrap_or("?"),
1384 );
1385 }
1386 if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1387 && !orphans.is_empty()
1388 {
1389 let pids: Vec<String> = orphans
1390 .iter()
1391 .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1392 .collect();
1393 println!(
1394 " !! orphan daemon process(es): pids {}. \
1395 pgrep saw them but pidfile didn't — likely stale process from \
1396 prior install. Multiple daemons race the relay cursor.",
1397 pids.join(", ")
1398 );
1399 }
1400 let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1401 let inbound_count = summary["pending_pairs"]["inbound_count"]
1402 .as_u64()
1403 .unwrap_or(0);
1404 if pending_total > 0 {
1405 print!("pending pairs: {pending_total}");
1406 if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1407 let parts: Vec<String> = obj
1408 .iter()
1409 .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1410 .collect();
1411 if !parts.is_empty() {
1412 print!(" ({})", parts.join(", "));
1413 }
1414 }
1415 println!();
1416 } else if inbound_count == 0 {
1417 println!("pending pairs: none");
1418 }
1419 if inbound_count > 0 {
1423 let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
1424 .as_array()
1425 .map(|a| {
1426 a.iter()
1427 .filter_map(|v| v.as_str().map(str::to_string))
1428 .collect()
1429 })
1430 .unwrap_or_default();
1431 println!(
1432 "inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
1433 handles.join(", "),
1434 );
1435 }
1436 }
1437 Ok(())
1438}
1439
1440fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1441 if !dir.exists() {
1442 return Ok(json!({"files": 0, "events": 0}));
1443 }
1444 let mut files = 0usize;
1445 let mut events = 0usize;
1446 for entry in std::fs::read_dir(dir)? {
1447 let path = entry?.path();
1448 if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1449 files += 1;
1450 if let Ok(body) = std::fs::read_to_string(&path) {
1451 events += body.lines().filter(|l| !l.trim().is_empty()).count();
1452 }
1453 }
1454 }
1455 Ok(json!({"files": files, "events": events}))
1456}
1457
1458fn responder_status_allowed(status: &str) -> bool {
1461 matches!(
1462 status,
1463 "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1464 )
1465}
1466
1467fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1468 let state = config::read_relay_state()?;
1469 let (label, slot_info) = match peer {
1470 Some(peer) => (
1471 peer.to_string(),
1472 state
1473 .get("peers")
1474 .and_then(|p| p.get(peer))
1475 .ok_or_else(|| {
1476 anyhow!(
1477 "unknown peer {peer:?} in relay state — pair with them first:\n \
1478 wire add {peer}@wireup.net (or {peer}@<their-relay>)\n\
1479 (`wire peers` lists who you've already paired with.)"
1480 )
1481 })?,
1482 ),
1483 None => (
1484 "self".to_string(),
1485 state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1486 anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1487 })?,
1488 ),
1489 };
1490 let relay_url = slot_info["relay_url"]
1491 .as_str()
1492 .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1493 .to_string();
1494 let slot_id = slot_info["slot_id"]
1495 .as_str()
1496 .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1497 .to_string();
1498 let slot_token = slot_info["slot_token"]
1499 .as_str()
1500 .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1501 .to_string();
1502 Ok((label, relay_url, slot_id, slot_token))
1503}
1504
1505fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1506 if !responder_status_allowed(status) {
1507 bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1508 }
1509 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1510 let now = time::OffsetDateTime::now_utc()
1511 .format(&time::format_description::well_known::Rfc3339)
1512 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1513 let mut record = json!({
1514 "status": status,
1515 "set_at": now,
1516 });
1517 if let Some(reason) = reason {
1518 record["reason"] = json!(reason);
1519 }
1520 if status == "online" {
1521 record["last_success_at"] = json!(now);
1522 }
1523 let client = crate::relay_client::RelayClient::new(&relay_url);
1524 let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1525 if as_json {
1526 println!("{}", serde_json::to_string(&saved)?);
1527 } else {
1528 let reason = saved
1529 .get("reason")
1530 .and_then(Value::as_str)
1531 .map(|r| format!(" — {r}"))
1532 .unwrap_or_default();
1533 println!(
1534 "responder {}{}",
1535 saved
1536 .get("status")
1537 .and_then(Value::as_str)
1538 .unwrap_or(status),
1539 reason
1540 );
1541 }
1542 Ok(())
1543}
1544
1545fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1546 let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1547 let client = crate::relay_client::RelayClient::new(&relay_url);
1548 let health = client.responder_health_get(&slot_id, &slot_token)?;
1549 if as_json {
1550 println!(
1551 "{}",
1552 serde_json::to_string(&json!({
1553 "target": label,
1554 "responder_health": health,
1555 }))?
1556 );
1557 } else if health.is_null() {
1558 println!("{label}: responder health not reported");
1559 } else {
1560 let status = health
1561 .get("status")
1562 .and_then(Value::as_str)
1563 .unwrap_or("unknown");
1564 let reason = health
1565 .get("reason")
1566 .and_then(Value::as_str)
1567 .map(|r| format!(" — {r}"))
1568 .unwrap_or_default();
1569 let last_success = health
1570 .get("last_success_at")
1571 .and_then(Value::as_str)
1572 .map(|t| format!(" (last_success: {t})"))
1573 .unwrap_or_default();
1574 println!("{label}: {status}{reason}{last_success}");
1575 }
1576 Ok(())
1577}
1578
1579fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1580 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1581 let client = crate::relay_client::RelayClient::new(&relay_url);
1582
1583 let started = std::time::Instant::now();
1584 let transport_ok = client.healthz().unwrap_or(false);
1585 let latency_ms = started.elapsed().as_millis() as u64;
1586
1587 let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1588 let now = std::time::SystemTime::now()
1589 .duration_since(std::time::UNIX_EPOCH)
1590 .map(|d| d.as_secs())
1591 .unwrap_or(0);
1592 let attention = match last_pull_at_unix {
1593 Some(last) if now.saturating_sub(last) <= 300 => json!({
1594 "status": "ok",
1595 "last_pull_at_unix": last,
1596 "age_seconds": now.saturating_sub(last),
1597 "event_count": event_count,
1598 }),
1599 Some(last) => json!({
1600 "status": "stale",
1601 "last_pull_at_unix": last,
1602 "age_seconds": now.saturating_sub(last),
1603 "event_count": event_count,
1604 }),
1605 None => json!({
1606 "status": "never_pulled",
1607 "last_pull_at_unix": Value::Null,
1608 "event_count": event_count,
1609 }),
1610 };
1611
1612 let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1613 let responder = if responder_health.is_null() {
1614 json!({"status": "not_reported", "record": Value::Null})
1615 } else {
1616 json!({
1617 "status": responder_health
1618 .get("status")
1619 .and_then(Value::as_str)
1620 .unwrap_or("unknown"),
1621 "record": responder_health,
1622 })
1623 };
1624
1625 let report = json!({
1626 "peer": peer,
1627 "transport": {
1628 "status": if transport_ok { "ok" } else { "error" },
1629 "relay_url": relay_url,
1630 "latency_ms": latency_ms,
1631 },
1632 "attention": attention,
1633 "responder": responder,
1634 });
1635
1636 if as_json {
1637 println!("{}", serde_json::to_string(&report)?);
1638 } else {
1639 let transport_line = if transport_ok {
1640 format!("ok relay reachable ({latency_ms}ms)")
1641 } else {
1642 "error relay unreachable".to_string()
1643 };
1644 println!("transport {transport_line}");
1645 match report["attention"]["status"].as_str().unwrap_or("unknown") {
1646 "ok" => println!(
1647 "attention ok last pull {}s ago",
1648 report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1649 ),
1650 "stale" => println!(
1651 "attention stale last pull {}m ago",
1652 report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1653 ),
1654 "never_pulled" => println!("attention never pulled since relay reset"),
1655 other => println!("attention {other}"),
1656 }
1657 if report["responder"]["status"] == "not_reported" {
1658 println!("auto-responder not reported");
1659 } else {
1660 let record = &report["responder"]["record"];
1661 let status = record
1662 .get("status")
1663 .and_then(Value::as_str)
1664 .unwrap_or("unknown");
1665 let reason = record
1666 .get("reason")
1667 .and_then(Value::as_str)
1668 .map(|r| format!(" — {r}"))
1669 .unwrap_or_default();
1670 println!("auto-responder {status}{reason}");
1671 }
1672 }
1673 Ok(())
1674}
1675
1676fn cmd_whoami(as_json: bool) -> Result<()> {
1681 if !config::is_initialized()? {
1682 bail!("not initialized — run `wire init <handle>` first");
1683 }
1684 let card = config::read_agent_card()?;
1685 let did = card
1686 .get("did")
1687 .and_then(Value::as_str)
1688 .unwrap_or("")
1689 .to_string();
1690 let handle = card
1691 .get("handle")
1692 .and_then(Value::as_str)
1693 .map(str::to_string)
1694 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1695 let pk_b64 = card
1696 .get("verify_keys")
1697 .and_then(Value::as_object)
1698 .and_then(|m| m.values().next())
1699 .and_then(|v| v.get("key"))
1700 .and_then(Value::as_str)
1701 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1702 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1703 let fp = fingerprint(&pk_bytes);
1704 let key_id = make_key_id(&handle, &pk_bytes);
1705 let capabilities = card
1706 .get("capabilities")
1707 .cloned()
1708 .unwrap_or_else(|| json!(["wire/v3.1"]));
1709
1710 if as_json {
1711 println!(
1712 "{}",
1713 serde_json::to_string(&json!({
1714 "did": did,
1715 "handle": handle,
1716 "fingerprint": fp,
1717 "key_id": key_id,
1718 "public_key_b64": pk_b64,
1719 "capabilities": capabilities,
1720 "config_dir": config::config_dir()?.to_string_lossy(),
1721 }))?
1722 );
1723 } else {
1724 println!("{did} (ed25519:{key_id})");
1725 println!("fingerprint: {fp}");
1726 println!("capabilities: {capabilities}");
1727 }
1728 Ok(())
1729}
1730
1731fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1746 let raw = crate::trust::get_tier(trust, handle);
1747 if raw != "VERIFIED" {
1748 return raw.to_string();
1749 }
1750 let token = relay_state
1751 .get("peers")
1752 .and_then(|p| p.get(handle))
1753 .and_then(|p| p.get("slot_token"))
1754 .and_then(Value::as_str)
1755 .unwrap_or("");
1756 if token.is_empty() {
1757 "PENDING_ACK".to_string()
1758 } else {
1759 raw.to_string()
1760 }
1761}
1762
1763fn cmd_peers(as_json: bool) -> Result<()> {
1764 let trust = config::read_trust()?;
1765 let agents = trust
1766 .get("agents")
1767 .and_then(Value::as_object)
1768 .cloned()
1769 .unwrap_or_default();
1770 let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1771
1772 let mut self_did: Option<String> = None;
1773 if let Ok(card) = config::read_agent_card() {
1774 self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1775 }
1776
1777 let mut peers = Vec::new();
1778 for (handle, agent) in agents.iter() {
1779 let did = agent
1780 .get("did")
1781 .and_then(Value::as_str)
1782 .unwrap_or("")
1783 .to_string();
1784 if Some(did.as_str()) == self_did.as_deref() {
1785 continue; }
1787 let tier = effective_peer_tier(&trust, &relay_state, handle);
1788 let capabilities = agent
1789 .get("card")
1790 .and_then(|c| c.get("capabilities"))
1791 .cloned()
1792 .unwrap_or_else(|| json!([]));
1793 peers.push(json!({
1794 "handle": handle,
1795 "did": did,
1796 "tier": tier,
1797 "capabilities": capabilities,
1798 }));
1799 }
1800
1801 if as_json {
1802 println!("{}", serde_json::to_string(&peers)?);
1803 } else if peers.is_empty() {
1804 println!("no peers pinned (run `wire join <code>` to pair)");
1805 } else {
1806 for p in &peers {
1807 println!(
1808 "{:<20} {:<10} {}",
1809 p["handle"].as_str().unwrap_or(""),
1810 p["tier"].as_str().unwrap_or(""),
1811 p["did"].as_str().unwrap_or(""),
1812 );
1813 }
1814 }
1815 Ok(())
1816}
1817
1818fn maybe_warn_peer_attentiveness(peer: &str) {
1828 let state = match config::read_relay_state() {
1829 Ok(s) => s,
1830 Err(_) => return,
1831 };
1832 let p = state.get("peers").and_then(|p| p.get(peer));
1833 let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1834 Some(s) if !s.is_empty() => s,
1835 _ => return,
1836 };
1837 let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1838 Some(s) if !s.is_empty() => s,
1839 _ => return,
1840 };
1841 let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1842 Some(s) if !s.is_empty() => s.to_string(),
1843 _ => match state
1844 .get("self")
1845 .and_then(|s| s.get("relay_url"))
1846 .and_then(Value::as_str)
1847 {
1848 Some(s) if !s.is_empty() => s.to_string(),
1849 _ => return,
1850 },
1851 };
1852 let client = crate::relay_client::RelayClient::new(&relay_url);
1853 let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1854 Ok(t) => t,
1855 Err(_) => return,
1856 };
1857 let now = std::time::SystemTime::now()
1858 .duration_since(std::time::UNIX_EPOCH)
1859 .map(|d| d.as_secs())
1860 .unwrap_or(0);
1861 match last_pull {
1862 None => {
1863 eprintln!(
1864 "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1865 );
1866 }
1867 Some(t) if now.saturating_sub(t) > 300 => {
1868 let mins = now.saturating_sub(t) / 60;
1869 eprintln!(
1870 "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1871 );
1872 }
1873 _ => {}
1874 }
1875}
1876
1877pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1878 let trimmed = input.trim();
1879 if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1880 {
1881 return Ok(trimmed.to_string());
1882 }
1883 let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1884 let n: i64 = amount
1885 .parse()
1886 .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1887 if n <= 0 {
1888 bail!("deadline duration must be positive: {input:?}");
1889 }
1890 let duration = match unit {
1891 "m" => time::Duration::minutes(n),
1892 "h" => time::Duration::hours(n),
1893 "d" => time::Duration::days(n),
1894 _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1895 };
1896 Ok((time::OffsetDateTime::now_utc() + duration)
1897 .format(&time::format_description::well_known::Rfc3339)
1898 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1899}
1900
1901fn cmd_send(
1902 peer: &str,
1903 kind: &str,
1904 body_arg: &str,
1905 deadline: Option<&str>,
1906 as_json: bool,
1907) -> Result<()> {
1908 if !config::is_initialized()? {
1909 bail!("not initialized — run `wire init <handle>` first");
1910 }
1911 let peer = crate::agent_card::bare_handle(peer);
1912 let sk_seed = config::read_private_key()?;
1913 let card = config::read_agent_card()?;
1914 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1915 let handle = crate::agent_card::display_handle_from_did(did).to_string();
1916 let pk_b64 = card
1917 .get("verify_keys")
1918 .and_then(Value::as_object)
1919 .and_then(|m| m.values().next())
1920 .and_then(|v| v.get("key"))
1921 .and_then(Value::as_str)
1922 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1923 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1924
1925 let body_value: Value = if body_arg == "-" {
1930 use std::io::Read;
1931 let mut raw = String::new();
1932 std::io::stdin()
1933 .read_to_string(&mut raw)
1934 .with_context(|| "reading body from stdin")?;
1935 serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1938 } else if let Some(path) = body_arg.strip_prefix('@') {
1939 let raw =
1940 std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1941 serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1942 } else {
1943 Value::String(body_arg.to_string())
1944 };
1945
1946 let kind_id = parse_kind(kind)?;
1947
1948 let now = time::OffsetDateTime::now_utc()
1949 .format(&time::format_description::well_known::Rfc3339)
1950 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1951
1952 let mut event = json!({
1953 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1954 "timestamp": now,
1955 "from": did,
1956 "to": format!("did:wire:{peer}"),
1957 "type": kind,
1958 "kind": kind_id,
1959 "body": body_value,
1960 });
1961 if let Some(deadline) = deadline {
1962 event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1963 }
1964 let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1965 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1966
1967 maybe_warn_peer_attentiveness(peer);
1972
1973 let line = serde_json::to_vec(&signed)?;
1978 let outbox = config::append_outbox_record(peer, &line)?;
1979
1980 if as_json {
1981 println!(
1982 "{}",
1983 serde_json::to_string(&json!({
1984 "event_id": event_id,
1985 "status": "queued",
1986 "peer": peer,
1987 "outbox": outbox.to_string_lossy(),
1988 }))?
1989 );
1990 } else {
1991 println!(
1992 "queued event {event_id} → {peer} (outbox: {})",
1993 outbox.display()
1994 );
1995 }
1996 Ok(())
1997}
1998
1999fn parse_kind(s: &str) -> Result<u32> {
2000 if let Ok(n) = s.parse::<u32>() {
2001 return Ok(n);
2002 }
2003 for (id, name) in crate::signing::kinds() {
2004 if *name == s {
2005 return Ok(*id);
2006 }
2007 }
2008 Ok(1)
2010}
2011
2012fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
2015 let inbox = config::inbox_dir()?;
2016 if !inbox.exists() {
2017 if !as_json {
2018 eprintln!("no inbox yet — daemon hasn't run, or no events received");
2019 }
2020 return Ok(());
2021 }
2022 let trust = config::read_trust()?;
2023 let mut count = 0usize;
2024
2025 let entries: Vec<_> = std::fs::read_dir(&inbox)?
2026 .filter_map(|e| e.ok())
2027 .map(|e| e.path())
2028 .filter(|p| {
2029 p.extension().map(|x| x == "jsonl").unwrap_or(false)
2030 && match peer {
2031 Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
2032 None => true,
2033 }
2034 })
2035 .collect();
2036
2037 for path in entries {
2038 let body = std::fs::read_to_string(&path)?;
2039 for line in body.lines() {
2040 let event: Value = match serde_json::from_str(line) {
2041 Ok(v) => v,
2042 Err(_) => continue,
2043 };
2044 let verified = verify_message_v31(&event, &trust).is_ok();
2045 if as_json {
2046 let mut event_with_meta = event.clone();
2047 if let Some(obj) = event_with_meta.as_object_mut() {
2048 obj.insert("verified".into(), json!(verified));
2049 }
2050 println!("{}", serde_json::to_string(&event_with_meta)?);
2051 } else {
2052 let ts = event
2053 .get("timestamp")
2054 .and_then(Value::as_str)
2055 .unwrap_or("?");
2056 let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
2057 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
2058 let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
2059 let summary = event
2060 .get("body")
2061 .map(|b| match b {
2062 Value::String(s) => s.clone(),
2063 _ => b.to_string(),
2064 })
2065 .unwrap_or_default();
2066 let mark = if verified { "✓" } else { "✗" };
2067 let deadline = event
2068 .get("time_sensitive_until")
2069 .and_then(Value::as_str)
2070 .map(|d| format!(" deadline: {d}"))
2071 .unwrap_or_default();
2072 println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
2073 }
2074 count += 1;
2075 if limit > 0 && count >= limit {
2076 return Ok(());
2077 }
2078 }
2079 }
2080 Ok(())
2081}
2082
2083fn monitor_is_noise_kind(kind: &str) -> bool {
2089 matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
2090}
2091
2092fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
2096 if as_json {
2097 Ok(serde_json::to_string(e)?)
2098 } else {
2099 let eid_short: String = e.event_id.chars().take(12).collect();
2100 let body = e.body_preview.replace('\n', " ");
2101 let ts: String = e.timestamp.chars().take(19).collect();
2102 Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
2103 }
2104}
2105
2106fn cmd_monitor(
2122 peer_filter: Option<&str>,
2123 as_json: bool,
2124 include_handshake: bool,
2125 interval_ms: u64,
2126 replay: usize,
2127) -> Result<()> {
2128 let inbox_dir = config::inbox_dir()?;
2129 if !inbox_dir.exists() {
2130 if !as_json {
2131 eprintln!(
2132 "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
2133 );
2134 }
2135 }
2137
2138 if replay > 0 && inbox_dir.exists() {
2142 let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
2143 for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
2144 let path = entry.path();
2145 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2146 continue;
2147 }
2148 let peer = match path.file_stem().and_then(|s| s.to_str()) {
2149 Some(s) => s.to_string(),
2150 None => continue,
2151 };
2152 if let Some(filter) = peer_filter {
2153 if peer != filter {
2154 continue;
2155 }
2156 }
2157 let body = std::fs::read_to_string(&path).unwrap_or_default();
2158 for line in body.lines() {
2159 let line = line.trim();
2160 if line.is_empty() {
2161 continue;
2162 }
2163 let signed: Value = match serde_json::from_str(line) {
2164 Ok(v) => v,
2165 Err(_) => continue,
2166 };
2167 let ev = crate::inbox_watch::InboxEvent::from_signed(
2168 &peer,
2169 signed,
2170 true,
2171 );
2172 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2173 continue;
2174 }
2175 all.push(ev);
2176 }
2177 }
2178 all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2181 let start = all.len().saturating_sub(replay);
2182 for ev in &all[start..] {
2183 println!("{}", monitor_render(ev, as_json)?);
2184 }
2185 use std::io::Write;
2186 std::io::stdout().flush().ok();
2187 }
2188
2189 let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2192 let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2193
2194 loop {
2195 let events = w.poll()?;
2196 let mut wrote = false;
2197 for ev in events {
2198 if let Some(filter) = peer_filter {
2199 if ev.peer != filter {
2200 continue;
2201 }
2202 }
2203 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2204 continue;
2205 }
2206 println!("{}", monitor_render(&ev, as_json)?);
2207 wrote = true;
2208 }
2209 if wrote {
2210 use std::io::Write;
2211 std::io::stdout().flush().ok();
2212 }
2213 std::thread::sleep(sleep_dur);
2214 }
2215}
2216
2217#[cfg(test)]
2218mod tier_tests {
2219 use super::*;
2220 use serde_json::json;
2221
2222 fn trust_with(handle: &str, tier: &str) -> Value {
2223 json!({
2224 "version": 1,
2225 "agents": {
2226 handle: {
2227 "tier": tier,
2228 "did": format!("did:wire:{handle}"),
2229 "card": {"capabilities": ["wire/v3.1"]}
2230 }
2231 }
2232 })
2233 }
2234
2235 #[test]
2236 fn pending_ack_when_verified_but_no_slot_token() {
2237 let trust = trust_with("willard", "VERIFIED");
2241 let relay_state = json!({
2242 "peers": {
2243 "willard": {
2244 "relay_url": "https://relay",
2245 "slot_id": "abc",
2246 "slot_token": "",
2247 }
2248 }
2249 });
2250 assert_eq!(
2251 effective_peer_tier(&trust, &relay_state, "willard"),
2252 "PENDING_ACK"
2253 );
2254 }
2255
2256 #[test]
2257 fn verified_when_slot_token_present() {
2258 let trust = trust_with("willard", "VERIFIED");
2259 let relay_state = json!({
2260 "peers": {
2261 "willard": {
2262 "relay_url": "https://relay",
2263 "slot_id": "abc",
2264 "slot_token": "tok123",
2265 }
2266 }
2267 });
2268 assert_eq!(
2269 effective_peer_tier(&trust, &relay_state, "willard"),
2270 "VERIFIED"
2271 );
2272 }
2273
2274 #[test]
2275 fn raw_tier_passes_through_for_non_verified() {
2276 let trust = trust_with("willard", "UNTRUSTED");
2279 let relay_state = json!({
2280 "peers": {"willard": {"slot_token": ""}}
2281 });
2282 assert_eq!(
2283 effective_peer_tier(&trust, &relay_state, "willard"),
2284 "UNTRUSTED"
2285 );
2286 }
2287
2288 #[test]
2289 fn pending_ack_when_relay_state_missing_peer() {
2290 let trust = trust_with("willard", "VERIFIED");
2294 let relay_state = json!({"peers": {}});
2295 assert_eq!(
2296 effective_peer_tier(&trust, &relay_state, "willard"),
2297 "PENDING_ACK"
2298 );
2299 }
2300}
2301
2302#[cfg(test)]
2303mod monitor_tests {
2304 use super::*;
2305 use crate::inbox_watch::InboxEvent;
2306 use serde_json::Value;
2307
2308 fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2309 InboxEvent {
2310 peer: peer.to_string(),
2311 event_id: "abcd1234567890ef".to_string(),
2312 kind: kind.to_string(),
2313 body_preview: body.to_string(),
2314 verified: true,
2315 timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2316 raw: Value::Null,
2317 }
2318 }
2319
2320 #[test]
2321 fn monitor_filter_drops_handshake_kinds_by_default() {
2322 assert!(monitor_is_noise_kind("pair_drop"));
2327 assert!(monitor_is_noise_kind("pair_drop_ack"));
2328 assert!(monitor_is_noise_kind("heartbeat"));
2329
2330 assert!(!monitor_is_noise_kind("claim"));
2332 assert!(!monitor_is_noise_kind("decision"));
2333 assert!(!monitor_is_noise_kind("ack"));
2334 assert!(!monitor_is_noise_kind("request"));
2335 assert!(!monitor_is_noise_kind("note"));
2336 assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2340 }
2341
2342 #[test]
2343 fn monitor_render_plain_is_one_short_line() {
2344 let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2345 let line = monitor_render(&e, false).unwrap();
2346 assert!(!line.contains('\n'), "render must be one line: {line}");
2348 assert!(line.contains("willard"));
2350 assert!(line.contains("claim"));
2351 assert!(line.contains("real v8 train"));
2352 assert!(line.contains("abcd12345678"));
2354 assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2355 assert!(line.contains("2026-05-15T23:14:07"));
2357 }
2358
2359 #[test]
2360 fn monitor_render_strips_newlines_from_body() {
2361 let e = ev("spark", "claim", "line one\nline two\nline three");
2366 let line = monitor_render(&e, false).unwrap();
2367 assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2368 assert!(line.contains("line one line two line three"));
2369 }
2370
2371 #[test]
2372 fn monitor_render_json_is_valid_jsonl() {
2373 let e = ev("spark", "claim", "hi");
2374 let line = monitor_render(&e, true).unwrap();
2375 assert!(!line.contains('\n'));
2376 let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2377 assert_eq!(parsed["peer"], "spark");
2378 assert_eq!(parsed["kind"], "claim");
2379 assert_eq!(parsed["body_preview"], "hi");
2380 }
2381
2382 #[test]
2383 fn monitor_does_not_drop_on_verified_null() {
2384 let mut e = ev("spark", "claim", "from disk with verified=null");
2395 e.verified = false; let line = monitor_render(&e, false).unwrap();
2397 assert!(line.contains("from disk with verified=null"));
2398 assert!(!monitor_is_noise_kind("claim"));
2400 }
2401}
2402
2403fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2406 let body = if path == "-" {
2407 let mut buf = String::new();
2408 use std::io::Read;
2409 std::io::stdin().read_to_string(&mut buf)?;
2410 buf
2411 } else {
2412 std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2413 };
2414 let event: Value = serde_json::from_str(&body)?;
2415 let trust = config::read_trust()?;
2416 match verify_message_v31(&event, &trust) {
2417 Ok(()) => {
2418 if as_json {
2419 println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2420 } else {
2421 println!("verified ✓");
2422 }
2423 Ok(())
2424 }
2425 Err(e) => {
2426 let reason = e.to_string();
2427 if as_json {
2428 println!(
2429 "{}",
2430 serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2431 );
2432 } else {
2433 eprintln!("FAILED: {reason}");
2434 }
2435 std::process::exit(1);
2436 }
2437 }
2438}
2439
2440fn cmd_mcp() -> Result<()> {
2443 crate::mcp::run()
2444}
2445
2446fn cmd_relay_server(bind: &str, local_only: bool) -> Result<()> {
2447 if local_only {
2451 validate_loopback_bind(bind)?;
2452 }
2453 let base = if let Ok(home) = std::env::var("WIRE_HOME") {
2459 std::path::PathBuf::from(home)
2460 .join("state")
2461 .join("wire-relay")
2462 } else {
2463 dirs::state_dir()
2464 .or_else(dirs::data_local_dir)
2465 .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2466 .join("wire-relay")
2467 };
2468 let state_dir = if local_only { base.join("local") } else { base };
2469 let runtime = tokio::runtime::Builder::new_multi_thread()
2470 .enable_all()
2471 .build()?;
2472 runtime.block_on(crate::relay_server::serve_with_mode(
2473 bind,
2474 state_dir,
2475 crate::relay_server::ServerMode { local_only },
2476 ))
2477}
2478
2479fn validate_loopback_bind(bind: &str) -> Result<()> {
2485 let host = if let Some(stripped) = bind.strip_prefix('[') {
2487 let close = stripped
2488 .find(']')
2489 .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
2490 stripped[..close].to_string()
2491 } else {
2492 bind.rsplit_once(':')
2493 .map(|(h, _)| h.to_string())
2494 .unwrap_or_else(|| bind.to_string())
2495 };
2496 use std::net::ToSocketAddrs;
2497 let probe = format!("{host}:0");
2498 let resolved: Vec<_> = probe
2499 .to_socket_addrs()
2500 .with_context(|| format!("resolving bind host {host:?}"))?
2501 .collect();
2502 if resolved.is_empty() {
2503 bail!("--local-only: bind host {host:?} resolved to no addresses");
2504 }
2505 for addr in &resolved {
2506 if !addr.ip().is_loopback() {
2507 bail!(
2508 "--local-only refuses non-loopback bind: {host:?} resolves to {} \
2509 which is not in 127.0.0.0/8 or [::1]. Remove --local-only to bind \
2510 publicly, or use 127.0.0.1 / [::1] / localhost.",
2511 addr.ip()
2512 );
2513 }
2514 }
2515 Ok(())
2516}
2517
2518fn cmd_bind_relay(url: &str, migrate_pinned: bool, as_json: bool) -> Result<()> {
2521 if !config::is_initialized()? {
2522 bail!("not initialized — run `wire init <handle>` first");
2523 }
2524 let card = config::read_agent_card()?;
2525 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2526 let handle = crate::agent_card::display_handle_from_did(did).to_string();
2527
2528 let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
2535 let pinned: Vec<String> = existing
2536 .get("peers")
2537 .and_then(|p| p.as_object())
2538 .map(|o| o.keys().cloned().collect())
2539 .unwrap_or_default();
2540 if !pinned.is_empty() && !migrate_pinned {
2541 let list = pinned.join(", ");
2542 bail!(
2543 "bind-relay would silently black-hole {n} pinned peer(s): {list}. \
2544 They are pinned to your CURRENT slot; without coordination they will keep \
2545 pushing to a slot you no longer read.\n\n\
2546 SAFE PATHS:\n\
2547 • `wire rotate-slot` — rotates slot on the SAME relay and emits a \
2548 wire_close event to every pinned peer so their daemons drop the stale \
2549 coords cleanly. This is the supported migration path.\n\
2550 • `wire bind-relay {url} --migrate-pinned` — acknowledges that pinned \
2551 peers will need to re-pin manually (you must notify them out-of-band, \
2552 via a fresh `wire add` from each peer or a re-shared invite). Use this \
2553 only when the current slot is unreachable so rotate-slot can't ack.\n\n\
2554 Issue #7 (silent black-hole on relay change) caught this — proceed only \
2555 if you understand the consequences.",
2556 n = pinned.len(),
2557 );
2558 }
2559
2560 let normalized = url.trim_end_matches('/');
2561 let client = crate::relay_client::RelayClient::new(normalized);
2562 client.check_healthz()?;
2563 let alloc = client.allocate_slot(Some(&handle))?;
2564 let mut state = existing;
2565 if !pinned.is_empty() {
2566 eprintln!(
2570 "wire bind-relay: migrating with {n} pinned peer(s) — they will black-hole \
2571 until they re-pin: {peers}",
2572 n = pinned.len(),
2573 peers = pinned.join(", "),
2574 );
2575 }
2576 state["self"] = json!({
2577 "relay_url": url,
2578 "slot_id": alloc.slot_id,
2579 "slot_token": alloc.slot_token,
2580 });
2581 config::write_relay_state(&state)?;
2582
2583 if as_json {
2584 println!(
2585 "{}",
2586 serde_json::to_string(&json!({
2587 "relay_url": url,
2588 "slot_id": alloc.slot_id,
2589 "slot_token_present": true,
2590 }))?
2591 );
2592 } else {
2593 println!("bound to relay {url}");
2594 println!("slot_id: {}", alloc.slot_id);
2595 println!(
2596 "(slot_token written to {} mode 0600)",
2597 config::relay_state_path()?.display()
2598 );
2599 }
2600 Ok(())
2601}
2602
2603fn cmd_add_peer_slot(
2606 handle: &str,
2607 url: &str,
2608 slot_id: &str,
2609 slot_token: &str,
2610 as_json: bool,
2611) -> Result<()> {
2612 let mut state = config::read_relay_state()?;
2613 let peers = state["peers"]
2614 .as_object_mut()
2615 .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2616 peers.insert(
2617 handle.to_string(),
2618 json!({
2619 "relay_url": url,
2620 "slot_id": slot_id,
2621 "slot_token": slot_token,
2622 }),
2623 );
2624 config::write_relay_state(&state)?;
2625 if as_json {
2626 println!(
2627 "{}",
2628 serde_json::to_string(&json!({
2629 "handle": handle,
2630 "relay_url": url,
2631 "slot_id": slot_id,
2632 "added": true,
2633 }))?
2634 );
2635 } else {
2636 println!("pinned peer slot for {handle} at {url} ({slot_id})");
2637 }
2638 Ok(())
2639}
2640
2641fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2644 let state = config::read_relay_state()?;
2645 let peers = state["peers"].as_object().cloned().unwrap_or_default();
2646 if peers.is_empty() {
2647 bail!(
2648 "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2649 );
2650 }
2651 let outbox_dir = config::outbox_dir()?;
2652 if outbox_dir.exists() {
2657 let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2658 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2659 let path = entry.path();
2660 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2661 continue;
2662 }
2663 let stem = match path.file_stem().and_then(|s| s.to_str()) {
2664 Some(s) => s.to_string(),
2665 None => continue,
2666 };
2667 if pinned.contains(&stem) {
2668 continue;
2669 }
2670 let bare = crate::agent_card::bare_handle(&stem);
2673 if pinned.contains(bare) {
2674 eprintln!(
2675 "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2676 Merge with: `cat {} >> {}` then delete the FQDN file.",
2677 stem,
2678 path.display(),
2679 outbox_dir.join(format!("{bare}.jsonl")).display(),
2680 );
2681 }
2682 }
2683 }
2684 if !outbox_dir.exists() {
2685 if as_json {
2686 println!(
2687 "{}",
2688 serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2689 );
2690 } else {
2691 println!("phyllis: nothing to dial out — write a message first with `wire send`");
2692 }
2693 return Ok(());
2694 }
2695
2696 let mut pushed = Vec::new();
2697 let mut skipped = Vec::new();
2698
2699 for (peer_handle, _) in peers.iter() {
2705 if let Some(want) = peer_filter
2706 && peer_handle != want
2707 {
2708 continue;
2709 }
2710 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2711 if !outbox.exists() {
2712 continue;
2713 }
2714 let ordered_endpoints =
2715 crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
2716 if ordered_endpoints.is_empty() {
2717 for line in std::fs::read_to_string(&outbox)
2721 .unwrap_or_default()
2722 .lines()
2723 {
2724 let event: Value = match serde_json::from_str(line) {
2725 Ok(v) => v,
2726 Err(_) => continue,
2727 };
2728 let event_id = event
2729 .get("event_id")
2730 .and_then(Value::as_str)
2731 .unwrap_or("")
2732 .to_string();
2733 skipped.push(json!({
2734 "peer": peer_handle,
2735 "event_id": event_id,
2736 "reason": "no reachable endpoint pinned for peer",
2737 }));
2738 }
2739 continue;
2740 }
2741 let body = std::fs::read_to_string(&outbox)?;
2742 for line in body.lines() {
2743 let event: Value = match serde_json::from_str(line) {
2744 Ok(v) => v,
2745 Err(_) => continue,
2746 };
2747 let event_id = event
2748 .get("event_id")
2749 .and_then(Value::as_str)
2750 .unwrap_or("")
2751 .to_string();
2752
2753 let mut delivered = false;
2754 let mut last_err_reason: Option<String> = None;
2755 for endpoint in &ordered_endpoints {
2756 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2757 match client.post_event(&endpoint.slot_id, &endpoint.slot_token, &event) {
2758 Ok(resp) => {
2759 if resp.status == "duplicate" {
2760 skipped.push(json!({
2761 "peer": peer_handle,
2762 "event_id": event_id,
2763 "reason": "duplicate",
2764 "endpoint": endpoint.relay_url,
2765 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2766 }));
2767 } else {
2768 pushed.push(json!({
2769 "peer": peer_handle,
2770 "event_id": event_id,
2771 "endpoint": endpoint.relay_url,
2772 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2773 }));
2774 }
2775 delivered = true;
2776 break;
2777 }
2778 Err(e) => {
2779 last_err_reason =
2784 Some(crate::relay_client::format_transport_error(&e));
2785 }
2786 }
2787 }
2788 if !delivered {
2789 skipped.push(json!({
2790 "peer": peer_handle,
2791 "event_id": event_id,
2792 "reason": last_err_reason.unwrap_or_else(|| "all endpoints failed".to_string()),
2793 }));
2794 }
2795 }
2796 }
2797
2798 if as_json {
2799 println!(
2800 "{}",
2801 serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2802 );
2803 } else {
2804 println!(
2805 "pushed {} event(s); skipped {} ({})",
2806 pushed.len(),
2807 skipped.len(),
2808 if skipped.is_empty() {
2809 "none"
2810 } else {
2811 "see --json for detail"
2812 }
2813 );
2814 }
2815 Ok(())
2816}
2817
2818fn cmd_pull(as_json: bool) -> Result<()> {
2821 let state = config::read_relay_state()?;
2822 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2823 if self_state.is_null() {
2824 bail!("self slot not bound — run `wire bind-relay <url>` first");
2825 }
2826
2827 let endpoints = crate::endpoints::self_endpoints(&state);
2836 if endpoints.is_empty() {
2837 bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
2838 }
2839
2840 let inbox_dir = config::inbox_dir()?;
2841 config::ensure_dirs()?;
2842
2843 let mut total_seen = 0usize;
2844 let mut all_written: Vec<Value> = Vec::new();
2845 let mut all_rejected: Vec<Value> = Vec::new();
2846 let mut all_blocked = false;
2847 let mut all_advance_cursor_to: Option<String> = None;
2848
2849 for endpoint in &endpoints {
2850 let cursor_key = endpoint_cursor_key(endpoint.scope);
2851 let last_event_id = self_state
2852 .get(&cursor_key)
2853 .and_then(Value::as_str)
2854 .map(str::to_string);
2855 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2856 let events = match client.list_events(
2857 &endpoint.slot_id,
2858 &endpoint.slot_token,
2859 last_event_id.as_deref(),
2860 Some(1000),
2861 ) {
2862 Ok(ev) => ev,
2863 Err(e) => {
2864 eprintln!(
2868 "wire pull: endpoint {} ({:?}) errored: {}; continuing",
2869 endpoint.relay_url,
2870 endpoint.scope,
2871 crate::relay_client::format_transport_error(&e),
2872 );
2873 continue;
2874 }
2875 };
2876 total_seen += events.len();
2877 let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
2878 all_written.extend(result.written.iter().cloned());
2879 all_rejected.extend(result.rejected.iter().cloned());
2880 if result.blocked {
2881 all_blocked = true;
2882 }
2883 if let Some(eid) = result.advance_cursor_to.clone() {
2886 if endpoint.scope == crate::endpoints::EndpointScope::Federation {
2887 all_advance_cursor_to = Some(eid.clone());
2888 }
2889 let key = cursor_key.clone();
2890 config::update_relay_state(|state| {
2891 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2892 self_obj.insert(key, Value::String(eid));
2893 }
2894 Ok(())
2895 })?;
2896 }
2897 }
2898
2899 let result = crate::pull::PullResult {
2904 written: all_written,
2905 rejected: all_rejected,
2906 blocked: all_blocked,
2907 advance_cursor_to: all_advance_cursor_to,
2908 };
2909 let events_len = total_seen;
2910
2911 if as_json {
2915 println!(
2916 "{}",
2917 serde_json::to_string(&json!({
2918 "written": result.written,
2919 "rejected": result.rejected,
2920 "total_seen": events_len,
2921 "cursor_blocked": result.blocked,
2922 "cursor_advanced_to": result.advance_cursor_to,
2923 }))?
2924 );
2925 } else {
2926 let blocking = result
2927 .rejected
2928 .iter()
2929 .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2930 .count();
2931 if blocking > 0 {
2932 println!(
2933 "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2934 events_len,
2935 result.written.len(),
2936 result.rejected.len(),
2937 blocking,
2938 );
2939 } else {
2940 println!(
2941 "pulled {} event(s); wrote {}; rejected {}",
2942 events_len,
2943 result.written.len(),
2944 result.rejected.len(),
2945 );
2946 }
2947 }
2948 Ok(())
2949}
2950
2951fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
2956 match scope {
2957 crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
2958 crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
2959 }
2960}
2961
2962fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2965 if !config::is_initialized()? {
2966 bail!("not initialized — run `wire init <handle>` first");
2967 }
2968 let mut state = config::read_relay_state()?;
2969 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2970 if self_state.is_null() {
2971 bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
2972 }
2973 let url = self_state["relay_url"]
2974 .as_str()
2975 .ok_or_else(|| anyhow!("self.relay_url missing"))?
2976 .to_string();
2977 let old_slot_id = self_state["slot_id"]
2978 .as_str()
2979 .ok_or_else(|| anyhow!("self.slot_id missing"))?
2980 .to_string();
2981 let old_slot_token = self_state["slot_token"]
2982 .as_str()
2983 .ok_or_else(|| anyhow!("self.slot_token missing"))?
2984 .to_string();
2985
2986 let card = config::read_agent_card()?;
2988 let did = card
2989 .get("did")
2990 .and_then(Value::as_str)
2991 .unwrap_or("")
2992 .to_string();
2993 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
2994 let pk_b64 = card
2995 .get("verify_keys")
2996 .and_then(Value::as_object)
2997 .and_then(|m| m.values().next())
2998 .and_then(|v| v.get("key"))
2999 .and_then(Value::as_str)
3000 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
3001 .to_string();
3002 let pk_bytes = crate::signing::b64decode(&pk_b64)?;
3003 let sk_seed = config::read_private_key()?;
3004
3005 let normalized = url.trim_end_matches('/').to_string();
3007 let client = crate::relay_client::RelayClient::new(&normalized);
3008 client
3009 .check_healthz()
3010 .context("aborting rotation; old slot still valid")?;
3011 let alloc = client.allocate_slot(Some(&handle))?;
3012 let new_slot_id = alloc.slot_id.clone();
3013 let new_slot_token = alloc.slot_token.clone();
3014
3015 let mut announced: Vec<String> = Vec::new();
3022 if !no_announce {
3023 let now = time::OffsetDateTime::now_utc()
3024 .format(&time::format_description::well_known::Rfc3339)
3025 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
3026 let body = json!({
3027 "reason": "operator-initiated slot rotation",
3028 "new_relay_url": url,
3029 "new_slot_id": new_slot_id,
3030 });
3034 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3035 for (peer_handle, _peer_info) in peers.iter() {
3036 let event = json!({
3037 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3038 "timestamp": now.clone(),
3039 "from": did,
3040 "to": format!("did:wire:{peer_handle}"),
3041 "type": "wire_close",
3042 "kind": 1201,
3043 "body": body.clone(),
3044 });
3045 let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
3046 Ok(s) => s,
3047 Err(e) => {
3048 eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
3049 continue;
3050 }
3051 };
3052 let peer_info = match state["peers"].get(peer_handle) {
3057 Some(p) => p.clone(),
3058 None => continue,
3059 };
3060 let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
3061 let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
3062 let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
3063 if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
3064 continue;
3065 }
3066 let peer_client = if peer_url == url {
3067 client.clone()
3068 } else {
3069 crate::relay_client::RelayClient::new(peer_url)
3070 };
3071 match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
3072 Ok(_) => announced.push(peer_handle.clone()),
3073 Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
3074 }
3075 }
3076 }
3077
3078 state["self"] = json!({
3080 "relay_url": url,
3081 "slot_id": new_slot_id,
3082 "slot_token": new_slot_token,
3083 });
3084 config::write_relay_state(&state)?;
3085
3086 if as_json {
3087 println!(
3088 "{}",
3089 serde_json::to_string(&json!({
3090 "rotated": true,
3091 "old_slot_id": old_slot_id,
3092 "new_slot_id": new_slot_id,
3093 "relay_url": url,
3094 "announced_to": announced,
3095 }))?
3096 );
3097 } else {
3098 println!("rotated slot on {url}");
3099 println!(
3100 " old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
3101 );
3102 println!(" new slot_id: {new_slot_id}");
3103 if !announced.is_empty() {
3104 println!(
3105 " announced wire_close (kind=1201) to: {}",
3106 announced.join(", ")
3107 );
3108 }
3109 println!();
3110 println!("next steps:");
3111 println!(" - peers see the wire_close event in their next `wire pull`");
3112 println!(
3113 " - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
3114 );
3115 println!(" (or full re-pair via `wire pair-host`/`wire join`)");
3116 println!(" - until they do, you'll receive but they won't be able to reach you");
3117 let _ = old_slot_token;
3119 }
3120 Ok(())
3121}
3122
3123fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
3126 let mut trust = config::read_trust()?;
3127 let mut removed_from_trust = false;
3128 if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
3129 && agents.remove(handle).is_some()
3130 {
3131 removed_from_trust = true;
3132 }
3133 config::write_trust(&trust)?;
3134
3135 let mut state = config::read_relay_state()?;
3136 let mut removed_from_relay = false;
3137 if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
3138 && peers.remove(handle).is_some()
3139 {
3140 removed_from_relay = true;
3141 }
3142 config::write_relay_state(&state)?;
3143
3144 let mut purged: Vec<String> = Vec::new();
3145 if purge {
3146 for dir in [config::inbox_dir()?, config::outbox_dir()?] {
3147 let path = dir.join(format!("{handle}.jsonl"));
3148 if path.exists() {
3149 std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
3150 purged.push(path.to_string_lossy().into());
3151 }
3152 }
3153 }
3154
3155 if !removed_from_trust && !removed_from_relay {
3156 if as_json {
3157 println!(
3158 "{}",
3159 serde_json::to_string(&json!({
3160 "removed": false,
3161 "reason": format!("peer {handle:?} not pinned"),
3162 }))?
3163 );
3164 } else {
3165 eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
3166 }
3167 return Ok(());
3168 }
3169
3170 if as_json {
3171 println!(
3172 "{}",
3173 serde_json::to_string(&json!({
3174 "handle": handle,
3175 "removed_from_trust": removed_from_trust,
3176 "removed_from_relay_state": removed_from_relay,
3177 "purged_files": purged,
3178 }))?
3179 );
3180 } else {
3181 println!("forgot peer {handle:?}");
3182 if removed_from_trust {
3183 println!(" - removed from trust.json");
3184 }
3185 if removed_from_relay {
3186 println!(" - removed from relay.json");
3187 }
3188 if !purged.is_empty() {
3189 for p in &purged {
3190 println!(" - deleted {p}");
3191 }
3192 } else if !purge {
3193 println!(" (inbox/outbox files preserved; pass --purge to delete them)");
3194 }
3195 }
3196 Ok(())
3197}
3198
3199fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
3202 if !config::is_initialized()? {
3203 bail!("not initialized — run `wire init <handle>` first");
3204 }
3205 let interval = std::time::Duration::from_secs(interval_secs.max(1));
3206
3207 if !as_json {
3208 if once {
3209 eprintln!("wire daemon: single sync cycle, then exit");
3210 } else {
3211 eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
3212 }
3213 }
3214
3215 if let Err(e) = crate::pending_pair::cleanup_on_startup() {
3219 eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
3220 }
3221
3222 let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
3228 if !once {
3229 crate::daemon_stream::spawn_stream_subscriber(wake_tx);
3230 }
3231
3232 loop {
3233 let pushed = run_sync_push().unwrap_or_else(|e| {
3234 eprintln!("daemon: push error: {e:#}");
3235 json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
3236 });
3237 let pulled = run_sync_pull().unwrap_or_else(|e| {
3238 eprintln!("daemon: pull error: {e:#}");
3239 json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
3240 });
3241 let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
3242 eprintln!("daemon: pending-pair tick error: {e:#}");
3243 json!({"transitions": []})
3244 });
3245
3246 if as_json {
3247 println!(
3248 "{}",
3249 serde_json::to_string(&json!({
3250 "ts": time::OffsetDateTime::now_utc()
3251 .format(&time::format_description::well_known::Rfc3339)
3252 .unwrap_or_default(),
3253 "push": pushed,
3254 "pull": pulled,
3255 "pairs": pairs,
3256 }))?
3257 );
3258 } else {
3259 let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
3260 let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
3261 let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
3262 let pair_transitions = pairs["transitions"]
3263 .as_array()
3264 .map(|a| a.len())
3265 .unwrap_or(0);
3266 if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
3267 eprintln!(
3268 "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
3269 );
3270 }
3271 if let Some(arr) = pairs["transitions"].as_array() {
3273 for t in arr {
3274 eprintln!(
3275 " pair {} : {} → {}",
3276 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3277 t.get("from").and_then(Value::as_str).unwrap_or("?"),
3278 t.get("to").and_then(Value::as_str).unwrap_or("?")
3279 );
3280 if let Some(sas) = t.get("sas").and_then(Value::as_str)
3281 && t.get("to").and_then(Value::as_str) == Some("sas_ready")
3282 {
3283 eprintln!(" SAS digits: {}-{}", &sas[..3], &sas[3..]);
3284 eprintln!(
3285 " Run: wire pair-confirm {} {}",
3286 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3287 sas
3288 );
3289 }
3290 }
3291 }
3292 }
3293
3294 if once {
3295 return Ok(());
3296 }
3297 let _ = wake_rx.recv_timeout(interval);
3302 while wake_rx.try_recv().is_ok() {}
3303 }
3304}
3305
3306fn run_sync_push() -> Result<Value> {
3309 let state = config::read_relay_state()?;
3310 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3311 if peers.is_empty() {
3312 return Ok(json!({"pushed": [], "skipped": []}));
3313 }
3314 let outbox_dir = config::outbox_dir()?;
3315 if !outbox_dir.exists() {
3316 return Ok(json!({"pushed": [], "skipped": []}));
3317 }
3318 let mut pushed = Vec::new();
3319 let mut skipped = Vec::new();
3320 for (peer_handle, slot_info) in peers.iter() {
3321 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
3322 if !outbox.exists() {
3323 continue;
3324 }
3325 let url = slot_info["relay_url"].as_str().unwrap_or("");
3326 let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
3327 let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
3328 if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
3329 continue;
3330 }
3331 let client = crate::relay_client::RelayClient::new(url);
3332 let body = std::fs::read_to_string(&outbox)?;
3333 for line in body.lines() {
3334 let event: Value = match serde_json::from_str(line) {
3335 Ok(v) => v,
3336 Err(_) => continue,
3337 };
3338 let event_id = event
3339 .get("event_id")
3340 .and_then(Value::as_str)
3341 .unwrap_or("")
3342 .to_string();
3343 match client.post_event(slot_id, slot_token, &event) {
3344 Ok(resp) => {
3345 if resp.status == "duplicate" {
3346 skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
3347 } else {
3348 pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
3349 }
3350 }
3351 Err(e) => {
3352 let reason = crate::relay_client::format_transport_error(&e);
3356 skipped.push(
3357 json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
3358 );
3359 }
3360 }
3361 }
3362 }
3363 Ok(json!({"pushed": pushed, "skipped": skipped}))
3364}
3365
3366fn run_sync_pull() -> Result<Value> {
3368 let state = config::read_relay_state()?;
3369 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3370 if self_state.is_null() {
3371 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3372 }
3373 let url = self_state["relay_url"].as_str().unwrap_or("");
3374 let slot_id = self_state["slot_id"].as_str().unwrap_or("");
3375 let slot_token = self_state["slot_token"].as_str().unwrap_or("");
3376 let last_event_id = self_state
3377 .get("last_pulled_event_id")
3378 .and_then(Value::as_str)
3379 .map(str::to_string);
3380 if url.is_empty() {
3381 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3382 }
3383 let client = crate::relay_client::RelayClient::new(url);
3384 let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3385 let inbox_dir = config::inbox_dir()?;
3386 config::ensure_dirs()?;
3387
3388 let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3392
3393 if let Some(eid) = &result.advance_cursor_to {
3395 let eid = eid.clone();
3396 config::update_relay_state(|state| {
3397 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3398 self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3399 }
3400 Ok(())
3401 })?;
3402 }
3403
3404 Ok(json!({
3405 "written": result.written,
3406 "rejected": result.rejected,
3407 "total_seen": events.len(),
3408 "cursor_blocked": result.blocked,
3409 "cursor_advanced_to": result.advance_cursor_to,
3410 }))
3411}
3412
3413fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3416 let body =
3417 std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3418 let card: Value =
3419 serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3420 crate::agent_card::verify_agent_card(&card)
3421 .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3422
3423 let mut trust = config::read_trust()?;
3424 crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3425
3426 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3427 let handle = crate::agent_card::display_handle_from_did(did).to_string();
3428 config::write_trust(&trust)?;
3429
3430 if as_json {
3431 println!(
3432 "{}",
3433 serde_json::to_string(&json!({
3434 "handle": handle,
3435 "did": did,
3436 "tier": "VERIFIED",
3437 "pinned": true,
3438 }))?
3439 );
3440 } else {
3441 println!("pinned {handle} ({did}) at tier VERIFIED");
3442 }
3443 Ok(())
3444}
3445
3446fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3449 pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3450}
3451
3452fn cmd_pair_join(
3453 code_phrase: &str,
3454 relay_url: &str,
3455 auto_yes: bool,
3456 timeout_secs: u64,
3457) -> Result<()> {
3458 pair_orchestrate(
3459 relay_url,
3460 Some(code_phrase),
3461 "guest",
3462 auto_yes,
3463 timeout_secs,
3464 )
3465}
3466
3467fn pair_orchestrate(
3473 relay_url: &str,
3474 code_in: Option<&str>,
3475 role: &str,
3476 auto_yes: bool,
3477 timeout_secs: u64,
3478) -> Result<()> {
3479 use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3480
3481 let mut s = pair_session_open(role, relay_url, code_in)?;
3482
3483 if role == "host" {
3484 eprintln!();
3485 eprintln!("share this code phrase with your peer:");
3486 eprintln!();
3487 eprintln!(" {}", s.code);
3488 eprintln!();
3489 eprintln!(
3490 "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3491 s.code
3492 );
3493 } else {
3494 eprintln!();
3495 eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3496 }
3497
3498 const HEARTBEAT_SECS: u64 = 10;
3503 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3504 let started = std::time::Instant::now();
3505 let mut last_heartbeat = started;
3506 let formatted = loop {
3507 if let Some(sas) = pair_session_try_sas(&mut s)? {
3508 break sas;
3509 }
3510 let now = std::time::Instant::now();
3511 if now >= deadline {
3512 return Err(anyhow!(
3513 "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3514 ));
3515 }
3516 if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3517 let elapsed = now.duration_since(started).as_secs();
3518 eprintln!(" ... still waiting ({elapsed}s / {timeout_secs}s)");
3519 last_heartbeat = now;
3520 }
3521 std::thread::sleep(std::time::Duration::from_millis(250));
3522 };
3523
3524 eprintln!();
3525 eprintln!("SAS digits (must match peer's terminal):");
3526 eprintln!();
3527 eprintln!(" {formatted}");
3528 eprintln!();
3529
3530 if !auto_yes {
3533 eprint!("does this match your peer's terminal? [y/N]: ");
3534 use std::io::Write;
3535 std::io::stderr().flush().ok();
3536 let mut input = String::new();
3537 std::io::stdin().read_line(&mut input)?;
3538 let trimmed = input.trim().to_lowercase();
3539 if trimmed != "y" && trimmed != "yes" {
3540 bail!("SAS confirmation declined — aborting pairing");
3541 }
3542 }
3543 s.sas_confirmed = true;
3544
3545 let result = pair_session_finalize(&mut s, timeout_secs)?;
3547
3548 let peer_did = result["paired_with"].as_str().unwrap_or("");
3549 let peer_role = if role == "host" { "guest" } else { "host" };
3550 eprintln!("paired with {peer_did} (peer role: {peer_role})");
3551 eprintln!("peer card pinned at tier VERIFIED");
3552 eprintln!(
3553 "peer relay slot saved to {}",
3554 config::relay_state_path()?.display()
3555 );
3556
3557 println!("{}", serde_json::to_string(&result)?);
3558 Ok(())
3559}
3560
3561fn cmd_pair(
3567 handle: &str,
3568 code: Option<&str>,
3569 relay: &str,
3570 auto_yes: bool,
3571 timeout_secs: u64,
3572 no_setup: bool,
3573) -> Result<()> {
3574 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3577 let did = init_result
3578 .get("did")
3579 .and_then(|v| v.as_str())
3580 .unwrap_or("(unknown)")
3581 .to_string();
3582 let already = init_result
3583 .get("already_initialized")
3584 .and_then(|v| v.as_bool())
3585 .unwrap_or(false);
3586 if already {
3587 println!("(identity {did} already initialized — reusing)");
3588 } else {
3589 println!("initialized {did}");
3590 }
3591 println!();
3592
3593 match code {
3595 None => {
3596 println!("hosting pair on {relay} (no code = host) ...");
3597 cmd_pair_host(relay, auto_yes, timeout_secs)?;
3598 }
3599 Some(c) => {
3600 println!("joining pair with code {c} on {relay} ...");
3601 cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3602 }
3603 }
3604
3605 if !no_setup {
3607 println!();
3608 println!("registering wire as MCP server in detected client configs ...");
3609 if let Err(e) = cmd_setup(true) {
3610 eprintln!("warn: setup --apply failed: {e}");
3612 eprintln!(" pair succeeded; you can re-run `wire setup --apply` manually.");
3613 }
3614 }
3615
3616 println!();
3617 println!("pair complete. Next steps:");
3618 println!(" wire daemon start # background sync of inbox/outbox vs relay");
3619 println!(" wire send <peer> claim <msg> # send your peer something");
3620 println!(" wire tail # watch incoming events");
3621 Ok(())
3622}
3623
3624fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3630 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3631 let did = init_result
3632 .get("did")
3633 .and_then(|v| v.as_str())
3634 .unwrap_or("(unknown)")
3635 .to_string();
3636 let already = init_result
3637 .get("already_initialized")
3638 .and_then(|v| v.as_bool())
3639 .unwrap_or(false);
3640 if already {
3641 println!("(identity {did} already initialized — reusing)");
3642 } else {
3643 println!("initialized {did}");
3644 }
3645 println!();
3646 match code {
3647 None => cmd_pair_host_detach(relay, false),
3648 Some(c) => cmd_pair_join_detach(c, relay, false),
3649 }
3650}
3651
3652fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3653 if !config::is_initialized()? {
3654 bail!("not initialized — run `wire init <handle>` first");
3655 }
3656 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3657 Ok(b) => b,
3658 Err(e) => {
3659 if !as_json {
3660 eprintln!(
3661 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3662 );
3663 }
3664 false
3665 }
3666 };
3667 let code = crate::sas::generate_code_phrase();
3668 let code_hash = crate::pair_session::derive_code_hash(&code);
3669 let now = time::OffsetDateTime::now_utc()
3670 .format(&time::format_description::well_known::Rfc3339)
3671 .unwrap_or_default();
3672 let p = crate::pending_pair::PendingPair {
3673 code: code.clone(),
3674 code_hash,
3675 role: "host".to_string(),
3676 relay_url: relay_url.to_string(),
3677 status: "request_host".to_string(),
3678 sas: None,
3679 peer_did: None,
3680 created_at: now,
3681 last_error: None,
3682 pair_id: None,
3683 our_slot_id: None,
3684 our_slot_token: None,
3685 spake2_seed_b64: None,
3686 };
3687 crate::pending_pair::write_pending(&p)?;
3688 if as_json {
3689 println!(
3690 "{}",
3691 serde_json::to_string(&json!({
3692 "state": "queued",
3693 "code_phrase": code,
3694 "relay_url": relay_url,
3695 "role": "host",
3696 "daemon_spawned": daemon_spawned,
3697 }))?
3698 );
3699 } else {
3700 if daemon_spawned {
3701 println!("(started wire daemon in background)");
3702 }
3703 println!("detached pair-host queued. Share this code with your peer:\n");
3704 println!(" {code}\n");
3705 println!("Next steps:");
3706 println!(" wire pair-list # check status");
3707 println!(" wire pair-confirm {code} <digits> # when SAS shows up");
3708 println!(" wire pair-cancel {code} # to abort");
3709 }
3710 Ok(())
3711}
3712
3713fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3714 if !config::is_initialized()? {
3715 bail!("not initialized — run `wire init <handle>` first");
3716 }
3717 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3718 Ok(b) => b,
3719 Err(e) => {
3720 if !as_json {
3721 eprintln!(
3722 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3723 );
3724 }
3725 false
3726 }
3727 };
3728 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3729 let code_hash = crate::pair_session::derive_code_hash(&code);
3730 let now = time::OffsetDateTime::now_utc()
3731 .format(&time::format_description::well_known::Rfc3339)
3732 .unwrap_or_default();
3733 let p = crate::pending_pair::PendingPair {
3734 code: code.clone(),
3735 code_hash,
3736 role: "guest".to_string(),
3737 relay_url: relay_url.to_string(),
3738 status: "request_guest".to_string(),
3739 sas: None,
3740 peer_did: None,
3741 created_at: now,
3742 last_error: None,
3743 pair_id: None,
3744 our_slot_id: None,
3745 our_slot_token: None,
3746 spake2_seed_b64: None,
3747 };
3748 crate::pending_pair::write_pending(&p)?;
3749 if as_json {
3750 println!(
3751 "{}",
3752 serde_json::to_string(&json!({
3753 "state": "queued",
3754 "code_phrase": code,
3755 "relay_url": relay_url,
3756 "role": "guest",
3757 "daemon_spawned": daemon_spawned,
3758 }))?
3759 );
3760 } else {
3761 if daemon_spawned {
3762 println!("(started wire daemon in background)");
3763 }
3764 println!("detached pair-join queued for code {code}.");
3765 println!(
3766 "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3767 );
3768 }
3769 Ok(())
3770}
3771
3772fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3773 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3774 let typed: String = typed_digits
3775 .chars()
3776 .filter(|c| c.is_ascii_digit())
3777 .collect();
3778 if typed.len() != 6 {
3779 bail!(
3780 "expected 6 digits (got {} after stripping non-digits)",
3781 typed.len()
3782 );
3783 }
3784 let mut p = crate::pending_pair::read_pending(&code)?
3785 .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3786 if p.status != "sas_ready" {
3787 bail!(
3788 "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3789 p.status
3790 );
3791 }
3792 let stored = p
3793 .sas
3794 .as_ref()
3795 .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3796 .clone();
3797 if stored == typed {
3798 p.status = "confirmed".to_string();
3799 crate::pending_pair::write_pending(&p)?;
3800 if as_json {
3801 println!(
3802 "{}",
3803 serde_json::to_string(&json!({
3804 "state": "confirmed",
3805 "code_phrase": code,
3806 }))?
3807 );
3808 } else {
3809 println!("digits match. Daemon will finalize the handshake on its next tick.");
3810 println!("Run `wire peers` after a few seconds to confirm.");
3811 }
3812 } else {
3813 p.status = "aborted".to_string();
3814 p.last_error = Some(format!(
3815 "SAS digit mismatch (typed {typed}, expected {stored})"
3816 ));
3817 let client = crate::relay_client::RelayClient::new(&p.relay_url);
3818 let _ = client.pair_abandon(&p.code_hash);
3819 crate::pending_pair::write_pending(&p)?;
3820 crate::os_notify::toast(
3821 &format!("wire — pair aborted ({})", p.code),
3822 p.last_error.as_deref().unwrap_or("digits mismatch"),
3823 );
3824 if as_json {
3825 println!(
3826 "{}",
3827 serde_json::to_string(&json!({
3828 "state": "aborted",
3829 "code_phrase": code,
3830 "error": "digits mismatch",
3831 }))?
3832 );
3833 }
3834 bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3835 }
3836 Ok(())
3837}
3838
3839fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3840 if watch {
3841 return cmd_pair_list_watch(watch_interval_secs);
3842 }
3843 let spake2_items = crate::pending_pair::list_pending()?;
3844 let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
3845 if as_json {
3846 println!("{}", serde_json::to_string(&spake2_items)?);
3851 return Ok(());
3852 }
3853 if spake2_items.is_empty() && inbound_items.is_empty() {
3854 println!("no pending pair sessions.");
3855 return Ok(());
3856 }
3857 if !inbound_items.is_empty() {
3860 println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
3861 println!(
3862 "{:<20} {:<35} {:<25} NEXT STEP",
3863 "PEER", "RELAY", "RECEIVED"
3864 );
3865 for p in &inbound_items {
3866 println!(
3867 "{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
3868 p.peer_handle,
3869 p.peer_relay_url,
3870 p.received_at,
3871 peer = p.peer_handle,
3872 );
3873 }
3874 println!();
3875 }
3876 if !spake2_items.is_empty() {
3877 println!("SPAKE2 SESSIONS");
3878 println!(
3879 "{:<15} {:<8} {:<18} {:<10} NOTE",
3880 "CODE", "ROLE", "STATUS", "SAS"
3881 );
3882 for p in spake2_items {
3883 let sas = p
3884 .sas
3885 .as_ref()
3886 .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3887 .unwrap_or_else(|| "—".to_string());
3888 let note = p
3889 .last_error
3890 .as_deref()
3891 .or(p.peer_did.as_deref())
3892 .unwrap_or("");
3893 println!(
3894 "{:<15} {:<8} {:<18} {:<10} {}",
3895 p.code, p.role, p.status, sas, note
3896 );
3897 }
3898 }
3899 Ok(())
3900}
3901
3902fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3914 use std::collections::HashMap;
3915 use std::io::Write;
3916 let interval = std::time::Duration::from_secs(interval_secs.max(1));
3917 let mut prev: HashMap<String, String> = HashMap::new();
3920 {
3921 let items = crate::pending_pair::list_pending()?;
3922 for p in &items {
3923 println!("{}", serde_json::to_string(&p)?);
3924 prev.insert(p.code.clone(), p.status.clone());
3925 }
3926 let _ = std::io::stdout().flush();
3928 }
3929 loop {
3930 std::thread::sleep(interval);
3931 let items = match crate::pending_pair::list_pending() {
3932 Ok(v) => v,
3933 Err(_) => continue,
3934 };
3935 let mut cur: HashMap<String, String> = HashMap::new();
3936 for p in &items {
3937 cur.insert(p.code.clone(), p.status.clone());
3938 match prev.get(&p.code) {
3939 None => {
3940 println!("{}", serde_json::to_string(&p)?);
3942 }
3943 Some(prev_status) if prev_status != &p.status => {
3944 println!("{}", serde_json::to_string(&p)?);
3946 }
3947 _ => {}
3948 }
3949 }
3950 for code in prev.keys() {
3951 if !cur.contains_key(code) {
3952 println!(
3955 "{}",
3956 serde_json::to_string(&json!({
3957 "code": code,
3958 "status": "removed",
3959 "_synthetic": true,
3960 }))?
3961 );
3962 }
3963 }
3964 let _ = std::io::stdout().flush();
3965 prev = cur;
3966 }
3967}
3968
3969fn cmd_pair_watch(
3973 code_phrase: &str,
3974 target_status: &str,
3975 timeout_secs: u64,
3976 as_json: bool,
3977) -> Result<()> {
3978 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3979 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3980 let mut last_seen_status: Option<String> = None;
3981 loop {
3982 let p_opt = crate::pending_pair::read_pending(&code)?;
3983 let now = std::time::Instant::now();
3984 match p_opt {
3985 None => {
3986 if last_seen_status.is_some() {
3990 if as_json {
3991 println!(
3992 "{}",
3993 serde_json::to_string(&json!({"state": "finalized", "code": code}))?
3994 );
3995 } else {
3996 println!("pair {code} finalized (file removed)");
3997 }
3998 return Ok(());
3999 } else {
4000 if as_json {
4001 println!(
4002 "{}",
4003 serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
4004 );
4005 }
4006 std::process::exit(1);
4007 }
4008 }
4009 Some(p) => {
4010 let cur = p.status.clone();
4011 if Some(cur.clone()) != last_seen_status {
4012 if as_json {
4013 println!("{}", serde_json::to_string(&p)?);
4015 }
4016 last_seen_status = Some(cur.clone());
4017 }
4018 if cur == target_status {
4019 if !as_json {
4020 let sas_str = p
4021 .sas
4022 .as_ref()
4023 .map(|s| format!("{}-{}", &s[..3], &s[3..]))
4024 .unwrap_or_else(|| "—".to_string());
4025 println!("pair {code} reached {target_status} (SAS: {sas_str})");
4026 }
4027 return Ok(());
4028 }
4029 if cur == "aborted" || cur == "aborted_restart" {
4030 if !as_json {
4031 let err = p.last_error.as_deref().unwrap_or("(no detail)");
4032 eprintln!("pair {code} {cur}: {err}");
4033 }
4034 std::process::exit(1);
4035 }
4036 }
4037 }
4038 if now >= deadline {
4039 if !as_json {
4040 eprintln!(
4041 "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
4042 );
4043 }
4044 std::process::exit(2);
4045 }
4046 std::thread::sleep(std::time::Duration::from_millis(250));
4047 }
4048}
4049
4050fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
4051 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4052 let p = crate::pending_pair::read_pending(&code)?
4053 .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
4054 let client = crate::relay_client::RelayClient::new(&p.relay_url);
4055 let _ = client.pair_abandon(&p.code_hash);
4056 crate::pending_pair::delete_pending(&code)?;
4057 if as_json {
4058 println!(
4059 "{}",
4060 serde_json::to_string(&json!({
4061 "state": "cancelled",
4062 "code_phrase": code,
4063 }))?
4064 );
4065 } else {
4066 println!("cancelled pending pair {code} (relay slot released, file removed).");
4067 }
4068 Ok(())
4069}
4070
4071fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
4074 let code = crate::sas::parse_code_phrase(code_phrase)?;
4077 let code_hash = crate::pair_session::derive_code_hash(code);
4078 let client = crate::relay_client::RelayClient::new(relay_url);
4079 client.pair_abandon(&code_hash)?;
4080 println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
4081 println!("host can now issue a fresh code; guest can re-join.");
4082 Ok(())
4083}
4084
4085fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
4088 let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
4089
4090 let share_payload: Option<Value> = if share {
4093 let client = reqwest::blocking::Client::new();
4094 let single_use = if uses == 1 { Some(1u32) } else { None };
4095 let body = json!({
4096 "invite_url": url,
4097 "ttl_seconds": ttl,
4098 "uses": single_use,
4099 });
4100 let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
4101 let resp = client.post(&endpoint).json(&body).send()?;
4102 if !resp.status().is_success() {
4103 let code = resp.status();
4104 let txt = resp.text().unwrap_or_default();
4105 bail!("relay {code} on /v1/invite/register: {txt}");
4106 }
4107 let parsed: Value = resp.json()?;
4108 let token = parsed
4109 .get("token")
4110 .and_then(Value::as_str)
4111 .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
4112 .to_string();
4113 let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
4114 let curl_line = format!("curl -fsSL {share_url} | sh");
4115 Some(json!({
4116 "token": token,
4117 "share_url": share_url,
4118 "curl": curl_line,
4119 "expires_unix": parsed.get("expires_unix"),
4120 }))
4121 } else {
4122 None
4123 };
4124
4125 if as_json {
4126 let mut out = json!({
4127 "invite_url": url,
4128 "ttl_secs": ttl,
4129 "uses": uses,
4130 "relay": relay,
4131 });
4132 if let Some(s) = &share_payload {
4133 out["share"] = s.clone();
4134 }
4135 println!("{}", serde_json::to_string(&out)?);
4136 } else if let Some(s) = share_payload {
4137 let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
4138 eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
4139 eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
4140 println!("{curl}");
4141 } else {
4142 eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
4143 eprintln!("# TTL: {ttl}s. Uses: {uses}.");
4144 println!("{url}");
4145 }
4146 Ok(())
4147}
4148
4149fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
4150 let resolved = if url.starts_with("http://") || url.starts_with("https://") {
4154 let sep = if url.contains('?') { '&' } else { '?' };
4155 let resolve_url = format!("{url}{sep}format=url");
4156 let client = reqwest::blocking::Client::new();
4157 let resp = client
4158 .get(&resolve_url)
4159 .send()
4160 .with_context(|| format!("GET {resolve_url}"))?;
4161 if !resp.status().is_success() {
4162 bail!("could not resolve short URL {url} (HTTP {})", resp.status());
4163 }
4164 let body = resp.text().unwrap_or_default().trim().to_string();
4165 if !body.starts_with("wire://pair?") {
4166 bail!(
4167 "short URL {url} did not resolve to a wire:// invite. \
4168 (got: {}{})",
4169 body.chars().take(80).collect::<String>(),
4170 if body.chars().count() > 80 { "…" } else { "" }
4171 );
4172 }
4173 body
4174 } else {
4175 url.to_string()
4176 };
4177
4178 let result = crate::pair_invite::accept_invite(&resolved)?;
4179 if as_json {
4180 println!("{}", serde_json::to_string(&result)?);
4181 } else {
4182 let did = result
4183 .get("paired_with")
4184 .and_then(Value::as_str)
4185 .unwrap_or("?");
4186 println!("paired with {did}");
4187 println!(
4188 "you can now: wire send {} <kind> <body>",
4189 crate::agent_card::display_handle_from_did(did)
4190 );
4191 }
4192 Ok(())
4193}
4194
4195fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
4198 if let Some(h) = handle {
4199 let parsed = crate::pair_profile::parse_handle(h)?;
4200 if config::is_initialized()? {
4203 let card = config::read_agent_card()?;
4204 let local_handle = card
4205 .get("profile")
4206 .and_then(|p| p.get("handle"))
4207 .and_then(Value::as_str)
4208 .map(str::to_string);
4209 if local_handle.as_deref() == Some(h) {
4210 return cmd_whois(None, as_json, None);
4211 }
4212 }
4213 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4215 if as_json {
4216 println!("{}", serde_json::to_string(&resolved)?);
4217 } else {
4218 print_resolved_profile(&resolved);
4219 }
4220 return Ok(());
4221 }
4222 let card = config::read_agent_card()?;
4223 if as_json {
4224 let profile = card.get("profile").cloned().unwrap_or(Value::Null);
4225 println!(
4226 "{}",
4227 serde_json::to_string(&json!({
4228 "did": card.get("did").cloned().unwrap_or(Value::Null),
4229 "profile": profile,
4230 }))?
4231 );
4232 } else {
4233 print!("{}", crate::pair_profile::render_self_summary()?);
4234 }
4235 Ok(())
4236}
4237
4238fn print_resolved_profile(resolved: &Value) {
4239 let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
4240 let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
4241 let relay = resolved
4242 .get("relay_url")
4243 .and_then(Value::as_str)
4244 .unwrap_or("");
4245 let slot = resolved
4246 .get("slot_id")
4247 .and_then(Value::as_str)
4248 .unwrap_or("");
4249 let profile = resolved
4250 .get("card")
4251 .and_then(|c| c.get("profile"))
4252 .cloned()
4253 .unwrap_or(Value::Null);
4254 println!("{did}");
4255 println!(" nick: {nick}");
4256 if !relay.is_empty() {
4257 println!(" relay_url: {relay}");
4258 }
4259 if !slot.is_empty() {
4260 println!(" slot_id: {slot}");
4261 }
4262 let pick =
4263 |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
4264 if let Some(s) = pick("display_name") {
4265 println!(" display_name: {s}");
4266 }
4267 if let Some(s) = pick("emoji") {
4268 println!(" emoji: {s}");
4269 }
4270 if let Some(s) = pick("motto") {
4271 println!(" motto: {s}");
4272 }
4273 if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
4274 let joined: Vec<String> = arr
4275 .iter()
4276 .filter_map(|v| v.as_str().map(str::to_string))
4277 .collect();
4278 println!(" vibe: {}", joined.join(", "));
4279 }
4280 if let Some(s) = pick("pronouns") {
4281 println!(" pronouns: {s}");
4282 }
4283}
4284
4285fn host_of_url(url: &str) -> String {
4293 let no_scheme = url
4294 .trim_start_matches("https://")
4295 .trim_start_matches("http://");
4296 no_scheme
4297 .split('/')
4298 .next()
4299 .unwrap_or("")
4300 .split(':')
4301 .next()
4302 .unwrap_or("")
4303 .to_string()
4304}
4305
4306fn is_known_relay_domain(peer_domain: &str, our_relay_url: &str) -> bool {
4310 const KNOWN_GOOD: &[&str] = &["wireup.net", "wire.laulpogan.com"];
4312 let peer_domain = peer_domain.trim().to_ascii_lowercase();
4313 if KNOWN_GOOD.iter().any(|k| *k == peer_domain) {
4314 return true;
4315 }
4316 let our_host = host_of_url(our_relay_url).to_ascii_lowercase();
4319 if !our_host.is_empty() && our_host == peer_domain {
4320 return true;
4321 }
4322 false
4323}
4324
4325fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
4326 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4327
4328 let (our_did, our_relay, our_slot_id, our_slot_token) =
4330 crate::pair_invite::ensure_self_with_relay(relay_override)?;
4331 if our_did == format!("did:wire:{}", parsed.nick) {
4332 bail!("refusing to add self (handle matches own DID)");
4334 }
4335
4336 if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
4346 return cmd_add_accept_pending(
4347 handle_arg,
4348 &parsed.nick,
4349 &pending,
4350 &our_relay,
4351 &our_slot_id,
4352 &our_slot_token,
4353 as_json,
4354 );
4355 }
4356
4357 if !is_known_relay_domain(&parsed.domain, &our_relay) {
4374 eprintln!(
4375 "wire add: WARN unfamiliar relay domain `{}`.",
4376 parsed.domain
4377 );
4378 eprintln!(
4379 " This is NOT `wireup.net` (the default), NOT your own relay (`{}`), "
4380 ,
4381 host_of_url(&our_relay)
4382 );
4383 eprintln!(
4384 " and not on the known-good list. If you meant `{}@wireup.net`, "
4385 ,
4386 parsed.nick
4387 );
4388 eprintln!(
4389 " run `wire add {}@wireup.net` instead. Otherwise verify with your",
4390 parsed.nick
4391 );
4392 eprintln!(" peer out-of-band that they actually run a relay at this domain");
4393 eprintln!(" before relying on the pair. (See issue #9.4.)");
4394 }
4395
4396 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4398 let peer_card = resolved
4399 .get("card")
4400 .cloned()
4401 .ok_or_else(|| anyhow!("resolved missing card"))?;
4402 let peer_did = resolved
4403 .get("did")
4404 .and_then(Value::as_str)
4405 .ok_or_else(|| anyhow!("resolved missing did"))?
4406 .to_string();
4407 let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
4408 let peer_slot_id = resolved
4409 .get("slot_id")
4410 .and_then(Value::as_str)
4411 .ok_or_else(|| anyhow!("resolved missing slot_id"))?
4412 .to_string();
4413 let peer_relay = resolved
4414 .get("relay_url")
4415 .and_then(Value::as_str)
4416 .map(str::to_string)
4417 .or_else(|| relay_override.map(str::to_string))
4418 .unwrap_or_else(|| format!("https://{}", parsed.domain));
4419
4420 let mut trust = config::read_trust()?;
4422 crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
4423 config::write_trust(&trust)?;
4424 let mut relay_state = config::read_relay_state()?;
4425 let existing_token = relay_state
4426 .get("peers")
4427 .and_then(|p| p.get(&peer_handle))
4428 .and_then(|p| p.get("slot_token"))
4429 .and_then(Value::as_str)
4430 .map(str::to_string)
4431 .unwrap_or_default();
4432 relay_state["peers"][&peer_handle] = json!({
4433 "relay_url": peer_relay,
4434 "slot_id": peer_slot_id,
4435 "slot_token": existing_token, });
4437 config::write_relay_state(&relay_state)?;
4438
4439 let our_card = config::read_agent_card()?;
4442 let sk_seed = config::read_private_key()?;
4443 let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4444 let pk_b64 = our_card
4445 .get("verify_keys")
4446 .and_then(Value::as_object)
4447 .and_then(|m| m.values().next())
4448 .and_then(|v| v.get("key"))
4449 .and_then(Value::as_str)
4450 .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4451 let pk_bytes = crate::signing::b64decode(pk_b64)?;
4452 let now = time::OffsetDateTime::now_utc()
4453 .format(&time::format_description::well_known::Rfc3339)
4454 .unwrap_or_default();
4455 let our_relay_state = config::read_relay_state().unwrap_or_else(|_| json!({}));
4460 let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4461 let mut body = json!({
4462 "card": our_card,
4463 "relay_url": our_relay,
4464 "slot_id": our_slot_id,
4465 "slot_token": our_slot_token,
4466 });
4467 if !our_endpoints.is_empty() {
4468 body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4469 }
4470 let event = json!({
4471 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4472 "timestamp": now,
4473 "from": our_did,
4474 "to": peer_did,
4475 "type": "pair_drop",
4476 "kind": 1100u32,
4477 "body": body,
4478 });
4479 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4480
4481 let client = crate::relay_client::RelayClient::new(&peer_relay);
4483 let resp = client.handle_intro(&parsed.nick, &signed)?;
4484 let event_id = signed
4485 .get("event_id")
4486 .and_then(Value::as_str)
4487 .unwrap_or("")
4488 .to_string();
4489
4490 if as_json {
4491 println!(
4492 "{}",
4493 serde_json::to_string(&json!({
4494 "handle": handle_arg,
4495 "paired_with": peer_did,
4496 "peer_handle": peer_handle,
4497 "event_id": event_id,
4498 "drop_response": resp,
4499 "status": "drop_sent",
4500 }))?
4501 );
4502 } else {
4503 println!(
4504 "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
4505 );
4506 }
4507 Ok(())
4508}
4509
4510fn cmd_add_accept_pending(
4517 handle_arg: &str,
4518 peer_nick: &str,
4519 pending: &crate::pending_inbound_pair::PendingInboundPair,
4520 _our_relay: &str,
4521 _our_slot_id: &str,
4522 _our_slot_token: &str,
4523 as_json: bool,
4524) -> Result<()> {
4525 let mut trust = config::read_trust()?;
4528 crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
4529 config::write_trust(&trust)?;
4530
4531 let mut relay_state = config::read_relay_state()?;
4537 let endpoints_to_pin = if pending.peer_endpoints.is_empty() {
4538 vec![crate::endpoints::Endpoint::federation(
4539 pending.peer_relay_url.clone(),
4540 pending.peer_slot_id.clone(),
4541 pending.peer_slot_token.clone(),
4542 )]
4543 } else {
4544 pending.peer_endpoints.clone()
4545 };
4546 crate::endpoints::pin_peer_endpoints(
4547 &mut relay_state,
4548 &pending.peer_handle,
4549 &endpoints_to_pin,
4550 )?;
4551 config::write_relay_state(&relay_state)?;
4552
4553 crate::pair_invite::send_pair_drop_ack(
4555 &pending.peer_handle,
4556 &pending.peer_relay_url,
4557 &pending.peer_slot_id,
4558 &pending.peer_slot_token,
4559 )
4560 .with_context(|| {
4561 format!(
4562 "pair_drop_ack send to {} @ {} slot {} failed",
4563 pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
4564 )
4565 })?;
4566
4567 crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
4569
4570 if as_json {
4571 println!(
4572 "{}",
4573 serde_json::to_string(&json!({
4574 "handle": handle_arg,
4575 "paired_with": pending.peer_did,
4576 "peer_handle": pending.peer_handle,
4577 "status": "bilateral_accepted",
4578 "via": "pending_inbound",
4579 }))?
4580 );
4581 } else {
4582 println!(
4583 "→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
4584 peer = pending.peer_handle,
4585 );
4586 }
4587 Ok(())
4588}
4589
4590fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
4597 let nick = crate::agent_card::bare_handle(peer_nick);
4598 let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
4599 anyhow!(
4600 "no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
4601 or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
4602 )
4603 })?;
4604 let (_our_did, our_relay, our_slot_id, our_slot_token) =
4605 crate::pair_invite::ensure_self_with_relay(None)?;
4606 let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
4607 cmd_add_accept_pending(
4608 &handle_arg,
4609 nick,
4610 &pending,
4611 &our_relay,
4612 &our_slot_id,
4613 &our_slot_token,
4614 as_json,
4615 )
4616}
4617
4618fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
4621 let items = crate::pending_inbound_pair::list_pending_inbound()?;
4622 if as_json {
4623 println!("{}", serde_json::to_string(&items)?);
4624 return Ok(());
4625 }
4626 if items.is_empty() {
4627 println!("no pending inbound pair requests.");
4628 return Ok(());
4629 }
4630 println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
4631 for p in items {
4632 println!(
4633 "{:<20} {:<35} {:<25} {}",
4634 p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
4635 );
4636 }
4637 println!(
4638 "→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`."
4639 );
4640 Ok(())
4641}
4642
4643fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
4647 let nick = crate::agent_card::bare_handle(peer_nick);
4648 let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
4649 crate::pending_inbound_pair::consume_pending_inbound(nick)?;
4650
4651 if as_json {
4652 println!(
4653 "{}",
4654 serde_json::to_string(&json!({
4655 "peer": nick,
4656 "rejected": existed.is_some(),
4657 "had_pending": existed.is_some(),
4658 }))?
4659 );
4660 } else if existed.is_some() {
4661 println!("→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent.");
4662 } else {
4663 println!("no pending pair from {nick} — nothing to reject");
4664 }
4665 Ok(())
4666}
4667
4668fn cmd_session(cmd: SessionCommand) -> Result<()> {
4677 match cmd {
4678 SessionCommand::New {
4679 name,
4680 relay,
4681 with_local,
4682 local_relay,
4683 no_daemon,
4684 json,
4685 } => cmd_session_new(
4686 name.as_deref(),
4687 &relay,
4688 with_local,
4689 &local_relay,
4690 no_daemon,
4691 json,
4692 ),
4693 SessionCommand::List { json } => cmd_session_list(json),
4694 SessionCommand::ListLocal { json } => cmd_session_list_local(json),
4695 SessionCommand::Env { name, json } => cmd_session_env(name.as_deref(), json),
4696 SessionCommand::Current { json } => cmd_session_current(json),
4697 SessionCommand::Destroy { name, force, json } => cmd_session_destroy(&name, force, json),
4698 }
4699}
4700
4701fn resolve_session_name(name: Option<&str>) -> Result<String> {
4702 if let Some(n) = name {
4703 return Ok(crate::session::sanitize_name(n));
4704 }
4705 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4706 let registry = crate::session::read_registry().unwrap_or_default();
4707 Ok(crate::session::derive_name_from_cwd(&cwd, ®istry))
4708}
4709
4710fn cmd_session_new(
4711 name_arg: Option<&str>,
4712 relay: &str,
4713 with_local: bool,
4714 local_relay: &str,
4715 no_daemon: bool,
4716 as_json: bool,
4717) -> Result<()> {
4718 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4719 let mut registry = crate::session::read_registry().unwrap_or_default();
4720 let name = match name_arg {
4721 Some(n) => crate::session::sanitize_name(n),
4722 None => crate::session::derive_name_from_cwd(&cwd, ®istry),
4723 };
4724 let session_home = crate::session::session_dir(&name)?;
4725
4726 let already_exists = session_home.exists()
4727 && session_home
4728 .join("config")
4729 .join("wire")
4730 .join("agent-card.json")
4731 .exists();
4732 if already_exists {
4733 registry
4737 .by_cwd
4738 .insert(cwd.to_string_lossy().into_owned(), name.clone());
4739 crate::session::write_registry(®istry)?;
4740 let info = render_session_info(&name, &session_home, &cwd)?;
4741 emit_session_new_result(&info, "already_exists", as_json)?;
4742 if !no_daemon {
4743 ensure_session_daemon(&session_home)?;
4744 }
4745 return Ok(());
4746 }
4747
4748 std::fs::create_dir_all(&session_home)
4749 .with_context(|| format!("creating session dir {session_home:?}"))?;
4750
4751 let init_status = run_wire_with_home(
4753 &session_home,
4754 &["init", &name, "--relay", relay],
4755 )?;
4756 if !init_status.success() {
4757 bail!(
4758 "`wire init {name} --relay {relay}` failed inside session dir {session_home:?}"
4759 );
4760 }
4761
4762 let mut claim_attempt = 0u32;
4767 let mut effective_handle = name.clone();
4768 loop {
4769 claim_attempt += 1;
4770 let status = run_wire_with_home(
4771 &session_home,
4772 &["claim", &effective_handle, "--relay", relay],
4773 )?;
4774 if status.success() {
4775 break;
4776 }
4777 if claim_attempt >= 5 {
4778 bail!(
4779 "5 failed attempts to claim a handle on {relay} for session {name}. \
4780 Try `wire session destroy {name} --force` and re-run with a different name."
4781 );
4782 }
4783 let attempt_path = cwd.join(format!("__attempt_{claim_attempt}"));
4787 let suffix = crate::session::derive_name_from_cwd(&attempt_path, ®istry);
4788 let token = suffix
4792 .rsplit('-')
4793 .next()
4794 .filter(|t| t.len() == 4)
4795 .map(str::to_string)
4796 .unwrap_or_else(|| format!("{claim_attempt}"));
4797 effective_handle = format!("{name}-{token}");
4798 }
4799
4800 registry
4803 .by_cwd
4804 .insert(cwd.to_string_lossy().into_owned(), name.clone());
4805 crate::session::write_registry(®istry)?;
4806
4807 if with_local {
4813 try_allocate_local_slot(&session_home, &effective_handle, relay, local_relay);
4814 }
4815
4816 if !no_daemon {
4817 ensure_session_daemon(&session_home)?;
4818 }
4819
4820 let info = render_session_info(&name, &session_home, &cwd)?;
4821 emit_session_new_result(&info, "created", as_json)
4822}
4823
4824fn try_allocate_local_slot(
4832 session_home: &std::path::Path,
4833 handle: &str,
4834 federation_relay: &str,
4835 local_relay: &str,
4836) {
4837 let probe = match crate::relay_client::build_blocking_client(Some(
4840 std::time::Duration::from_millis(500),
4841 )) {
4842 Ok(c) => c,
4843 Err(e) => {
4844 eprintln!("wire session new: cannot build probe client for {local_relay}: {e:#}");
4845 return;
4846 }
4847 };
4848 let healthz_url = format!("{}/healthz", local_relay.trim_end_matches('/'));
4849 match probe.get(&healthz_url).send() {
4850 Ok(resp) if resp.status().is_success() => {}
4851 Ok(resp) => {
4852 eprintln!(
4853 "wire session new: local relay probe at {healthz_url} returned {} — staying federation-only",
4854 resp.status()
4855 );
4856 return;
4857 }
4858 Err(e) => {
4859 eprintln!(
4860 "wire session new: local relay at {local_relay} unreachable ({}) — staying federation-only. \
4861 Start one with `wire relay-server --bind 127.0.0.1:8771 --local-only`.",
4862 crate::relay_client::format_transport_error(&anyhow::Error::new(e))
4863 );
4864 return;
4865 }
4866 };
4867
4868 let local_client = crate::relay_client::RelayClient::new(local_relay);
4870 let alloc = match local_client.allocate_slot(Some(handle)) {
4871 Ok(a) => a,
4872 Err(e) => {
4873 eprintln!(
4874 "wire session new: local relay slot allocation failed: {e:#} — staying federation-only"
4875 );
4876 return;
4877 }
4878 };
4879
4880 let state_path = session_home
4895 .join("config")
4896 .join("wire")
4897 .join("relay.json");
4898 let mut state: serde_json::Value = std::fs::read(&state_path)
4899 .ok()
4900 .and_then(|b| serde_json::from_slice(&b).ok())
4901 .unwrap_or_else(|| serde_json::json!({}));
4902 let fed_endpoint = state
4905 .get("self")
4906 .and_then(|s| {
4907 let url = s.get("relay_url").and_then(serde_json::Value::as_str)?;
4908 let slot_id = s.get("slot_id").and_then(serde_json::Value::as_str)?;
4909 let slot_token = s.get("slot_token").and_then(serde_json::Value::as_str)?;
4910 Some(crate::endpoints::Endpoint::federation(
4911 url.to_string(),
4912 slot_id.to_string(),
4913 slot_token.to_string(),
4914 ))
4915 });
4916
4917 let local_endpoint = crate::endpoints::Endpoint::local(
4918 local_relay.trim_end_matches('/').to_string(),
4919 alloc.slot_id.clone(),
4920 alloc.slot_token.clone(),
4921 );
4922
4923 let mut endpoints: Vec<crate::endpoints::Endpoint> = Vec::new();
4924 if let Some(f) = fed_endpoint.clone() {
4925 endpoints.push(f);
4926 }
4927 endpoints.push(local_endpoint);
4928
4929 let self_obj = state
4930 .as_object_mut()
4931 .expect("relay_state root is an object")
4932 .entry("self")
4933 .or_insert_with(|| {
4934 serde_json::json!({
4935 "relay_url": federation_relay,
4936 })
4937 });
4938 if let Some(obj) = self_obj.as_object_mut() {
4939 obj.insert(
4940 "endpoints".into(),
4941 serde_json::to_value(&endpoints).unwrap_or(serde_json::Value::Null),
4942 );
4943 }
4944
4945 if let Err(e) = std::fs::write(
4946 &state_path,
4947 serde_json::to_vec_pretty(&state).unwrap_or_default(),
4948 ) {
4949 eprintln!(
4950 "wire session new: persisting dual-slot relay_state at {state_path:?} failed: {e}"
4951 );
4952 return;
4953 }
4954 eprintln!(
4955 "wire session new: local slot allocated on {local_relay} (slot_id={})",
4956 alloc.slot_id
4957 );
4958}
4959
4960fn render_session_info(
4961 name: &str,
4962 session_home: &std::path::Path,
4963 cwd: &std::path::Path,
4964) -> Result<serde_json::Value> {
4965 let card_path = session_home.join("config").join("wire").join("agent-card.json");
4966 let (did, handle) = if card_path.exists() {
4967 let card: Value = serde_json::from_slice(&std::fs::read(&card_path)?)?;
4968 let did = card
4969 .get("did")
4970 .and_then(Value::as_str)
4971 .unwrap_or("")
4972 .to_string();
4973 let handle = card
4974 .get("handle")
4975 .and_then(Value::as_str)
4976 .map(str::to_string)
4977 .unwrap_or_else(|| {
4978 crate::agent_card::display_handle_from_did(&did).to_string()
4979 });
4980 (did, handle)
4981 } else {
4982 (String::new(), String::new())
4983 };
4984 Ok(json!({
4985 "name": name,
4986 "home_dir": session_home.to_string_lossy(),
4987 "cwd": cwd.to_string_lossy(),
4988 "did": did,
4989 "handle": handle,
4990 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
4991 }))
4992}
4993
4994fn emit_session_new_result(
4995 info: &serde_json::Value,
4996 status: &str,
4997 as_json: bool,
4998) -> Result<()> {
4999 if as_json {
5000 let mut obj = info.clone();
5001 obj["status"] = json!(status);
5002 println!("{}", serde_json::to_string(&obj)?);
5003 } else {
5004 let name = info["name"].as_str().unwrap_or("?");
5005 let handle = info["handle"].as_str().unwrap_or("?");
5006 let home = info["home_dir"].as_str().unwrap_or("?");
5007 let did = info["did"].as_str().unwrap_or("?");
5008 let export = info["export"].as_str().unwrap_or("?");
5009 let prefix = if status == "already_exists" {
5010 "session already exists (re-registered cwd)"
5011 } else {
5012 "session created"
5013 };
5014 println!(
5015 "{prefix}\n name: {name}\n handle: {handle}\n did: {did}\n home: {home}\n\nactivate with:\n {export}"
5016 );
5017 }
5018 Ok(())
5019}
5020
5021fn run_wire_with_home(
5022 session_home: &std::path::Path,
5023 args: &[&str],
5024) -> Result<std::process::ExitStatus> {
5025 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5026 let status = std::process::Command::new(&bin)
5027 .env("WIRE_HOME", session_home)
5028 .env_remove("RUST_LOG")
5029 .args(args)
5030 .status()
5031 .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
5032 Ok(status)
5033}
5034
5035fn ensure_session_daemon(session_home: &std::path::Path) -> Result<()> {
5036 let pidfile = session_home
5039 .join("state")
5040 .join("wire")
5041 .join("daemon.pid");
5042 if pidfile.exists() {
5043 let bytes = std::fs::read(&pidfile).unwrap_or_default();
5044 let pid: Option<u32> =
5045 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5046 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5047 } else {
5048 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5049 };
5050 if let Some(p) = pid {
5051 let alive = {
5052 #[cfg(target_os = "linux")]
5053 {
5054 std::path::Path::new(&format!("/proc/{p}")).exists()
5055 }
5056 #[cfg(not(target_os = "linux"))]
5057 {
5058 std::process::Command::new("kill")
5059 .args(["-0", &p.to_string()])
5060 .output()
5061 .map(|o| o.status.success())
5062 .unwrap_or(false)
5063 }
5064 };
5065 if alive {
5066 return Ok(());
5067 }
5068 }
5069 }
5070
5071 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5074 let log_path = session_home.join("state").join("wire").join("daemon.log");
5075 if let Some(parent) = log_path.parent() {
5076 std::fs::create_dir_all(parent).ok();
5077 }
5078 let log_file = std::fs::OpenOptions::new()
5079 .create(true)
5080 .append(true)
5081 .open(&log_path)
5082 .with_context(|| format!("opening daemon log {log_path:?}"))?;
5083 let log_err = log_file.try_clone()?;
5084 std::process::Command::new(&bin)
5085 .env("WIRE_HOME", session_home)
5086 .env_remove("RUST_LOG")
5087 .args(["daemon", "--interval", "5"])
5088 .stdout(log_file)
5089 .stderr(log_err)
5090 .stdin(std::process::Stdio::null())
5091 .spawn()
5092 .with_context(|| "spawning session-local `wire daemon`")?;
5093 Ok(())
5094}
5095
5096fn cmd_session_list(as_json: bool) -> Result<()> {
5097 let items = crate::session::list_sessions()?;
5098 if as_json {
5099 println!("{}", serde_json::to_string(&items)?);
5100 return Ok(());
5101 }
5102 if items.is_empty() {
5103 println!("no sessions on this machine. `wire session new` to create one.");
5104 return Ok(());
5105 }
5106 println!(
5107 "{:<24} {:<24} {:<10} CWD",
5108 "NAME", "HANDLE", "DAEMON"
5109 );
5110 for s in items {
5111 println!(
5112 "{:<24} {:<24} {:<10} {}",
5113 s.name,
5114 s.handle.as_deref().unwrap_or("?"),
5115 if s.daemon_running { "running" } else { "down" },
5116 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5117 );
5118 }
5119 Ok(())
5120}
5121
5122fn cmd_session_list_local(as_json: bool) -> Result<()> {
5134 let listing = crate::session::list_local_sessions()?;
5135 if as_json {
5136 println!("{}", serde_json::to_string(&listing)?);
5137 return Ok(());
5138 }
5139
5140 if listing.local.is_empty() && listing.federation_only.is_empty() {
5141 println!(
5142 "no sessions on this machine. `wire session new --with-local` to create one \
5143 with a local-relay endpoint (start the relay first: \
5144 `wire relay-server --bind 127.0.0.1:8771 --local-only`)."
5145 );
5146 return Ok(());
5147 }
5148
5149 if listing.local.is_empty() {
5150 println!(
5151 "no sister sessions reachable via a local relay. \
5152 Re-run `wire session new --with-local` to add a Local endpoint, or \
5153 start a local relay with `wire relay-server --bind 127.0.0.1:8771 --local-only`."
5154 );
5155 } else {
5156 let mut keys: Vec<&String> = listing.local.keys().collect();
5158 keys.sort();
5159 for relay_url in keys {
5160 let group = &listing.local[relay_url];
5161 println!("LOCAL RELAY: {relay_url}");
5162 println!(
5163 " {:<24} {:<32} {:<10} CWD",
5164 "NAME", "HANDLE", "DAEMON"
5165 );
5166 for s in group {
5167 println!(
5168 " {:<24} {:<32} {:<10} {}",
5169 s.name,
5170 s.handle.as_deref().unwrap_or("?"),
5171 if s.daemon_running { "running" } else { "down" },
5172 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5173 );
5174 }
5175 println!();
5176 }
5177 }
5178
5179 if !listing.federation_only.is_empty() {
5180 println!("federation-only (no local endpoint):");
5181 for s in &listing.federation_only {
5182 println!(
5183 " {:<24} {:<32} {}",
5184 s.name,
5185 s.handle.as_deref().unwrap_or("?"),
5186 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5187 );
5188 }
5189 }
5190 Ok(())
5191}
5192
5193fn cmd_session_env(name_arg: Option<&str>, as_json: bool) -> Result<()> {
5194 let name = resolve_session_name(name_arg)?;
5195 let session_home = crate::session::session_dir(&name)?;
5196 if !session_home.exists() {
5197 bail!(
5198 "no session named {name:?} on this machine. `wire session list` to enumerate, \
5199 `wire session new {name}` to create."
5200 );
5201 }
5202 if as_json {
5203 println!(
5204 "{}",
5205 serde_json::to_string(&json!({
5206 "name": name,
5207 "home_dir": session_home.to_string_lossy(),
5208 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
5209 }))?
5210 );
5211 } else {
5212 println!("export WIRE_HOME={}", session_home.to_string_lossy());
5213 }
5214 Ok(())
5215}
5216
5217fn cmd_session_current(as_json: bool) -> Result<()> {
5218 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5219 let registry = crate::session::read_registry().unwrap_or_default();
5220 let cwd_key = cwd.to_string_lossy().into_owned();
5221 let name = registry.by_cwd.get(&cwd_key).cloned();
5222 if as_json {
5223 println!(
5224 "{}",
5225 serde_json::to_string(&json!({
5226 "cwd": cwd_key,
5227 "session": name,
5228 }))?
5229 );
5230 } else if let Some(n) = name {
5231 println!("{n}");
5232 } else {
5233 println!("(no session registered for this cwd)");
5234 }
5235 Ok(())
5236}
5237
5238fn cmd_session_destroy(name_arg: &str, force: bool, as_json: bool) -> Result<()> {
5239 let name = crate::session::sanitize_name(name_arg);
5240 let session_home = crate::session::session_dir(&name)?;
5241 if !session_home.exists() {
5242 if as_json {
5243 println!(
5244 "{}",
5245 serde_json::to_string(&json!({
5246 "name": name,
5247 "destroyed": false,
5248 "reason": "no such session",
5249 }))?
5250 );
5251 } else {
5252 println!("no session named {name:?} — nothing to destroy.");
5253 }
5254 return Ok(());
5255 }
5256 if !force {
5257 bail!(
5258 "destroying session {name:?} would delete its keypair + state irrecoverably. \
5259 Pass --force to confirm."
5260 );
5261 }
5262
5263 let pidfile = session_home
5265 .join("state")
5266 .join("wire")
5267 .join("daemon.pid");
5268 if let Ok(bytes) = std::fs::read(&pidfile) {
5269 let pid: Option<u32> =
5270 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5271 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5272 } else {
5273 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5274 };
5275 if let Some(p) = pid {
5276 let _ = std::process::Command::new("kill")
5277 .args(["-TERM", &p.to_string()])
5278 .output();
5279 }
5280 }
5281
5282 std::fs::remove_dir_all(&session_home)
5283 .with_context(|| format!("removing session dir {session_home:?}"))?;
5284
5285 let mut registry = crate::session::read_registry().unwrap_or_default();
5287 registry.by_cwd.retain(|_, v| v != &name);
5288 crate::session::write_registry(®istry)?;
5289
5290 if as_json {
5291 println!(
5292 "{}",
5293 serde_json::to_string(&json!({
5294 "name": name,
5295 "destroyed": true,
5296 }))?
5297 );
5298 } else {
5299 println!("destroyed session {name:?}.");
5300 }
5301 Ok(())
5302}
5303
5304fn cmd_diag(action: DiagAction) -> Result<()> {
5307 let state = config::state_dir()?;
5308 let knob = state.join("diag.enabled");
5309 let log_path = state.join("diag.jsonl");
5310 match action {
5311 DiagAction::Tail { limit, json } => {
5312 let entries = crate::diag::tail(limit);
5313 if json {
5314 for e in entries {
5315 println!("{}", serde_json::to_string(&e)?);
5316 }
5317 } else if entries.is_empty() {
5318 println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
5319 } else {
5320 for e in entries {
5321 let ts = e["ts"].as_u64().unwrap_or(0);
5322 let ty = e["type"].as_str().unwrap_or("?");
5323 let pid = e["pid"].as_u64().unwrap_or(0);
5324 let payload = e["payload"].to_string();
5325 println!("[{ts}] pid={pid} {ty} {payload}");
5326 }
5327 }
5328 }
5329 DiagAction::Enable => {
5330 config::ensure_dirs()?;
5331 std::fs::write(&knob, "1")?;
5332 println!("wire diag: enabled at {knob:?}");
5333 }
5334 DiagAction::Disable => {
5335 if knob.exists() {
5336 std::fs::remove_file(&knob)?;
5337 }
5338 println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
5339 }
5340 DiagAction::Status { json } => {
5341 let enabled = crate::diag::is_enabled();
5342 let size = std::fs::metadata(&log_path)
5343 .map(|m| m.len())
5344 .unwrap_or(0);
5345 if json {
5346 println!(
5347 "{}",
5348 serde_json::to_string(&serde_json::json!({
5349 "enabled": enabled,
5350 "log_path": log_path,
5351 "log_size_bytes": size,
5352 }))?
5353 );
5354 } else {
5355 println!("wire diag status");
5356 println!(" enabled: {enabled}");
5357 println!(" log: {log_path:?}");
5358 println!(" log size: {size} bytes");
5359 }
5360 }
5361 }
5362 Ok(())
5363}
5364
5365fn cmd_service(action: ServiceAction) -> Result<()> {
5368 let kind = |local_relay: bool| {
5369 if local_relay {
5370 crate::service::ServiceKind::LocalRelay
5371 } else {
5372 crate::service::ServiceKind::Daemon
5373 }
5374 };
5375 let (report, as_json) = match action {
5376 ServiceAction::Install { local_relay, json } => {
5377 (crate::service::install_kind(kind(local_relay))?, json)
5378 }
5379 ServiceAction::Uninstall { local_relay, json } => {
5380 (crate::service::uninstall_kind(kind(local_relay))?, json)
5381 }
5382 ServiceAction::Status { local_relay, json } => {
5383 (crate::service::status_kind(kind(local_relay))?, json)
5384 }
5385 };
5386 if as_json {
5387 println!("{}", serde_json::to_string(&report)?);
5388 } else {
5389 println!("wire service {}", report.action);
5390 println!(" platform: {}", report.platform);
5391 println!(" unit: {}", report.unit_path);
5392 println!(" status: {}", report.status);
5393 println!(" detail: {}", report.detail);
5394 }
5395 Ok(())
5396}
5397
5398fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
5413 let pgrep_out = std::process::Command::new("pgrep")
5415 .args(["-f", "wire daemon"])
5416 .output();
5417 let running_pids: Vec<u32> = match pgrep_out {
5418 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
5419 .split_whitespace()
5420 .filter_map(|s| s.parse::<u32>().ok())
5421 .collect(),
5422 _ => Vec::new(),
5423 };
5424
5425 let record = crate::ensure_up::read_pid_record("daemon");
5427 let recorded_version: Option<String> = match &record {
5428 crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
5429 crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
5430 _ => None,
5431 };
5432 let cli_version = env!("CARGO_PKG_VERSION").to_string();
5433
5434 if check_only {
5435 let report = json!({
5436 "running_pids": running_pids,
5437 "pidfile_version": recorded_version,
5438 "cli_version": cli_version,
5439 "would_kill": running_pids,
5440 });
5441 if as_json {
5442 println!("{}", serde_json::to_string(&report)?);
5443 } else {
5444 println!("wire upgrade --check");
5445 println!(" cli version: {cli_version}");
5446 println!(" pidfile version: {}", recorded_version.as_deref().unwrap_or("(missing)"));
5447 if running_pids.is_empty() {
5448 println!(" running daemons: none");
5449 } else {
5450 let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
5451 println!(" running daemons: pids {}", pids.join(", "));
5452 println!(" would kill all + spawn fresh");
5453 }
5454 }
5455 return Ok(());
5456 }
5457
5458 let mut killed: Vec<u32> = Vec::new();
5461 for pid in &running_pids {
5462 let _ = std::process::Command::new("kill")
5464 .args(["-15", &pid.to_string()])
5465 .status();
5466 killed.push(*pid);
5467 }
5468 if !killed.is_empty() {
5470 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
5471 loop {
5472 let still_alive: Vec<u32> = killed
5473 .iter()
5474 .copied()
5475 .filter(|p| process_alive_pid(*p))
5476 .collect();
5477 if still_alive.is_empty() {
5478 break;
5479 }
5480 if std::time::Instant::now() >= deadline {
5481 for pid in still_alive {
5483 let _ = std::process::Command::new("kill")
5484 .args(["-9", &pid.to_string()])
5485 .status();
5486 }
5487 break;
5488 }
5489 std::thread::sleep(std::time::Duration::from_millis(50));
5490 }
5491 }
5492
5493 let pidfile = config::state_dir()?.join("daemon.pid");
5496 if pidfile.exists() {
5497 let _ = std::fs::remove_file(&pidfile);
5498 }
5499
5500 let spawned = crate::ensure_up::ensure_daemon_running()?;
5503
5504 let new_record = crate::ensure_up::read_pid_record("daemon");
5505 let new_pid = new_record.pid();
5506 let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
5507 Some(d.version.clone())
5508 } else {
5509 None
5510 };
5511
5512 if as_json {
5513 println!(
5514 "{}",
5515 serde_json::to_string(&json!({
5516 "killed": killed,
5517 "spawned_fresh_daemon": spawned,
5518 "new_pid": new_pid,
5519 "new_version": new_version,
5520 "cli_version": cli_version,
5521 }))?
5522 );
5523 } else {
5524 if killed.is_empty() {
5525 println!("wire upgrade: no stale daemons running");
5526 } else {
5527 println!("wire upgrade: killed {} daemon(s) (pids {})",
5528 killed.len(),
5529 killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
5530 }
5531 if spawned {
5532 println!(
5533 "wire upgrade: spawned fresh daemon (pid {} v{})",
5534 new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
5535 new_version.as_deref().unwrap_or(&cli_version),
5536 );
5537 } else {
5538 println!("wire upgrade: daemon was already running on current binary");
5539 }
5540 }
5541 Ok(())
5542}
5543
5544fn process_alive_pid(pid: u32) -> bool {
5545 #[cfg(target_os = "linux")]
5546 {
5547 std::path::Path::new(&format!("/proc/{pid}")).exists()
5548 }
5549 #[cfg(not(target_os = "linux"))]
5550 {
5551 std::process::Command::new("kill")
5552 .args(["-0", &pid.to_string()])
5553 .stdin(std::process::Stdio::null())
5554 .stdout(std::process::Stdio::null())
5555 .stderr(std::process::Stdio::null())
5556 .status()
5557 .map(|s| s.success())
5558 .unwrap_or(false)
5559 }
5560}
5561
5562#[derive(Clone, Debug, serde::Serialize)]
5566pub struct DoctorCheck {
5567 pub id: String,
5570 pub status: String,
5572 pub detail: String,
5574 #[serde(skip_serializing_if = "Option::is_none")]
5576 pub fix: Option<String>,
5577}
5578
5579impl DoctorCheck {
5580 fn pass(id: &str, detail: impl Into<String>) -> Self {
5581 Self {
5582 id: id.into(),
5583 status: "PASS".into(),
5584 detail: detail.into(),
5585 fix: None,
5586 }
5587 }
5588 fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5589 Self {
5590 id: id.into(),
5591 status: "WARN".into(),
5592 detail: detail.into(),
5593 fix: Some(fix.into()),
5594 }
5595 }
5596 fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5597 Self {
5598 id: id.into(),
5599 status: "FAIL".into(),
5600 detail: detail.into(),
5601 fix: Some(fix.into()),
5602 }
5603 }
5604}
5605
5606fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
5611 let mut checks: Vec<DoctorCheck> = Vec::new();
5612
5613 checks.push(check_daemon_health());
5614 checks.push(check_daemon_pid_consistency());
5615 checks.push(check_relay_reachable());
5616 checks.push(check_pair_rejections(recent_rejections));
5617 checks.push(check_cursor_progress());
5618
5619 let fails = checks.iter().filter(|c| c.status == "FAIL").count();
5620 let warns = checks.iter().filter(|c| c.status == "WARN").count();
5621
5622 if as_json {
5623 println!(
5624 "{}",
5625 serde_json::to_string(&json!({
5626 "checks": checks,
5627 "fail_count": fails,
5628 "warn_count": warns,
5629 "ok": fails == 0,
5630 }))?
5631 );
5632 } else {
5633 println!("wire doctor — {} checks", checks.len());
5634 for c in &checks {
5635 let bullet = match c.status.as_str() {
5636 "PASS" => "✓",
5637 "WARN" => "!",
5638 "FAIL" => "✗",
5639 _ => "?",
5640 };
5641 println!(" {bullet} [{}] {}: {}", c.status, c.id, c.detail);
5642 if let Some(fix) = &c.fix {
5643 println!(" fix: {fix}");
5644 }
5645 }
5646 println!();
5647 if fails == 0 && warns == 0 {
5648 println!("ALL GREEN");
5649 } else {
5650 println!("{fails} FAIL, {warns} WARN");
5651 }
5652 }
5653
5654 if fails > 0 {
5655 std::process::exit(1);
5656 }
5657 Ok(())
5658}
5659
5660fn check_daemon_health() -> DoctorCheck {
5667 let snap = crate::ensure_up::daemon_liveness();
5673 let pgrep_pids = &snap.pgrep_pids;
5674 let pidfile_pid = snap.pidfile_pid;
5675 let pidfile_alive = snap.pidfile_alive;
5676 let orphan_pids = &snap.orphan_pids;
5677
5678 let fmt_pids = |xs: &[u32]| -> String {
5679 xs.iter()
5680 .map(|p| p.to_string())
5681 .collect::<Vec<_>>()
5682 .join(", ")
5683 };
5684
5685 match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
5686 (0, _, _) => DoctorCheck::fail(
5687 "daemon",
5688 "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
5689 "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
5690 ),
5691 (1, true, true) => DoctorCheck::pass(
5693 "daemon",
5694 format!(
5695 "one daemon running (pid {}, matches pidfile)",
5696 pgrep_pids[0]
5697 ),
5698 ),
5699 (n, true, false) => DoctorCheck::fail(
5701 "daemon",
5702 format!(
5703 "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
5704 The orphans race the relay cursor — they advance past events your current binary can't process. \
5705 (Issue #2 exact class.)",
5706 fmt_pids(&pgrep_pids),
5707 pidfile_pid.unwrap(),
5708 fmt_pids(&orphan_pids),
5709 ),
5710 "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
5711 ),
5712 (n, false, _) => DoctorCheck::fail(
5714 "daemon",
5715 format!(
5716 "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
5717 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
5718 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
5719 fmt_pids(&pgrep_pids),
5720 match pidfile_pid {
5721 Some(p) => format!("claims pid {p} which is dead"),
5722 None => "is missing".to_string(),
5723 },
5724 ),
5725 "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
5726 ),
5727 (n, true, true) => DoctorCheck::warn(
5729 "daemon",
5730 format!(
5731 "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
5732 fmt_pids(&pgrep_pids)
5733 ),
5734 "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
5735 ),
5736 }
5737}
5738
5739fn check_daemon_pid_consistency() -> DoctorCheck {
5751 let snap = crate::ensure_up::daemon_liveness();
5752 match &snap.record {
5753 crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
5754 "daemon_pid_consistency",
5755 "no daemon.pid yet — fresh box or daemon never started",
5756 ),
5757 crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
5758 "daemon_pid_consistency",
5759 format!("daemon.pid is corrupt: {reason}"),
5760 "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
5761 ),
5762 crate::ensure_up::PidRecord::LegacyInt(pid) => {
5763 let pid = *pid;
5766 if !crate::ensure_up::pid_is_alive(pid) {
5767 return DoctorCheck::warn(
5768 "daemon_pid_consistency",
5769 format!(
5770 "daemon.pid (legacy-int) points at pid {pid} which is not running. \
5771 Stale pidfile from a crashed pre-0.5.11 daemon. \
5772 (Issue #2: this surface used to PASS while `wire status` said DOWN.)"
5773 ),
5774 "`wire upgrade` (kills any orphan + spawns a fresh daemon with JSON pidfile)",
5775 );
5776 }
5777 DoctorCheck::warn(
5778 "daemon_pid_consistency",
5779 format!(
5780 "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
5781 Daemon was started by a pre-0.5.11 binary."
5782 ),
5783 "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
5784 )
5785 }
5786 crate::ensure_up::PidRecord::Json(d) => {
5787 if !snap.pidfile_alive {
5791 return DoctorCheck::warn(
5792 "daemon_pid_consistency",
5793 format!(
5794 "daemon.pid records pid {pid} (v{version}) but that process is not running — \
5795 pidfile is stale. `wire status` will report DOWN, but pre-v0.5.19 doctor \
5796 silently PASSed this state and ignored any live orphan daemons (#2 root cause).",
5797 pid = d.pid,
5798 version = d.version,
5799 ),
5800 "`wire upgrade` to clean up the stale pidfile + spawn a fresh daemon \
5801 (kills any orphan daemon advancing the cursor without coordination)",
5802 );
5803 }
5804 let mut issues: Vec<String> = Vec::new();
5805 if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
5806 issues.push(format!(
5807 "schema={} (expected {})",
5808 d.schema,
5809 crate::ensure_up::DAEMON_PID_SCHEMA
5810 ));
5811 }
5812 let cli_version = env!("CARGO_PKG_VERSION");
5813 if d.version != cli_version {
5814 issues.push(format!(
5815 "version daemon={} cli={cli_version}",
5816 d.version
5817 ));
5818 }
5819 if !std::path::Path::new(&d.bin_path).exists() {
5820 issues.push(format!("bin_path {} missing on disk", d.bin_path));
5821 }
5822 if let Ok(card) = config::read_agent_card()
5824 && let Some(current_did) = card.get("did").and_then(Value::as_str)
5825 && let Some(recorded_did) = &d.did
5826 && recorded_did != current_did
5827 {
5828 issues.push(format!(
5829 "did daemon={recorded_did} config={current_did} — identity drift"
5830 ));
5831 }
5832 if let Ok(state) = config::read_relay_state()
5833 && let Some(current_relay) = state
5834 .get("self")
5835 .and_then(|s| s.get("relay_url"))
5836 .and_then(Value::as_str)
5837 && let Some(recorded_relay) = &d.relay_url
5838 && recorded_relay != current_relay
5839 {
5840 issues.push(format!(
5841 "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
5842 ));
5843 }
5844 if issues.is_empty() {
5845 DoctorCheck::pass(
5846 "daemon_pid_consistency",
5847 format!(
5848 "daemon v{} bound to {} as {}",
5849 d.version,
5850 d.relay_url.as_deref().unwrap_or("?"),
5851 d.did.as_deref().unwrap_or("?")
5852 ),
5853 )
5854 } else {
5855 DoctorCheck::warn(
5856 "daemon_pid_consistency",
5857 format!("daemon pidfile drift: {}", issues.join("; ")),
5858 "`wire upgrade` to atomically restart daemon with current config".to_string(),
5859 )
5860 }
5861 }
5862 }
5863}
5864
5865fn check_relay_reachable() -> DoctorCheck {
5867 let state = match config::read_relay_state() {
5868 Ok(s) => s,
5869 Err(e) => return DoctorCheck::fail(
5870 "relay",
5871 format!("could not read relay state: {e}"),
5872 "run `wire up <handle>@<relay>` to bootstrap",
5873 ),
5874 };
5875 let url = state
5876 .get("self")
5877 .and_then(|s| s.get("relay_url"))
5878 .and_then(Value::as_str)
5879 .unwrap_or("");
5880 if url.is_empty() {
5881 return DoctorCheck::warn(
5882 "relay",
5883 "no relay bound — wire send/pull will not work",
5884 "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
5885 );
5886 }
5887 let client = crate::relay_client::RelayClient::new(url);
5888 match client.check_healthz() {
5889 Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
5890 Err(e) => DoctorCheck::fail(
5891 "relay",
5892 format!("{url} unreachable: {e}"),
5893 format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
5894 ),
5895 }
5896}
5897
5898fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
5902 let path = match config::state_dir() {
5903 Ok(d) => d.join("pair-rejected.jsonl"),
5904 Err(e) => return DoctorCheck::warn(
5905 "pair_rejections",
5906 format!("could not resolve state dir: {e}"),
5907 "set WIRE_HOME or fix XDG_STATE_HOME",
5908 ),
5909 };
5910 if !path.exists() {
5911 return DoctorCheck::pass(
5912 "pair_rejections",
5913 "no pair-rejected.jsonl — no recorded pair failures",
5914 );
5915 }
5916 let body = match std::fs::read_to_string(&path) {
5917 Ok(b) => b,
5918 Err(e) => return DoctorCheck::warn(
5919 "pair_rejections",
5920 format!("could not read {path:?}: {e}"),
5921 "check file permissions",
5922 ),
5923 };
5924 let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
5925 if lines.is_empty() {
5926 return DoctorCheck::pass(
5927 "pair_rejections",
5928 "pair-rejected.jsonl present but empty",
5929 );
5930 }
5931 let total = lines.len();
5932 let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
5933 let mut summary: Vec<String> = Vec::new();
5934 for line in &recent {
5935 if let Ok(rec) = serde_json::from_str::<Value>(line) {
5936 let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
5937 let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
5938 summary.push(format!("{peer}/{code}"));
5939 }
5940 }
5941 DoctorCheck::warn(
5942 "pair_rejections",
5943 format!(
5944 "{total} pair failures recorded. recent: [{}]",
5945 summary.join(", ")
5946 ),
5947 format!(
5948 "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
5949 ),
5950 )
5951}
5952
5953fn check_cursor_progress() -> DoctorCheck {
5958 let state = match config::read_relay_state() {
5959 Ok(s) => s,
5960 Err(e) => return DoctorCheck::warn(
5961 "cursor",
5962 format!("could not read relay state: {e}"),
5963 "check ~/Library/Application Support/wire/relay.json",
5964 ),
5965 };
5966 let cursor = state
5967 .get("self")
5968 .and_then(|s| s.get("last_pulled_event_id"))
5969 .and_then(Value::as_str)
5970 .map(|s| s.chars().take(16).collect::<String>())
5971 .unwrap_or_else(|| "<none>".to_string());
5972 DoctorCheck::pass(
5973 "cursor",
5974 format!(
5975 "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
5976 ),
5977 )
5978}
5979
5980#[cfg(test)]
5981mod doctor_tests {
5982 use super::*;
5983
5984 #[test]
5985 fn doctor_check_constructors_set_status_correctly() {
5986 let p = DoctorCheck::pass("x", "ok");
5991 assert_eq!(p.status, "PASS");
5992 assert_eq!(p.fix, None);
5993
5994 let w = DoctorCheck::warn("x", "watch out", "do this");
5995 assert_eq!(w.status, "WARN");
5996 assert_eq!(w.fix, Some("do this".to_string()));
5997
5998 let f = DoctorCheck::fail("x", "broken", "fix it");
5999 assert_eq!(f.status, "FAIL");
6000 assert_eq!(f.fix, Some("fix it".to_string()));
6001 }
6002
6003 #[test]
6004 fn check_pair_rejections_no_file_is_pass() {
6005 config::test_support::with_temp_home(|| {
6008 config::ensure_dirs().unwrap();
6009 let c = check_pair_rejections(5);
6010 assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
6011 });
6012 }
6013
6014 #[test]
6015 fn check_pair_rejections_with_entries_warns() {
6016 config::test_support::with_temp_home(|| {
6020 config::ensure_dirs().unwrap();
6021 crate::pair_invite::record_pair_rejection(
6022 "willard",
6023 "pair_drop_ack_send_failed",
6024 "POST 502",
6025 );
6026 let c = check_pair_rejections(5);
6027 assert_eq!(c.status, "WARN");
6028 assert!(c.detail.contains("1 pair failures"));
6029 assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
6030 });
6031 }
6032}
6033
6034fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
6046 let (nick, relay_url) = match handle_arg.split_once('@') {
6047 Some((n, host)) => {
6048 let url = if host.starts_with("http://") || host.starts_with("https://") {
6049 host.to_string()
6050 } else {
6051 format!("https://{host}")
6052 };
6053 (n.to_string(), url)
6054 }
6055 None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
6056 };
6057
6058 let mut report: Vec<(String, String)> = Vec::new();
6059 let mut step = |stage: &str, detail: String| {
6060 report.push((stage.to_string(), detail.clone()));
6061 if !as_json {
6062 eprintln!("wire up: {stage} — {detail}");
6063 }
6064 };
6065
6066 if config::is_initialized()? {
6068 let card = config::read_agent_card()?;
6069 let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
6070 let existing_handle =
6071 crate::agent_card::display_handle_from_did(existing_did).to_string();
6072 if existing_handle != nick {
6073 bail!(
6074 "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
6075 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
6076 delete `{:?}` to start fresh.",
6077 config::config_dir()?
6078 );
6079 }
6080 step("init", format!("already initialized as {existing_handle}"));
6081 } else {
6082 cmd_init(&nick, name, Some(&relay_url), false)?;
6083 step("init", format!("created identity {nick} bound to {relay_url}"));
6084 }
6085
6086 let relay_state = config::read_relay_state()?;
6090 let bound_relay = relay_state
6091 .get("self")
6092 .and_then(|s| s.get("relay_url"))
6093 .and_then(Value::as_str)
6094 .unwrap_or("")
6095 .to_string();
6096 if bound_relay.is_empty() {
6097 cmd_bind_relay(&relay_url, false, false)?;
6101 step("bind-relay", format!("bound to {relay_url}"));
6102 } else if bound_relay != relay_url {
6103 step(
6104 "bind-relay",
6105 format!(
6106 "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
6107 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
6108 ),
6109 );
6110 } else {
6111 step("bind-relay", format!("already bound to {bound_relay}"));
6112 }
6113
6114 match cmd_claim(&nick, Some(&relay_url), None, false, false) {
6117 Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
6118 Err(e) => step(
6119 "claim",
6120 format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
6121 ),
6122 }
6123
6124 match crate::ensure_up::ensure_daemon_running() {
6126 Ok(true) => step("daemon", "started fresh background daemon".to_string()),
6127 Ok(false) => step("daemon", "already running".to_string()),
6128 Err(e) => step(
6129 "daemon",
6130 format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
6131 ),
6132 }
6133
6134 let summary = format!(
6136 "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
6137 `wire monitor` to watch incoming events."
6138 );
6139 step("ready", summary.clone());
6140
6141 if as_json {
6142 let steps_json: Vec<_> = report
6143 .iter()
6144 .map(|(k, v)| json!({"stage": k, "detail": v}))
6145 .collect();
6146 println!(
6147 "{}",
6148 serde_json::to_string(&json!({
6149 "nick": nick,
6150 "relay": relay_url,
6151 "steps": steps_json,
6152 }))?
6153 );
6154 }
6155 Ok(())
6156}
6157
6158fn strip_proto(url: &str) -> String {
6160 url.trim_start_matches("https://")
6161 .trim_start_matches("http://")
6162 .to_string()
6163}
6164
6165fn cmd_pair_megacommand(
6179 handle_arg: &str,
6180 relay_override: Option<&str>,
6181 timeout_secs: u64,
6182 _as_json: bool,
6183) -> Result<()> {
6184 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
6185 let peer_handle = parsed.nick.clone();
6186
6187 eprintln!("wire pair: resolving {handle_arg}...");
6188 cmd_add(handle_arg, relay_override, false)?;
6189
6190 eprintln!(
6191 "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
6192 to ack (their daemon must be running + pulling)..."
6193 );
6194
6195 let _ = run_sync_pull();
6199
6200 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
6201 let poll_interval = std::time::Duration::from_millis(500);
6202
6203 loop {
6204 let _ = run_sync_pull();
6206 let relay_state = config::read_relay_state()?;
6207 let peer_entry = relay_state
6208 .get("peers")
6209 .and_then(|p| p.get(&peer_handle))
6210 .cloned();
6211 let token = peer_entry
6212 .as_ref()
6213 .and_then(|e| e.get("slot_token"))
6214 .and_then(Value::as_str)
6215 .unwrap_or("");
6216
6217 if !token.is_empty() {
6218 let trust = config::read_trust()?;
6220 let pinned_in_trust = trust
6221 .get("agents")
6222 .and_then(|a| a.get(&peer_handle))
6223 .is_some();
6224 println!(
6225 "wire pair: paired with {peer_handle}.\n trust: {} bilateral: yes (slot_token recorded)\n next: `wire send {peer_handle} \"<msg>\"`",
6226 if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
6227 );
6228 return Ok(());
6229 }
6230
6231 if std::time::Instant::now() >= deadline {
6232 bail!(
6239 "wire pair: timed out after {timeout_secs}s. \
6240 peer {peer_handle} never sent pair_drop_ack. \
6241 likely causes: (a) their daemon is down — ask them to run \
6242 `wire status` and `wire daemon &`; (b) their binary is older \
6243 than 0.5.x and doesn't understand pair_drop events — ask \
6244 them to `wire upgrade`; (c) network / relay blip — re-run \
6245 `wire pair {handle_arg}` to retry."
6246 );
6247 }
6248
6249 std::thread::sleep(poll_interval);
6250 }
6251}
6252
6253fn cmd_claim(
6254 nick: &str,
6255 relay_override: Option<&str>,
6256 public_url: Option<&str>,
6257 hidden: bool,
6258 as_json: bool,
6259) -> Result<()> {
6260 if !crate::pair_profile::is_valid_nick(nick) {
6261 bail!(
6262 "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
6263 );
6264 }
6265 let (_did, relay_url, slot_id, slot_token) =
6268 crate::pair_invite::ensure_self_with_relay(relay_override)?;
6269 let card = config::read_agent_card()?;
6270
6271 let client = crate::relay_client::RelayClient::new(&relay_url);
6272 let discoverable = if hidden { Some(false) } else { None };
6276 let resp = client.handle_claim_v2(
6277 nick,
6278 &slot_id,
6279 &slot_token,
6280 public_url,
6281 &card,
6282 discoverable,
6283 )?;
6284
6285 if as_json {
6286 println!(
6287 "{}",
6288 serde_json::to_string(&json!({
6289 "nick": nick,
6290 "relay": relay_url,
6291 "response": resp,
6292 }))?
6293 );
6294 } else {
6295 let domain = public_url
6299 .unwrap_or(&relay_url)
6300 .trim_start_matches("https://")
6301 .trim_start_matches("http://")
6302 .trim_end_matches('/')
6303 .split('/')
6304 .next()
6305 .unwrap_or("<this-relay-domain>")
6306 .to_string();
6307 println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
6308 println!("verify with: wire whois {nick}@{domain}");
6309 }
6310 Ok(())
6311}
6312
6313fn cmd_profile(action: ProfileAction) -> Result<()> {
6314 match action {
6315 ProfileAction::Set { field, value, json } => {
6316 let parsed: Value =
6320 serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
6321 let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
6322 if json {
6323 println!(
6324 "{}",
6325 serde_json::to_string(&json!({
6326 "field": field,
6327 "profile": new_profile,
6328 }))?
6329 );
6330 } else {
6331 println!("profile.{field} set");
6332 }
6333 }
6334 ProfileAction::Get { json } => return cmd_whois(None, json, None),
6335 ProfileAction::Clear { field, json } => {
6336 let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
6337 if json {
6338 println!(
6339 "{}",
6340 serde_json::to_string(&json!({
6341 "field": field,
6342 "cleared": true,
6343 "profile": new_profile,
6344 }))?
6345 );
6346 } else {
6347 println!("profile.{field} cleared");
6348 }
6349 }
6350 }
6351 Ok(())
6352}
6353
6354fn cmd_setup(apply: bool) -> Result<()> {
6357 use std::path::PathBuf;
6358
6359 let entry = json!({"command": "wire", "args": ["mcp"]});
6360 let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
6361
6362 let mut targets: Vec<(&str, PathBuf)> = Vec::new();
6365 if let Some(home) = dirs::home_dir() {
6366 targets.push(("Claude Code", home.join(".claude.json")));
6369 targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
6371 #[cfg(target_os = "macos")]
6373 targets.push((
6374 "Claude Desktop (macOS)",
6375 home.join("Library/Application Support/Claude/claude_desktop_config.json"),
6376 ));
6377 #[cfg(target_os = "windows")]
6379 if let Ok(appdata) = std::env::var("APPDATA") {
6380 targets.push((
6381 "Claude Desktop (Windows)",
6382 PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
6383 ));
6384 }
6385 targets.push(("Cursor", home.join(".cursor/mcp.json")));
6387 }
6388 targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
6390
6391 println!("wire setup\n");
6392 println!("MCP server snippet (add this to your client's mcpServers):");
6393 println!();
6394 println!("{entry_pretty}");
6395 println!();
6396
6397 if !apply {
6398 println!("Probable MCP host config locations on this machine:");
6399 for (name, path) in &targets {
6400 let marker = if path.exists() {
6401 "✓ found"
6402 } else {
6403 " (would create)"
6404 };
6405 println!(" {marker:14} {name}: {}", path.display());
6406 }
6407 println!();
6408 println!("Run `wire setup --apply` to merge wire into each config above.");
6409 println!(
6410 "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
6411 );
6412 return Ok(());
6413 }
6414
6415 let mut modified: Vec<String> = Vec::new();
6416 let mut skipped: Vec<String> = Vec::new();
6417 for (name, path) in &targets {
6418 match upsert_mcp_entry(path, "wire", &entry) {
6419 Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
6420 Ok(false) => skipped.push(format!(" {name} ({}): already configured", path.display())),
6421 Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
6422 }
6423 }
6424 if !modified.is_empty() {
6425 println!("Modified:");
6426 for line in &modified {
6427 println!(" {line}");
6428 }
6429 println!();
6430 println!("Restart the app(s) above to load wire MCP.");
6431 }
6432 if !skipped.is_empty() {
6433 println!();
6434 println!("Skipped:");
6435 for line in &skipped {
6436 println!(" {line}");
6437 }
6438 }
6439 Ok(())
6440}
6441
6442fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
6445 let mut cfg: Value = if path.exists() {
6446 let body = std::fs::read_to_string(path).context("reading config")?;
6447 serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
6448 } else {
6449 json!({})
6450 };
6451 if !cfg.is_object() {
6452 cfg = json!({});
6453 }
6454 let root = cfg.as_object_mut().unwrap();
6455 let servers = root
6456 .entry("mcpServers".to_string())
6457 .or_insert_with(|| json!({}));
6458 if !servers.is_object() {
6459 *servers = json!({});
6460 }
6461 let map = servers.as_object_mut().unwrap();
6462 if map.get(server_name) == Some(entry) {
6463 return Ok(false);
6464 }
6465 map.insert(server_name.to_string(), entry.clone());
6466 if let Some(parent) = path.parent()
6467 && !parent.as_os_str().is_empty()
6468 {
6469 std::fs::create_dir_all(parent).context("creating parent dir")?;
6470 }
6471 let out = serde_json::to_string_pretty(&cfg)? + "\n";
6472 std::fs::write(path, out).context("writing config")?;
6473 Ok(true)
6474}
6475
6476#[allow(clippy::too_many_arguments)]
6479fn cmd_reactor(
6480 on_event: &str,
6481 peer_filter: Option<&str>,
6482 kind_filter: Option<&str>,
6483 verified_only: bool,
6484 interval_secs: u64,
6485 once: bool,
6486 dry_run: bool,
6487 max_per_minute: u32,
6488 max_chain_depth: u32,
6489) -> Result<()> {
6490 use crate::inbox_watch::{InboxEvent, InboxWatcher};
6491 use std::collections::{HashMap, HashSet, VecDeque};
6492 use std::io::Write;
6493 use std::process::{Command, Stdio};
6494 use std::time::{Duration, Instant};
6495
6496 let cursor_path = config::state_dir()?.join("reactor.cursor");
6497 let emitted_path = config::state_dir()?.join("reactor-emitted.log");
6506 let mut emitted_ids: HashSet<String> = HashSet::new();
6507 if emitted_path.exists()
6508 && let Ok(body) = std::fs::read_to_string(&emitted_path)
6509 {
6510 for line in body.lines() {
6511 let t = line.trim();
6512 if !t.is_empty() {
6513 emitted_ids.insert(t.to_string());
6514 }
6515 }
6516 }
6517 let outbox_dir = config::outbox_dir()?;
6519 let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
6522
6523 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6524
6525 let kind_num: Option<u32> = match kind_filter {
6526 Some(k) => Some(parse_kind(k)?),
6527 None => None,
6528 };
6529
6530 let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
6532
6533 let dispatch = |ev: &InboxEvent,
6534 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
6535 emitted_ids: &HashSet<String>|
6536 -> Result<bool> {
6537 if let Some(p) = peer_filter
6538 && ev.peer != p
6539 {
6540 return Ok(false);
6541 }
6542 if verified_only && !ev.verified {
6543 return Ok(false);
6544 }
6545 if let Some(want) = kind_num {
6546 let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
6547 if ev_kind != Some(want) {
6548 return Ok(false);
6549 }
6550 }
6551
6552 if max_chain_depth > 0 {
6556 let body_str = match &ev.raw["body"] {
6557 Value::String(s) => s.clone(),
6558 other => serde_json::to_string(other).unwrap_or_default(),
6559 };
6560 if let Some(referenced) = parse_re_marker(&body_str) {
6561 let matched = emitted_ids.contains(&referenced)
6564 || emitted_ids.iter().any(|full| full.starts_with(&referenced));
6565 if matched {
6566 eprintln!(
6567 "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
6568 ev.event_id, ev.peer, referenced
6569 );
6570 return Ok(false);
6571 }
6572 }
6573 }
6574
6575 if max_per_minute > 0 {
6577 let now = Instant::now();
6578 let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
6579 while let Some(&front) = win.front() {
6580 if now.duration_since(front) > Duration::from_secs(60) {
6581 win.pop_front();
6582 } else {
6583 break;
6584 }
6585 }
6586 if win.len() as u32 >= max_per_minute {
6587 eprintln!(
6588 "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
6589 ev.event_id, ev.peer, max_per_minute
6590 );
6591 return Ok(false);
6592 }
6593 win.push_back(now);
6594 }
6595
6596 if dry_run {
6597 println!("{}", serde_json::to_string(&ev.raw)?);
6598 return Ok(true);
6599 }
6600
6601 let mut child = Command::new("sh")
6602 .arg("-c")
6603 .arg(on_event)
6604 .stdin(Stdio::piped())
6605 .stdout(Stdio::inherit())
6606 .stderr(Stdio::inherit())
6607 .env("WIRE_EVENT_PEER", &ev.peer)
6608 .env("WIRE_EVENT_ID", &ev.event_id)
6609 .env("WIRE_EVENT_KIND", &ev.kind)
6610 .spawn()
6611 .with_context(|| format!("spawning reactor handler: {on_event}"))?;
6612 if let Some(mut stdin) = child.stdin.take() {
6613 let body = serde_json::to_vec(&ev.raw)?;
6614 let _ = stdin.write_all(&body);
6615 let _ = stdin.write_all(b"\n");
6616 }
6617 std::mem::drop(child);
6618 Ok(true)
6619 };
6620
6621 let scan_outbox = |emitted_ids: &mut HashSet<String>,
6623 outbox_cursors: &mut HashMap<String, u64>|
6624 -> Result<usize> {
6625 if !outbox_dir.exists() {
6626 return Ok(0);
6627 }
6628 let mut added = 0;
6629 let mut new_ids: Vec<String> = Vec::new();
6630 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
6631 let path = entry.path();
6632 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
6633 continue;
6634 }
6635 let peer = match path.file_stem().and_then(|s| s.to_str()) {
6636 Some(s) => s.to_string(),
6637 None => continue,
6638 };
6639 let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
6640 let start = *outbox_cursors.get(&peer).unwrap_or(&0);
6641 if cur_len <= start {
6642 outbox_cursors.insert(peer, start);
6643 continue;
6644 }
6645 let body = std::fs::read_to_string(&path).unwrap_or_default();
6646 let tail = &body[start as usize..];
6647 for line in tail.lines() {
6648 if let Ok(v) = serde_json::from_str::<Value>(line)
6649 && let Some(eid) = v.get("event_id").and_then(Value::as_str)
6650 && emitted_ids.insert(eid.to_string())
6651 {
6652 new_ids.push(eid.to_string());
6653 added += 1;
6654 }
6655 }
6656 outbox_cursors.insert(peer, cur_len);
6657 }
6658 if !new_ids.is_empty() {
6659 let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
6661 if all.len() > 500 {
6662 all.sort();
6663 let drop_n = all.len() - 500;
6664 let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
6665 emitted_ids.retain(|x| !dropped.contains(x));
6666 all = emitted_ids.iter().cloned().collect();
6667 }
6668 let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
6669 }
6670 Ok(added)
6671 };
6672
6673 let sweep = |watcher: &mut InboxWatcher,
6674 emitted_ids: &mut HashSet<String>,
6675 outbox_cursors: &mut HashMap<String, u64>,
6676 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
6677 -> Result<usize> {
6678 let _ = scan_outbox(emitted_ids, outbox_cursors);
6680
6681 let events = watcher.poll()?;
6682 let mut fired = 0usize;
6683 for ev in &events {
6684 match dispatch(ev, peer_dispatch_log, emitted_ids) {
6685 Ok(true) => fired += 1,
6686 Ok(false) => {}
6687 Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
6688 }
6689 }
6690 watcher.save_cursors(&cursor_path)?;
6691 Ok(fired)
6692 };
6693
6694 if once {
6695 sweep(
6696 &mut watcher,
6697 &mut emitted_ids,
6698 &mut outbox_cursors,
6699 &mut peer_dispatch_log,
6700 )?;
6701 return Ok(());
6702 }
6703 let interval = std::time::Duration::from_secs(interval_secs.max(1));
6704 loop {
6705 if let Err(e) = sweep(
6706 &mut watcher,
6707 &mut emitted_ids,
6708 &mut outbox_cursors,
6709 &mut peer_dispatch_log,
6710 ) {
6711 eprintln!("wire reactor: sweep error: {e}");
6712 }
6713 std::thread::sleep(interval);
6714 }
6715}
6716
6717fn parse_re_marker(body: &str) -> Option<String> {
6720 let needle = "(re:";
6721 let i = body.find(needle)?;
6722 let rest = &body[i + needle.len()..];
6723 let end = rest.find(')')?;
6724 let id = rest[..end].trim().to_string();
6725 if id.is_empty() {
6726 return None;
6727 }
6728 Some(id)
6729}
6730
6731fn cmd_notify(
6734 interval_secs: u64,
6735 peer_filter: Option<&str>,
6736 once: bool,
6737 as_json: bool,
6738) -> Result<()> {
6739 use crate::inbox_watch::InboxWatcher;
6740 let cursor_path = config::state_dir()?.join("notify.cursor");
6741 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6742
6743 let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
6744 let events = watcher.poll()?;
6745 for ev in events {
6746 if let Some(p) = peer_filter
6747 && ev.peer != p
6748 {
6749 continue;
6750 }
6751 if as_json {
6752 println!("{}", serde_json::to_string(&ev)?);
6753 } else {
6754 os_notify_inbox_event(&ev);
6755 }
6756 }
6757 watcher.save_cursors(&cursor_path)?;
6758 Ok(())
6759 };
6760
6761 if once {
6762 return sweep(&mut watcher);
6763 }
6764
6765 let interval = std::time::Duration::from_secs(interval_secs.max(1));
6766 loop {
6767 if let Err(e) = sweep(&mut watcher) {
6768 eprintln!("wire notify: sweep error: {e}");
6769 }
6770 std::thread::sleep(interval);
6771 }
6772}
6773
6774fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
6775 let title = if ev.verified {
6776 format!("wire ← {}", ev.peer)
6777 } else {
6778 format!("wire ← {} (UNVERIFIED)", ev.peer)
6779 };
6780 let body = format!("{}: {}", ev.kind, ev.body_preview);
6781 crate::os_notify::toast(&title, &body);
6782}
6783
6784#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
6785fn os_toast(title: &str, body: &str) {
6786 eprintln!("[wire notify] {title}\n {body}");
6787}
6788
6789