1use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21 agent_card::{build_agent_card, sign_agent_card},
22 config,
23 signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24 trust::{add_self_to_trust, empty_trust},
25};
26
27#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31 #[command(subcommand)]
32 pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37 Init {
39 handle: String,
41 #[arg(long)]
43 name: Option<String>,
44 #[arg(long)]
47 relay: Option<String>,
48 #[arg(long)]
50 json: bool,
51 },
52 Whoami {
56 #[arg(long)]
57 json: bool,
58 },
59 Peers {
61 #[arg(long)]
62 json: bool,
63 },
64 Send {
72 peer: String,
74 kind_or_body: String,
79 body: Option<String>,
83 #[arg(long)]
85 deadline: Option<String>,
86 #[arg(long)]
88 json: bool,
89 },
90 Tail {
92 peer: Option<String>,
94 #[arg(long)]
96 json: bool,
97 #[arg(long, default_value_t = 0)]
99 limit: usize,
100 },
101 Monitor {
112 #[arg(long)]
114 peer: Option<String>,
115 #[arg(long)]
117 json: bool,
118 #[arg(long)]
121 include_handshake: bool,
122 #[arg(long, default_value_t = 500)]
124 interval_ms: u64,
125 #[arg(long, default_value_t = 0)]
127 replay: usize,
128 },
129 Verify {
131 path: String,
133 #[arg(long)]
135 json: bool,
136 },
137 Mcp,
141 RelayServer {
143 #[arg(long, default_value = "127.0.0.1:8770")]
145 bind: String,
146 #[arg(long)]
154 local_only: bool,
155 },
156 BindRelay {
165 url: String,
167 #[arg(long)]
172 migrate_pinned: bool,
173 #[arg(long)]
174 json: bool,
175 },
176 AddPeerSlot {
179 handle: String,
181 url: String,
183 slot_id: String,
185 slot_token: String,
187 #[arg(long)]
188 json: bool,
189 },
190 Push {
192 peer: Option<String>,
194 #[arg(long)]
195 json: bool,
196 },
197 Pull {
199 #[arg(long)]
200 json: bool,
201 },
202 Status {
205 #[arg(long)]
207 peer: Option<String>,
208 #[arg(long)]
209 json: bool,
210 },
211 Responder {
213 #[command(subcommand)]
214 command: ResponderCommand,
215 },
216 Pin {
219 card_file: String,
221 #[arg(long)]
222 json: bool,
223 },
224 RotateSlot {
235 #[arg(long)]
238 no_announce: bool,
239 #[arg(long)]
240 json: bool,
241 },
242 ForgetPeer {
246 handle: String,
248 #[arg(long)]
250 purge: bool,
251 #[arg(long)]
252 json: bool,
253 },
254 Daemon {
258 #[arg(long, default_value_t = 5)]
260 interval: u64,
261 #[arg(long)]
263 once: bool,
264 #[arg(long)]
265 json: bool,
266 },
267 PairHost {
272 #[arg(long)]
274 relay: String,
275 #[arg(long)]
279 yes: bool,
280 #[arg(long, default_value_t = 300)]
282 timeout: u64,
283 #[arg(long)]
289 detach: bool,
290 #[arg(long)]
292 json: bool,
293 },
294 #[command(alias = "join")]
298 PairJoin {
299 code_phrase: String,
301 #[arg(long)]
303 relay: String,
304 #[arg(long)]
305 yes: bool,
306 #[arg(long, default_value_t = 300)]
307 timeout: u64,
308 #[arg(long)]
310 detach: bool,
311 #[arg(long)]
313 json: bool,
314 },
315 PairConfirm {
319 code_phrase: String,
321 digits: String,
323 #[arg(long)]
325 json: bool,
326 },
327 PairList {
329 #[arg(long)]
331 json: bool,
332 #[arg(long)]
336 watch: bool,
337 #[arg(long, default_value_t = 1)]
339 watch_interval: u64,
340 },
341 PairCancel {
343 code_phrase: String,
344 #[arg(long)]
345 json: bool,
346 },
347 PairWatch {
357 code_phrase: String,
358 #[arg(long, default_value = "sas_ready")]
360 status: String,
361 #[arg(long, default_value_t = 300)]
363 timeout: u64,
364 #[arg(long)]
366 json: bool,
367 },
368 Pair {
377 handle: String,
380 #[arg(long)]
383 code: Option<String>,
384 #[arg(long, default_value = "https://wireup.net")]
386 relay: String,
387 #[arg(long)]
389 yes: bool,
390 #[arg(long, default_value_t = 300)]
392 timeout: u64,
393 #[arg(long)]
396 no_setup: bool,
397 #[arg(long)]
402 detach: bool,
403 },
404 PairAbandon {
410 code_phrase: String,
412 #[arg(long, default_value = "https://wireup.net")]
414 relay: String,
415 },
416 PairAccept {
422 peer: String,
424 #[arg(long)]
426 json: bool,
427 },
428 PairReject {
435 peer: String,
437 #[arg(long)]
439 json: bool,
440 },
441 PairListInbound {
447 #[arg(long)]
449 json: bool,
450 },
451 #[command(subcommand)]
461 Session(SessionCommand),
462 Setup {
467 #[arg(long)]
469 apply: bool,
470 },
471 Whois {
475 handle: Option<String>,
477 #[arg(long)]
478 json: bool,
479 #[arg(long)]
482 relay: Option<String>,
483 },
484 Add {
490 handle: String,
492 #[arg(long)]
494 relay: Option<String>,
495 #[arg(long)]
496 json: bool,
497 },
498 Up {
508 handle: String,
511 #[arg(long)]
513 name: Option<String>,
514 #[arg(long)]
515 json: bool,
516 },
517 Doctor {
524 #[arg(long)]
526 json: bool,
527 #[arg(long, default_value_t = 5)]
529 recent_rejections: usize,
530 },
531 Upgrade {
536 #[arg(long)]
539 check: bool,
540 #[arg(long)]
541 json: bool,
542 },
543 Service {
548 #[command(subcommand)]
549 action: ServiceAction,
550 },
551 Diag {
556 #[command(subcommand)]
557 action: DiagAction,
558 },
559 Claim {
563 nick: String,
564 #[arg(long)]
566 relay: Option<String>,
567 #[arg(long)]
569 public_url: Option<String>,
570 #[arg(long)]
578 hidden: bool,
579 #[arg(long)]
580 json: bool,
581 },
582 Profile {
592 #[command(subcommand)]
593 action: ProfileAction,
594 },
595 Invite {
599 #[arg(long, default_value = "https://wireup.net")]
601 relay: String,
602 #[arg(long, default_value_t = 86_400)]
604 ttl: u64,
605 #[arg(long, default_value_t = 1)]
608 uses: u32,
609 #[arg(long)]
613 share: bool,
614 #[arg(long)]
616 json: bool,
617 },
618 Accept {
621 url: String,
623 #[arg(long)]
625 json: bool,
626 },
627 Reactor {
633 #[arg(long)]
635 on_event: String,
636 #[arg(long)]
638 peer: Option<String>,
639 #[arg(long)]
641 kind: Option<String>,
642 #[arg(long, default_value_t = true)]
644 verified_only: bool,
645 #[arg(long, default_value_t = 2)]
647 interval: u64,
648 #[arg(long)]
650 once: bool,
651 #[arg(long)]
653 dry_run: bool,
654 #[arg(long, default_value_t = 6)]
658 max_per_minute: u32,
659 #[arg(long, default_value_t = 1)]
663 max_chain_depth: u32,
664 },
665 Notify {
670 #[arg(long, default_value_t = 2)]
672 interval: u64,
673 #[arg(long)]
675 peer: Option<String>,
676 #[arg(long)]
678 once: bool,
679 #[arg(long)]
683 json: bool,
684 },
685}
686
687#[derive(Subcommand, Debug)]
688pub enum DiagAction {
689 Tail {
691 #[arg(long, default_value_t = 20)]
692 limit: usize,
693 #[arg(long)]
694 json: bool,
695 },
696 Enable,
699 Disable,
701 Status {
703 #[arg(long)]
704 json: bool,
705 },
706}
707
708#[derive(Subcommand, Debug)]
709pub enum SessionCommand {
710 New {
718 name: Option<String>,
720 #[arg(long, default_value = "https://wireup.net")]
722 relay: String,
723 #[arg(long)]
730 with_local: bool,
731 #[arg(long, default_value = "http://127.0.0.1:8771")]
735 local_relay: String,
736 #[arg(long)]
739 no_daemon: bool,
740 #[arg(long)]
742 json: bool,
743 },
744 List {
747 #[arg(long)]
748 json: bool,
749 },
750 ListLocal {
756 #[arg(long)]
757 json: bool,
758 },
759 Env {
763 name: Option<String>,
765 #[arg(long)]
766 json: bool,
767 },
768 Current {
772 #[arg(long)]
773 json: bool,
774 },
775 Destroy {
779 name: String,
780 #[arg(long)]
782 force: bool,
783 #[arg(long)]
784 json: bool,
785 },
786}
787
788#[derive(Subcommand, Debug)]
789pub enum ServiceAction {
790 Install {
793 #[arg(long)]
794 json: bool,
795 },
796 Uninstall {
800 #[arg(long)]
801 json: bool,
802 },
803 Status {
805 #[arg(long)]
806 json: bool,
807 },
808}
809
810#[derive(Subcommand, Debug)]
811pub enum ResponderCommand {
812 Set {
814 status: String,
816 #[arg(long)]
818 reason: Option<String>,
819 #[arg(long)]
821 json: bool,
822 },
823 Get {
825 peer: Option<String>,
827 #[arg(long)]
829 json: bool,
830 },
831}
832
833#[derive(Subcommand, Debug)]
834pub enum ProfileAction {
835 Set {
839 field: String,
840 value: String,
841 #[arg(long)]
842 json: bool,
843 },
844 Get {
846 #[arg(long)]
847 json: bool,
848 },
849 Clear {
851 field: String,
852 #[arg(long)]
853 json: bool,
854 },
855}
856
857pub fn run() -> Result<()> {
859 let cli = Cli::parse();
860 match cli.command {
861 Command::Init {
862 handle,
863 name,
864 relay,
865 json,
866 } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
867 Command::Status { peer, json } => {
868 if let Some(peer) = peer {
869 cmd_status_peer(&peer, json)
870 } else {
871 cmd_status(json)
872 }
873 }
874 Command::Whoami { json } => cmd_whoami(json),
875 Command::Peers { json } => cmd_peers(json),
876 Command::Send {
877 peer,
878 kind_or_body,
879 body,
880 deadline,
881 json,
882 } => {
883 let (kind, body) = match body {
886 Some(real_body) => (kind_or_body, real_body),
887 None => ("claim".to_string(), kind_or_body),
888 };
889 cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
890 }
891 Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
892 Command::Monitor {
893 peer,
894 json,
895 include_handshake,
896 interval_ms,
897 replay,
898 } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
899 Command::Verify { path, json } => cmd_verify(&path, json),
900 Command::Responder { command } => match command {
901 ResponderCommand::Set {
902 status,
903 reason,
904 json,
905 } => cmd_responder_set(&status, reason.as_deref(), json),
906 ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
907 },
908 Command::Mcp => cmd_mcp(),
909 Command::RelayServer { bind, local_only } => cmd_relay_server(&bind, local_only),
910 Command::BindRelay {
911 url,
912 migrate_pinned,
913 json,
914 } => cmd_bind_relay(&url, migrate_pinned, json),
915 Command::AddPeerSlot {
916 handle,
917 url,
918 slot_id,
919 slot_token,
920 json,
921 } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
922 Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
923 Command::Pull { json } => cmd_pull(json),
924 Command::Pin { card_file, json } => cmd_pin(&card_file, json),
925 Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
926 Command::ForgetPeer {
927 handle,
928 purge,
929 json,
930 } => cmd_forget_peer(&handle, purge, json),
931 Command::Daemon {
932 interval,
933 once,
934 json,
935 } => cmd_daemon(interval, once, json),
936 Command::PairHost {
937 relay,
938 yes,
939 timeout,
940 detach,
941 json,
942 } => {
943 if detach {
944 cmd_pair_host_detach(&relay, json)
945 } else {
946 cmd_pair_host(&relay, yes, timeout)
947 }
948 }
949 Command::PairJoin {
950 code_phrase,
951 relay,
952 yes,
953 timeout,
954 detach,
955 json,
956 } => {
957 if detach {
958 cmd_pair_join_detach(&code_phrase, &relay, json)
959 } else {
960 cmd_pair_join(&code_phrase, &relay, yes, timeout)
961 }
962 }
963 Command::PairConfirm {
964 code_phrase,
965 digits,
966 json,
967 } => cmd_pair_confirm(&code_phrase, &digits, json),
968 Command::PairList {
969 json,
970 watch,
971 watch_interval,
972 } => cmd_pair_list(json, watch, watch_interval),
973 Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
974 Command::PairWatch {
975 code_phrase,
976 status,
977 timeout,
978 json,
979 } => cmd_pair_watch(&code_phrase, &status, timeout, json),
980 Command::Pair {
981 handle,
982 code,
983 relay,
984 yes,
985 timeout,
986 no_setup,
987 detach,
988 } => {
989 if handle.contains('@') && code.is_none() {
996 cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
997 } else if detach {
998 cmd_pair_detach(&handle, code.as_deref(), &relay)
999 } else {
1000 cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
1001 }
1002 }
1003 Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
1004 Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
1005 Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
1006 Command::PairListInbound { json } => cmd_pair_list_inbound(json),
1007 Command::Session(cmd) => cmd_session(cmd),
1008 Command::Invite {
1009 relay,
1010 ttl,
1011 uses,
1012 share,
1013 json,
1014 } => cmd_invite(&relay, ttl, uses, share, json),
1015 Command::Accept { url, json } => cmd_accept(&url, json),
1016 Command::Whois {
1017 handle,
1018 json,
1019 relay,
1020 } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
1021 Command::Add {
1022 handle,
1023 relay,
1024 json,
1025 } => cmd_add(&handle, relay.as_deref(), json),
1026 Command::Up {
1027 handle,
1028 name,
1029 json,
1030 } => cmd_up(&handle, name.as_deref(), json),
1031 Command::Doctor {
1032 json,
1033 recent_rejections,
1034 } => cmd_doctor(json, recent_rejections),
1035 Command::Upgrade { check, json } => cmd_upgrade(check, json),
1036 Command::Service { action } => cmd_service(action),
1037 Command::Diag { action } => cmd_diag(action),
1038 Command::Claim {
1039 nick,
1040 relay,
1041 public_url,
1042 hidden,
1043 json,
1044 } => cmd_claim(
1045 &nick,
1046 relay.as_deref(),
1047 public_url.as_deref(),
1048 hidden,
1049 json,
1050 ),
1051 Command::Profile { action } => cmd_profile(action),
1052 Command::Setup { apply } => cmd_setup(apply),
1053 Command::Reactor {
1054 on_event,
1055 peer,
1056 kind,
1057 verified_only,
1058 interval,
1059 once,
1060 dry_run,
1061 max_per_minute,
1062 max_chain_depth,
1063 } => cmd_reactor(
1064 &on_event,
1065 peer.as_deref(),
1066 kind.as_deref(),
1067 verified_only,
1068 interval,
1069 once,
1070 dry_run,
1071 max_per_minute,
1072 max_chain_depth,
1073 ),
1074 Command::Notify {
1075 interval,
1076 peer,
1077 once,
1078 json,
1079 } => cmd_notify(interval, peer.as_deref(), once, json),
1080 }
1081}
1082
1083fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
1086 if !handle
1087 .chars()
1088 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1089 {
1090 bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
1091 }
1092 if config::is_initialized()? {
1093 bail!(
1094 "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
1095 config::config_dir()?
1096 );
1097 }
1098
1099 config::ensure_dirs()?;
1100 let (sk_seed, pk_bytes) = generate_keypair();
1101 config::write_private_key(&sk_seed)?;
1102
1103 let card = build_agent_card(handle, &pk_bytes, name, None, None);
1104 let signed = sign_agent_card(&card, &sk_seed);
1105 config::write_agent_card(&signed)?;
1106
1107 let mut trust = empty_trust();
1108 add_self_to_trust(&mut trust, handle, &pk_bytes);
1109 config::write_trust(&trust)?;
1110
1111 let fp = fingerprint(&pk_bytes);
1112 let key_id = make_key_id(handle, &pk_bytes);
1113
1114 let mut relay_info: Option<(String, String)> = None;
1116 if let Some(url) = relay {
1117 let normalized = url.trim_end_matches('/');
1118 let client = crate::relay_client::RelayClient::new(normalized);
1119 client.check_healthz()?;
1120 let alloc = client.allocate_slot(Some(handle))?;
1121 let mut state = config::read_relay_state()?;
1122 state["self"] = json!({
1123 "relay_url": normalized,
1124 "slot_id": alloc.slot_id.clone(),
1125 "slot_token": alloc.slot_token,
1126 });
1127 config::write_relay_state(&state)?;
1128 relay_info = Some((normalized.to_string(), alloc.slot_id));
1129 }
1130
1131 let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
1132 if as_json {
1133 let mut out = json!({
1134 "did": did_str.clone(),
1135 "fingerprint": fp,
1136 "key_id": key_id,
1137 "config_dir": config::config_dir()?.to_string_lossy(),
1138 });
1139 if let Some((url, slot_id)) = &relay_info {
1140 out["relay_url"] = json!(url);
1141 out["slot_id"] = json!(slot_id);
1142 }
1143 println!("{}", serde_json::to_string(&out)?);
1144 } else {
1145 println!("generated {did_str} (ed25519:{key_id})");
1146 println!(
1147 "config written to {}",
1148 config::config_dir()?.to_string_lossy()
1149 );
1150 if let Some((url, slot_id)) = &relay_info {
1151 println!("bound to relay {url} (slot {slot_id})");
1152 println!();
1153 println!(
1154 "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
1155 );
1156 } else {
1157 println!();
1158 println!(
1159 "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
1160 );
1161 }
1162 }
1163 Ok(())
1164}
1165
1166fn cmd_status(as_json: bool) -> Result<()> {
1169 let initialized = config::is_initialized()?;
1170
1171 let mut summary = json!({
1172 "initialized": initialized,
1173 });
1174
1175 if initialized {
1176 let card = config::read_agent_card()?;
1177 let did = card
1178 .get("did")
1179 .and_then(Value::as_str)
1180 .unwrap_or("")
1181 .to_string();
1182 let handle = card
1186 .get("handle")
1187 .and_then(Value::as_str)
1188 .map(str::to_string)
1189 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1190 let pk_b64 = card
1191 .get("verify_keys")
1192 .and_then(Value::as_object)
1193 .and_then(|m| m.values().next())
1194 .and_then(|v| v.get("key"))
1195 .and_then(Value::as_str)
1196 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1197 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1198 summary["did"] = json!(did);
1199 summary["handle"] = json!(handle);
1200 summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1201 summary["capabilities"] = card
1202 .get("capabilities")
1203 .cloned()
1204 .unwrap_or_else(|| json!([]));
1205
1206 let trust = config::read_trust()?;
1207 let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1208 let mut peers = Vec::new();
1209 if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1210 for (peer_handle, _agent) in agents {
1211 if peer_handle == &handle {
1212 continue; }
1214 peers.push(json!({
1219 "handle": peer_handle,
1220 "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1221 }));
1222 }
1223 }
1224 summary["peers"] = json!(peers);
1225
1226 let relay_state = config::read_relay_state()?;
1227 summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1228 if !summary["self_relay"].is_null() {
1229 if let Some(obj) = summary["self_relay"].as_object_mut() {
1231 obj.remove("slot_token");
1232 }
1233 }
1234 summary["peer_slots_count"] = json!(
1235 relay_state
1236 .get("peers")
1237 .and_then(Value::as_object)
1238 .map(|m| m.len())
1239 .unwrap_or(0)
1240 );
1241
1242 let outbox = config::outbox_dir()?;
1244 let inbox = config::inbox_dir()?;
1245 summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1246 summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1247
1248 let snap = crate::ensure_up::daemon_liveness();
1254 let mut daemon = json!({
1255 "running": snap.pidfile_alive,
1256 "pid": snap.pidfile_pid,
1257 "all_running_pids": snap.pgrep_pids,
1258 "orphans": snap.orphan_pids,
1259 });
1260 if let crate::ensure_up::PidRecord::Json(d) = &snap.record {
1261 daemon["version"] = json!(d.version);
1262 daemon["bin_path"] = json!(d.bin_path);
1263 daemon["did"] = json!(d.did);
1264 daemon["relay_url"] = json!(d.relay_url);
1265 daemon["started_at"] = json!(d.started_at);
1266 daemon["schema"] = json!(d.schema);
1267 if d.version != env!("CARGO_PKG_VERSION") {
1268 daemon["version_mismatch"] = json!({
1269 "daemon": d.version.clone(),
1270 "cli": env!("CARGO_PKG_VERSION"),
1271 });
1272 }
1273 } else if matches!(snap.record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1274 daemon["pidfile_form"] = json!("legacy-int");
1275 daemon["version_mismatch"] = json!({
1276 "daemon": "<pre-0.5.11>",
1277 "cli": env!("CARGO_PKG_VERSION"),
1278 });
1279 }
1280 summary["daemon"] = daemon;
1281
1282 let pending = crate::pending_pair::list_pending().unwrap_or_default();
1284 let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1285 for p in &pending {
1286 *counts.entry(p.status.clone()).or_default() += 1;
1287 }
1288 let pending_inbound =
1290 crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
1291 let inbound_handles: Vec<&str> = pending_inbound
1292 .iter()
1293 .map(|p| p.peer_handle.as_str())
1294 .collect();
1295 summary["pending_pairs"] = json!({
1296 "total": pending.len(),
1297 "by_status": counts,
1298 "inbound_count": pending_inbound.len(),
1299 "inbound_handles": inbound_handles,
1300 });
1301 }
1302
1303 if as_json {
1304 println!("{}", serde_json::to_string(&summary)?);
1305 } else if !initialized {
1306 println!("not initialized — run `wire init <handle>` first");
1307 } else {
1308 println!("did: {}", summary["did"].as_str().unwrap_or("?"));
1309 println!(
1310 "fingerprint: {}",
1311 summary["fingerprint"].as_str().unwrap_or("?")
1312 );
1313 println!("capabilities: {}", summary["capabilities"]);
1314 if !summary["self_relay"].is_null() {
1315 println!(
1316 "self relay: {} (slot {})",
1317 summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1318 summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1319 );
1320 } else {
1321 println!("self relay: (not bound — run `wire pair-host --relay <url>` to bind)");
1322 }
1323 println!(
1324 "peers: {}",
1325 summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1326 );
1327 for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1328 println!(
1329 " - {:<20} tier={}",
1330 p["handle"].as_str().unwrap_or(""),
1331 p["tier"].as_str().unwrap_or("?")
1332 );
1333 }
1334 println!(
1335 "outbox: {} file(s), {} event(s) queued",
1336 summary["outbox"]["files"].as_u64().unwrap_or(0),
1337 summary["outbox"]["events"].as_u64().unwrap_or(0)
1338 );
1339 println!(
1340 "inbox: {} file(s), {} event(s) received",
1341 summary["inbox"]["files"].as_u64().unwrap_or(0),
1342 summary["inbox"]["events"].as_u64().unwrap_or(0)
1343 );
1344 let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1345 let daemon_pid = summary["daemon"]["pid"]
1346 .as_u64()
1347 .map(|p| p.to_string())
1348 .unwrap_or_else(|| "—".to_string());
1349 let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1350 let version_suffix = if !daemon_version.is_empty() {
1351 format!(" v{daemon_version}")
1352 } else {
1353 String::new()
1354 };
1355 println!(
1356 "daemon: {} (pid {}{})",
1357 if daemon_running { "running" } else { "DOWN" },
1358 daemon_pid,
1359 version_suffix,
1360 );
1361 if let Some(mm) = summary["daemon"].get("version_mismatch") {
1363 println!(
1364 " !! version mismatch: daemon={} CLI={}. \
1365 run `wire upgrade` to swap atomically.",
1366 mm["daemon"].as_str().unwrap_or("?"),
1367 mm["cli"].as_str().unwrap_or("?"),
1368 );
1369 }
1370 if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1371 && !orphans.is_empty()
1372 {
1373 let pids: Vec<String> = orphans
1374 .iter()
1375 .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1376 .collect();
1377 println!(
1378 " !! orphan daemon process(es): pids {}. \
1379 pgrep saw them but pidfile didn't — likely stale process from \
1380 prior install. Multiple daemons race the relay cursor.",
1381 pids.join(", ")
1382 );
1383 }
1384 let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1385 let inbound_count = summary["pending_pairs"]["inbound_count"]
1386 .as_u64()
1387 .unwrap_or(0);
1388 if pending_total > 0 {
1389 print!("pending pairs: {pending_total}");
1390 if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1391 let parts: Vec<String> = obj
1392 .iter()
1393 .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1394 .collect();
1395 if !parts.is_empty() {
1396 print!(" ({})", parts.join(", "));
1397 }
1398 }
1399 println!();
1400 } else if inbound_count == 0 {
1401 println!("pending pairs: none");
1402 }
1403 if inbound_count > 0 {
1407 let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
1408 .as_array()
1409 .map(|a| {
1410 a.iter()
1411 .filter_map(|v| v.as_str().map(str::to_string))
1412 .collect()
1413 })
1414 .unwrap_or_default();
1415 println!(
1416 "inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
1417 handles.join(", "),
1418 );
1419 }
1420 }
1421 Ok(())
1422}
1423
1424fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1425 if !dir.exists() {
1426 return Ok(json!({"files": 0, "events": 0}));
1427 }
1428 let mut files = 0usize;
1429 let mut events = 0usize;
1430 for entry in std::fs::read_dir(dir)? {
1431 let path = entry?.path();
1432 if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1433 files += 1;
1434 if let Ok(body) = std::fs::read_to_string(&path) {
1435 events += body.lines().filter(|l| !l.trim().is_empty()).count();
1436 }
1437 }
1438 }
1439 Ok(json!({"files": files, "events": events}))
1440}
1441
1442fn responder_status_allowed(status: &str) -> bool {
1445 matches!(
1446 status,
1447 "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1448 )
1449}
1450
1451fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1452 let state = config::read_relay_state()?;
1453 let (label, slot_info) = match peer {
1454 Some(peer) => (
1455 peer.to_string(),
1456 state
1457 .get("peers")
1458 .and_then(|p| p.get(peer))
1459 .ok_or_else(|| {
1460 anyhow!(
1461 "unknown peer {peer:?} in relay state — pair with them first:\n \
1462 wire add {peer}@wireup.net (or {peer}@<their-relay>)\n\
1463 (`wire peers` lists who you've already paired with.)"
1464 )
1465 })?,
1466 ),
1467 None => (
1468 "self".to_string(),
1469 state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1470 anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1471 })?,
1472 ),
1473 };
1474 let relay_url = slot_info["relay_url"]
1475 .as_str()
1476 .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1477 .to_string();
1478 let slot_id = slot_info["slot_id"]
1479 .as_str()
1480 .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1481 .to_string();
1482 let slot_token = slot_info["slot_token"]
1483 .as_str()
1484 .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1485 .to_string();
1486 Ok((label, relay_url, slot_id, slot_token))
1487}
1488
1489fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1490 if !responder_status_allowed(status) {
1491 bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1492 }
1493 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1494 let now = time::OffsetDateTime::now_utc()
1495 .format(&time::format_description::well_known::Rfc3339)
1496 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1497 let mut record = json!({
1498 "status": status,
1499 "set_at": now,
1500 });
1501 if let Some(reason) = reason {
1502 record["reason"] = json!(reason);
1503 }
1504 if status == "online" {
1505 record["last_success_at"] = json!(now);
1506 }
1507 let client = crate::relay_client::RelayClient::new(&relay_url);
1508 let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1509 if as_json {
1510 println!("{}", serde_json::to_string(&saved)?);
1511 } else {
1512 let reason = saved
1513 .get("reason")
1514 .and_then(Value::as_str)
1515 .map(|r| format!(" — {r}"))
1516 .unwrap_or_default();
1517 println!(
1518 "responder {}{}",
1519 saved
1520 .get("status")
1521 .and_then(Value::as_str)
1522 .unwrap_or(status),
1523 reason
1524 );
1525 }
1526 Ok(())
1527}
1528
1529fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1530 let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1531 let client = crate::relay_client::RelayClient::new(&relay_url);
1532 let health = client.responder_health_get(&slot_id, &slot_token)?;
1533 if as_json {
1534 println!(
1535 "{}",
1536 serde_json::to_string(&json!({
1537 "target": label,
1538 "responder_health": health,
1539 }))?
1540 );
1541 } else if health.is_null() {
1542 println!("{label}: responder health not reported");
1543 } else {
1544 let status = health
1545 .get("status")
1546 .and_then(Value::as_str)
1547 .unwrap_or("unknown");
1548 let reason = health
1549 .get("reason")
1550 .and_then(Value::as_str)
1551 .map(|r| format!(" — {r}"))
1552 .unwrap_or_default();
1553 let last_success = health
1554 .get("last_success_at")
1555 .and_then(Value::as_str)
1556 .map(|t| format!(" (last_success: {t})"))
1557 .unwrap_or_default();
1558 println!("{label}: {status}{reason}{last_success}");
1559 }
1560 Ok(())
1561}
1562
1563fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1564 let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1565 let client = crate::relay_client::RelayClient::new(&relay_url);
1566
1567 let started = std::time::Instant::now();
1568 let transport_ok = client.healthz().unwrap_or(false);
1569 let latency_ms = started.elapsed().as_millis() as u64;
1570
1571 let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1572 let now = std::time::SystemTime::now()
1573 .duration_since(std::time::UNIX_EPOCH)
1574 .map(|d| d.as_secs())
1575 .unwrap_or(0);
1576 let attention = match last_pull_at_unix {
1577 Some(last) if now.saturating_sub(last) <= 300 => json!({
1578 "status": "ok",
1579 "last_pull_at_unix": last,
1580 "age_seconds": now.saturating_sub(last),
1581 "event_count": event_count,
1582 }),
1583 Some(last) => json!({
1584 "status": "stale",
1585 "last_pull_at_unix": last,
1586 "age_seconds": now.saturating_sub(last),
1587 "event_count": event_count,
1588 }),
1589 None => json!({
1590 "status": "never_pulled",
1591 "last_pull_at_unix": Value::Null,
1592 "event_count": event_count,
1593 }),
1594 };
1595
1596 let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1597 let responder = if responder_health.is_null() {
1598 json!({"status": "not_reported", "record": Value::Null})
1599 } else {
1600 json!({
1601 "status": responder_health
1602 .get("status")
1603 .and_then(Value::as_str)
1604 .unwrap_or("unknown"),
1605 "record": responder_health,
1606 })
1607 };
1608
1609 let report = json!({
1610 "peer": peer,
1611 "transport": {
1612 "status": if transport_ok { "ok" } else { "error" },
1613 "relay_url": relay_url,
1614 "latency_ms": latency_ms,
1615 },
1616 "attention": attention,
1617 "responder": responder,
1618 });
1619
1620 if as_json {
1621 println!("{}", serde_json::to_string(&report)?);
1622 } else {
1623 let transport_line = if transport_ok {
1624 format!("ok relay reachable ({latency_ms}ms)")
1625 } else {
1626 "error relay unreachable".to_string()
1627 };
1628 println!("transport {transport_line}");
1629 match report["attention"]["status"].as_str().unwrap_or("unknown") {
1630 "ok" => println!(
1631 "attention ok last pull {}s ago",
1632 report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1633 ),
1634 "stale" => println!(
1635 "attention stale last pull {}m ago",
1636 report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1637 ),
1638 "never_pulled" => println!("attention never pulled since relay reset"),
1639 other => println!("attention {other}"),
1640 }
1641 if report["responder"]["status"] == "not_reported" {
1642 println!("auto-responder not reported");
1643 } else {
1644 let record = &report["responder"]["record"];
1645 let status = record
1646 .get("status")
1647 .and_then(Value::as_str)
1648 .unwrap_or("unknown");
1649 let reason = record
1650 .get("reason")
1651 .and_then(Value::as_str)
1652 .map(|r| format!(" — {r}"))
1653 .unwrap_or_default();
1654 println!("auto-responder {status}{reason}");
1655 }
1656 }
1657 Ok(())
1658}
1659
1660fn cmd_whoami(as_json: bool) -> Result<()> {
1665 if !config::is_initialized()? {
1666 bail!("not initialized — run `wire init <handle>` first");
1667 }
1668 let card = config::read_agent_card()?;
1669 let did = card
1670 .get("did")
1671 .and_then(Value::as_str)
1672 .unwrap_or("")
1673 .to_string();
1674 let handle = card
1675 .get("handle")
1676 .and_then(Value::as_str)
1677 .map(str::to_string)
1678 .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1679 let pk_b64 = card
1680 .get("verify_keys")
1681 .and_then(Value::as_object)
1682 .and_then(|m| m.values().next())
1683 .and_then(|v| v.get("key"))
1684 .and_then(Value::as_str)
1685 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1686 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1687 let fp = fingerprint(&pk_bytes);
1688 let key_id = make_key_id(&handle, &pk_bytes);
1689 let capabilities = card
1690 .get("capabilities")
1691 .cloned()
1692 .unwrap_or_else(|| json!(["wire/v3.1"]));
1693
1694 if as_json {
1695 println!(
1696 "{}",
1697 serde_json::to_string(&json!({
1698 "did": did,
1699 "handle": handle,
1700 "fingerprint": fp,
1701 "key_id": key_id,
1702 "public_key_b64": pk_b64,
1703 "capabilities": capabilities,
1704 "config_dir": config::config_dir()?.to_string_lossy(),
1705 }))?
1706 );
1707 } else {
1708 println!("{did} (ed25519:{key_id})");
1709 println!("fingerprint: {fp}");
1710 println!("capabilities: {capabilities}");
1711 }
1712 Ok(())
1713}
1714
1715fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1730 let raw = crate::trust::get_tier(trust, handle);
1731 if raw != "VERIFIED" {
1732 return raw.to_string();
1733 }
1734 let token = relay_state
1735 .get("peers")
1736 .and_then(|p| p.get(handle))
1737 .and_then(|p| p.get("slot_token"))
1738 .and_then(Value::as_str)
1739 .unwrap_or("");
1740 if token.is_empty() {
1741 "PENDING_ACK".to_string()
1742 } else {
1743 raw.to_string()
1744 }
1745}
1746
1747fn cmd_peers(as_json: bool) -> Result<()> {
1748 let trust = config::read_trust()?;
1749 let agents = trust
1750 .get("agents")
1751 .and_then(Value::as_object)
1752 .cloned()
1753 .unwrap_or_default();
1754 let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1755
1756 let mut self_did: Option<String> = None;
1757 if let Ok(card) = config::read_agent_card() {
1758 self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1759 }
1760
1761 let mut peers = Vec::new();
1762 for (handle, agent) in agents.iter() {
1763 let did = agent
1764 .get("did")
1765 .and_then(Value::as_str)
1766 .unwrap_or("")
1767 .to_string();
1768 if Some(did.as_str()) == self_did.as_deref() {
1769 continue; }
1771 let tier = effective_peer_tier(&trust, &relay_state, handle);
1772 let capabilities = agent
1773 .get("card")
1774 .and_then(|c| c.get("capabilities"))
1775 .cloned()
1776 .unwrap_or_else(|| json!([]));
1777 peers.push(json!({
1778 "handle": handle,
1779 "did": did,
1780 "tier": tier,
1781 "capabilities": capabilities,
1782 }));
1783 }
1784
1785 if as_json {
1786 println!("{}", serde_json::to_string(&peers)?);
1787 } else if peers.is_empty() {
1788 println!("no peers pinned (run `wire join <code>` to pair)");
1789 } else {
1790 for p in &peers {
1791 println!(
1792 "{:<20} {:<10} {}",
1793 p["handle"].as_str().unwrap_or(""),
1794 p["tier"].as_str().unwrap_or(""),
1795 p["did"].as_str().unwrap_or(""),
1796 );
1797 }
1798 }
1799 Ok(())
1800}
1801
1802fn maybe_warn_peer_attentiveness(peer: &str) {
1812 let state = match config::read_relay_state() {
1813 Ok(s) => s,
1814 Err(_) => return,
1815 };
1816 let p = state.get("peers").and_then(|p| p.get(peer));
1817 let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1818 Some(s) if !s.is_empty() => s,
1819 _ => return,
1820 };
1821 let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1822 Some(s) if !s.is_empty() => s,
1823 _ => return,
1824 };
1825 let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1826 Some(s) if !s.is_empty() => s.to_string(),
1827 _ => match state
1828 .get("self")
1829 .and_then(|s| s.get("relay_url"))
1830 .and_then(Value::as_str)
1831 {
1832 Some(s) if !s.is_empty() => s.to_string(),
1833 _ => return,
1834 },
1835 };
1836 let client = crate::relay_client::RelayClient::new(&relay_url);
1837 let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1838 Ok(t) => t,
1839 Err(_) => return,
1840 };
1841 let now = std::time::SystemTime::now()
1842 .duration_since(std::time::UNIX_EPOCH)
1843 .map(|d| d.as_secs())
1844 .unwrap_or(0);
1845 match last_pull {
1846 None => {
1847 eprintln!(
1848 "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1849 );
1850 }
1851 Some(t) if now.saturating_sub(t) > 300 => {
1852 let mins = now.saturating_sub(t) / 60;
1853 eprintln!(
1854 "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1855 );
1856 }
1857 _ => {}
1858 }
1859}
1860
1861pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1862 let trimmed = input.trim();
1863 if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1864 {
1865 return Ok(trimmed.to_string());
1866 }
1867 let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1868 let n: i64 = amount
1869 .parse()
1870 .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1871 if n <= 0 {
1872 bail!("deadline duration must be positive: {input:?}");
1873 }
1874 let duration = match unit {
1875 "m" => time::Duration::minutes(n),
1876 "h" => time::Duration::hours(n),
1877 "d" => time::Duration::days(n),
1878 _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1879 };
1880 Ok((time::OffsetDateTime::now_utc() + duration)
1881 .format(&time::format_description::well_known::Rfc3339)
1882 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1883}
1884
1885fn cmd_send(
1886 peer: &str,
1887 kind: &str,
1888 body_arg: &str,
1889 deadline: Option<&str>,
1890 as_json: bool,
1891) -> Result<()> {
1892 if !config::is_initialized()? {
1893 bail!("not initialized — run `wire init <handle>` first");
1894 }
1895 let peer = crate::agent_card::bare_handle(peer);
1896 let sk_seed = config::read_private_key()?;
1897 let card = config::read_agent_card()?;
1898 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1899 let handle = crate::agent_card::display_handle_from_did(did).to_string();
1900 let pk_b64 = card
1901 .get("verify_keys")
1902 .and_then(Value::as_object)
1903 .and_then(|m| m.values().next())
1904 .and_then(|v| v.get("key"))
1905 .and_then(Value::as_str)
1906 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1907 let pk_bytes = crate::signing::b64decode(pk_b64)?;
1908
1909 let body_value: Value = if body_arg == "-" {
1914 use std::io::Read;
1915 let mut raw = String::new();
1916 std::io::stdin()
1917 .read_to_string(&mut raw)
1918 .with_context(|| "reading body from stdin")?;
1919 serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1922 } else if let Some(path) = body_arg.strip_prefix('@') {
1923 let raw =
1924 std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1925 serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1926 } else {
1927 Value::String(body_arg.to_string())
1928 };
1929
1930 let kind_id = parse_kind(kind)?;
1931
1932 let now = time::OffsetDateTime::now_utc()
1933 .format(&time::format_description::well_known::Rfc3339)
1934 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1935
1936 let mut event = json!({
1937 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1938 "timestamp": now,
1939 "from": did,
1940 "to": format!("did:wire:{peer}"),
1941 "type": kind,
1942 "kind": kind_id,
1943 "body": body_value,
1944 });
1945 if let Some(deadline) = deadline {
1946 event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1947 }
1948 let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1949 let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1950
1951 maybe_warn_peer_attentiveness(peer);
1956
1957 let line = serde_json::to_vec(&signed)?;
1962 let outbox = config::append_outbox_record(peer, &line)?;
1963
1964 if as_json {
1965 println!(
1966 "{}",
1967 serde_json::to_string(&json!({
1968 "event_id": event_id,
1969 "status": "queued",
1970 "peer": peer,
1971 "outbox": outbox.to_string_lossy(),
1972 }))?
1973 );
1974 } else {
1975 println!(
1976 "queued event {event_id} → {peer} (outbox: {})",
1977 outbox.display()
1978 );
1979 }
1980 Ok(())
1981}
1982
1983fn parse_kind(s: &str) -> Result<u32> {
1984 if let Ok(n) = s.parse::<u32>() {
1985 return Ok(n);
1986 }
1987 for (id, name) in crate::signing::kinds() {
1988 if *name == s {
1989 return Ok(*id);
1990 }
1991 }
1992 Ok(1)
1994}
1995
1996fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
1999 let inbox = config::inbox_dir()?;
2000 if !inbox.exists() {
2001 if !as_json {
2002 eprintln!("no inbox yet — daemon hasn't run, or no events received");
2003 }
2004 return Ok(());
2005 }
2006 let trust = config::read_trust()?;
2007 let mut count = 0usize;
2008
2009 let entries: Vec<_> = std::fs::read_dir(&inbox)?
2010 .filter_map(|e| e.ok())
2011 .map(|e| e.path())
2012 .filter(|p| {
2013 p.extension().map(|x| x == "jsonl").unwrap_or(false)
2014 && match peer {
2015 Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
2016 None => true,
2017 }
2018 })
2019 .collect();
2020
2021 for path in entries {
2022 let body = std::fs::read_to_string(&path)?;
2023 for line in body.lines() {
2024 let event: Value = match serde_json::from_str(line) {
2025 Ok(v) => v,
2026 Err(_) => continue,
2027 };
2028 let verified = verify_message_v31(&event, &trust).is_ok();
2029 if as_json {
2030 let mut event_with_meta = event.clone();
2031 if let Some(obj) = event_with_meta.as_object_mut() {
2032 obj.insert("verified".into(), json!(verified));
2033 }
2034 println!("{}", serde_json::to_string(&event_with_meta)?);
2035 } else {
2036 let ts = event
2037 .get("timestamp")
2038 .and_then(Value::as_str)
2039 .unwrap_or("?");
2040 let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
2041 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
2042 let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
2043 let summary = event
2044 .get("body")
2045 .map(|b| match b {
2046 Value::String(s) => s.clone(),
2047 _ => b.to_string(),
2048 })
2049 .unwrap_or_default();
2050 let mark = if verified { "✓" } else { "✗" };
2051 let deadline = event
2052 .get("time_sensitive_until")
2053 .and_then(Value::as_str)
2054 .map(|d| format!(" deadline: {d}"))
2055 .unwrap_or_default();
2056 println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
2057 }
2058 count += 1;
2059 if limit > 0 && count >= limit {
2060 return Ok(());
2061 }
2062 }
2063 }
2064 Ok(())
2065}
2066
2067fn monitor_is_noise_kind(kind: &str) -> bool {
2073 matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
2074}
2075
2076fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
2080 if as_json {
2081 Ok(serde_json::to_string(e)?)
2082 } else {
2083 let eid_short: String = e.event_id.chars().take(12).collect();
2084 let body = e.body_preview.replace('\n', " ");
2085 let ts: String = e.timestamp.chars().take(19).collect();
2086 Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
2087 }
2088}
2089
2090fn cmd_monitor(
2106 peer_filter: Option<&str>,
2107 as_json: bool,
2108 include_handshake: bool,
2109 interval_ms: u64,
2110 replay: usize,
2111) -> Result<()> {
2112 let inbox_dir = config::inbox_dir()?;
2113 if !inbox_dir.exists() {
2114 if !as_json {
2115 eprintln!(
2116 "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
2117 );
2118 }
2119 }
2121
2122 if replay > 0 && inbox_dir.exists() {
2126 let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
2127 for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
2128 let path = entry.path();
2129 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2130 continue;
2131 }
2132 let peer = match path.file_stem().and_then(|s| s.to_str()) {
2133 Some(s) => s.to_string(),
2134 None => continue,
2135 };
2136 if let Some(filter) = peer_filter {
2137 if peer != filter {
2138 continue;
2139 }
2140 }
2141 let body = std::fs::read_to_string(&path).unwrap_or_default();
2142 for line in body.lines() {
2143 let line = line.trim();
2144 if line.is_empty() {
2145 continue;
2146 }
2147 let signed: Value = match serde_json::from_str(line) {
2148 Ok(v) => v,
2149 Err(_) => continue,
2150 };
2151 let ev = crate::inbox_watch::InboxEvent::from_signed(
2152 &peer,
2153 signed,
2154 true,
2155 );
2156 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2157 continue;
2158 }
2159 all.push(ev);
2160 }
2161 }
2162 all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2165 let start = all.len().saturating_sub(replay);
2166 for ev in &all[start..] {
2167 println!("{}", monitor_render(ev, as_json)?);
2168 }
2169 use std::io::Write;
2170 std::io::stdout().flush().ok();
2171 }
2172
2173 let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2176 let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2177
2178 loop {
2179 let events = w.poll()?;
2180 let mut wrote = false;
2181 for ev in events {
2182 if let Some(filter) = peer_filter {
2183 if ev.peer != filter {
2184 continue;
2185 }
2186 }
2187 if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2188 continue;
2189 }
2190 println!("{}", monitor_render(&ev, as_json)?);
2191 wrote = true;
2192 }
2193 if wrote {
2194 use std::io::Write;
2195 std::io::stdout().flush().ok();
2196 }
2197 std::thread::sleep(sleep_dur);
2198 }
2199}
2200
2201#[cfg(test)]
2202mod tier_tests {
2203 use super::*;
2204 use serde_json::json;
2205
2206 fn trust_with(handle: &str, tier: &str) -> Value {
2207 json!({
2208 "version": 1,
2209 "agents": {
2210 handle: {
2211 "tier": tier,
2212 "did": format!("did:wire:{handle}"),
2213 "card": {"capabilities": ["wire/v3.1"]}
2214 }
2215 }
2216 })
2217 }
2218
2219 #[test]
2220 fn pending_ack_when_verified_but_no_slot_token() {
2221 let trust = trust_with("willard", "VERIFIED");
2225 let relay_state = json!({
2226 "peers": {
2227 "willard": {
2228 "relay_url": "https://relay",
2229 "slot_id": "abc",
2230 "slot_token": "",
2231 }
2232 }
2233 });
2234 assert_eq!(
2235 effective_peer_tier(&trust, &relay_state, "willard"),
2236 "PENDING_ACK"
2237 );
2238 }
2239
2240 #[test]
2241 fn verified_when_slot_token_present() {
2242 let trust = trust_with("willard", "VERIFIED");
2243 let relay_state = json!({
2244 "peers": {
2245 "willard": {
2246 "relay_url": "https://relay",
2247 "slot_id": "abc",
2248 "slot_token": "tok123",
2249 }
2250 }
2251 });
2252 assert_eq!(
2253 effective_peer_tier(&trust, &relay_state, "willard"),
2254 "VERIFIED"
2255 );
2256 }
2257
2258 #[test]
2259 fn raw_tier_passes_through_for_non_verified() {
2260 let trust = trust_with("willard", "UNTRUSTED");
2263 let relay_state = json!({
2264 "peers": {"willard": {"slot_token": ""}}
2265 });
2266 assert_eq!(
2267 effective_peer_tier(&trust, &relay_state, "willard"),
2268 "UNTRUSTED"
2269 );
2270 }
2271
2272 #[test]
2273 fn pending_ack_when_relay_state_missing_peer() {
2274 let trust = trust_with("willard", "VERIFIED");
2278 let relay_state = json!({"peers": {}});
2279 assert_eq!(
2280 effective_peer_tier(&trust, &relay_state, "willard"),
2281 "PENDING_ACK"
2282 );
2283 }
2284}
2285
2286#[cfg(test)]
2287mod monitor_tests {
2288 use super::*;
2289 use crate::inbox_watch::InboxEvent;
2290 use serde_json::Value;
2291
2292 fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2293 InboxEvent {
2294 peer: peer.to_string(),
2295 event_id: "abcd1234567890ef".to_string(),
2296 kind: kind.to_string(),
2297 body_preview: body.to_string(),
2298 verified: true,
2299 timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2300 raw: Value::Null,
2301 }
2302 }
2303
2304 #[test]
2305 fn monitor_filter_drops_handshake_kinds_by_default() {
2306 assert!(monitor_is_noise_kind("pair_drop"));
2311 assert!(monitor_is_noise_kind("pair_drop_ack"));
2312 assert!(monitor_is_noise_kind("heartbeat"));
2313
2314 assert!(!monitor_is_noise_kind("claim"));
2316 assert!(!monitor_is_noise_kind("decision"));
2317 assert!(!monitor_is_noise_kind("ack"));
2318 assert!(!monitor_is_noise_kind("request"));
2319 assert!(!monitor_is_noise_kind("note"));
2320 assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2324 }
2325
2326 #[test]
2327 fn monitor_render_plain_is_one_short_line() {
2328 let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2329 let line = monitor_render(&e, false).unwrap();
2330 assert!(!line.contains('\n'), "render must be one line: {line}");
2332 assert!(line.contains("willard"));
2334 assert!(line.contains("claim"));
2335 assert!(line.contains("real v8 train"));
2336 assert!(line.contains("abcd12345678"));
2338 assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2339 assert!(line.contains("2026-05-15T23:14:07"));
2341 }
2342
2343 #[test]
2344 fn monitor_render_strips_newlines_from_body() {
2345 let e = ev("spark", "claim", "line one\nline two\nline three");
2350 let line = monitor_render(&e, false).unwrap();
2351 assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2352 assert!(line.contains("line one line two line three"));
2353 }
2354
2355 #[test]
2356 fn monitor_render_json_is_valid_jsonl() {
2357 let e = ev("spark", "claim", "hi");
2358 let line = monitor_render(&e, true).unwrap();
2359 assert!(!line.contains('\n'));
2360 let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2361 assert_eq!(parsed["peer"], "spark");
2362 assert_eq!(parsed["kind"], "claim");
2363 assert_eq!(parsed["body_preview"], "hi");
2364 }
2365
2366 #[test]
2367 fn monitor_does_not_drop_on_verified_null() {
2368 let mut e = ev("spark", "claim", "from disk with verified=null");
2379 e.verified = false; let line = monitor_render(&e, false).unwrap();
2381 assert!(line.contains("from disk with verified=null"));
2382 assert!(!monitor_is_noise_kind("claim"));
2384 }
2385}
2386
2387fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2390 let body = if path == "-" {
2391 let mut buf = String::new();
2392 use std::io::Read;
2393 std::io::stdin().read_to_string(&mut buf)?;
2394 buf
2395 } else {
2396 std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2397 };
2398 let event: Value = serde_json::from_str(&body)?;
2399 let trust = config::read_trust()?;
2400 match verify_message_v31(&event, &trust) {
2401 Ok(()) => {
2402 if as_json {
2403 println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2404 } else {
2405 println!("verified ✓");
2406 }
2407 Ok(())
2408 }
2409 Err(e) => {
2410 let reason = e.to_string();
2411 if as_json {
2412 println!(
2413 "{}",
2414 serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2415 );
2416 } else {
2417 eprintln!("FAILED: {reason}");
2418 }
2419 std::process::exit(1);
2420 }
2421 }
2422}
2423
2424fn cmd_mcp() -> Result<()> {
2427 crate::mcp::run()
2428}
2429
2430fn cmd_relay_server(bind: &str, local_only: bool) -> Result<()> {
2431 if local_only {
2435 validate_loopback_bind(bind)?;
2436 }
2437 let base = if let Ok(home) = std::env::var("WIRE_HOME") {
2443 std::path::PathBuf::from(home)
2444 .join("state")
2445 .join("wire-relay")
2446 } else {
2447 dirs::state_dir()
2448 .or_else(dirs::data_local_dir)
2449 .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2450 .join("wire-relay")
2451 };
2452 let state_dir = if local_only { base.join("local") } else { base };
2453 let runtime = tokio::runtime::Builder::new_multi_thread()
2454 .enable_all()
2455 .build()?;
2456 runtime.block_on(crate::relay_server::serve_with_mode(
2457 bind,
2458 state_dir,
2459 crate::relay_server::ServerMode { local_only },
2460 ))
2461}
2462
2463fn validate_loopback_bind(bind: &str) -> Result<()> {
2469 let host = if let Some(stripped) = bind.strip_prefix('[') {
2471 let close = stripped
2472 .find(']')
2473 .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
2474 stripped[..close].to_string()
2475 } else {
2476 bind.rsplit_once(':')
2477 .map(|(h, _)| h.to_string())
2478 .unwrap_or_else(|| bind.to_string())
2479 };
2480 use std::net::ToSocketAddrs;
2481 let probe = format!("{host}:0");
2482 let resolved: Vec<_> = probe
2483 .to_socket_addrs()
2484 .with_context(|| format!("resolving bind host {host:?}"))?
2485 .collect();
2486 if resolved.is_empty() {
2487 bail!("--local-only: bind host {host:?} resolved to no addresses");
2488 }
2489 for addr in &resolved {
2490 if !addr.ip().is_loopback() {
2491 bail!(
2492 "--local-only refuses non-loopback bind: {host:?} resolves to {} \
2493 which is not in 127.0.0.0/8 or [::1]. Remove --local-only to bind \
2494 publicly, or use 127.0.0.1 / [::1] / localhost.",
2495 addr.ip()
2496 );
2497 }
2498 }
2499 Ok(())
2500}
2501
2502fn cmd_bind_relay(url: &str, migrate_pinned: bool, as_json: bool) -> Result<()> {
2505 if !config::is_initialized()? {
2506 bail!("not initialized — run `wire init <handle>` first");
2507 }
2508 let card = config::read_agent_card()?;
2509 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2510 let handle = crate::agent_card::display_handle_from_did(did).to_string();
2511
2512 let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
2519 let pinned: Vec<String> = existing
2520 .get("peers")
2521 .and_then(|p| p.as_object())
2522 .map(|o| o.keys().cloned().collect())
2523 .unwrap_or_default();
2524 if !pinned.is_empty() && !migrate_pinned {
2525 let list = pinned.join(", ");
2526 bail!(
2527 "bind-relay would silently black-hole {n} pinned peer(s): {list}. \
2528 They are pinned to your CURRENT slot; without coordination they will keep \
2529 pushing to a slot you no longer read.\n\n\
2530 SAFE PATHS:\n\
2531 • `wire rotate-slot` — rotates slot on the SAME relay and emits a \
2532 wire_close event to every pinned peer so their daemons drop the stale \
2533 coords cleanly. This is the supported migration path.\n\
2534 • `wire bind-relay {url} --migrate-pinned` — acknowledges that pinned \
2535 peers will need to re-pin manually (you must notify them out-of-band, \
2536 via a fresh `wire add` from each peer or a re-shared invite). Use this \
2537 only when the current slot is unreachable so rotate-slot can't ack.\n\n\
2538 Issue #7 (silent black-hole on relay change) caught this — proceed only \
2539 if you understand the consequences.",
2540 n = pinned.len(),
2541 );
2542 }
2543
2544 let normalized = url.trim_end_matches('/');
2545 let client = crate::relay_client::RelayClient::new(normalized);
2546 client.check_healthz()?;
2547 let alloc = client.allocate_slot(Some(&handle))?;
2548 let mut state = existing;
2549 if !pinned.is_empty() {
2550 eprintln!(
2554 "wire bind-relay: migrating with {n} pinned peer(s) — they will black-hole \
2555 until they re-pin: {peers}",
2556 n = pinned.len(),
2557 peers = pinned.join(", "),
2558 );
2559 }
2560 state["self"] = json!({
2561 "relay_url": url,
2562 "slot_id": alloc.slot_id,
2563 "slot_token": alloc.slot_token,
2564 });
2565 config::write_relay_state(&state)?;
2566
2567 if as_json {
2568 println!(
2569 "{}",
2570 serde_json::to_string(&json!({
2571 "relay_url": url,
2572 "slot_id": alloc.slot_id,
2573 "slot_token_present": true,
2574 }))?
2575 );
2576 } else {
2577 println!("bound to relay {url}");
2578 println!("slot_id: {}", alloc.slot_id);
2579 println!(
2580 "(slot_token written to {} mode 0600)",
2581 config::relay_state_path()?.display()
2582 );
2583 }
2584 Ok(())
2585}
2586
2587fn cmd_add_peer_slot(
2590 handle: &str,
2591 url: &str,
2592 slot_id: &str,
2593 slot_token: &str,
2594 as_json: bool,
2595) -> Result<()> {
2596 let mut state = config::read_relay_state()?;
2597 let peers = state["peers"]
2598 .as_object_mut()
2599 .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2600 peers.insert(
2601 handle.to_string(),
2602 json!({
2603 "relay_url": url,
2604 "slot_id": slot_id,
2605 "slot_token": slot_token,
2606 }),
2607 );
2608 config::write_relay_state(&state)?;
2609 if as_json {
2610 println!(
2611 "{}",
2612 serde_json::to_string(&json!({
2613 "handle": handle,
2614 "relay_url": url,
2615 "slot_id": slot_id,
2616 "added": true,
2617 }))?
2618 );
2619 } else {
2620 println!("pinned peer slot for {handle} at {url} ({slot_id})");
2621 }
2622 Ok(())
2623}
2624
2625fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2628 let state = config::read_relay_state()?;
2629 let peers = state["peers"].as_object().cloned().unwrap_or_default();
2630 if peers.is_empty() {
2631 bail!(
2632 "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2633 );
2634 }
2635 let outbox_dir = config::outbox_dir()?;
2636 if outbox_dir.exists() {
2641 let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2642 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2643 let path = entry.path();
2644 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2645 continue;
2646 }
2647 let stem = match path.file_stem().and_then(|s| s.to_str()) {
2648 Some(s) => s.to_string(),
2649 None => continue,
2650 };
2651 if pinned.contains(&stem) {
2652 continue;
2653 }
2654 let bare = crate::agent_card::bare_handle(&stem);
2657 if pinned.contains(bare) {
2658 eprintln!(
2659 "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2660 Merge with: `cat {} >> {}` then delete the FQDN file.",
2661 stem,
2662 path.display(),
2663 outbox_dir.join(format!("{bare}.jsonl")).display(),
2664 );
2665 }
2666 }
2667 }
2668 if !outbox_dir.exists() {
2669 if as_json {
2670 println!(
2671 "{}",
2672 serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2673 );
2674 } else {
2675 println!("phyllis: nothing to dial out — write a message first with `wire send`");
2676 }
2677 return Ok(());
2678 }
2679
2680 let mut pushed = Vec::new();
2681 let mut skipped = Vec::new();
2682
2683 for (peer_handle, _) in peers.iter() {
2689 if let Some(want) = peer_filter
2690 && peer_handle != want
2691 {
2692 continue;
2693 }
2694 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2695 if !outbox.exists() {
2696 continue;
2697 }
2698 let ordered_endpoints =
2699 crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
2700 if ordered_endpoints.is_empty() {
2701 for line in std::fs::read_to_string(&outbox)
2705 .unwrap_or_default()
2706 .lines()
2707 {
2708 let event: Value = match serde_json::from_str(line) {
2709 Ok(v) => v,
2710 Err(_) => continue,
2711 };
2712 let event_id = event
2713 .get("event_id")
2714 .and_then(Value::as_str)
2715 .unwrap_or("")
2716 .to_string();
2717 skipped.push(json!({
2718 "peer": peer_handle,
2719 "event_id": event_id,
2720 "reason": "no reachable endpoint pinned for peer",
2721 }));
2722 }
2723 continue;
2724 }
2725 let body = std::fs::read_to_string(&outbox)?;
2726 for line in body.lines() {
2727 let event: Value = match serde_json::from_str(line) {
2728 Ok(v) => v,
2729 Err(_) => continue,
2730 };
2731 let event_id = event
2732 .get("event_id")
2733 .and_then(Value::as_str)
2734 .unwrap_or("")
2735 .to_string();
2736
2737 let mut delivered = false;
2738 let mut last_err_reason: Option<String> = None;
2739 for endpoint in &ordered_endpoints {
2740 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2741 match client.post_event(&endpoint.slot_id, &endpoint.slot_token, &event) {
2742 Ok(resp) => {
2743 if resp.status == "duplicate" {
2744 skipped.push(json!({
2745 "peer": peer_handle,
2746 "event_id": event_id,
2747 "reason": "duplicate",
2748 "endpoint": endpoint.relay_url,
2749 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2750 }));
2751 } else {
2752 pushed.push(json!({
2753 "peer": peer_handle,
2754 "event_id": event_id,
2755 "endpoint": endpoint.relay_url,
2756 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2757 }));
2758 }
2759 delivered = true;
2760 break;
2761 }
2762 Err(e) => {
2763 last_err_reason =
2768 Some(crate::relay_client::format_transport_error(&e));
2769 }
2770 }
2771 }
2772 if !delivered {
2773 skipped.push(json!({
2774 "peer": peer_handle,
2775 "event_id": event_id,
2776 "reason": last_err_reason.unwrap_or_else(|| "all endpoints failed".to_string()),
2777 }));
2778 }
2779 }
2780 }
2781
2782 if as_json {
2783 println!(
2784 "{}",
2785 serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2786 );
2787 } else {
2788 println!(
2789 "pushed {} event(s); skipped {} ({})",
2790 pushed.len(),
2791 skipped.len(),
2792 if skipped.is_empty() {
2793 "none"
2794 } else {
2795 "see --json for detail"
2796 }
2797 );
2798 }
2799 Ok(())
2800}
2801
2802fn cmd_pull(as_json: bool) -> Result<()> {
2805 let state = config::read_relay_state()?;
2806 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2807 if self_state.is_null() {
2808 bail!("self slot not bound — run `wire bind-relay <url>` first");
2809 }
2810
2811 let endpoints = crate::endpoints::self_endpoints(&state);
2820 if endpoints.is_empty() {
2821 bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
2822 }
2823
2824 let inbox_dir = config::inbox_dir()?;
2825 config::ensure_dirs()?;
2826
2827 let mut total_seen = 0usize;
2828 let mut all_written: Vec<Value> = Vec::new();
2829 let mut all_rejected: Vec<Value> = Vec::new();
2830 let mut all_blocked = false;
2831 let mut all_advance_cursor_to: Option<String> = None;
2832
2833 for endpoint in &endpoints {
2834 let cursor_key = endpoint_cursor_key(endpoint.scope);
2835 let last_event_id = self_state
2836 .get(&cursor_key)
2837 .and_then(Value::as_str)
2838 .map(str::to_string);
2839 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2840 let events = match client.list_events(
2841 &endpoint.slot_id,
2842 &endpoint.slot_token,
2843 last_event_id.as_deref(),
2844 Some(1000),
2845 ) {
2846 Ok(ev) => ev,
2847 Err(e) => {
2848 eprintln!(
2852 "wire pull: endpoint {} ({:?}) errored: {}; continuing",
2853 endpoint.relay_url,
2854 endpoint.scope,
2855 crate::relay_client::format_transport_error(&e),
2856 );
2857 continue;
2858 }
2859 };
2860 total_seen += events.len();
2861 let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
2862 all_written.extend(result.written.iter().cloned());
2863 all_rejected.extend(result.rejected.iter().cloned());
2864 if result.blocked {
2865 all_blocked = true;
2866 }
2867 if let Some(eid) = result.advance_cursor_to.clone() {
2870 if endpoint.scope == crate::endpoints::EndpointScope::Federation {
2871 all_advance_cursor_to = Some(eid.clone());
2872 }
2873 let key = cursor_key.clone();
2874 config::update_relay_state(|state| {
2875 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2876 self_obj.insert(key, Value::String(eid));
2877 }
2878 Ok(())
2879 })?;
2880 }
2881 }
2882
2883 let result = crate::pull::PullResult {
2888 written: all_written,
2889 rejected: all_rejected,
2890 blocked: all_blocked,
2891 advance_cursor_to: all_advance_cursor_to,
2892 };
2893 let events_len = total_seen;
2894
2895 if as_json {
2899 println!(
2900 "{}",
2901 serde_json::to_string(&json!({
2902 "written": result.written,
2903 "rejected": result.rejected,
2904 "total_seen": events_len,
2905 "cursor_blocked": result.blocked,
2906 "cursor_advanced_to": result.advance_cursor_to,
2907 }))?
2908 );
2909 } else {
2910 let blocking = result
2911 .rejected
2912 .iter()
2913 .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2914 .count();
2915 if blocking > 0 {
2916 println!(
2917 "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2918 events_len,
2919 result.written.len(),
2920 result.rejected.len(),
2921 blocking,
2922 );
2923 } else {
2924 println!(
2925 "pulled {} event(s); wrote {}; rejected {}",
2926 events_len,
2927 result.written.len(),
2928 result.rejected.len(),
2929 );
2930 }
2931 }
2932 Ok(())
2933}
2934
2935fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
2940 match scope {
2941 crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
2942 crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
2943 }
2944}
2945
2946fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2949 if !config::is_initialized()? {
2950 bail!("not initialized — run `wire init <handle>` first");
2951 }
2952 let mut state = config::read_relay_state()?;
2953 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2954 if self_state.is_null() {
2955 bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
2956 }
2957 let url = self_state["relay_url"]
2958 .as_str()
2959 .ok_or_else(|| anyhow!("self.relay_url missing"))?
2960 .to_string();
2961 let old_slot_id = self_state["slot_id"]
2962 .as_str()
2963 .ok_or_else(|| anyhow!("self.slot_id missing"))?
2964 .to_string();
2965 let old_slot_token = self_state["slot_token"]
2966 .as_str()
2967 .ok_or_else(|| anyhow!("self.slot_token missing"))?
2968 .to_string();
2969
2970 let card = config::read_agent_card()?;
2972 let did = card
2973 .get("did")
2974 .and_then(Value::as_str)
2975 .unwrap_or("")
2976 .to_string();
2977 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
2978 let pk_b64 = card
2979 .get("verify_keys")
2980 .and_then(Value::as_object)
2981 .and_then(|m| m.values().next())
2982 .and_then(|v| v.get("key"))
2983 .and_then(Value::as_str)
2984 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
2985 .to_string();
2986 let pk_bytes = crate::signing::b64decode(&pk_b64)?;
2987 let sk_seed = config::read_private_key()?;
2988
2989 let normalized = url.trim_end_matches('/').to_string();
2991 let client = crate::relay_client::RelayClient::new(&normalized);
2992 client
2993 .check_healthz()
2994 .context("aborting rotation; old slot still valid")?;
2995 let alloc = client.allocate_slot(Some(&handle))?;
2996 let new_slot_id = alloc.slot_id.clone();
2997 let new_slot_token = alloc.slot_token.clone();
2998
2999 let mut announced: Vec<String> = Vec::new();
3006 if !no_announce {
3007 let now = time::OffsetDateTime::now_utc()
3008 .format(&time::format_description::well_known::Rfc3339)
3009 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
3010 let body = json!({
3011 "reason": "operator-initiated slot rotation",
3012 "new_relay_url": url,
3013 "new_slot_id": new_slot_id,
3014 });
3018 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3019 for (peer_handle, _peer_info) in peers.iter() {
3020 let event = json!({
3021 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3022 "timestamp": now.clone(),
3023 "from": did,
3024 "to": format!("did:wire:{peer_handle}"),
3025 "type": "wire_close",
3026 "kind": 1201,
3027 "body": body.clone(),
3028 });
3029 let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
3030 Ok(s) => s,
3031 Err(e) => {
3032 eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
3033 continue;
3034 }
3035 };
3036 let peer_info = match state["peers"].get(peer_handle) {
3041 Some(p) => p.clone(),
3042 None => continue,
3043 };
3044 let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
3045 let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
3046 let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
3047 if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
3048 continue;
3049 }
3050 let peer_client = if peer_url == url {
3051 client.clone()
3052 } else {
3053 crate::relay_client::RelayClient::new(peer_url)
3054 };
3055 match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
3056 Ok(_) => announced.push(peer_handle.clone()),
3057 Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
3058 }
3059 }
3060 }
3061
3062 state["self"] = json!({
3064 "relay_url": url,
3065 "slot_id": new_slot_id,
3066 "slot_token": new_slot_token,
3067 });
3068 config::write_relay_state(&state)?;
3069
3070 if as_json {
3071 println!(
3072 "{}",
3073 serde_json::to_string(&json!({
3074 "rotated": true,
3075 "old_slot_id": old_slot_id,
3076 "new_slot_id": new_slot_id,
3077 "relay_url": url,
3078 "announced_to": announced,
3079 }))?
3080 );
3081 } else {
3082 println!("rotated slot on {url}");
3083 println!(
3084 " old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
3085 );
3086 println!(" new slot_id: {new_slot_id}");
3087 if !announced.is_empty() {
3088 println!(
3089 " announced wire_close (kind=1201) to: {}",
3090 announced.join(", ")
3091 );
3092 }
3093 println!();
3094 println!("next steps:");
3095 println!(" - peers see the wire_close event in their next `wire pull`");
3096 println!(
3097 " - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
3098 );
3099 println!(" (or full re-pair via `wire pair-host`/`wire join`)");
3100 println!(" - until they do, you'll receive but they won't be able to reach you");
3101 let _ = old_slot_token;
3103 }
3104 Ok(())
3105}
3106
3107fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
3110 let mut trust = config::read_trust()?;
3111 let mut removed_from_trust = false;
3112 if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
3113 && agents.remove(handle).is_some()
3114 {
3115 removed_from_trust = true;
3116 }
3117 config::write_trust(&trust)?;
3118
3119 let mut state = config::read_relay_state()?;
3120 let mut removed_from_relay = false;
3121 if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
3122 && peers.remove(handle).is_some()
3123 {
3124 removed_from_relay = true;
3125 }
3126 config::write_relay_state(&state)?;
3127
3128 let mut purged: Vec<String> = Vec::new();
3129 if purge {
3130 for dir in [config::inbox_dir()?, config::outbox_dir()?] {
3131 let path = dir.join(format!("{handle}.jsonl"));
3132 if path.exists() {
3133 std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
3134 purged.push(path.to_string_lossy().into());
3135 }
3136 }
3137 }
3138
3139 if !removed_from_trust && !removed_from_relay {
3140 if as_json {
3141 println!(
3142 "{}",
3143 serde_json::to_string(&json!({
3144 "removed": false,
3145 "reason": format!("peer {handle:?} not pinned"),
3146 }))?
3147 );
3148 } else {
3149 eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
3150 }
3151 return Ok(());
3152 }
3153
3154 if as_json {
3155 println!(
3156 "{}",
3157 serde_json::to_string(&json!({
3158 "handle": handle,
3159 "removed_from_trust": removed_from_trust,
3160 "removed_from_relay_state": removed_from_relay,
3161 "purged_files": purged,
3162 }))?
3163 );
3164 } else {
3165 println!("forgot peer {handle:?}");
3166 if removed_from_trust {
3167 println!(" - removed from trust.json");
3168 }
3169 if removed_from_relay {
3170 println!(" - removed from relay.json");
3171 }
3172 if !purged.is_empty() {
3173 for p in &purged {
3174 println!(" - deleted {p}");
3175 }
3176 } else if !purge {
3177 println!(" (inbox/outbox files preserved; pass --purge to delete them)");
3178 }
3179 }
3180 Ok(())
3181}
3182
3183fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
3186 if !config::is_initialized()? {
3187 bail!("not initialized — run `wire init <handle>` first");
3188 }
3189 let interval = std::time::Duration::from_secs(interval_secs.max(1));
3190
3191 if !as_json {
3192 if once {
3193 eprintln!("wire daemon: single sync cycle, then exit");
3194 } else {
3195 eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
3196 }
3197 }
3198
3199 if let Err(e) = crate::pending_pair::cleanup_on_startup() {
3203 eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
3204 }
3205
3206 let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
3212 if !once {
3213 crate::daemon_stream::spawn_stream_subscriber(wake_tx);
3214 }
3215
3216 loop {
3217 let pushed = run_sync_push().unwrap_or_else(|e| {
3218 eprintln!("daemon: push error: {e:#}");
3219 json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
3220 });
3221 let pulled = run_sync_pull().unwrap_or_else(|e| {
3222 eprintln!("daemon: pull error: {e:#}");
3223 json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
3224 });
3225 let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
3226 eprintln!("daemon: pending-pair tick error: {e:#}");
3227 json!({"transitions": []})
3228 });
3229
3230 if as_json {
3231 println!(
3232 "{}",
3233 serde_json::to_string(&json!({
3234 "ts": time::OffsetDateTime::now_utc()
3235 .format(&time::format_description::well_known::Rfc3339)
3236 .unwrap_or_default(),
3237 "push": pushed,
3238 "pull": pulled,
3239 "pairs": pairs,
3240 }))?
3241 );
3242 } else {
3243 let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
3244 let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
3245 let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
3246 let pair_transitions = pairs["transitions"]
3247 .as_array()
3248 .map(|a| a.len())
3249 .unwrap_or(0);
3250 if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
3251 eprintln!(
3252 "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
3253 );
3254 }
3255 if let Some(arr) = pairs["transitions"].as_array() {
3257 for t in arr {
3258 eprintln!(
3259 " pair {} : {} → {}",
3260 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3261 t.get("from").and_then(Value::as_str).unwrap_or("?"),
3262 t.get("to").and_then(Value::as_str).unwrap_or("?")
3263 );
3264 if let Some(sas) = t.get("sas").and_then(Value::as_str)
3265 && t.get("to").and_then(Value::as_str) == Some("sas_ready")
3266 {
3267 eprintln!(" SAS digits: {}-{}", &sas[..3], &sas[3..]);
3268 eprintln!(
3269 " Run: wire pair-confirm {} {}",
3270 t.get("code").and_then(Value::as_str).unwrap_or("?"),
3271 sas
3272 );
3273 }
3274 }
3275 }
3276 }
3277
3278 if once {
3279 return Ok(());
3280 }
3281 let _ = wake_rx.recv_timeout(interval);
3286 while wake_rx.try_recv().is_ok() {}
3287 }
3288}
3289
3290fn run_sync_push() -> Result<Value> {
3293 let state = config::read_relay_state()?;
3294 let peers = state["peers"].as_object().cloned().unwrap_or_default();
3295 if peers.is_empty() {
3296 return Ok(json!({"pushed": [], "skipped": []}));
3297 }
3298 let outbox_dir = config::outbox_dir()?;
3299 if !outbox_dir.exists() {
3300 return Ok(json!({"pushed": [], "skipped": []}));
3301 }
3302 let mut pushed = Vec::new();
3303 let mut skipped = Vec::new();
3304 for (peer_handle, slot_info) in peers.iter() {
3305 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
3306 if !outbox.exists() {
3307 continue;
3308 }
3309 let url = slot_info["relay_url"].as_str().unwrap_or("");
3310 let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
3311 let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
3312 if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
3313 continue;
3314 }
3315 let client = crate::relay_client::RelayClient::new(url);
3316 let body = std::fs::read_to_string(&outbox)?;
3317 for line in body.lines() {
3318 let event: Value = match serde_json::from_str(line) {
3319 Ok(v) => v,
3320 Err(_) => continue,
3321 };
3322 let event_id = event
3323 .get("event_id")
3324 .and_then(Value::as_str)
3325 .unwrap_or("")
3326 .to_string();
3327 match client.post_event(slot_id, slot_token, &event) {
3328 Ok(resp) => {
3329 if resp.status == "duplicate" {
3330 skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
3331 } else {
3332 pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
3333 }
3334 }
3335 Err(e) => {
3336 let reason = crate::relay_client::format_transport_error(&e);
3340 skipped.push(
3341 json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
3342 );
3343 }
3344 }
3345 }
3346 }
3347 Ok(json!({"pushed": pushed, "skipped": skipped}))
3348}
3349
3350fn run_sync_pull() -> Result<Value> {
3352 let state = config::read_relay_state()?;
3353 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3354 if self_state.is_null() {
3355 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3356 }
3357 let url = self_state["relay_url"].as_str().unwrap_or("");
3358 let slot_id = self_state["slot_id"].as_str().unwrap_or("");
3359 let slot_token = self_state["slot_token"].as_str().unwrap_or("");
3360 let last_event_id = self_state
3361 .get("last_pulled_event_id")
3362 .and_then(Value::as_str)
3363 .map(str::to_string);
3364 if url.is_empty() {
3365 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3366 }
3367 let client = crate::relay_client::RelayClient::new(url);
3368 let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3369 let inbox_dir = config::inbox_dir()?;
3370 config::ensure_dirs()?;
3371
3372 let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3376
3377 if let Some(eid) = &result.advance_cursor_to {
3379 let eid = eid.clone();
3380 config::update_relay_state(|state| {
3381 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3382 self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3383 }
3384 Ok(())
3385 })?;
3386 }
3387
3388 Ok(json!({
3389 "written": result.written,
3390 "rejected": result.rejected,
3391 "total_seen": events.len(),
3392 "cursor_blocked": result.blocked,
3393 "cursor_advanced_to": result.advance_cursor_to,
3394 }))
3395}
3396
3397fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3400 let body =
3401 std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3402 let card: Value =
3403 serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3404 crate::agent_card::verify_agent_card(&card)
3405 .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3406
3407 let mut trust = config::read_trust()?;
3408 crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3409
3410 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3411 let handle = crate::agent_card::display_handle_from_did(did).to_string();
3412 config::write_trust(&trust)?;
3413
3414 if as_json {
3415 println!(
3416 "{}",
3417 serde_json::to_string(&json!({
3418 "handle": handle,
3419 "did": did,
3420 "tier": "VERIFIED",
3421 "pinned": true,
3422 }))?
3423 );
3424 } else {
3425 println!("pinned {handle} ({did}) at tier VERIFIED");
3426 }
3427 Ok(())
3428}
3429
3430fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3433 pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3434}
3435
3436fn cmd_pair_join(
3437 code_phrase: &str,
3438 relay_url: &str,
3439 auto_yes: bool,
3440 timeout_secs: u64,
3441) -> Result<()> {
3442 pair_orchestrate(
3443 relay_url,
3444 Some(code_phrase),
3445 "guest",
3446 auto_yes,
3447 timeout_secs,
3448 )
3449}
3450
3451fn pair_orchestrate(
3457 relay_url: &str,
3458 code_in: Option<&str>,
3459 role: &str,
3460 auto_yes: bool,
3461 timeout_secs: u64,
3462) -> Result<()> {
3463 use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3464
3465 let mut s = pair_session_open(role, relay_url, code_in)?;
3466
3467 if role == "host" {
3468 eprintln!();
3469 eprintln!("share this code phrase with your peer:");
3470 eprintln!();
3471 eprintln!(" {}", s.code);
3472 eprintln!();
3473 eprintln!(
3474 "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3475 s.code
3476 );
3477 } else {
3478 eprintln!();
3479 eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3480 }
3481
3482 const HEARTBEAT_SECS: u64 = 10;
3487 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3488 let started = std::time::Instant::now();
3489 let mut last_heartbeat = started;
3490 let formatted = loop {
3491 if let Some(sas) = pair_session_try_sas(&mut s)? {
3492 break sas;
3493 }
3494 let now = std::time::Instant::now();
3495 if now >= deadline {
3496 return Err(anyhow!(
3497 "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3498 ));
3499 }
3500 if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3501 let elapsed = now.duration_since(started).as_secs();
3502 eprintln!(" ... still waiting ({elapsed}s / {timeout_secs}s)");
3503 last_heartbeat = now;
3504 }
3505 std::thread::sleep(std::time::Duration::from_millis(250));
3506 };
3507
3508 eprintln!();
3509 eprintln!("SAS digits (must match peer's terminal):");
3510 eprintln!();
3511 eprintln!(" {formatted}");
3512 eprintln!();
3513
3514 if !auto_yes {
3517 eprint!("does this match your peer's terminal? [y/N]: ");
3518 use std::io::Write;
3519 std::io::stderr().flush().ok();
3520 let mut input = String::new();
3521 std::io::stdin().read_line(&mut input)?;
3522 let trimmed = input.trim().to_lowercase();
3523 if trimmed != "y" && trimmed != "yes" {
3524 bail!("SAS confirmation declined — aborting pairing");
3525 }
3526 }
3527 s.sas_confirmed = true;
3528
3529 let result = pair_session_finalize(&mut s, timeout_secs)?;
3531
3532 let peer_did = result["paired_with"].as_str().unwrap_or("");
3533 let peer_role = if role == "host" { "guest" } else { "host" };
3534 eprintln!("paired with {peer_did} (peer role: {peer_role})");
3535 eprintln!("peer card pinned at tier VERIFIED");
3536 eprintln!(
3537 "peer relay slot saved to {}",
3538 config::relay_state_path()?.display()
3539 );
3540
3541 println!("{}", serde_json::to_string(&result)?);
3542 Ok(())
3543}
3544
3545fn cmd_pair(
3551 handle: &str,
3552 code: Option<&str>,
3553 relay: &str,
3554 auto_yes: bool,
3555 timeout_secs: u64,
3556 no_setup: bool,
3557) -> Result<()> {
3558 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3561 let did = init_result
3562 .get("did")
3563 .and_then(|v| v.as_str())
3564 .unwrap_or("(unknown)")
3565 .to_string();
3566 let already = init_result
3567 .get("already_initialized")
3568 .and_then(|v| v.as_bool())
3569 .unwrap_or(false);
3570 if already {
3571 println!("(identity {did} already initialized — reusing)");
3572 } else {
3573 println!("initialized {did}");
3574 }
3575 println!();
3576
3577 match code {
3579 None => {
3580 println!("hosting pair on {relay} (no code = host) ...");
3581 cmd_pair_host(relay, auto_yes, timeout_secs)?;
3582 }
3583 Some(c) => {
3584 println!("joining pair with code {c} on {relay} ...");
3585 cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3586 }
3587 }
3588
3589 if !no_setup {
3591 println!();
3592 println!("registering wire as MCP server in detected client configs ...");
3593 if let Err(e) = cmd_setup(true) {
3594 eprintln!("warn: setup --apply failed: {e}");
3596 eprintln!(" pair succeeded; you can re-run `wire setup --apply` manually.");
3597 }
3598 }
3599
3600 println!();
3601 println!("pair complete. Next steps:");
3602 println!(" wire daemon start # background sync of inbox/outbox vs relay");
3603 println!(" wire send <peer> claim <msg> # send your peer something");
3604 println!(" wire tail # watch incoming events");
3605 Ok(())
3606}
3607
3608fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3614 let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3615 let did = init_result
3616 .get("did")
3617 .and_then(|v| v.as_str())
3618 .unwrap_or("(unknown)")
3619 .to_string();
3620 let already = init_result
3621 .get("already_initialized")
3622 .and_then(|v| v.as_bool())
3623 .unwrap_or(false);
3624 if already {
3625 println!("(identity {did} already initialized — reusing)");
3626 } else {
3627 println!("initialized {did}");
3628 }
3629 println!();
3630 match code {
3631 None => cmd_pair_host_detach(relay, false),
3632 Some(c) => cmd_pair_join_detach(c, relay, false),
3633 }
3634}
3635
3636fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3637 if !config::is_initialized()? {
3638 bail!("not initialized — run `wire init <handle>` first");
3639 }
3640 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3641 Ok(b) => b,
3642 Err(e) => {
3643 if !as_json {
3644 eprintln!(
3645 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3646 );
3647 }
3648 false
3649 }
3650 };
3651 let code = crate::sas::generate_code_phrase();
3652 let code_hash = crate::pair_session::derive_code_hash(&code);
3653 let now = time::OffsetDateTime::now_utc()
3654 .format(&time::format_description::well_known::Rfc3339)
3655 .unwrap_or_default();
3656 let p = crate::pending_pair::PendingPair {
3657 code: code.clone(),
3658 code_hash,
3659 role: "host".to_string(),
3660 relay_url: relay_url.to_string(),
3661 status: "request_host".to_string(),
3662 sas: None,
3663 peer_did: None,
3664 created_at: now,
3665 last_error: None,
3666 pair_id: None,
3667 our_slot_id: None,
3668 our_slot_token: None,
3669 spake2_seed_b64: None,
3670 };
3671 crate::pending_pair::write_pending(&p)?;
3672 if as_json {
3673 println!(
3674 "{}",
3675 serde_json::to_string(&json!({
3676 "state": "queued",
3677 "code_phrase": code,
3678 "relay_url": relay_url,
3679 "role": "host",
3680 "daemon_spawned": daemon_spawned,
3681 }))?
3682 );
3683 } else {
3684 if daemon_spawned {
3685 println!("(started wire daemon in background)");
3686 }
3687 println!("detached pair-host queued. Share this code with your peer:\n");
3688 println!(" {code}\n");
3689 println!("Next steps:");
3690 println!(" wire pair-list # check status");
3691 println!(" wire pair-confirm {code} <digits> # when SAS shows up");
3692 println!(" wire pair-cancel {code} # to abort");
3693 }
3694 Ok(())
3695}
3696
3697fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3698 if !config::is_initialized()? {
3699 bail!("not initialized — run `wire init <handle>` first");
3700 }
3701 let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3702 Ok(b) => b,
3703 Err(e) => {
3704 if !as_json {
3705 eprintln!(
3706 "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3707 );
3708 }
3709 false
3710 }
3711 };
3712 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3713 let code_hash = crate::pair_session::derive_code_hash(&code);
3714 let now = time::OffsetDateTime::now_utc()
3715 .format(&time::format_description::well_known::Rfc3339)
3716 .unwrap_or_default();
3717 let p = crate::pending_pair::PendingPair {
3718 code: code.clone(),
3719 code_hash,
3720 role: "guest".to_string(),
3721 relay_url: relay_url.to_string(),
3722 status: "request_guest".to_string(),
3723 sas: None,
3724 peer_did: None,
3725 created_at: now,
3726 last_error: None,
3727 pair_id: None,
3728 our_slot_id: None,
3729 our_slot_token: None,
3730 spake2_seed_b64: None,
3731 };
3732 crate::pending_pair::write_pending(&p)?;
3733 if as_json {
3734 println!(
3735 "{}",
3736 serde_json::to_string(&json!({
3737 "state": "queued",
3738 "code_phrase": code,
3739 "relay_url": relay_url,
3740 "role": "guest",
3741 "daemon_spawned": daemon_spawned,
3742 }))?
3743 );
3744 } else {
3745 if daemon_spawned {
3746 println!("(started wire daemon in background)");
3747 }
3748 println!("detached pair-join queued for code {code}.");
3749 println!(
3750 "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3751 );
3752 }
3753 Ok(())
3754}
3755
3756fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3757 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3758 let typed: String = typed_digits
3759 .chars()
3760 .filter(|c| c.is_ascii_digit())
3761 .collect();
3762 if typed.len() != 6 {
3763 bail!(
3764 "expected 6 digits (got {} after stripping non-digits)",
3765 typed.len()
3766 );
3767 }
3768 let mut p = crate::pending_pair::read_pending(&code)?
3769 .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3770 if p.status != "sas_ready" {
3771 bail!(
3772 "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3773 p.status
3774 );
3775 }
3776 let stored = p
3777 .sas
3778 .as_ref()
3779 .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3780 .clone();
3781 if stored == typed {
3782 p.status = "confirmed".to_string();
3783 crate::pending_pair::write_pending(&p)?;
3784 if as_json {
3785 println!(
3786 "{}",
3787 serde_json::to_string(&json!({
3788 "state": "confirmed",
3789 "code_phrase": code,
3790 }))?
3791 );
3792 } else {
3793 println!("digits match. Daemon will finalize the handshake on its next tick.");
3794 println!("Run `wire peers` after a few seconds to confirm.");
3795 }
3796 } else {
3797 p.status = "aborted".to_string();
3798 p.last_error = Some(format!(
3799 "SAS digit mismatch (typed {typed}, expected {stored})"
3800 ));
3801 let client = crate::relay_client::RelayClient::new(&p.relay_url);
3802 let _ = client.pair_abandon(&p.code_hash);
3803 crate::pending_pair::write_pending(&p)?;
3804 crate::os_notify::toast(
3805 &format!("wire — pair aborted ({})", p.code),
3806 p.last_error.as_deref().unwrap_or("digits mismatch"),
3807 );
3808 if as_json {
3809 println!(
3810 "{}",
3811 serde_json::to_string(&json!({
3812 "state": "aborted",
3813 "code_phrase": code,
3814 "error": "digits mismatch",
3815 }))?
3816 );
3817 }
3818 bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3819 }
3820 Ok(())
3821}
3822
3823fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3824 if watch {
3825 return cmd_pair_list_watch(watch_interval_secs);
3826 }
3827 let spake2_items = crate::pending_pair::list_pending()?;
3828 let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
3829 if as_json {
3830 println!("{}", serde_json::to_string(&spake2_items)?);
3835 return Ok(());
3836 }
3837 if spake2_items.is_empty() && inbound_items.is_empty() {
3838 println!("no pending pair sessions.");
3839 return Ok(());
3840 }
3841 if !inbound_items.is_empty() {
3844 println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
3845 println!(
3846 "{:<20} {:<35} {:<25} NEXT STEP",
3847 "PEER", "RELAY", "RECEIVED"
3848 );
3849 for p in &inbound_items {
3850 println!(
3851 "{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
3852 p.peer_handle,
3853 p.peer_relay_url,
3854 p.received_at,
3855 peer = p.peer_handle,
3856 );
3857 }
3858 println!();
3859 }
3860 if !spake2_items.is_empty() {
3861 println!("SPAKE2 SESSIONS");
3862 println!(
3863 "{:<15} {:<8} {:<18} {:<10} NOTE",
3864 "CODE", "ROLE", "STATUS", "SAS"
3865 );
3866 for p in spake2_items {
3867 let sas = p
3868 .sas
3869 .as_ref()
3870 .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3871 .unwrap_or_else(|| "—".to_string());
3872 let note = p
3873 .last_error
3874 .as_deref()
3875 .or(p.peer_did.as_deref())
3876 .unwrap_or("");
3877 println!(
3878 "{:<15} {:<8} {:<18} {:<10} {}",
3879 p.code, p.role, p.status, sas, note
3880 );
3881 }
3882 }
3883 Ok(())
3884}
3885
3886fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3898 use std::collections::HashMap;
3899 use std::io::Write;
3900 let interval = std::time::Duration::from_secs(interval_secs.max(1));
3901 let mut prev: HashMap<String, String> = HashMap::new();
3904 {
3905 let items = crate::pending_pair::list_pending()?;
3906 for p in &items {
3907 println!("{}", serde_json::to_string(&p)?);
3908 prev.insert(p.code.clone(), p.status.clone());
3909 }
3910 let _ = std::io::stdout().flush();
3912 }
3913 loop {
3914 std::thread::sleep(interval);
3915 let items = match crate::pending_pair::list_pending() {
3916 Ok(v) => v,
3917 Err(_) => continue,
3918 };
3919 let mut cur: HashMap<String, String> = HashMap::new();
3920 for p in &items {
3921 cur.insert(p.code.clone(), p.status.clone());
3922 match prev.get(&p.code) {
3923 None => {
3924 println!("{}", serde_json::to_string(&p)?);
3926 }
3927 Some(prev_status) if prev_status != &p.status => {
3928 println!("{}", serde_json::to_string(&p)?);
3930 }
3931 _ => {}
3932 }
3933 }
3934 for code in prev.keys() {
3935 if !cur.contains_key(code) {
3936 println!(
3939 "{}",
3940 serde_json::to_string(&json!({
3941 "code": code,
3942 "status": "removed",
3943 "_synthetic": true,
3944 }))?
3945 );
3946 }
3947 }
3948 let _ = std::io::stdout().flush();
3949 prev = cur;
3950 }
3951}
3952
3953fn cmd_pair_watch(
3957 code_phrase: &str,
3958 target_status: &str,
3959 timeout_secs: u64,
3960 as_json: bool,
3961) -> Result<()> {
3962 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3963 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3964 let mut last_seen_status: Option<String> = None;
3965 loop {
3966 let p_opt = crate::pending_pair::read_pending(&code)?;
3967 let now = std::time::Instant::now();
3968 match p_opt {
3969 None => {
3970 if last_seen_status.is_some() {
3974 if as_json {
3975 println!(
3976 "{}",
3977 serde_json::to_string(&json!({"state": "finalized", "code": code}))?
3978 );
3979 } else {
3980 println!("pair {code} finalized (file removed)");
3981 }
3982 return Ok(());
3983 } else {
3984 if as_json {
3985 println!(
3986 "{}",
3987 serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
3988 );
3989 }
3990 std::process::exit(1);
3991 }
3992 }
3993 Some(p) => {
3994 let cur = p.status.clone();
3995 if Some(cur.clone()) != last_seen_status {
3996 if as_json {
3997 println!("{}", serde_json::to_string(&p)?);
3999 }
4000 last_seen_status = Some(cur.clone());
4001 }
4002 if cur == target_status {
4003 if !as_json {
4004 let sas_str = p
4005 .sas
4006 .as_ref()
4007 .map(|s| format!("{}-{}", &s[..3], &s[3..]))
4008 .unwrap_or_else(|| "—".to_string());
4009 println!("pair {code} reached {target_status} (SAS: {sas_str})");
4010 }
4011 return Ok(());
4012 }
4013 if cur == "aborted" || cur == "aborted_restart" {
4014 if !as_json {
4015 let err = p.last_error.as_deref().unwrap_or("(no detail)");
4016 eprintln!("pair {code} {cur}: {err}");
4017 }
4018 std::process::exit(1);
4019 }
4020 }
4021 }
4022 if now >= deadline {
4023 if !as_json {
4024 eprintln!(
4025 "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
4026 );
4027 }
4028 std::process::exit(2);
4029 }
4030 std::thread::sleep(std::time::Duration::from_millis(250));
4031 }
4032}
4033
4034fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
4035 let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4036 let p = crate::pending_pair::read_pending(&code)?
4037 .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
4038 let client = crate::relay_client::RelayClient::new(&p.relay_url);
4039 let _ = client.pair_abandon(&p.code_hash);
4040 crate::pending_pair::delete_pending(&code)?;
4041 if as_json {
4042 println!(
4043 "{}",
4044 serde_json::to_string(&json!({
4045 "state": "cancelled",
4046 "code_phrase": code,
4047 }))?
4048 );
4049 } else {
4050 println!("cancelled pending pair {code} (relay slot released, file removed).");
4051 }
4052 Ok(())
4053}
4054
4055fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
4058 let code = crate::sas::parse_code_phrase(code_phrase)?;
4061 let code_hash = crate::pair_session::derive_code_hash(code);
4062 let client = crate::relay_client::RelayClient::new(relay_url);
4063 client.pair_abandon(&code_hash)?;
4064 println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
4065 println!("host can now issue a fresh code; guest can re-join.");
4066 Ok(())
4067}
4068
4069fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
4072 let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
4073
4074 let share_payload: Option<Value> = if share {
4077 let client = reqwest::blocking::Client::new();
4078 let single_use = if uses == 1 { Some(1u32) } else { None };
4079 let body = json!({
4080 "invite_url": url,
4081 "ttl_seconds": ttl,
4082 "uses": single_use,
4083 });
4084 let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
4085 let resp = client.post(&endpoint).json(&body).send()?;
4086 if !resp.status().is_success() {
4087 let code = resp.status();
4088 let txt = resp.text().unwrap_or_default();
4089 bail!("relay {code} on /v1/invite/register: {txt}");
4090 }
4091 let parsed: Value = resp.json()?;
4092 let token = parsed
4093 .get("token")
4094 .and_then(Value::as_str)
4095 .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
4096 .to_string();
4097 let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
4098 let curl_line = format!("curl -fsSL {share_url} | sh");
4099 Some(json!({
4100 "token": token,
4101 "share_url": share_url,
4102 "curl": curl_line,
4103 "expires_unix": parsed.get("expires_unix"),
4104 }))
4105 } else {
4106 None
4107 };
4108
4109 if as_json {
4110 let mut out = json!({
4111 "invite_url": url,
4112 "ttl_secs": ttl,
4113 "uses": uses,
4114 "relay": relay,
4115 });
4116 if let Some(s) = &share_payload {
4117 out["share"] = s.clone();
4118 }
4119 println!("{}", serde_json::to_string(&out)?);
4120 } else if let Some(s) = share_payload {
4121 let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
4122 eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
4123 eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
4124 println!("{curl}");
4125 } else {
4126 eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
4127 eprintln!("# TTL: {ttl}s. Uses: {uses}.");
4128 println!("{url}");
4129 }
4130 Ok(())
4131}
4132
4133fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
4134 let resolved = if url.starts_with("http://") || url.starts_with("https://") {
4138 let sep = if url.contains('?') { '&' } else { '?' };
4139 let resolve_url = format!("{url}{sep}format=url");
4140 let client = reqwest::blocking::Client::new();
4141 let resp = client
4142 .get(&resolve_url)
4143 .send()
4144 .with_context(|| format!("GET {resolve_url}"))?;
4145 if !resp.status().is_success() {
4146 bail!("could not resolve short URL {url} (HTTP {})", resp.status());
4147 }
4148 let body = resp.text().unwrap_or_default().trim().to_string();
4149 if !body.starts_with("wire://pair?") {
4150 bail!(
4151 "short URL {url} did not resolve to a wire:// invite. \
4152 (got: {}{})",
4153 body.chars().take(80).collect::<String>(),
4154 if body.chars().count() > 80 { "…" } else { "" }
4155 );
4156 }
4157 body
4158 } else {
4159 url.to_string()
4160 };
4161
4162 let result = crate::pair_invite::accept_invite(&resolved)?;
4163 if as_json {
4164 println!("{}", serde_json::to_string(&result)?);
4165 } else {
4166 let did = result
4167 .get("paired_with")
4168 .and_then(Value::as_str)
4169 .unwrap_or("?");
4170 println!("paired with {did}");
4171 println!(
4172 "you can now: wire send {} <kind> <body>",
4173 crate::agent_card::display_handle_from_did(did)
4174 );
4175 }
4176 Ok(())
4177}
4178
4179fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
4182 if let Some(h) = handle {
4183 let parsed = crate::pair_profile::parse_handle(h)?;
4184 if config::is_initialized()? {
4187 let card = config::read_agent_card()?;
4188 let local_handle = card
4189 .get("profile")
4190 .and_then(|p| p.get("handle"))
4191 .and_then(Value::as_str)
4192 .map(str::to_string);
4193 if local_handle.as_deref() == Some(h) {
4194 return cmd_whois(None, as_json, None);
4195 }
4196 }
4197 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4199 if as_json {
4200 println!("{}", serde_json::to_string(&resolved)?);
4201 } else {
4202 print_resolved_profile(&resolved);
4203 }
4204 return Ok(());
4205 }
4206 let card = config::read_agent_card()?;
4207 if as_json {
4208 let profile = card.get("profile").cloned().unwrap_or(Value::Null);
4209 println!(
4210 "{}",
4211 serde_json::to_string(&json!({
4212 "did": card.get("did").cloned().unwrap_or(Value::Null),
4213 "profile": profile,
4214 }))?
4215 );
4216 } else {
4217 print!("{}", crate::pair_profile::render_self_summary()?);
4218 }
4219 Ok(())
4220}
4221
4222fn print_resolved_profile(resolved: &Value) {
4223 let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
4224 let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
4225 let relay = resolved
4226 .get("relay_url")
4227 .and_then(Value::as_str)
4228 .unwrap_or("");
4229 let slot = resolved
4230 .get("slot_id")
4231 .and_then(Value::as_str)
4232 .unwrap_or("");
4233 let profile = resolved
4234 .get("card")
4235 .and_then(|c| c.get("profile"))
4236 .cloned()
4237 .unwrap_or(Value::Null);
4238 println!("{did}");
4239 println!(" nick: {nick}");
4240 if !relay.is_empty() {
4241 println!(" relay_url: {relay}");
4242 }
4243 if !slot.is_empty() {
4244 println!(" slot_id: {slot}");
4245 }
4246 let pick =
4247 |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
4248 if let Some(s) = pick("display_name") {
4249 println!(" display_name: {s}");
4250 }
4251 if let Some(s) = pick("emoji") {
4252 println!(" emoji: {s}");
4253 }
4254 if let Some(s) = pick("motto") {
4255 println!(" motto: {s}");
4256 }
4257 if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
4258 let joined: Vec<String> = arr
4259 .iter()
4260 .filter_map(|v| v.as_str().map(str::to_string))
4261 .collect();
4262 println!(" vibe: {}", joined.join(", "));
4263 }
4264 if let Some(s) = pick("pronouns") {
4265 println!(" pronouns: {s}");
4266 }
4267}
4268
4269fn host_of_url(url: &str) -> String {
4277 let no_scheme = url
4278 .trim_start_matches("https://")
4279 .trim_start_matches("http://");
4280 no_scheme
4281 .split('/')
4282 .next()
4283 .unwrap_or("")
4284 .split(':')
4285 .next()
4286 .unwrap_or("")
4287 .to_string()
4288}
4289
4290fn is_known_relay_domain(peer_domain: &str, our_relay_url: &str) -> bool {
4294 const KNOWN_GOOD: &[&str] = &["wireup.net", "wire.laulpogan.com"];
4296 let peer_domain = peer_domain.trim().to_ascii_lowercase();
4297 if KNOWN_GOOD.iter().any(|k| *k == peer_domain) {
4298 return true;
4299 }
4300 let our_host = host_of_url(our_relay_url).to_ascii_lowercase();
4303 if !our_host.is_empty() && our_host == peer_domain {
4304 return true;
4305 }
4306 false
4307}
4308
4309fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
4310 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4311
4312 let (our_did, our_relay, our_slot_id, our_slot_token) =
4314 crate::pair_invite::ensure_self_with_relay(relay_override)?;
4315 if our_did == format!("did:wire:{}", parsed.nick) {
4316 bail!("refusing to add self (handle matches own DID)");
4318 }
4319
4320 if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
4330 return cmd_add_accept_pending(
4331 handle_arg,
4332 &parsed.nick,
4333 &pending,
4334 &our_relay,
4335 &our_slot_id,
4336 &our_slot_token,
4337 as_json,
4338 );
4339 }
4340
4341 if !is_known_relay_domain(&parsed.domain, &our_relay) {
4358 eprintln!(
4359 "wire add: WARN unfamiliar relay domain `{}`.",
4360 parsed.domain
4361 );
4362 eprintln!(
4363 " This is NOT `wireup.net` (the default), NOT your own relay (`{}`), "
4364 ,
4365 host_of_url(&our_relay)
4366 );
4367 eprintln!(
4368 " and not on the known-good list. If you meant `{}@wireup.net`, "
4369 ,
4370 parsed.nick
4371 );
4372 eprintln!(
4373 " run `wire add {}@wireup.net` instead. Otherwise verify with your",
4374 parsed.nick
4375 );
4376 eprintln!(" peer out-of-band that they actually run a relay at this domain");
4377 eprintln!(" before relying on the pair. (See issue #9.4.)");
4378 }
4379
4380 let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4382 let peer_card = resolved
4383 .get("card")
4384 .cloned()
4385 .ok_or_else(|| anyhow!("resolved missing card"))?;
4386 let peer_did = resolved
4387 .get("did")
4388 .and_then(Value::as_str)
4389 .ok_or_else(|| anyhow!("resolved missing did"))?
4390 .to_string();
4391 let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
4392 let peer_slot_id = resolved
4393 .get("slot_id")
4394 .and_then(Value::as_str)
4395 .ok_or_else(|| anyhow!("resolved missing slot_id"))?
4396 .to_string();
4397 let peer_relay = resolved
4398 .get("relay_url")
4399 .and_then(Value::as_str)
4400 .map(str::to_string)
4401 .or_else(|| relay_override.map(str::to_string))
4402 .unwrap_or_else(|| format!("https://{}", parsed.domain));
4403
4404 let mut trust = config::read_trust()?;
4406 crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
4407 config::write_trust(&trust)?;
4408 let mut relay_state = config::read_relay_state()?;
4409 let existing_token = relay_state
4410 .get("peers")
4411 .and_then(|p| p.get(&peer_handle))
4412 .and_then(|p| p.get("slot_token"))
4413 .and_then(Value::as_str)
4414 .map(str::to_string)
4415 .unwrap_or_default();
4416 relay_state["peers"][&peer_handle] = json!({
4417 "relay_url": peer_relay,
4418 "slot_id": peer_slot_id,
4419 "slot_token": existing_token, });
4421 config::write_relay_state(&relay_state)?;
4422
4423 let our_card = config::read_agent_card()?;
4426 let sk_seed = config::read_private_key()?;
4427 let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4428 let pk_b64 = our_card
4429 .get("verify_keys")
4430 .and_then(Value::as_object)
4431 .and_then(|m| m.values().next())
4432 .and_then(|v| v.get("key"))
4433 .and_then(Value::as_str)
4434 .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4435 let pk_bytes = crate::signing::b64decode(pk_b64)?;
4436 let now = time::OffsetDateTime::now_utc()
4437 .format(&time::format_description::well_known::Rfc3339)
4438 .unwrap_or_default();
4439 let our_relay_state = config::read_relay_state().unwrap_or_else(|_| json!({}));
4444 let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4445 let mut body = json!({
4446 "card": our_card,
4447 "relay_url": our_relay,
4448 "slot_id": our_slot_id,
4449 "slot_token": our_slot_token,
4450 });
4451 if !our_endpoints.is_empty() {
4452 body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4453 }
4454 let event = json!({
4455 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4456 "timestamp": now,
4457 "from": our_did,
4458 "to": peer_did,
4459 "type": "pair_drop",
4460 "kind": 1100u32,
4461 "body": body,
4462 });
4463 let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4464
4465 let client = crate::relay_client::RelayClient::new(&peer_relay);
4467 let resp = client.handle_intro(&parsed.nick, &signed)?;
4468 let event_id = signed
4469 .get("event_id")
4470 .and_then(Value::as_str)
4471 .unwrap_or("")
4472 .to_string();
4473
4474 if as_json {
4475 println!(
4476 "{}",
4477 serde_json::to_string(&json!({
4478 "handle": handle_arg,
4479 "paired_with": peer_did,
4480 "peer_handle": peer_handle,
4481 "event_id": event_id,
4482 "drop_response": resp,
4483 "status": "drop_sent",
4484 }))?
4485 );
4486 } else {
4487 println!(
4488 "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
4489 );
4490 }
4491 Ok(())
4492}
4493
4494fn cmd_add_accept_pending(
4501 handle_arg: &str,
4502 peer_nick: &str,
4503 pending: &crate::pending_inbound_pair::PendingInboundPair,
4504 _our_relay: &str,
4505 _our_slot_id: &str,
4506 _our_slot_token: &str,
4507 as_json: bool,
4508) -> Result<()> {
4509 let mut trust = config::read_trust()?;
4512 crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
4513 config::write_trust(&trust)?;
4514
4515 let mut relay_state = config::read_relay_state()?;
4521 let endpoints_to_pin = if pending.peer_endpoints.is_empty() {
4522 vec![crate::endpoints::Endpoint::federation(
4523 pending.peer_relay_url.clone(),
4524 pending.peer_slot_id.clone(),
4525 pending.peer_slot_token.clone(),
4526 )]
4527 } else {
4528 pending.peer_endpoints.clone()
4529 };
4530 crate::endpoints::pin_peer_endpoints(
4531 &mut relay_state,
4532 &pending.peer_handle,
4533 &endpoints_to_pin,
4534 )?;
4535 config::write_relay_state(&relay_state)?;
4536
4537 crate::pair_invite::send_pair_drop_ack(
4539 &pending.peer_handle,
4540 &pending.peer_relay_url,
4541 &pending.peer_slot_id,
4542 &pending.peer_slot_token,
4543 )
4544 .with_context(|| {
4545 format!(
4546 "pair_drop_ack send to {} @ {} slot {} failed",
4547 pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
4548 )
4549 })?;
4550
4551 crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
4553
4554 if as_json {
4555 println!(
4556 "{}",
4557 serde_json::to_string(&json!({
4558 "handle": handle_arg,
4559 "paired_with": pending.peer_did,
4560 "peer_handle": pending.peer_handle,
4561 "status": "bilateral_accepted",
4562 "via": "pending_inbound",
4563 }))?
4564 );
4565 } else {
4566 println!(
4567 "→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
4568 peer = pending.peer_handle,
4569 );
4570 }
4571 Ok(())
4572}
4573
4574fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
4581 let nick = crate::agent_card::bare_handle(peer_nick);
4582 let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
4583 anyhow!(
4584 "no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
4585 or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
4586 )
4587 })?;
4588 let (_our_did, our_relay, our_slot_id, our_slot_token) =
4589 crate::pair_invite::ensure_self_with_relay(None)?;
4590 let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
4591 cmd_add_accept_pending(
4592 &handle_arg,
4593 nick,
4594 &pending,
4595 &our_relay,
4596 &our_slot_id,
4597 &our_slot_token,
4598 as_json,
4599 )
4600}
4601
4602fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
4605 let items = crate::pending_inbound_pair::list_pending_inbound()?;
4606 if as_json {
4607 println!("{}", serde_json::to_string(&items)?);
4608 return Ok(());
4609 }
4610 if items.is_empty() {
4611 println!("no pending inbound pair requests.");
4612 return Ok(());
4613 }
4614 println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
4615 for p in items {
4616 println!(
4617 "{:<20} {:<35} {:<25} {}",
4618 p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
4619 );
4620 }
4621 println!(
4622 "→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`."
4623 );
4624 Ok(())
4625}
4626
4627fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
4631 let nick = crate::agent_card::bare_handle(peer_nick);
4632 let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
4633 crate::pending_inbound_pair::consume_pending_inbound(nick)?;
4634
4635 if as_json {
4636 println!(
4637 "{}",
4638 serde_json::to_string(&json!({
4639 "peer": nick,
4640 "rejected": existed.is_some(),
4641 "had_pending": existed.is_some(),
4642 }))?
4643 );
4644 } else if existed.is_some() {
4645 println!("→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent.");
4646 } else {
4647 println!("no pending pair from {nick} — nothing to reject");
4648 }
4649 Ok(())
4650}
4651
4652fn cmd_session(cmd: SessionCommand) -> Result<()> {
4661 match cmd {
4662 SessionCommand::New {
4663 name,
4664 relay,
4665 with_local,
4666 local_relay,
4667 no_daemon,
4668 json,
4669 } => cmd_session_new(
4670 name.as_deref(),
4671 &relay,
4672 with_local,
4673 &local_relay,
4674 no_daemon,
4675 json,
4676 ),
4677 SessionCommand::List { json } => cmd_session_list(json),
4678 SessionCommand::ListLocal { json } => cmd_session_list_local(json),
4679 SessionCommand::Env { name, json } => cmd_session_env(name.as_deref(), json),
4680 SessionCommand::Current { json } => cmd_session_current(json),
4681 SessionCommand::Destroy { name, force, json } => cmd_session_destroy(&name, force, json),
4682 }
4683}
4684
4685fn resolve_session_name(name: Option<&str>) -> Result<String> {
4686 if let Some(n) = name {
4687 return Ok(crate::session::sanitize_name(n));
4688 }
4689 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4690 let registry = crate::session::read_registry().unwrap_or_default();
4691 Ok(crate::session::derive_name_from_cwd(&cwd, ®istry))
4692}
4693
4694fn cmd_session_new(
4695 name_arg: Option<&str>,
4696 relay: &str,
4697 with_local: bool,
4698 local_relay: &str,
4699 no_daemon: bool,
4700 as_json: bool,
4701) -> Result<()> {
4702 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4703 let mut registry = crate::session::read_registry().unwrap_or_default();
4704 let name = match name_arg {
4705 Some(n) => crate::session::sanitize_name(n),
4706 None => crate::session::derive_name_from_cwd(&cwd, ®istry),
4707 };
4708 let session_home = crate::session::session_dir(&name)?;
4709
4710 let already_exists = session_home.exists()
4711 && session_home
4712 .join("config")
4713 .join("wire")
4714 .join("agent-card.json")
4715 .exists();
4716 if already_exists {
4717 registry
4721 .by_cwd
4722 .insert(cwd.to_string_lossy().into_owned(), name.clone());
4723 crate::session::write_registry(®istry)?;
4724 let info = render_session_info(&name, &session_home, &cwd)?;
4725 emit_session_new_result(&info, "already_exists", as_json)?;
4726 if !no_daemon {
4727 ensure_session_daemon(&session_home)?;
4728 }
4729 return Ok(());
4730 }
4731
4732 std::fs::create_dir_all(&session_home)
4733 .with_context(|| format!("creating session dir {session_home:?}"))?;
4734
4735 let init_status = run_wire_with_home(
4737 &session_home,
4738 &["init", &name, "--relay", relay],
4739 )?;
4740 if !init_status.success() {
4741 bail!(
4742 "`wire init {name} --relay {relay}` failed inside session dir {session_home:?}"
4743 );
4744 }
4745
4746 let mut claim_attempt = 0u32;
4751 let mut effective_handle = name.clone();
4752 loop {
4753 claim_attempt += 1;
4754 let status = run_wire_with_home(
4755 &session_home,
4756 &["claim", &effective_handle, "--relay", relay],
4757 )?;
4758 if status.success() {
4759 break;
4760 }
4761 if claim_attempt >= 5 {
4762 bail!(
4763 "5 failed attempts to claim a handle on {relay} for session {name}. \
4764 Try `wire session destroy {name} --force` and re-run with a different name."
4765 );
4766 }
4767 let attempt_path = cwd.join(format!("__attempt_{claim_attempt}"));
4771 let suffix = crate::session::derive_name_from_cwd(&attempt_path, ®istry);
4772 let token = suffix
4776 .rsplit('-')
4777 .next()
4778 .filter(|t| t.len() == 4)
4779 .map(str::to_string)
4780 .unwrap_or_else(|| format!("{claim_attempt}"));
4781 effective_handle = format!("{name}-{token}");
4782 }
4783
4784 registry
4787 .by_cwd
4788 .insert(cwd.to_string_lossy().into_owned(), name.clone());
4789 crate::session::write_registry(®istry)?;
4790
4791 if with_local {
4797 try_allocate_local_slot(&session_home, &effective_handle, relay, local_relay);
4798 }
4799
4800 if !no_daemon {
4801 ensure_session_daemon(&session_home)?;
4802 }
4803
4804 let info = render_session_info(&name, &session_home, &cwd)?;
4805 emit_session_new_result(&info, "created", as_json)
4806}
4807
4808fn try_allocate_local_slot(
4816 session_home: &std::path::Path,
4817 handle: &str,
4818 federation_relay: &str,
4819 local_relay: &str,
4820) {
4821 let probe = match crate::relay_client::build_blocking_client(Some(
4824 std::time::Duration::from_millis(500),
4825 )) {
4826 Ok(c) => c,
4827 Err(e) => {
4828 eprintln!("wire session new: cannot build probe client for {local_relay}: {e:#}");
4829 return;
4830 }
4831 };
4832 let healthz_url = format!("{}/healthz", local_relay.trim_end_matches('/'));
4833 match probe.get(&healthz_url).send() {
4834 Ok(resp) if resp.status().is_success() => {}
4835 Ok(resp) => {
4836 eprintln!(
4837 "wire session new: local relay probe at {healthz_url} returned {} — staying federation-only",
4838 resp.status()
4839 );
4840 return;
4841 }
4842 Err(e) => {
4843 eprintln!(
4844 "wire session new: local relay at {local_relay} unreachable ({}) — staying federation-only. \
4845 Start one with `wire relay-server --bind 127.0.0.1:8771 --local-only`.",
4846 crate::relay_client::format_transport_error(&anyhow::Error::new(e))
4847 );
4848 return;
4849 }
4850 };
4851
4852 let local_client = crate::relay_client::RelayClient::new(local_relay);
4854 let alloc = match local_client.allocate_slot(Some(handle)) {
4855 Ok(a) => a,
4856 Err(e) => {
4857 eprintln!(
4858 "wire session new: local relay slot allocation failed: {e:#} — staying federation-only"
4859 );
4860 return;
4861 }
4862 };
4863
4864 let state_path = session_home
4870 .join("config")
4871 .join("wire")
4872 .join("relay-state.json");
4873 let mut state: serde_json::Value = std::fs::read(&state_path)
4874 .ok()
4875 .and_then(|b| serde_json::from_slice(&b).ok())
4876 .unwrap_or_else(|| serde_json::json!({}));
4877 let fed_endpoint = state
4880 .get("self")
4881 .and_then(|s| {
4882 let url = s.get("relay_url").and_then(serde_json::Value::as_str)?;
4883 let slot_id = s.get("slot_id").and_then(serde_json::Value::as_str)?;
4884 let slot_token = s.get("slot_token").and_then(serde_json::Value::as_str)?;
4885 Some(crate::endpoints::Endpoint::federation(
4886 url.to_string(),
4887 slot_id.to_string(),
4888 slot_token.to_string(),
4889 ))
4890 });
4891
4892 let local_endpoint = crate::endpoints::Endpoint::local(
4893 local_relay.trim_end_matches('/').to_string(),
4894 alloc.slot_id.clone(),
4895 alloc.slot_token.clone(),
4896 );
4897
4898 let mut endpoints: Vec<crate::endpoints::Endpoint> = Vec::new();
4899 if let Some(f) = fed_endpoint.clone() {
4900 endpoints.push(f);
4901 }
4902 endpoints.push(local_endpoint);
4903
4904 let self_obj = state
4905 .as_object_mut()
4906 .expect("relay_state root is an object")
4907 .entry("self")
4908 .or_insert_with(|| {
4909 serde_json::json!({
4910 "relay_url": federation_relay,
4911 })
4912 });
4913 if let Some(obj) = self_obj.as_object_mut() {
4914 obj.insert(
4915 "endpoints".into(),
4916 serde_json::to_value(&endpoints).unwrap_or(serde_json::Value::Null),
4917 );
4918 }
4919
4920 if let Err(e) = std::fs::write(
4921 &state_path,
4922 serde_json::to_vec_pretty(&state).unwrap_or_default(),
4923 ) {
4924 eprintln!(
4925 "wire session new: persisting dual-slot relay_state at {state_path:?} failed: {e}"
4926 );
4927 return;
4928 }
4929 eprintln!(
4930 "wire session new: local slot allocated on {local_relay} (slot_id={})",
4931 alloc.slot_id
4932 );
4933}
4934
4935fn render_session_info(
4936 name: &str,
4937 session_home: &std::path::Path,
4938 cwd: &std::path::Path,
4939) -> Result<serde_json::Value> {
4940 let card_path = session_home.join("config").join("wire").join("agent-card.json");
4941 let (did, handle) = if card_path.exists() {
4942 let card: Value = serde_json::from_slice(&std::fs::read(&card_path)?)?;
4943 let did = card
4944 .get("did")
4945 .and_then(Value::as_str)
4946 .unwrap_or("")
4947 .to_string();
4948 let handle = card
4949 .get("handle")
4950 .and_then(Value::as_str)
4951 .map(str::to_string)
4952 .unwrap_or_else(|| {
4953 crate::agent_card::display_handle_from_did(&did).to_string()
4954 });
4955 (did, handle)
4956 } else {
4957 (String::new(), String::new())
4958 };
4959 Ok(json!({
4960 "name": name,
4961 "home_dir": session_home.to_string_lossy(),
4962 "cwd": cwd.to_string_lossy(),
4963 "did": did,
4964 "handle": handle,
4965 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
4966 }))
4967}
4968
4969fn emit_session_new_result(
4970 info: &serde_json::Value,
4971 status: &str,
4972 as_json: bool,
4973) -> Result<()> {
4974 if as_json {
4975 let mut obj = info.clone();
4976 obj["status"] = json!(status);
4977 println!("{}", serde_json::to_string(&obj)?);
4978 } else {
4979 let name = info["name"].as_str().unwrap_or("?");
4980 let handle = info["handle"].as_str().unwrap_or("?");
4981 let home = info["home_dir"].as_str().unwrap_or("?");
4982 let did = info["did"].as_str().unwrap_or("?");
4983 let export = info["export"].as_str().unwrap_or("?");
4984 let prefix = if status == "already_exists" {
4985 "session already exists (re-registered cwd)"
4986 } else {
4987 "session created"
4988 };
4989 println!(
4990 "{prefix}\n name: {name}\n handle: {handle}\n did: {did}\n home: {home}\n\nactivate with:\n {export}"
4991 );
4992 }
4993 Ok(())
4994}
4995
4996fn run_wire_with_home(
4997 session_home: &std::path::Path,
4998 args: &[&str],
4999) -> Result<std::process::ExitStatus> {
5000 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5001 let status = std::process::Command::new(&bin)
5002 .env("WIRE_HOME", session_home)
5003 .env_remove("RUST_LOG")
5004 .args(args)
5005 .status()
5006 .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
5007 Ok(status)
5008}
5009
5010fn ensure_session_daemon(session_home: &std::path::Path) -> Result<()> {
5011 let pidfile = session_home
5014 .join("state")
5015 .join("wire")
5016 .join("daemon.pid");
5017 if pidfile.exists() {
5018 let bytes = std::fs::read(&pidfile).unwrap_or_default();
5019 let pid: Option<u32> =
5020 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5021 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5022 } else {
5023 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5024 };
5025 if let Some(p) = pid {
5026 let alive = {
5027 #[cfg(target_os = "linux")]
5028 {
5029 std::path::Path::new(&format!("/proc/{p}")).exists()
5030 }
5031 #[cfg(not(target_os = "linux"))]
5032 {
5033 std::process::Command::new("kill")
5034 .args(["-0", &p.to_string()])
5035 .output()
5036 .map(|o| o.status.success())
5037 .unwrap_or(false)
5038 }
5039 };
5040 if alive {
5041 return Ok(());
5042 }
5043 }
5044 }
5045
5046 let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5049 let log_path = session_home.join("state").join("wire").join("daemon.log");
5050 if let Some(parent) = log_path.parent() {
5051 std::fs::create_dir_all(parent).ok();
5052 }
5053 let log_file = std::fs::OpenOptions::new()
5054 .create(true)
5055 .append(true)
5056 .open(&log_path)
5057 .with_context(|| format!("opening daemon log {log_path:?}"))?;
5058 let log_err = log_file.try_clone()?;
5059 std::process::Command::new(&bin)
5060 .env("WIRE_HOME", session_home)
5061 .env_remove("RUST_LOG")
5062 .args(["daemon", "--interval", "5"])
5063 .stdout(log_file)
5064 .stderr(log_err)
5065 .stdin(std::process::Stdio::null())
5066 .spawn()
5067 .with_context(|| "spawning session-local `wire daemon`")?;
5068 Ok(())
5069}
5070
5071fn cmd_session_list(as_json: bool) -> Result<()> {
5072 let items = crate::session::list_sessions()?;
5073 if as_json {
5074 println!("{}", serde_json::to_string(&items)?);
5075 return Ok(());
5076 }
5077 if items.is_empty() {
5078 println!("no sessions on this machine. `wire session new` to create one.");
5079 return Ok(());
5080 }
5081 println!(
5082 "{:<24} {:<24} {:<10} CWD",
5083 "NAME", "HANDLE", "DAEMON"
5084 );
5085 for s in items {
5086 println!(
5087 "{:<24} {:<24} {:<10} {}",
5088 s.name,
5089 s.handle.as_deref().unwrap_or("?"),
5090 if s.daemon_running { "running" } else { "down" },
5091 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5092 );
5093 }
5094 Ok(())
5095}
5096
5097fn cmd_session_list_local(as_json: bool) -> Result<()> {
5109 let listing = crate::session::list_local_sessions()?;
5110 if as_json {
5111 println!("{}", serde_json::to_string(&listing)?);
5112 return Ok(());
5113 }
5114
5115 if listing.local.is_empty() && listing.federation_only.is_empty() {
5116 println!(
5117 "no sessions on this machine. `wire session new --with-local` to create one \
5118 with a local-relay endpoint (start the relay first: \
5119 `wire relay-server --bind 127.0.0.1:8771 --local-only`)."
5120 );
5121 return Ok(());
5122 }
5123
5124 if listing.local.is_empty() {
5125 println!(
5126 "no sister sessions reachable via a local relay. \
5127 Re-run `wire session new --with-local` to add a Local endpoint, or \
5128 start a local relay with `wire relay-server --bind 127.0.0.1:8771 --local-only`."
5129 );
5130 } else {
5131 let mut keys: Vec<&String> = listing.local.keys().collect();
5133 keys.sort();
5134 for relay_url in keys {
5135 let group = &listing.local[relay_url];
5136 println!("LOCAL RELAY: {relay_url}");
5137 println!(
5138 " {:<24} {:<32} {:<10} CWD",
5139 "NAME", "HANDLE", "DAEMON"
5140 );
5141 for s in group {
5142 println!(
5143 " {:<24} {:<32} {:<10} {}",
5144 s.name,
5145 s.handle.as_deref().unwrap_or("?"),
5146 if s.daemon_running { "running" } else { "down" },
5147 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5148 );
5149 }
5150 println!();
5151 }
5152 }
5153
5154 if !listing.federation_only.is_empty() {
5155 println!("federation-only (no local endpoint):");
5156 for s in &listing.federation_only {
5157 println!(
5158 " {:<24} {:<32} {}",
5159 s.name,
5160 s.handle.as_deref().unwrap_or("?"),
5161 s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5162 );
5163 }
5164 }
5165 Ok(())
5166}
5167
5168fn cmd_session_env(name_arg: Option<&str>, as_json: bool) -> Result<()> {
5169 let name = resolve_session_name(name_arg)?;
5170 let session_home = crate::session::session_dir(&name)?;
5171 if !session_home.exists() {
5172 bail!(
5173 "no session named {name:?} on this machine. `wire session list` to enumerate, \
5174 `wire session new {name}` to create."
5175 );
5176 }
5177 if as_json {
5178 println!(
5179 "{}",
5180 serde_json::to_string(&json!({
5181 "name": name,
5182 "home_dir": session_home.to_string_lossy(),
5183 "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
5184 }))?
5185 );
5186 } else {
5187 println!("export WIRE_HOME={}", session_home.to_string_lossy());
5188 }
5189 Ok(())
5190}
5191
5192fn cmd_session_current(as_json: bool) -> Result<()> {
5193 let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5194 let registry = crate::session::read_registry().unwrap_or_default();
5195 let cwd_key = cwd.to_string_lossy().into_owned();
5196 let name = registry.by_cwd.get(&cwd_key).cloned();
5197 if as_json {
5198 println!(
5199 "{}",
5200 serde_json::to_string(&json!({
5201 "cwd": cwd_key,
5202 "session": name,
5203 }))?
5204 );
5205 } else if let Some(n) = name {
5206 println!("{n}");
5207 } else {
5208 println!("(no session registered for this cwd)");
5209 }
5210 Ok(())
5211}
5212
5213fn cmd_session_destroy(name_arg: &str, force: bool, as_json: bool) -> Result<()> {
5214 let name = crate::session::sanitize_name(name_arg);
5215 let session_home = crate::session::session_dir(&name)?;
5216 if !session_home.exists() {
5217 if as_json {
5218 println!(
5219 "{}",
5220 serde_json::to_string(&json!({
5221 "name": name,
5222 "destroyed": false,
5223 "reason": "no such session",
5224 }))?
5225 );
5226 } else {
5227 println!("no session named {name:?} — nothing to destroy.");
5228 }
5229 return Ok(());
5230 }
5231 if !force {
5232 bail!(
5233 "destroying session {name:?} would delete its keypair + state irrecoverably. \
5234 Pass --force to confirm."
5235 );
5236 }
5237
5238 let pidfile = session_home
5240 .join("state")
5241 .join("wire")
5242 .join("daemon.pid");
5243 if let Ok(bytes) = std::fs::read(&pidfile) {
5244 let pid: Option<u32> =
5245 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5246 v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5247 } else {
5248 String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5249 };
5250 if let Some(p) = pid {
5251 let _ = std::process::Command::new("kill")
5252 .args(["-TERM", &p.to_string()])
5253 .output();
5254 }
5255 }
5256
5257 std::fs::remove_dir_all(&session_home)
5258 .with_context(|| format!("removing session dir {session_home:?}"))?;
5259
5260 let mut registry = crate::session::read_registry().unwrap_or_default();
5262 registry.by_cwd.retain(|_, v| v != &name);
5263 crate::session::write_registry(®istry)?;
5264
5265 if as_json {
5266 println!(
5267 "{}",
5268 serde_json::to_string(&json!({
5269 "name": name,
5270 "destroyed": true,
5271 }))?
5272 );
5273 } else {
5274 println!("destroyed session {name:?}.");
5275 }
5276 Ok(())
5277}
5278
5279fn cmd_diag(action: DiagAction) -> Result<()> {
5282 let state = config::state_dir()?;
5283 let knob = state.join("diag.enabled");
5284 let log_path = state.join("diag.jsonl");
5285 match action {
5286 DiagAction::Tail { limit, json } => {
5287 let entries = crate::diag::tail(limit);
5288 if json {
5289 for e in entries {
5290 println!("{}", serde_json::to_string(&e)?);
5291 }
5292 } else if entries.is_empty() {
5293 println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
5294 } else {
5295 for e in entries {
5296 let ts = e["ts"].as_u64().unwrap_or(0);
5297 let ty = e["type"].as_str().unwrap_or("?");
5298 let pid = e["pid"].as_u64().unwrap_or(0);
5299 let payload = e["payload"].to_string();
5300 println!("[{ts}] pid={pid} {ty} {payload}");
5301 }
5302 }
5303 }
5304 DiagAction::Enable => {
5305 config::ensure_dirs()?;
5306 std::fs::write(&knob, "1")?;
5307 println!("wire diag: enabled at {knob:?}");
5308 }
5309 DiagAction::Disable => {
5310 if knob.exists() {
5311 std::fs::remove_file(&knob)?;
5312 }
5313 println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
5314 }
5315 DiagAction::Status { json } => {
5316 let enabled = crate::diag::is_enabled();
5317 let size = std::fs::metadata(&log_path)
5318 .map(|m| m.len())
5319 .unwrap_or(0);
5320 if json {
5321 println!(
5322 "{}",
5323 serde_json::to_string(&serde_json::json!({
5324 "enabled": enabled,
5325 "log_path": log_path,
5326 "log_size_bytes": size,
5327 }))?
5328 );
5329 } else {
5330 println!("wire diag status");
5331 println!(" enabled: {enabled}");
5332 println!(" log: {log_path:?}");
5333 println!(" log size: {size} bytes");
5334 }
5335 }
5336 }
5337 Ok(())
5338}
5339
5340fn cmd_service(action: ServiceAction) -> Result<()> {
5343 let (report, as_json) = match action {
5344 ServiceAction::Install { json } => (crate::service::install()?, json),
5345 ServiceAction::Uninstall { json } => (crate::service::uninstall()?, json),
5346 ServiceAction::Status { json } => (crate::service::status()?, json),
5347 };
5348 if as_json {
5349 println!("{}", serde_json::to_string(&report)?);
5350 } else {
5351 println!("wire service {}", report.action);
5352 println!(" platform: {}", report.platform);
5353 println!(" unit: {}", report.unit_path);
5354 println!(" status: {}", report.status);
5355 println!(" detail: {}", report.detail);
5356 }
5357 Ok(())
5358}
5359
5360fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
5375 let pgrep_out = std::process::Command::new("pgrep")
5377 .args(["-f", "wire daemon"])
5378 .output();
5379 let running_pids: Vec<u32> = match pgrep_out {
5380 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
5381 .split_whitespace()
5382 .filter_map(|s| s.parse::<u32>().ok())
5383 .collect(),
5384 _ => Vec::new(),
5385 };
5386
5387 let record = crate::ensure_up::read_pid_record("daemon");
5389 let recorded_version: Option<String> = match &record {
5390 crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
5391 crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
5392 _ => None,
5393 };
5394 let cli_version = env!("CARGO_PKG_VERSION").to_string();
5395
5396 if check_only {
5397 let report = json!({
5398 "running_pids": running_pids,
5399 "pidfile_version": recorded_version,
5400 "cli_version": cli_version,
5401 "would_kill": running_pids,
5402 });
5403 if as_json {
5404 println!("{}", serde_json::to_string(&report)?);
5405 } else {
5406 println!("wire upgrade --check");
5407 println!(" cli version: {cli_version}");
5408 println!(" pidfile version: {}", recorded_version.as_deref().unwrap_or("(missing)"));
5409 if running_pids.is_empty() {
5410 println!(" running daemons: none");
5411 } else {
5412 let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
5413 println!(" running daemons: pids {}", pids.join(", "));
5414 println!(" would kill all + spawn fresh");
5415 }
5416 }
5417 return Ok(());
5418 }
5419
5420 let mut killed: Vec<u32> = Vec::new();
5423 for pid in &running_pids {
5424 let _ = std::process::Command::new("kill")
5426 .args(["-15", &pid.to_string()])
5427 .status();
5428 killed.push(*pid);
5429 }
5430 if !killed.is_empty() {
5432 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
5433 loop {
5434 let still_alive: Vec<u32> = killed
5435 .iter()
5436 .copied()
5437 .filter(|p| process_alive_pid(*p))
5438 .collect();
5439 if still_alive.is_empty() {
5440 break;
5441 }
5442 if std::time::Instant::now() >= deadline {
5443 for pid in still_alive {
5445 let _ = std::process::Command::new("kill")
5446 .args(["-9", &pid.to_string()])
5447 .status();
5448 }
5449 break;
5450 }
5451 std::thread::sleep(std::time::Duration::from_millis(50));
5452 }
5453 }
5454
5455 let pidfile = config::state_dir()?.join("daemon.pid");
5458 if pidfile.exists() {
5459 let _ = std::fs::remove_file(&pidfile);
5460 }
5461
5462 let spawned = crate::ensure_up::ensure_daemon_running()?;
5465
5466 let new_record = crate::ensure_up::read_pid_record("daemon");
5467 let new_pid = new_record.pid();
5468 let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
5469 Some(d.version.clone())
5470 } else {
5471 None
5472 };
5473
5474 if as_json {
5475 println!(
5476 "{}",
5477 serde_json::to_string(&json!({
5478 "killed": killed,
5479 "spawned_fresh_daemon": spawned,
5480 "new_pid": new_pid,
5481 "new_version": new_version,
5482 "cli_version": cli_version,
5483 }))?
5484 );
5485 } else {
5486 if killed.is_empty() {
5487 println!("wire upgrade: no stale daemons running");
5488 } else {
5489 println!("wire upgrade: killed {} daemon(s) (pids {})",
5490 killed.len(),
5491 killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
5492 }
5493 if spawned {
5494 println!(
5495 "wire upgrade: spawned fresh daemon (pid {} v{})",
5496 new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
5497 new_version.as_deref().unwrap_or(&cli_version),
5498 );
5499 } else {
5500 println!("wire upgrade: daemon was already running on current binary");
5501 }
5502 }
5503 Ok(())
5504}
5505
5506fn process_alive_pid(pid: u32) -> bool {
5507 #[cfg(target_os = "linux")]
5508 {
5509 std::path::Path::new(&format!("/proc/{pid}")).exists()
5510 }
5511 #[cfg(not(target_os = "linux"))]
5512 {
5513 std::process::Command::new("kill")
5514 .args(["-0", &pid.to_string()])
5515 .stdin(std::process::Stdio::null())
5516 .stdout(std::process::Stdio::null())
5517 .stderr(std::process::Stdio::null())
5518 .status()
5519 .map(|s| s.success())
5520 .unwrap_or(false)
5521 }
5522}
5523
5524#[derive(Clone, Debug, serde::Serialize)]
5528pub struct DoctorCheck {
5529 pub id: String,
5532 pub status: String,
5534 pub detail: String,
5536 #[serde(skip_serializing_if = "Option::is_none")]
5538 pub fix: Option<String>,
5539}
5540
5541impl DoctorCheck {
5542 fn pass(id: &str, detail: impl Into<String>) -> Self {
5543 Self {
5544 id: id.into(),
5545 status: "PASS".into(),
5546 detail: detail.into(),
5547 fix: None,
5548 }
5549 }
5550 fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5551 Self {
5552 id: id.into(),
5553 status: "WARN".into(),
5554 detail: detail.into(),
5555 fix: Some(fix.into()),
5556 }
5557 }
5558 fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5559 Self {
5560 id: id.into(),
5561 status: "FAIL".into(),
5562 detail: detail.into(),
5563 fix: Some(fix.into()),
5564 }
5565 }
5566}
5567
5568fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
5573 let mut checks: Vec<DoctorCheck> = Vec::new();
5574
5575 checks.push(check_daemon_health());
5576 checks.push(check_daemon_pid_consistency());
5577 checks.push(check_relay_reachable());
5578 checks.push(check_pair_rejections(recent_rejections));
5579 checks.push(check_cursor_progress());
5580
5581 let fails = checks.iter().filter(|c| c.status == "FAIL").count();
5582 let warns = checks.iter().filter(|c| c.status == "WARN").count();
5583
5584 if as_json {
5585 println!(
5586 "{}",
5587 serde_json::to_string(&json!({
5588 "checks": checks,
5589 "fail_count": fails,
5590 "warn_count": warns,
5591 "ok": fails == 0,
5592 }))?
5593 );
5594 } else {
5595 println!("wire doctor — {} checks", checks.len());
5596 for c in &checks {
5597 let bullet = match c.status.as_str() {
5598 "PASS" => "✓",
5599 "WARN" => "!",
5600 "FAIL" => "✗",
5601 _ => "?",
5602 };
5603 println!(" {bullet} [{}] {}: {}", c.status, c.id, c.detail);
5604 if let Some(fix) = &c.fix {
5605 println!(" fix: {fix}");
5606 }
5607 }
5608 println!();
5609 if fails == 0 && warns == 0 {
5610 println!("ALL GREEN");
5611 } else {
5612 println!("{fails} FAIL, {warns} WARN");
5613 }
5614 }
5615
5616 if fails > 0 {
5617 std::process::exit(1);
5618 }
5619 Ok(())
5620}
5621
5622fn check_daemon_health() -> DoctorCheck {
5629 let snap = crate::ensure_up::daemon_liveness();
5635 let pgrep_pids = &snap.pgrep_pids;
5636 let pidfile_pid = snap.pidfile_pid;
5637 let pidfile_alive = snap.pidfile_alive;
5638 let orphan_pids = &snap.orphan_pids;
5639
5640 let fmt_pids = |xs: &[u32]| -> String {
5641 xs.iter()
5642 .map(|p| p.to_string())
5643 .collect::<Vec<_>>()
5644 .join(", ")
5645 };
5646
5647 match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
5648 (0, _, _) => DoctorCheck::fail(
5649 "daemon",
5650 "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
5651 "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
5652 ),
5653 (1, true, true) => DoctorCheck::pass(
5655 "daemon",
5656 format!(
5657 "one daemon running (pid {}, matches pidfile)",
5658 pgrep_pids[0]
5659 ),
5660 ),
5661 (n, true, false) => DoctorCheck::fail(
5663 "daemon",
5664 format!(
5665 "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
5666 The orphans race the relay cursor — they advance past events your current binary can't process. \
5667 (Issue #2 exact class.)",
5668 fmt_pids(&pgrep_pids),
5669 pidfile_pid.unwrap(),
5670 fmt_pids(&orphan_pids),
5671 ),
5672 "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
5673 ),
5674 (n, false, _) => DoctorCheck::fail(
5676 "daemon",
5677 format!(
5678 "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
5679 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
5680 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
5681 fmt_pids(&pgrep_pids),
5682 match pidfile_pid {
5683 Some(p) => format!("claims pid {p} which is dead"),
5684 None => "is missing".to_string(),
5685 },
5686 ),
5687 "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
5688 ),
5689 (n, true, true) => DoctorCheck::warn(
5691 "daemon",
5692 format!(
5693 "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
5694 fmt_pids(&pgrep_pids)
5695 ),
5696 "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
5697 ),
5698 }
5699}
5700
5701fn check_daemon_pid_consistency() -> DoctorCheck {
5713 let snap = crate::ensure_up::daemon_liveness();
5714 match &snap.record {
5715 crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
5716 "daemon_pid_consistency",
5717 "no daemon.pid yet — fresh box or daemon never started",
5718 ),
5719 crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
5720 "daemon_pid_consistency",
5721 format!("daemon.pid is corrupt: {reason}"),
5722 "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
5723 ),
5724 crate::ensure_up::PidRecord::LegacyInt(pid) => {
5725 let pid = *pid;
5728 if !crate::ensure_up::pid_is_alive(pid) {
5729 return DoctorCheck::warn(
5730 "daemon_pid_consistency",
5731 format!(
5732 "daemon.pid (legacy-int) points at pid {pid} which is not running. \
5733 Stale pidfile from a crashed pre-0.5.11 daemon. \
5734 (Issue #2: this surface used to PASS while `wire status` said DOWN.)"
5735 ),
5736 "`wire upgrade` (kills any orphan + spawns a fresh daemon with JSON pidfile)",
5737 );
5738 }
5739 DoctorCheck::warn(
5740 "daemon_pid_consistency",
5741 format!(
5742 "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
5743 Daemon was started by a pre-0.5.11 binary."
5744 ),
5745 "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
5746 )
5747 }
5748 crate::ensure_up::PidRecord::Json(d) => {
5749 if !snap.pidfile_alive {
5753 return DoctorCheck::warn(
5754 "daemon_pid_consistency",
5755 format!(
5756 "daemon.pid records pid {pid} (v{version}) but that process is not running — \
5757 pidfile is stale. `wire status` will report DOWN, but pre-v0.5.19 doctor \
5758 silently PASSed this state and ignored any live orphan daemons (#2 root cause).",
5759 pid = d.pid,
5760 version = d.version,
5761 ),
5762 "`wire upgrade` to clean up the stale pidfile + spawn a fresh daemon \
5763 (kills any orphan daemon advancing the cursor without coordination)",
5764 );
5765 }
5766 let mut issues: Vec<String> = Vec::new();
5767 if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
5768 issues.push(format!(
5769 "schema={} (expected {})",
5770 d.schema,
5771 crate::ensure_up::DAEMON_PID_SCHEMA
5772 ));
5773 }
5774 let cli_version = env!("CARGO_PKG_VERSION");
5775 if d.version != cli_version {
5776 issues.push(format!(
5777 "version daemon={} cli={cli_version}",
5778 d.version
5779 ));
5780 }
5781 if !std::path::Path::new(&d.bin_path).exists() {
5782 issues.push(format!("bin_path {} missing on disk", d.bin_path));
5783 }
5784 if let Ok(card) = config::read_agent_card()
5786 && let Some(current_did) = card.get("did").and_then(Value::as_str)
5787 && let Some(recorded_did) = &d.did
5788 && recorded_did != current_did
5789 {
5790 issues.push(format!(
5791 "did daemon={recorded_did} config={current_did} — identity drift"
5792 ));
5793 }
5794 if let Ok(state) = config::read_relay_state()
5795 && let Some(current_relay) = state
5796 .get("self")
5797 .and_then(|s| s.get("relay_url"))
5798 .and_then(Value::as_str)
5799 && let Some(recorded_relay) = &d.relay_url
5800 && recorded_relay != current_relay
5801 {
5802 issues.push(format!(
5803 "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
5804 ));
5805 }
5806 if issues.is_empty() {
5807 DoctorCheck::pass(
5808 "daemon_pid_consistency",
5809 format!(
5810 "daemon v{} bound to {} as {}",
5811 d.version,
5812 d.relay_url.as_deref().unwrap_or("?"),
5813 d.did.as_deref().unwrap_or("?")
5814 ),
5815 )
5816 } else {
5817 DoctorCheck::warn(
5818 "daemon_pid_consistency",
5819 format!("daemon pidfile drift: {}", issues.join("; ")),
5820 "`wire upgrade` to atomically restart daemon with current config".to_string(),
5821 )
5822 }
5823 }
5824 }
5825}
5826
5827fn check_relay_reachable() -> DoctorCheck {
5829 let state = match config::read_relay_state() {
5830 Ok(s) => s,
5831 Err(e) => return DoctorCheck::fail(
5832 "relay",
5833 format!("could not read relay state: {e}"),
5834 "run `wire up <handle>@<relay>` to bootstrap",
5835 ),
5836 };
5837 let url = state
5838 .get("self")
5839 .and_then(|s| s.get("relay_url"))
5840 .and_then(Value::as_str)
5841 .unwrap_or("");
5842 if url.is_empty() {
5843 return DoctorCheck::warn(
5844 "relay",
5845 "no relay bound — wire send/pull will not work",
5846 "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
5847 );
5848 }
5849 let client = crate::relay_client::RelayClient::new(url);
5850 match client.check_healthz() {
5851 Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
5852 Err(e) => DoctorCheck::fail(
5853 "relay",
5854 format!("{url} unreachable: {e}"),
5855 format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
5856 ),
5857 }
5858}
5859
5860fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
5864 let path = match config::state_dir() {
5865 Ok(d) => d.join("pair-rejected.jsonl"),
5866 Err(e) => return DoctorCheck::warn(
5867 "pair_rejections",
5868 format!("could not resolve state dir: {e}"),
5869 "set WIRE_HOME or fix XDG_STATE_HOME",
5870 ),
5871 };
5872 if !path.exists() {
5873 return DoctorCheck::pass(
5874 "pair_rejections",
5875 "no pair-rejected.jsonl — no recorded pair failures",
5876 );
5877 }
5878 let body = match std::fs::read_to_string(&path) {
5879 Ok(b) => b,
5880 Err(e) => return DoctorCheck::warn(
5881 "pair_rejections",
5882 format!("could not read {path:?}: {e}"),
5883 "check file permissions",
5884 ),
5885 };
5886 let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
5887 if lines.is_empty() {
5888 return DoctorCheck::pass(
5889 "pair_rejections",
5890 "pair-rejected.jsonl present but empty",
5891 );
5892 }
5893 let total = lines.len();
5894 let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
5895 let mut summary: Vec<String> = Vec::new();
5896 for line in &recent {
5897 if let Ok(rec) = serde_json::from_str::<Value>(line) {
5898 let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
5899 let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
5900 summary.push(format!("{peer}/{code}"));
5901 }
5902 }
5903 DoctorCheck::warn(
5904 "pair_rejections",
5905 format!(
5906 "{total} pair failures recorded. recent: [{}]",
5907 summary.join(", ")
5908 ),
5909 format!(
5910 "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
5911 ),
5912 )
5913}
5914
5915fn check_cursor_progress() -> DoctorCheck {
5920 let state = match config::read_relay_state() {
5921 Ok(s) => s,
5922 Err(e) => return DoctorCheck::warn(
5923 "cursor",
5924 format!("could not read relay state: {e}"),
5925 "check ~/Library/Application Support/wire/relay.json",
5926 ),
5927 };
5928 let cursor = state
5929 .get("self")
5930 .and_then(|s| s.get("last_pulled_event_id"))
5931 .and_then(Value::as_str)
5932 .map(|s| s.chars().take(16).collect::<String>())
5933 .unwrap_or_else(|| "<none>".to_string());
5934 DoctorCheck::pass(
5935 "cursor",
5936 format!(
5937 "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
5938 ),
5939 )
5940}
5941
5942#[cfg(test)]
5943mod doctor_tests {
5944 use super::*;
5945
5946 #[test]
5947 fn doctor_check_constructors_set_status_correctly() {
5948 let p = DoctorCheck::pass("x", "ok");
5953 assert_eq!(p.status, "PASS");
5954 assert_eq!(p.fix, None);
5955
5956 let w = DoctorCheck::warn("x", "watch out", "do this");
5957 assert_eq!(w.status, "WARN");
5958 assert_eq!(w.fix, Some("do this".to_string()));
5959
5960 let f = DoctorCheck::fail("x", "broken", "fix it");
5961 assert_eq!(f.status, "FAIL");
5962 assert_eq!(f.fix, Some("fix it".to_string()));
5963 }
5964
5965 #[test]
5966 fn check_pair_rejections_no_file_is_pass() {
5967 config::test_support::with_temp_home(|| {
5970 config::ensure_dirs().unwrap();
5971 let c = check_pair_rejections(5);
5972 assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
5973 });
5974 }
5975
5976 #[test]
5977 fn check_pair_rejections_with_entries_warns() {
5978 config::test_support::with_temp_home(|| {
5982 config::ensure_dirs().unwrap();
5983 crate::pair_invite::record_pair_rejection(
5984 "willard",
5985 "pair_drop_ack_send_failed",
5986 "POST 502",
5987 );
5988 let c = check_pair_rejections(5);
5989 assert_eq!(c.status, "WARN");
5990 assert!(c.detail.contains("1 pair failures"));
5991 assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
5992 });
5993 }
5994}
5995
5996fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
6008 let (nick, relay_url) = match handle_arg.split_once('@') {
6009 Some((n, host)) => {
6010 let url = if host.starts_with("http://") || host.starts_with("https://") {
6011 host.to_string()
6012 } else {
6013 format!("https://{host}")
6014 };
6015 (n.to_string(), url)
6016 }
6017 None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
6018 };
6019
6020 let mut report: Vec<(String, String)> = Vec::new();
6021 let mut step = |stage: &str, detail: String| {
6022 report.push((stage.to_string(), detail.clone()));
6023 if !as_json {
6024 eprintln!("wire up: {stage} — {detail}");
6025 }
6026 };
6027
6028 if config::is_initialized()? {
6030 let card = config::read_agent_card()?;
6031 let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
6032 let existing_handle =
6033 crate::agent_card::display_handle_from_did(existing_did).to_string();
6034 if existing_handle != nick {
6035 bail!(
6036 "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
6037 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
6038 delete `{:?}` to start fresh.",
6039 config::config_dir()?
6040 );
6041 }
6042 step("init", format!("already initialized as {existing_handle}"));
6043 } else {
6044 cmd_init(&nick, name, Some(&relay_url), false)?;
6045 step("init", format!("created identity {nick} bound to {relay_url}"));
6046 }
6047
6048 let relay_state = config::read_relay_state()?;
6052 let bound_relay = relay_state
6053 .get("self")
6054 .and_then(|s| s.get("relay_url"))
6055 .and_then(Value::as_str)
6056 .unwrap_or("")
6057 .to_string();
6058 if bound_relay.is_empty() {
6059 cmd_bind_relay(&relay_url, false, false)?;
6063 step("bind-relay", format!("bound to {relay_url}"));
6064 } else if bound_relay != relay_url {
6065 step(
6066 "bind-relay",
6067 format!(
6068 "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
6069 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
6070 ),
6071 );
6072 } else {
6073 step("bind-relay", format!("already bound to {bound_relay}"));
6074 }
6075
6076 match cmd_claim(&nick, Some(&relay_url), None, false, false) {
6079 Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
6080 Err(e) => step(
6081 "claim",
6082 format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
6083 ),
6084 }
6085
6086 match crate::ensure_up::ensure_daemon_running() {
6088 Ok(true) => step("daemon", "started fresh background daemon".to_string()),
6089 Ok(false) => step("daemon", "already running".to_string()),
6090 Err(e) => step(
6091 "daemon",
6092 format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
6093 ),
6094 }
6095
6096 let summary = format!(
6098 "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
6099 `wire monitor` to watch incoming events."
6100 );
6101 step("ready", summary.clone());
6102
6103 if as_json {
6104 let steps_json: Vec<_> = report
6105 .iter()
6106 .map(|(k, v)| json!({"stage": k, "detail": v}))
6107 .collect();
6108 println!(
6109 "{}",
6110 serde_json::to_string(&json!({
6111 "nick": nick,
6112 "relay": relay_url,
6113 "steps": steps_json,
6114 }))?
6115 );
6116 }
6117 Ok(())
6118}
6119
6120fn strip_proto(url: &str) -> String {
6122 url.trim_start_matches("https://")
6123 .trim_start_matches("http://")
6124 .to_string()
6125}
6126
6127fn cmd_pair_megacommand(
6141 handle_arg: &str,
6142 relay_override: Option<&str>,
6143 timeout_secs: u64,
6144 _as_json: bool,
6145) -> Result<()> {
6146 let parsed = crate::pair_profile::parse_handle(handle_arg)?;
6147 let peer_handle = parsed.nick.clone();
6148
6149 eprintln!("wire pair: resolving {handle_arg}...");
6150 cmd_add(handle_arg, relay_override, false)?;
6151
6152 eprintln!(
6153 "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
6154 to ack (their daemon must be running + pulling)..."
6155 );
6156
6157 let _ = run_sync_pull();
6161
6162 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
6163 let poll_interval = std::time::Duration::from_millis(500);
6164
6165 loop {
6166 let _ = run_sync_pull();
6168 let relay_state = config::read_relay_state()?;
6169 let peer_entry = relay_state
6170 .get("peers")
6171 .and_then(|p| p.get(&peer_handle))
6172 .cloned();
6173 let token = peer_entry
6174 .as_ref()
6175 .and_then(|e| e.get("slot_token"))
6176 .and_then(Value::as_str)
6177 .unwrap_or("");
6178
6179 if !token.is_empty() {
6180 let trust = config::read_trust()?;
6182 let pinned_in_trust = trust
6183 .get("agents")
6184 .and_then(|a| a.get(&peer_handle))
6185 .is_some();
6186 println!(
6187 "wire pair: paired with {peer_handle}.\n trust: {} bilateral: yes (slot_token recorded)\n next: `wire send {peer_handle} \"<msg>\"`",
6188 if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
6189 );
6190 return Ok(());
6191 }
6192
6193 if std::time::Instant::now() >= deadline {
6194 bail!(
6201 "wire pair: timed out after {timeout_secs}s. \
6202 peer {peer_handle} never sent pair_drop_ack. \
6203 likely causes: (a) their daemon is down — ask them to run \
6204 `wire status` and `wire daemon &`; (b) their binary is older \
6205 than 0.5.x and doesn't understand pair_drop events — ask \
6206 them to `wire upgrade`; (c) network / relay blip — re-run \
6207 `wire pair {handle_arg}` to retry."
6208 );
6209 }
6210
6211 std::thread::sleep(poll_interval);
6212 }
6213}
6214
6215fn cmd_claim(
6216 nick: &str,
6217 relay_override: Option<&str>,
6218 public_url: Option<&str>,
6219 hidden: bool,
6220 as_json: bool,
6221) -> Result<()> {
6222 if !crate::pair_profile::is_valid_nick(nick) {
6223 bail!(
6224 "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
6225 );
6226 }
6227 let (_did, relay_url, slot_id, slot_token) =
6230 crate::pair_invite::ensure_self_with_relay(relay_override)?;
6231 let card = config::read_agent_card()?;
6232
6233 let client = crate::relay_client::RelayClient::new(&relay_url);
6234 let discoverable = if hidden { Some(false) } else { None };
6238 let resp = client.handle_claim_v2(
6239 nick,
6240 &slot_id,
6241 &slot_token,
6242 public_url,
6243 &card,
6244 discoverable,
6245 )?;
6246
6247 if as_json {
6248 println!(
6249 "{}",
6250 serde_json::to_string(&json!({
6251 "nick": nick,
6252 "relay": relay_url,
6253 "response": resp,
6254 }))?
6255 );
6256 } else {
6257 let domain = public_url
6261 .unwrap_or(&relay_url)
6262 .trim_start_matches("https://")
6263 .trim_start_matches("http://")
6264 .trim_end_matches('/')
6265 .split('/')
6266 .next()
6267 .unwrap_or("<this-relay-domain>")
6268 .to_string();
6269 println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
6270 println!("verify with: wire whois {nick}@{domain}");
6271 }
6272 Ok(())
6273}
6274
6275fn cmd_profile(action: ProfileAction) -> Result<()> {
6276 match action {
6277 ProfileAction::Set { field, value, json } => {
6278 let parsed: Value =
6282 serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
6283 let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
6284 if json {
6285 println!(
6286 "{}",
6287 serde_json::to_string(&json!({
6288 "field": field,
6289 "profile": new_profile,
6290 }))?
6291 );
6292 } else {
6293 println!("profile.{field} set");
6294 }
6295 }
6296 ProfileAction::Get { json } => return cmd_whois(None, json, None),
6297 ProfileAction::Clear { field, json } => {
6298 let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
6299 if json {
6300 println!(
6301 "{}",
6302 serde_json::to_string(&json!({
6303 "field": field,
6304 "cleared": true,
6305 "profile": new_profile,
6306 }))?
6307 );
6308 } else {
6309 println!("profile.{field} cleared");
6310 }
6311 }
6312 }
6313 Ok(())
6314}
6315
6316fn cmd_setup(apply: bool) -> Result<()> {
6319 use std::path::PathBuf;
6320
6321 let entry = json!({"command": "wire", "args": ["mcp"]});
6322 let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
6323
6324 let mut targets: Vec<(&str, PathBuf)> = Vec::new();
6327 if let Some(home) = dirs::home_dir() {
6328 targets.push(("Claude Code", home.join(".claude.json")));
6331 targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
6333 #[cfg(target_os = "macos")]
6335 targets.push((
6336 "Claude Desktop (macOS)",
6337 home.join("Library/Application Support/Claude/claude_desktop_config.json"),
6338 ));
6339 #[cfg(target_os = "windows")]
6341 if let Ok(appdata) = std::env::var("APPDATA") {
6342 targets.push((
6343 "Claude Desktop (Windows)",
6344 PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
6345 ));
6346 }
6347 targets.push(("Cursor", home.join(".cursor/mcp.json")));
6349 }
6350 targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
6352
6353 println!("wire setup\n");
6354 println!("MCP server snippet (add this to your client's mcpServers):");
6355 println!();
6356 println!("{entry_pretty}");
6357 println!();
6358
6359 if !apply {
6360 println!("Probable MCP host config locations on this machine:");
6361 for (name, path) in &targets {
6362 let marker = if path.exists() {
6363 "✓ found"
6364 } else {
6365 " (would create)"
6366 };
6367 println!(" {marker:14} {name}: {}", path.display());
6368 }
6369 println!();
6370 println!("Run `wire setup --apply` to merge wire into each config above.");
6371 println!(
6372 "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
6373 );
6374 return Ok(());
6375 }
6376
6377 let mut modified: Vec<String> = Vec::new();
6378 let mut skipped: Vec<String> = Vec::new();
6379 for (name, path) in &targets {
6380 match upsert_mcp_entry(path, "wire", &entry) {
6381 Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
6382 Ok(false) => skipped.push(format!(" {name} ({}): already configured", path.display())),
6383 Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
6384 }
6385 }
6386 if !modified.is_empty() {
6387 println!("Modified:");
6388 for line in &modified {
6389 println!(" {line}");
6390 }
6391 println!();
6392 println!("Restart the app(s) above to load wire MCP.");
6393 }
6394 if !skipped.is_empty() {
6395 println!();
6396 println!("Skipped:");
6397 for line in &skipped {
6398 println!(" {line}");
6399 }
6400 }
6401 Ok(())
6402}
6403
6404fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
6407 let mut cfg: Value = if path.exists() {
6408 let body = std::fs::read_to_string(path).context("reading config")?;
6409 serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
6410 } else {
6411 json!({})
6412 };
6413 if !cfg.is_object() {
6414 cfg = json!({});
6415 }
6416 let root = cfg.as_object_mut().unwrap();
6417 let servers = root
6418 .entry("mcpServers".to_string())
6419 .or_insert_with(|| json!({}));
6420 if !servers.is_object() {
6421 *servers = json!({});
6422 }
6423 let map = servers.as_object_mut().unwrap();
6424 if map.get(server_name) == Some(entry) {
6425 return Ok(false);
6426 }
6427 map.insert(server_name.to_string(), entry.clone());
6428 if let Some(parent) = path.parent()
6429 && !parent.as_os_str().is_empty()
6430 {
6431 std::fs::create_dir_all(parent).context("creating parent dir")?;
6432 }
6433 let out = serde_json::to_string_pretty(&cfg)? + "\n";
6434 std::fs::write(path, out).context("writing config")?;
6435 Ok(true)
6436}
6437
6438#[allow(clippy::too_many_arguments)]
6441fn cmd_reactor(
6442 on_event: &str,
6443 peer_filter: Option<&str>,
6444 kind_filter: Option<&str>,
6445 verified_only: bool,
6446 interval_secs: u64,
6447 once: bool,
6448 dry_run: bool,
6449 max_per_minute: u32,
6450 max_chain_depth: u32,
6451) -> Result<()> {
6452 use crate::inbox_watch::{InboxEvent, InboxWatcher};
6453 use std::collections::{HashMap, HashSet, VecDeque};
6454 use std::io::Write;
6455 use std::process::{Command, Stdio};
6456 use std::time::{Duration, Instant};
6457
6458 let cursor_path = config::state_dir()?.join("reactor.cursor");
6459 let emitted_path = config::state_dir()?.join("reactor-emitted.log");
6468 let mut emitted_ids: HashSet<String> = HashSet::new();
6469 if emitted_path.exists()
6470 && let Ok(body) = std::fs::read_to_string(&emitted_path)
6471 {
6472 for line in body.lines() {
6473 let t = line.trim();
6474 if !t.is_empty() {
6475 emitted_ids.insert(t.to_string());
6476 }
6477 }
6478 }
6479 let outbox_dir = config::outbox_dir()?;
6481 let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
6484
6485 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6486
6487 let kind_num: Option<u32> = match kind_filter {
6488 Some(k) => Some(parse_kind(k)?),
6489 None => None,
6490 };
6491
6492 let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
6494
6495 let dispatch = |ev: &InboxEvent,
6496 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
6497 emitted_ids: &HashSet<String>|
6498 -> Result<bool> {
6499 if let Some(p) = peer_filter
6500 && ev.peer != p
6501 {
6502 return Ok(false);
6503 }
6504 if verified_only && !ev.verified {
6505 return Ok(false);
6506 }
6507 if let Some(want) = kind_num {
6508 let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
6509 if ev_kind != Some(want) {
6510 return Ok(false);
6511 }
6512 }
6513
6514 if max_chain_depth > 0 {
6518 let body_str = match &ev.raw["body"] {
6519 Value::String(s) => s.clone(),
6520 other => serde_json::to_string(other).unwrap_or_default(),
6521 };
6522 if let Some(referenced) = parse_re_marker(&body_str) {
6523 let matched = emitted_ids.contains(&referenced)
6526 || emitted_ids.iter().any(|full| full.starts_with(&referenced));
6527 if matched {
6528 eprintln!(
6529 "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
6530 ev.event_id, ev.peer, referenced
6531 );
6532 return Ok(false);
6533 }
6534 }
6535 }
6536
6537 if max_per_minute > 0 {
6539 let now = Instant::now();
6540 let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
6541 while let Some(&front) = win.front() {
6542 if now.duration_since(front) > Duration::from_secs(60) {
6543 win.pop_front();
6544 } else {
6545 break;
6546 }
6547 }
6548 if win.len() as u32 >= max_per_minute {
6549 eprintln!(
6550 "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
6551 ev.event_id, ev.peer, max_per_minute
6552 );
6553 return Ok(false);
6554 }
6555 win.push_back(now);
6556 }
6557
6558 if dry_run {
6559 println!("{}", serde_json::to_string(&ev.raw)?);
6560 return Ok(true);
6561 }
6562
6563 let mut child = Command::new("sh")
6564 .arg("-c")
6565 .arg(on_event)
6566 .stdin(Stdio::piped())
6567 .stdout(Stdio::inherit())
6568 .stderr(Stdio::inherit())
6569 .env("WIRE_EVENT_PEER", &ev.peer)
6570 .env("WIRE_EVENT_ID", &ev.event_id)
6571 .env("WIRE_EVENT_KIND", &ev.kind)
6572 .spawn()
6573 .with_context(|| format!("spawning reactor handler: {on_event}"))?;
6574 if let Some(mut stdin) = child.stdin.take() {
6575 let body = serde_json::to_vec(&ev.raw)?;
6576 let _ = stdin.write_all(&body);
6577 let _ = stdin.write_all(b"\n");
6578 }
6579 std::mem::drop(child);
6580 Ok(true)
6581 };
6582
6583 let scan_outbox = |emitted_ids: &mut HashSet<String>,
6585 outbox_cursors: &mut HashMap<String, u64>|
6586 -> Result<usize> {
6587 if !outbox_dir.exists() {
6588 return Ok(0);
6589 }
6590 let mut added = 0;
6591 let mut new_ids: Vec<String> = Vec::new();
6592 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
6593 let path = entry.path();
6594 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
6595 continue;
6596 }
6597 let peer = match path.file_stem().and_then(|s| s.to_str()) {
6598 Some(s) => s.to_string(),
6599 None => continue,
6600 };
6601 let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
6602 let start = *outbox_cursors.get(&peer).unwrap_or(&0);
6603 if cur_len <= start {
6604 outbox_cursors.insert(peer, start);
6605 continue;
6606 }
6607 let body = std::fs::read_to_string(&path).unwrap_or_default();
6608 let tail = &body[start as usize..];
6609 for line in tail.lines() {
6610 if let Ok(v) = serde_json::from_str::<Value>(line)
6611 && let Some(eid) = v.get("event_id").and_then(Value::as_str)
6612 && emitted_ids.insert(eid.to_string())
6613 {
6614 new_ids.push(eid.to_string());
6615 added += 1;
6616 }
6617 }
6618 outbox_cursors.insert(peer, cur_len);
6619 }
6620 if !new_ids.is_empty() {
6621 let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
6623 if all.len() > 500 {
6624 all.sort();
6625 let drop_n = all.len() - 500;
6626 let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
6627 emitted_ids.retain(|x| !dropped.contains(x));
6628 all = emitted_ids.iter().cloned().collect();
6629 }
6630 let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
6631 }
6632 Ok(added)
6633 };
6634
6635 let sweep = |watcher: &mut InboxWatcher,
6636 emitted_ids: &mut HashSet<String>,
6637 outbox_cursors: &mut HashMap<String, u64>,
6638 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
6639 -> Result<usize> {
6640 let _ = scan_outbox(emitted_ids, outbox_cursors);
6642
6643 let events = watcher.poll()?;
6644 let mut fired = 0usize;
6645 for ev in &events {
6646 match dispatch(ev, peer_dispatch_log, emitted_ids) {
6647 Ok(true) => fired += 1,
6648 Ok(false) => {}
6649 Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
6650 }
6651 }
6652 watcher.save_cursors(&cursor_path)?;
6653 Ok(fired)
6654 };
6655
6656 if once {
6657 sweep(
6658 &mut watcher,
6659 &mut emitted_ids,
6660 &mut outbox_cursors,
6661 &mut peer_dispatch_log,
6662 )?;
6663 return Ok(());
6664 }
6665 let interval = std::time::Duration::from_secs(interval_secs.max(1));
6666 loop {
6667 if let Err(e) = sweep(
6668 &mut watcher,
6669 &mut emitted_ids,
6670 &mut outbox_cursors,
6671 &mut peer_dispatch_log,
6672 ) {
6673 eprintln!("wire reactor: sweep error: {e}");
6674 }
6675 std::thread::sleep(interval);
6676 }
6677}
6678
6679fn parse_re_marker(body: &str) -> Option<String> {
6682 let needle = "(re:";
6683 let i = body.find(needle)?;
6684 let rest = &body[i + needle.len()..];
6685 let end = rest.find(')')?;
6686 let id = rest[..end].trim().to_string();
6687 if id.is_empty() {
6688 return None;
6689 }
6690 Some(id)
6691}
6692
6693fn cmd_notify(
6696 interval_secs: u64,
6697 peer_filter: Option<&str>,
6698 once: bool,
6699 as_json: bool,
6700) -> Result<()> {
6701 use crate::inbox_watch::InboxWatcher;
6702 let cursor_path = config::state_dir()?.join("notify.cursor");
6703 let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6704
6705 let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
6706 let events = watcher.poll()?;
6707 for ev in events {
6708 if let Some(p) = peer_filter
6709 && ev.peer != p
6710 {
6711 continue;
6712 }
6713 if as_json {
6714 println!("{}", serde_json::to_string(&ev)?);
6715 } else {
6716 os_notify_inbox_event(&ev);
6717 }
6718 }
6719 watcher.save_cursors(&cursor_path)?;
6720 Ok(())
6721 };
6722
6723 if once {
6724 return sweep(&mut watcher);
6725 }
6726
6727 let interval = std::time::Duration::from_secs(interval_secs.max(1));
6728 loop {
6729 if let Err(e) = sweep(&mut watcher) {
6730 eprintln!("wire notify: sweep error: {e}");
6731 }
6732 std::thread::sleep(interval);
6733 }
6734}
6735
6736fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
6737 let title = if ev.verified {
6738 format!("wire ← {}", ev.peer)
6739 } else {
6740 format!("wire ← {} (UNVERIFIED)", ev.peer)
6741 };
6742 let body = format!("{}: {}", ev.kind, ev.body_preview);
6743 crate::os_notify::toast(&title, &body);
6744}
6745
6746#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
6747fn os_toast(title: &str, body: &str) {
6748 eprintln!("[wire notify] {title}\n {body}");
6749}
6750
6751