Skip to main content

wire/
cli.rs

1//! `wire` CLI surface.
2//!
3//! Every subcommand emits human-readable text by default and structured JSON
4//! when `--json` is passed. Stable JSON shape is part of the API contract —
5//! see `docs/AGENT_INTEGRATION.md`.
6//!
7//! Subcommand split:
8//!   - **agent-safe**: `whoami`, `peers`, `verify`, `send`, `tail` — pure
9//!     message-layer ops, no trust establishment.
10//!   - **trust-establishing**: `init`, `pair-host`, `pair-join`. The CLI
11//!     uses interactive `y/N` prompts here. The MCP equivalents
12//!     (`wire_init`, `wire_pair_initiate`, `wire_pair_join`, `wire_pair_check`,
13//!     `wire_pair_confirm`) preserve the human gate by requiring the user to
14//!     type the 6 SAS digits back into chat — see `docs/THREAT_MODEL.md` T10/T14.
15
16use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21    agent_card::{build_agent_card, sign_agent_card},
22    config,
23    signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24    trust::{add_self_to_trust, empty_trust},
25};
26
27/// Top-level CLI.
28#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31    #[command(subcommand)]
32    pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37    /// Generate a keypair, write self-card, and prepare to pair. (HUMAN-ONLY — DO NOT exec from agents.)
38    Init {
39        /// Short handle for this agent (becomes did:wire:<handle>).
40        handle: String,
41        /// Optional display name (defaults to capitalized handle).
42        #[arg(long)]
43        name: Option<String>,
44        /// Optional relay URL — if set, also allocates a relay slot in one step
45        /// (equivalent to running `wire init` then `wire bind-relay <url>`).
46        #[arg(long)]
47        relay: Option<String>,
48        /// Emit JSON.
49        #[arg(long)]
50        json: bool,
51    },
52    // (Old `Join` stub removed in iter 11 — superseded by `pair-join` with
53    // `join` alias. See PairJoin below.)
54    /// Print this agent's identity (DID, fingerprint, mailbox slot).
55    Whoami {
56        #[arg(long)]
57        json: bool,
58    },
59    /// List pinned peers with their tiers and capabilities.
60    Peers {
61        #[arg(long)]
62        json: bool,
63    },
64    /// Sign and queue an event to a peer.
65    ///
66    /// Forms (P0.S 0.5.11):
67    ///   wire send <peer> <body>              # kind defaults to "claim"
68    ///   wire send <peer> <kind> <body>       # explicit kind (back-compat)
69    ///   wire send <peer> -                   # body from stdin (kind=claim)
70    ///   wire send <peer> @/path/to/body.json # body from file
71    Send {
72        /// Peer handle (without `did:wire:` prefix).
73        peer: String,
74        /// When `<body>` is omitted, this is the event body (kind defaults
75        /// to `claim`). When both this and `<body>` are given, this is the
76        /// event kind (`decision`, `claim`, etc., or numeric kind id) and
77        /// the next positional is the body.
78        kind_or_body: String,
79        /// Event body — free-form text, `@/path/to/body.json` to load from
80        /// a file, or `-` to read from stdin. Optional; omit to use
81        /// `<kind_or_body>` as the body with kind=`claim`.
82        body: Option<String>,
83        /// Advisory deadline: duration (`30m`, `2h`, `1d`) or RFC3339 timestamp.
84        #[arg(long)]
85        deadline: Option<String>,
86        /// Emit JSON.
87        #[arg(long)]
88        json: bool,
89    },
90    /// Stream signed events from peers.
91    Tail {
92        /// Optional peer filter; if omitted, tails all peers.
93        peer: Option<String>,
94        /// Emit JSONL (one event per line).
95        #[arg(long)]
96        json: bool,
97        /// Maximum events to read before exiting (0 = stream until SIGINT).
98        #[arg(long, default_value_t = 0)]
99        limit: usize,
100    },
101    /// Live tail of new inbox events across all pinned peers — one line per
102    /// new event, handshake (pair_drop / pair_drop_ack / heartbeat) filtered
103    /// by default.
104    ///
105    /// Designed to be left running in an agent harness's stream-watcher
106    /// (Claude Code Monitor tool, etc.) so peer messages surface in the
107    /// session as they arrive, not on next manual `wire pull`.
108    ///
109    /// See docs/AGENT_INTEGRATION.md for the recommended Monitor invocation
110    /// template.
111    Monitor {
112        /// Only show events from this peer.
113        #[arg(long)]
114        peer: Option<String>,
115        /// Emit JSONL (one InboxEvent per line) for tooling consumption.
116        #[arg(long)]
117        json: bool,
118        /// Include handshake events (pair_drop, pair_drop_ack, heartbeat).
119        /// Default filters them out as noise.
120        #[arg(long)]
121        include_handshake: bool,
122        /// Poll interval in milliseconds. Lower = lower latency, higher CPU.
123        #[arg(long, default_value_t = 500)]
124        interval_ms: u64,
125        /// Replay last N events from history before going live (0 = none).
126        #[arg(long, default_value_t = 0)]
127        replay: usize,
128    },
129    /// Verify a signed event from a JSON file or stdin (`-`).
130    Verify {
131        /// Path to event JSON, or `-` for stdin.
132        path: String,
133        /// Emit JSON.
134        #[arg(long)]
135        json: bool,
136    },
137    /// Run the MCP (Model Context Protocol) server over stdio.
138    /// This is how Claude Desktop / Claude Code / Cursor / etc. expose
139    /// `wire_send`, `wire_tail`, etc. as native tools.
140    Mcp,
141    /// Run a relay server on this host.
142    RelayServer {
143        /// Bind address (e.g. `127.0.0.1:8770`).
144        #[arg(long, default_value = "127.0.0.1:8770")]
145        bind: String,
146        /// v0.5.17: refuse non-loopback binds, skip phonebook listing,
147        /// skip `.well-known/wire/agent` serving. The relay becomes
148        /// invisible from outside the box — only same-machine processes
149        /// can pair through it. Right call for within-machine agent
150        /// coordination where you don't want metadata leaking to a
151        /// public relay. Pair this with `wire session new` which probes
152        /// `127.0.0.1:8771` and allocates a local slot automatically.
153        #[arg(long)]
154        local_only: bool,
155    },
156    /// Allocate a slot on a relay; bind it to this agent's identity.
157    ///
158    /// v0.5.19 (issue #7): if any peers are pinned to this agent's
159    /// current slot, this command refuses by default — silent migration
160    /// silently black-holes their inbound messages. Pass
161    /// `--migrate-pinned` to acknowledge the risk and proceed, or use
162    /// `wire rotate-slot` (which emits a `wire_close` event to peers)
163    /// for safe rotation.
164    BindRelay {
165        /// Relay base URL, e.g. `http://127.0.0.1:8770`.
166        url: String,
167        /// Acknowledge that pinned peers will black-hole until they
168        /// re-pin manually. Required when `state.peers` is non-empty;
169        /// ignored on fresh boxes. Use `wire rotate-slot` instead for
170        /// the supported same-relay rotation path.
171        #[arg(long)]
172        migrate_pinned: bool,
173        #[arg(long)]
174        json: bool,
175    },
176    /// Manually pin a peer's relay slot. (Replaces SAS pairing for v0.1 bootstrap;
177    /// real `wire join` lands in the SPAKE2 iter.)
178    AddPeerSlot {
179        /// Peer handle (becomes did:wire:<handle>).
180        handle: String,
181        /// Peer's relay base URL.
182        url: String,
183        /// Peer's slot id.
184        slot_id: String,
185        /// Slot bearer token (shared between paired peers in v0.1).
186        slot_token: String,
187        #[arg(long)]
188        json: bool,
189    },
190    /// Drain outbox JSONL files to peers' relay slots.
191    Push {
192        /// Optional peer filter; default = all peers with outbox entries.
193        peer: Option<String>,
194        #[arg(long)]
195        json: bool,
196    },
197    /// Pull events from our relay slot, verify, write to inbox.
198    Pull {
199        #[arg(long)]
200        json: bool,
201    },
202    /// Print a summary of identity, relay binding, peers, inbox/outbox queue depth.
203    /// Useful as a single "where am I" check.
204    Status {
205        /// Inspect a paired peer's transport / attention / responder health.
206        #[arg(long)]
207        peer: Option<String>,
208        #[arg(long)]
209        json: bool,
210    },
211    /// Publish or inspect auto-responder health for this slot.
212    Responder {
213        #[command(subcommand)]
214        command: ResponderCommand,
215    },
216    /// Pin a peer's signed agent-card from a file. (Manual out-of-band pairing
217    /// — fallback path; the magic-wormhole flow is `pair-host` / `pair-join`.)
218    Pin {
219        /// Path to peer's signed agent-card JSON.
220        card_file: String,
221        #[arg(long)]
222        json: bool,
223    },
224    /// Allocate a NEW slot on the same relay and abandon the old one.
225    /// Sends a kind=1201 wire_close event to every paired peer over the OLD
226    /// slot announcing the new mailbox before swapping. After rotation,
227    /// peers must re-pair (or operator runs `add-peer-slot` with the new
228    /// coords) — auto-update via wire_close is a v0.2 daemon feature.
229    ///
230    /// Use case: a paired peer turned hostile (T11 in THREAT_MODEL.md —
231    /// abusive bearer-holder spamming your slot). Rotate → old slot is
232    /// orphaned → attacker's leverage gone. Operator pairs again with
233    /// peers they still want.
234    RotateSlot {
235        /// Skip the wire_close announcement to peers (faster but they won't know
236        /// where you went).
237        #[arg(long)]
238        no_announce: bool,
239        #[arg(long)]
240        json: bool,
241    },
242    /// Remove a peer from trust + relay state. Inbox/outbox files for that
243    /// peer are NOT deleted (operator can grep history); pass --purge to
244    /// also wipe the JSONL files.
245    ForgetPeer {
246        /// Peer handle to forget.
247        handle: String,
248        /// Also delete inbox/<handle>.jsonl and outbox/<handle>.jsonl.
249        #[arg(long)]
250        purge: bool,
251        #[arg(long)]
252        json: bool,
253    },
254    /// Run a long-lived sync loop: every <interval> seconds, push outbox to
255    /// peers' relay slots and pull inbox from our own slot. Foreground process;
256    /// background it with systemd / `&` / tmux as you prefer.
257    Daemon {
258        /// Sync interval in seconds. Default 5.
259        #[arg(long, default_value_t = 5)]
260        interval: u64,
261        /// Run a single sync cycle and exit (useful for cron-driven setups).
262        #[arg(long)]
263        once: bool,
264        #[arg(long)]
265        json: bool,
266    },
267    /// Host a SAS-confirmed pairing. Generates a code phrase, prints it, waits
268    /// for a peer to `pair-join`, exchanges signed agent-cards via SPAKE2 +
269    /// ChaCha20-Poly1305. Auto-pins on success. (HUMAN-ONLY — operator must
270    /// read the SAS digits aloud and confirm.)
271    PairHost {
272        /// Relay base URL.
273        #[arg(long)]
274        relay: String,
275        /// Skip the SAS confirmation prompt. ONLY use when piping under
276        /// automated tests or when the SAS has already been verified by
277        /// another channel. Documented as test-only.
278        #[arg(long)]
279        yes: bool,
280        /// How long (seconds) to wait for the peer to join before timing out.
281        #[arg(long, default_value_t = 300)]
282        timeout: u64,
283        /// Detach: write a pending-pair file, print the code phrase, and exit
284        /// immediately. The running `wire daemon` does the handshake in the
285        /// background; confirm SAS later via `wire pair-confirm <code> <digits>`.
286        /// `wire pair-list` shows pending sessions. Default is foreground
287        /// blocking behavior for backward compat.
288        #[arg(long)]
289        detach: bool,
290        /// Emit JSON instead of text. Currently only meaningful with --detach.
291        #[arg(long)]
292        json: bool,
293    },
294    /// Join a pair-slot using a code phrase from the host. (HUMAN-ONLY.)
295    ///
296    /// Aliased as `wire join <code>` for magic-wormhole muscle-memory.
297    #[command(alias = "join")]
298    PairJoin {
299        /// Code phrase from the host's `pair-host` output (e.g. `73-2QXC4P`).
300        code_phrase: String,
301        /// Relay base URL (must match the host's relay).
302        #[arg(long)]
303        relay: String,
304        #[arg(long)]
305        yes: bool,
306        #[arg(long, default_value_t = 300)]
307        timeout: u64,
308        /// Detach: see `pair-host --detach`.
309        #[arg(long)]
310        detach: bool,
311        /// Emit JSON instead of text. Currently only meaningful with --detach.
312        #[arg(long)]
313        json: bool,
314    },
315    /// Confirm SAS digits for a detached pending pair. The daemon must be
316    /// running for this to do anything — it picks up the confirmation on its
317    /// next tick. Mismatch aborts the pair.
318    PairConfirm {
319        /// The code phrase the original `wire pair-host --detach` printed.
320        code_phrase: String,
321        /// 6 digits as displayed by `wire pair-list` (dashes/spaces stripped).
322        digits: String,
323        /// Emit JSON instead of human-readable text.
324        #[arg(long)]
325        json: bool,
326    },
327    /// List all pending detached pair sessions and their state.
328    PairList {
329        /// Emit JSON instead of the table.
330        #[arg(long)]
331        json: bool,
332        /// Stream mode: never exit; print one JSON line per status transition
333        /// (creation, status change, deletion) across all pending pairs.
334        /// Compose with bash `while read` to react in shell. Implies --json.
335        #[arg(long)]
336        watch: bool,
337        /// Poll interval in seconds for --watch.
338        #[arg(long, default_value_t = 1)]
339        watch_interval: u64,
340    },
341    /// Cancel a pending pair. Releases the relay slot and removes the pending file.
342    PairCancel {
343        code_phrase: String,
344        #[arg(long)]
345        json: bool,
346    },
347    /// Block until a pending pair reaches a target status (default sas_ready),
348    /// or terminates (finalized = file removed, aborted, aborted_restart), or
349    /// the timeout expires. Useful for shell scripts that want to drive the
350    /// detached flow without polling pair-list themselves.
351    ///
352    /// Exit codes:
353    ///   0 — reached target status (or finalized, if target was sas_ready)
354    ///   1 — terminated abnormally (aborted, aborted_restart, no such code)
355    ///   2 — timeout
356    PairWatch {
357        code_phrase: String,
358        /// Target status to wait for. Default: sas_ready.
359        #[arg(long, default_value = "sas_ready")]
360        status: String,
361        /// Max seconds to wait.
362        #[arg(long, default_value_t = 300)]
363        timeout: u64,
364        /// Emit JSON on each status change (one per line) instead of just on exit.
365        #[arg(long)]
366        json: bool,
367    },
368    /// One-shot bootstrap. Inits identity (idempotent), opens pair-host or
369    /// pair-join, then registers wire as an MCP server. Single command from
370    /// nothing to paired and ready — no separate init/pair-host/setup steps.
371    /// Operator still must confirm SAS digits.
372    ///
373    /// Examples:
374    ///   wire pair paul                          # host a new pair on default relay
375    ///   wire pair willard --code 58-NMTY7A      # join paul's pair
376    Pair {
377        /// Short handle for this agent (becomes did:wire:<handle>). Used by init
378        /// step if no identity exists; ignored if already initialized.
379        handle: String,
380        /// Code phrase from peer's pair-host output. Omit to be the host
381        /// (this command will print one for you to share).
382        #[arg(long)]
383        code: Option<String>,
384        /// Relay base URL. Defaults to the laulpogan public-good relay.
385        #[arg(long, default_value = "https://wireup.net")]
386        relay: String,
387        /// Skip SAS prompt. Test-only.
388        #[arg(long)]
389        yes: bool,
390        /// Pair-step timeout in seconds.
391        #[arg(long, default_value_t = 300)]
392        timeout: u64,
393        /// Skip the post-pair `setup --apply` step (don't register wire as
394        /// an MCP server in detected client configs).
395        #[arg(long)]
396        no_setup: bool,
397        /// Run via the daemon-orchestrated detached path (auto-starts daemon,
398        /// exits immediately, daemon does the handshake). Confirm via
399        /// `wire pair-confirm <code> <digits>` from any terminal. See
400        /// `pair-host --detach` for details.
401        #[arg(long)]
402        detach: bool,
403    },
404    /// Forget a half-finished pair-slot on the relay. Use this if `pair-host`
405    /// or `pair-join` crashed (process killed, network blip, OOM) before SAS
406    /// confirmation, leaving the relay-side slot stuck with "guest already
407    /// registered" or "host already registered" until the 5-minute TTL expires.
408    /// Either side can call. Idempotent.
409    PairAbandon {
410        /// The code phrase from the original pair-host (e.g. `58-NMTY7A`).
411        code_phrase: String,
412        /// Relay base URL.
413        #[arg(long, default_value = "https://wireup.net")]
414        relay: String,
415    },
416    /// Accept a pending-inbound pair request (v0.5.14). Explicit alias for
417    /// the bilateral-completion path that `wire add <peer>@<relay>` also
418    /// drives — but doesn't require remembering the peer's relay domain
419    /// (the relay coords come from the stored pair_drop). Errors if no
420    /// pending-inbound record exists for that peer.
421    PairAccept {
422        /// Bare peer handle (without `@<relay>`).
423        peer: String,
424        /// Emit JSON.
425        #[arg(long)]
426        json: bool,
427    },
428    /// Reject a pending pair request (v0.5.14). When someone runs `wire add
429    /// you@<your-relay>` against your handle, their signed pair_drop lands
430    /// in pending-inbound — visible via `wire pair-list`. Run `wire pair-reject
431    /// <peer>` to delete the record without pairing. The peer never receives
432    /// our slot_token; from their side the pair stays pending until they
433    /// time out.
434    PairReject {
435        /// Bare peer handle (without `@<relay>`).
436        peer: String,
437        /// Emit JSON.
438        #[arg(long)]
439        json: bool,
440    },
441    /// Programmatic-shape list of pending-inbound pair requests (v0.5.14).
442    /// `--json` returns a flat array (matching the v0.5.13-and-earlier
443    /// `pair-list --json` shape but for inbound). Use this in scripts that
444    /// need to enumerate inbound pair requests without parsing the SPAKE2
445    /// table format from `wire pair-list`.
446    PairListInbound {
447        /// Emit JSON.
448        #[arg(long)]
449        json: bool,
450    },
451    /// Manage isolated wire sessions on this machine (v0.5.16).
452    ///
453    /// Each session = its own DID + handle + relay slot + daemon + inbox/
454    /// outbox tree. Use when multiple agents (e.g. Claude Code sessions
455    /// in different projects) run on the same machine — without sessions
456    /// they all share one identity and race the inbox cursor.
457    ///
458    /// Names are derived from `basename(cwd)` and cached in a registry,
459    /// so re-entering the same project reuses the same identity.
460    #[command(subcommand)]
461    Session(SessionCommand),
462    /// Detect known MCP host config locations (Claude Desktop, Claude Code,
463    /// Cursor, project-local) and either print or auto-merge the wire MCP
464    /// server entry. Default prints; pass `--apply` to actually modify config
465    /// files. Idempotent — re-running is safe.
466    Setup {
467        /// Actually write the changes (default = print only).
468        #[arg(long)]
469        apply: bool,
470    },
471    /// Show an agent's profile. With no arg, prints local self. With a
472    /// `nick@domain` arg, resolves via that domain's `.well-known/wire/agent`
473    /// endpoint and verifies the returned signed card before display.
474    Whois {
475        /// Optional handle (`nick@domain`). Omit to show self.
476        handle: Option<String>,
477        #[arg(long)]
478        json: bool,
479        /// Override the relay base URL used for resolution (default:
480        /// `https://<domain>` from the handle).
481        #[arg(long)]
482        relay: Option<String>,
483    },
484    /// Zero-paste pair with a known handle. Resolves `nick@domain` via that
485    /// domain's `.well-known/wire/agent`, then delivers a signed pair-intro
486    /// to the peer's slot via `/v1/handle/intro`. Peer's daemon completes
487    /// the bilateral pin on its next pull (sends back pair_drop_ack carrying
488    /// their slot_token so we can `wire send` to them).
489    Add {
490        /// Peer handle (`nick@domain`).
491        handle: String,
492        /// Override the relay base URL used for resolution.
493        #[arg(long)]
494        relay: Option<String>,
495        #[arg(long)]
496        json: bool,
497    },
498    /// One-shot full bootstrap — `wire up <nick@relay-host>` does in one
499    /// command what 0.5.10 took five (init + bind-relay + claim + daemon-
500    /// background + remember-to-restart-on-login). Idempotent: re-run on
501    /// an already-set-up box prints state without churn.
502    ///
503    /// Examples:
504    ///   wire up paul@wireup.net           # full bootstrap
505    ///   wire up paul-mac@wireup.net       # ditto, nick = paul-mac
506    ///   wire up paul                      # bootstrap, default relay
507    Up {
508        /// Full handle in `nick@relay-host` form, or just `nick` (defaults
509        /// to the configured public relay wireup.net).
510        handle: String,
511        /// Optional display name (defaults to capitalized nick).
512        #[arg(long)]
513        name: Option<String>,
514        #[arg(long)]
515        json: bool,
516    },
517    /// Diagnose wire setup health. Single command that surfaces every
518    /// silent-fail class — daemon down or duplicated, relay unreachable,
519    /// cursor stuck, pair rejections piling up, trust ↔ directory drift.
520    /// Replaces today's 30-minute manual debug.
521    ///
522    /// Exit code non-zero if any FAIL findings.
523    Doctor {
524        /// Emit JSON.
525        #[arg(long)]
526        json: bool,
527        /// Show last N entries from pair-rejected.jsonl in the report.
528        #[arg(long, default_value_t = 5)]
529        recent_rejections: usize,
530    },
531    /// Atomic upgrade: kill every `wire daemon` process, spawn a fresh
532    /// one from the current binary, write a new pidfile. Eliminates the
533    /// "stale binary text in memory under a fresh symlink" bug class that
534    /// burned 30 minutes today.
535    Upgrade {
536        /// Report drift without taking action (lists processes that would
537        /// be killed + the version of each).
538        #[arg(long)]
539        check: bool,
540        #[arg(long)]
541        json: bool,
542    },
543    /// Install / inspect / remove a launchd plist (macOS) or systemd
544    /// user unit (linux) that runs `wire daemon` on login + restarts
545    /// on crash. Replaces today's "background it with tmux/&/systemd
546    /// as you prefer" footgun.
547    Service {
548        #[command(subcommand)]
549        action: ServiceAction,
550    },
551    /// Inspect or toggle the structured diagnostic trace
552    /// (`$WIRE_HOME/state/wire/diag.jsonl`). Off by default. Enable per
553    /// process via `WIRE_DIAG=1`, or per-machine via `wire diag enable`
554    /// (writes the file knob a running daemon picks up automatically).
555    Diag {
556        #[command(subcommand)]
557        action: DiagAction,
558    },
559    /// Claim a nick on a relay's handle directory. Anyone can then reach
560    /// this agent by `<nick>@<relay-domain>` via the relay's
561    /// `.well-known/wire/agent` endpoint. FCFS; same-DID re-claims allowed.
562    Claim {
563        nick: String,
564        /// Relay to claim the nick on. Default = relay our slot is on.
565        #[arg(long)]
566        relay: Option<String>,
567        /// Public URL the relay should advertise to resolvers (default = relay).
568        #[arg(long)]
569        public_url: Option<String>,
570        /// v0.5.19 (#9.1): opt out of the relay's bulk `/v1/handles`
571        /// directory listing. The handle stays claimed (FCFS still
572        /// applies) and direct `.well-known/wire/agent?handle=X` lookup
573        /// still resolves, so peers you share the handle with out-of-band
574        /// can still pair. Bulk scrapers / phonebook crawlers will not
575        /// see the nick. Use this for handles meant for known-peer
576        /// pairing only — see issue #9.
577        #[arg(long)]
578        hidden: bool,
579        #[arg(long)]
580        json: bool,
581    },
582    /// Edit profile fields (display_name, emoji, motto, vibe, pronouns,
583    /// avatar_url, handle, now). Re-signs the agent-card atomically.
584    ///
585    /// Examples:
586    ///   wire profile set motto "compiles or dies trying"
587    ///   wire profile set emoji "🦀"
588    ///   wire profile set vibe '["rust","late-night","no-async-please"]'
589    ///   wire profile set handle "coffee-ghost@anthropic.dev"
590    ///   wire profile get
591    Profile {
592        #[command(subcommand)]
593        action: ProfileAction,
594    },
595    /// Mint a one-paste invite URL. Anyone with this URL can pair to us in a
596    /// single step (no SAS digits, no code typing). Auto-inits + auto-allocates
597    /// a relay slot on first use. Default TTL 24h, single-use.
598    Invite {
599        /// Override the relay URL for first-time auto-allocation.
600        #[arg(long, default_value = "https://wireup.net")]
601        relay: String,
602        /// Invite lifetime in seconds (default 86400 = 24h).
603        #[arg(long, default_value_t = 86_400)]
604        ttl: u64,
605        /// Number of distinct peers that can accept this invite before it's
606        /// consumed (default 1).
607        #[arg(long, default_value_t = 1)]
608        uses: u32,
609        /// Register the invite at the relay's short-URL endpoint and print
610        /// a `curl ... | sh` one-liner the peer can run on a fresh machine.
611        /// Installs wire if missing, then accepts the invite, then pairs.
612        #[arg(long)]
613        share: bool,
614        /// Emit JSON.
615        #[arg(long)]
616        json: bool,
617    },
618    /// Accept a wire invite URL. Single-step pair — pins issuer, sends our
619    /// signed card to issuer's slot. Auto-inits + auto-allocates if needed.
620    Accept {
621        /// The full invite URL (starts with `wire://pair?v=1&inv=...`).
622        url: String,
623        /// Emit JSON.
624        #[arg(long)]
625        json: bool,
626    },
627    /// Long-running event dispatcher. Watches inbox for new verified events
628    /// and spawns the given shell command per event, passing the event JSON
629    /// on stdin. Use to wire up autonomous reply loops:
630    ///   wire reactor --on-event 'claude -p "respond via wire send"'
631    /// Cursor persisted to `$WIRE_HOME/state/wire/reactor.cursor`.
632    Reactor {
633        /// Shell command to spawn per event. Event JSON written to its stdin.
634        #[arg(long)]
635        on_event: String,
636        /// Only fire for events from this peer.
637        #[arg(long)]
638        peer: Option<String>,
639        /// Only fire for events of this kind (numeric or name, e.g. 1 / decision).
640        #[arg(long)]
641        kind: Option<String>,
642        /// Skip events whose verified flag is false (default true).
643        #[arg(long, default_value_t = true)]
644        verified_only: bool,
645        /// Poll interval in seconds.
646        #[arg(long, default_value_t = 2)]
647        interval: u64,
648        /// Process one sweep and exit.
649        #[arg(long)]
650        once: bool,
651        /// Don't actually spawn — print one JSONL line per event for smoke-testing.
652        #[arg(long)]
653        dry_run: bool,
654        /// Hard rate-limit: max events handler is fired for per peer per minute.
655        /// 0 = unlimited. Default 6 — covers normal conversational tempo, kills
656        /// LLM-vs-LLM feedback loops (which fire 10+/sec).
657        #[arg(long, default_value_t = 6)]
658        max_per_minute: u32,
659        /// Anti-loop chain depth. Track event_ids this reactor emitted; if an
660        /// incoming event body contains `(re:X)` where X is in our emitted log,
661        /// skip — that's a reply-to-our-reply, depth ≥ 2. Disable with 0.
662        #[arg(long, default_value_t = 1)]
663        max_chain_depth: u32,
664    },
665    /// Watch the inbox for new verified events and fire an OS notification per
666    /// event. Long-running; background under systemd / `&` / tmux. Cursor is
667    /// persisted to `$WIRE_HOME/state/wire/notify.cursor` so restarts don't
668    /// re-emit history.
669    Notify {
670        /// Poll interval in seconds.
671        #[arg(long, default_value_t = 2)]
672        interval: u64,
673        /// Only notify for events from this peer (handle, no did: prefix).
674        #[arg(long)]
675        peer: Option<String>,
676        /// Run a single sweep and exit (useful for cron / tests).
677        #[arg(long)]
678        once: bool,
679        /// Suppress the OS notification call; print one JSON line per event to
680        /// stdout instead (for piping into other tooling or smoke-testing
681        /// without a desktop session).
682        #[arg(long)]
683        json: bool,
684    },
685}
686
687#[derive(Subcommand, Debug)]
688pub enum DiagAction {
689    /// Tail the last N entries from diag.jsonl.
690    Tail {
691        #[arg(long, default_value_t = 20)]
692        limit: usize,
693        #[arg(long)]
694        json: bool,
695    },
696    /// Flip the file-based knob ON. Running daemons pick this up on
697    /// the next emit call without restart.
698    Enable,
699    /// Flip the file-based knob OFF.
700    Disable,
701    /// Report whether diag is currently enabled + the file's size.
702    Status {
703        #[arg(long)]
704        json: bool,
705    },
706}
707
708#[derive(Subcommand, Debug)]
709pub enum SessionCommand {
710    /// Bootstrap a new isolated session in this machine's sessions root.
711    /// With no name, derives one from `basename(cwd)` and caches it in
712    /// the registry so re-running from the same project reuses it.
713    /// Runs `init` + `claim` + spawns a session-local daemon, all inside
714    /// the new session's WIRE_HOME. Output includes the `export
715    /// WIRE_HOME=...` line operators paste into their shell to activate
716    /// it.
717    New {
718        /// Optional session name. Default = derived from `basename(cwd)`.
719        name: Option<String>,
720        /// Relay URL for the session's slot allocation + handle claim.
721        #[arg(long, default_value = "https://wireup.net")]
722        relay: String,
723        /// v0.5.17: also allocate a second slot on a same-machine local
724        /// relay (defaults to `http://127.0.0.1:8771`). Within-machine
725        /// sister-session traffic prefers this path: zero round-trip
726        /// latency, zero metadata exposure to the public relay. Probes
727        /// `<local-relay>/healthz` first; silently skips if the local
728        /// relay isn't running.
729        #[arg(long)]
730        with_local: bool,
731        /// v0.5.17: override the local relay URL probed by `--with-local`.
732        /// Default is `http://127.0.0.1:8771` to match
733        /// `wire relay-server --bind 127.0.0.1:8771 --local-only`.
734        #[arg(long, default_value = "http://127.0.0.1:8771")]
735        local_relay: String,
736        /// Skip spawning the session-local daemon. Use when you want
737        /// to drive sync explicitly from the agent or test rig.
738        #[arg(long)]
739        no_daemon: bool,
740        /// Emit JSON.
741        #[arg(long)]
742        json: bool,
743    },
744    /// List all sessions on this machine with their handle, DID,
745    /// daemon liveness, and the cwd they're associated with.
746    List {
747        #[arg(long)]
748        json: bool,
749    },
750    /// List sister sessions reachable via a same-machine local relay
751    /// (v0.5.17 dual-slot). Groups sessions by the local-relay URL they
752    /// share. Sessions without a Local-scope endpoint are listed
753    /// separately so the operator can tell which are federation-only.
754    /// Read-only — does not probe any relay or touch daemons.
755    ListLocal {
756        #[arg(long)]
757        json: bool,
758    },
759    /// v0.6.0 (issue #12): mesh-pair every sister session against every
760    /// other in O(N²) handshakes. For each unordered pair (A, B) that
761    /// is not already paired, drives the bilateral flow end-to-end:
762    /// `wire add` from A → B (queued + pushed), `wire pair-accept` on
763    /// B's side, then a final pull on A so the ack lands. Idempotent —
764    /// re-running skips pairs already in `state.peers`.
765    ///
766    /// **Trust anchor:** the operator running this command owns every
767    /// session listed in `wire session list-local` (they all live under
768    /// the same `$WIRE_HOME/sessions/` directory the operator chose).
769    /// That filesystem-permission boundary IS the consent for both
770    /// sides — the bilateral SAS / network-level handshake assumes
771    /// strangers; same-uid sister sessions are by definition not
772    /// strangers. Cross-uid sister sessions are out of scope; today
773    /// `wire session list-local` only enumerates this user's sessions.
774    PairAllLocal {
775        /// Seconds to wait between handshake stages for pair_drop /
776        /// pair_drop_ack to propagate over the relay. Default 1s
777        /// (local-relay is typically <100ms RTT). Bump if you see
778        /// "pending-inbound never arrived" errors on a slow relay.
779        #[arg(long, default_value_t = 1)]
780        settle_secs: u64,
781        /// Federation relay to bind each `wire add` against. Default
782        /// `https://wireup.net`. Sister sessions should be bound to
783        /// the same federation relay; the pair handshake routes through
784        /// it for the .well-known resolution + pair_drop deposit.
785        #[arg(long, default_value = "https://wireup.net")]
786        federation_relay: String,
787        #[arg(long)]
788        json: bool,
789    },
790    /// Print the `export WIRE_HOME=...` line for a session, so a shell
791    /// can `eval $(wire session env <name>)` to activate it. With no
792    /// name, resolves the cwd through the registry.
793    Env {
794        /// Session name. Default = derived from cwd via the registry.
795        name: Option<String>,
796        #[arg(long)]
797        json: bool,
798    },
799    /// Identify which session the current cwd maps to in the registry.
800    /// Prints `(none)` if cwd isn't registered — `wire session new`
801    /// would create one.
802    Current {
803        #[arg(long)]
804        json: bool,
805    },
806    /// Tear down a session: kills its daemon (if running), deletes its
807    /// state directory, and removes it from the registry. Requires
808    /// `--force` because state loss is unrecoverable (keypair gone).
809    Destroy {
810        name: String,
811        /// Confirm state-deleting operation.
812        #[arg(long)]
813        force: bool,
814        #[arg(long)]
815        json: bool,
816    },
817}
818
819#[derive(Subcommand, Debug)]
820pub enum ServiceAction {
821    /// Write the launchd plist (macOS) or systemd user unit (linux) and
822    /// load it. Idempotent — re-running re-bootstraps an existing service.
823    ///
824    /// v0.5.22: with no flags, installs the `wire daemon` (your sync
825    /// process). Pass `--local-relay` to install the loopback relay
826    /// (`wire relay-server --bind 127.0.0.1:8771 --local-only`) — the
827    /// transport sister-Claudes use to coordinate on the same machine
828    /// (v0.5.17 dual-slot). The two services have distinct labels +
829    /// log files, so you can install both.
830    Install {
831        /// Install the local-relay service instead of the daemon.
832        #[arg(long)]
833        local_relay: bool,
834        #[arg(long)]
835        json: bool,
836    },
837    /// Unload + delete the service unit. Daemon keeps running until the
838    /// next reboot or `wire upgrade`; this only changes the boot-time
839    /// behaviour.
840    Uninstall {
841        /// Uninstall the local-relay service instead of the daemon.
842        #[arg(long)]
843        local_relay: bool,
844        #[arg(long)]
845        json: bool,
846    },
847    /// Report whether the unit is installed + active.
848    Status {
849        /// Show status of the local-relay service instead of the daemon.
850        #[arg(long)]
851        local_relay: bool,
852        #[arg(long)]
853        json: bool,
854    },
855}
856
857#[derive(Subcommand, Debug)]
858pub enum ResponderCommand {
859    /// Publish this agent's auto-responder health.
860    Set {
861        /// One of: online, offline, oauth_locked, rate_limited, degraded.
862        status: String,
863        /// Optional operator-facing reason.
864        #[arg(long)]
865        reason: Option<String>,
866        /// Emit JSON.
867        #[arg(long)]
868        json: bool,
869    },
870    /// Read responder health for self, or for a paired peer.
871    Get {
872        /// Optional peer handle; omitted means this agent's own slot.
873        peer: Option<String>,
874        /// Emit JSON.
875        #[arg(long)]
876        json: bool,
877    },
878}
879
880#[derive(Subcommand, Debug)]
881pub enum ProfileAction {
882    /// Set a profile field. Field names: display_name, emoji, motto, vibe,
883    /// pronouns, avatar_url, handle, now. Values are strings except `vibe`
884    /// (JSON array) and `now` (JSON object).
885    Set {
886        field: String,
887        value: String,
888        #[arg(long)]
889        json: bool,
890    },
891    /// Show all profile fields. Equivalent to `wire whois`.
892    Get {
893        #[arg(long)]
894        json: bool,
895    },
896    /// Clear a profile field.
897    Clear {
898        field: String,
899        #[arg(long)]
900        json: bool,
901    },
902}
903
904/// Entry point — parse and dispatch.
905pub fn run() -> Result<()> {
906    let cli = Cli::parse();
907    match cli.command {
908        Command::Init {
909            handle,
910            name,
911            relay,
912            json,
913        } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
914        Command::Status { peer, json } => {
915            if let Some(peer) = peer {
916                cmd_status_peer(&peer, json)
917            } else {
918                cmd_status(json)
919            }
920        }
921        Command::Whoami { json } => cmd_whoami(json),
922        Command::Peers { json } => cmd_peers(json),
923        Command::Send {
924            peer,
925            kind_or_body,
926            body,
927            deadline,
928            json,
929        } => {
930            // P0.S: smart-positional API. `wire send peer body` =
931            // kind=claim. `wire send peer kind body` = explicit kind.
932            let (kind, body) = match body {
933                Some(real_body) => (kind_or_body, real_body),
934                None => ("claim".to_string(), kind_or_body),
935            };
936            cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
937        }
938        Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
939        Command::Monitor {
940            peer,
941            json,
942            include_handshake,
943            interval_ms,
944            replay,
945        } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
946        Command::Verify { path, json } => cmd_verify(&path, json),
947        Command::Responder { command } => match command {
948            ResponderCommand::Set {
949                status,
950                reason,
951                json,
952            } => cmd_responder_set(&status, reason.as_deref(), json),
953            ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
954        },
955        Command::Mcp => cmd_mcp(),
956        Command::RelayServer { bind, local_only } => cmd_relay_server(&bind, local_only),
957        Command::BindRelay {
958            url,
959            migrate_pinned,
960            json,
961        } => cmd_bind_relay(&url, migrate_pinned, json),
962        Command::AddPeerSlot {
963            handle,
964            url,
965            slot_id,
966            slot_token,
967            json,
968        } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
969        Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
970        Command::Pull { json } => cmd_pull(json),
971        Command::Pin { card_file, json } => cmd_pin(&card_file, json),
972        Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
973        Command::ForgetPeer {
974            handle,
975            purge,
976            json,
977        } => cmd_forget_peer(&handle, purge, json),
978        Command::Daemon {
979            interval,
980            once,
981            json,
982        } => cmd_daemon(interval, once, json),
983        Command::PairHost {
984            relay,
985            yes,
986            timeout,
987            detach,
988            json,
989        } => {
990            if detach {
991                cmd_pair_host_detach(&relay, json)
992            } else {
993                cmd_pair_host(&relay, yes, timeout)
994            }
995        }
996        Command::PairJoin {
997            code_phrase,
998            relay,
999            yes,
1000            timeout,
1001            detach,
1002            json,
1003        } => {
1004            if detach {
1005                cmd_pair_join_detach(&code_phrase, &relay, json)
1006            } else {
1007                cmd_pair_join(&code_phrase, &relay, yes, timeout)
1008            }
1009        }
1010        Command::PairConfirm {
1011            code_phrase,
1012            digits,
1013            json,
1014        } => cmd_pair_confirm(&code_phrase, &digits, json),
1015        Command::PairList {
1016            json,
1017            watch,
1018            watch_interval,
1019        } => cmd_pair_list(json, watch, watch_interval),
1020        Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
1021        Command::PairWatch {
1022            code_phrase,
1023            status,
1024            timeout,
1025            json,
1026        } => cmd_pair_watch(&code_phrase, &status, timeout, json),
1027        Command::Pair {
1028            handle,
1029            code,
1030            relay,
1031            yes,
1032            timeout,
1033            no_setup,
1034            detach,
1035        } => {
1036            // P0.P (0.5.11): if the handle is in `nick@domain` form, route to
1037            // the zero-paste megacommand path — `wire pair slancha-spark@
1038            // wireup.net` does add + poll-for-ack + verify in one shot. The
1039            // SAS / code-based pair flow stays available for handles without
1040            // `@` (bootstrap pairing between two boxes that don't yet share a
1041            // relay directory).
1042            if handle.contains('@') && code.is_none() {
1043                cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
1044            } else if detach {
1045                cmd_pair_detach(&handle, code.as_deref(), &relay)
1046            } else {
1047                cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
1048            }
1049        }
1050        Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
1051        Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
1052        Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
1053        Command::PairListInbound { json } => cmd_pair_list_inbound(json),
1054        Command::Session(cmd) => cmd_session(cmd),
1055        Command::Invite {
1056            relay,
1057            ttl,
1058            uses,
1059            share,
1060            json,
1061        } => cmd_invite(&relay, ttl, uses, share, json),
1062        Command::Accept { url, json } => cmd_accept(&url, json),
1063        Command::Whois {
1064            handle,
1065            json,
1066            relay,
1067        } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
1068        Command::Add {
1069            handle,
1070            relay,
1071            json,
1072        } => cmd_add(&handle, relay.as_deref(), json),
1073        Command::Up {
1074            handle,
1075            name,
1076            json,
1077        } => cmd_up(&handle, name.as_deref(), json),
1078        Command::Doctor {
1079            json,
1080            recent_rejections,
1081        } => cmd_doctor(json, recent_rejections),
1082        Command::Upgrade { check, json } => cmd_upgrade(check, json),
1083        Command::Service { action } => cmd_service(action),
1084        Command::Diag { action } => cmd_diag(action),
1085        Command::Claim {
1086            nick,
1087            relay,
1088            public_url,
1089            hidden,
1090            json,
1091        } => cmd_claim(
1092            &nick,
1093            relay.as_deref(),
1094            public_url.as_deref(),
1095            hidden,
1096            json,
1097        ),
1098        Command::Profile { action } => cmd_profile(action),
1099        Command::Setup { apply } => cmd_setup(apply),
1100        Command::Reactor {
1101            on_event,
1102            peer,
1103            kind,
1104            verified_only,
1105            interval,
1106            once,
1107            dry_run,
1108            max_per_minute,
1109            max_chain_depth,
1110        } => cmd_reactor(
1111            &on_event,
1112            peer.as_deref(),
1113            kind.as_deref(),
1114            verified_only,
1115            interval,
1116            once,
1117            dry_run,
1118            max_per_minute,
1119            max_chain_depth,
1120        ),
1121        Command::Notify {
1122            interval,
1123            peer,
1124            once,
1125            json,
1126        } => cmd_notify(interval, peer.as_deref(), once, json),
1127    }
1128}
1129
1130// ---------- init ----------
1131
1132fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
1133    if !handle
1134        .chars()
1135        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1136    {
1137        bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
1138    }
1139    if config::is_initialized()? {
1140        bail!(
1141            "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
1142            config::config_dir()?
1143        );
1144    }
1145
1146    config::ensure_dirs()?;
1147    let (sk_seed, pk_bytes) = generate_keypair();
1148    config::write_private_key(&sk_seed)?;
1149
1150    let card = build_agent_card(handle, &pk_bytes, name, None, None);
1151    let signed = sign_agent_card(&card, &sk_seed);
1152    config::write_agent_card(&signed)?;
1153
1154    let mut trust = empty_trust();
1155    add_self_to_trust(&mut trust, handle, &pk_bytes);
1156    config::write_trust(&trust)?;
1157
1158    let fp = fingerprint(&pk_bytes);
1159    let key_id = make_key_id(handle, &pk_bytes);
1160
1161    // If --relay was passed, also bind a slot inline so init+bind happen in one step.
1162    let mut relay_info: Option<(String, String)> = None;
1163    if let Some(url) = relay {
1164        let normalized = url.trim_end_matches('/');
1165        let client = crate::relay_client::RelayClient::new(normalized);
1166        client.check_healthz()?;
1167        let alloc = client.allocate_slot(Some(handle))?;
1168        let mut state = config::read_relay_state()?;
1169        state["self"] = json!({
1170            "relay_url": normalized,
1171            "slot_id": alloc.slot_id.clone(),
1172            "slot_token": alloc.slot_token,
1173        });
1174        config::write_relay_state(&state)?;
1175        relay_info = Some((normalized.to_string(), alloc.slot_id));
1176    }
1177
1178    let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
1179    if as_json {
1180        let mut out = json!({
1181            "did": did_str.clone(),
1182            "fingerprint": fp,
1183            "key_id": key_id,
1184            "config_dir": config::config_dir()?.to_string_lossy(),
1185        });
1186        if let Some((url, slot_id)) = &relay_info {
1187            out["relay_url"] = json!(url);
1188            out["slot_id"] = json!(slot_id);
1189        }
1190        println!("{}", serde_json::to_string(&out)?);
1191    } else {
1192        println!("generated {did_str} (ed25519:{key_id})");
1193        println!(
1194            "config written to {}",
1195            config::config_dir()?.to_string_lossy()
1196        );
1197        if let Some((url, slot_id)) = &relay_info {
1198            println!("bound to relay {url} (slot {slot_id})");
1199            println!();
1200            println!(
1201                "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
1202            );
1203        } else {
1204            println!();
1205            println!(
1206                "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
1207            );
1208        }
1209    }
1210    Ok(())
1211}
1212
1213// ---------- status ----------
1214
1215fn cmd_status(as_json: bool) -> Result<()> {
1216    let initialized = config::is_initialized()?;
1217
1218    let mut summary = json!({
1219        "initialized": initialized,
1220    });
1221
1222    if initialized {
1223        let card = config::read_agent_card()?;
1224        let did = card
1225            .get("did")
1226            .and_then(Value::as_str)
1227            .unwrap_or("")
1228            .to_string();
1229        // Prefer the explicit `handle` field added in v0.5.7. Fall back to
1230        // stripping the DID prefix (and the v0.5.7+ pubkey suffix) for
1231        // legacy cards.
1232        let handle = card
1233            .get("handle")
1234            .and_then(Value::as_str)
1235            .map(str::to_string)
1236            .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1237        let pk_b64 = card
1238            .get("verify_keys")
1239            .and_then(Value::as_object)
1240            .and_then(|m| m.values().next())
1241            .and_then(|v| v.get("key"))
1242            .and_then(Value::as_str)
1243            .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1244        let pk_bytes = crate::signing::b64decode(pk_b64)?;
1245        summary["did"] = json!(did);
1246        summary["handle"] = json!(handle);
1247        summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1248        summary["capabilities"] = card
1249            .get("capabilities")
1250            .cloned()
1251            .unwrap_or_else(|| json!([]));
1252
1253        let trust = config::read_trust()?;
1254        let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1255        let mut peers = Vec::new();
1256        if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1257            for (peer_handle, _agent) in agents {
1258                if peer_handle == &handle {
1259                    continue; // self
1260                }
1261                // P0.Y (0.5.11): use effective tier — surfaces PENDING_ACK
1262                // for peers we've pinned but never received a pair_drop_ack
1263                // from, so the operator sees the "we can't send to them yet"
1264                // state instead of seeing a misleading VERIFIED.
1265                peers.push(json!({
1266                    "handle": peer_handle,
1267                    "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1268                }));
1269            }
1270        }
1271        summary["peers"] = json!(peers);
1272
1273        let relay_state = config::read_relay_state()?;
1274        summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1275        if !summary["self_relay"].is_null() {
1276            // Hide slot_token from default view.
1277            if let Some(obj) = summary["self_relay"].as_object_mut() {
1278                obj.remove("slot_token");
1279            }
1280        }
1281        summary["peer_slots_count"] = json!(
1282            relay_state
1283                .get("peers")
1284                .and_then(Value::as_object)
1285                .map(|m| m.len())
1286                .unwrap_or(0)
1287        );
1288
1289        // Outbox / inbox queue depth (file count + total events)
1290        let outbox = config::outbox_dir()?;
1291        let inbox = config::inbox_dir()?;
1292        summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1293        summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1294
1295        // v0.5.19: liveness snapshot through a single helper so this
1296        // surface and `wire doctor` agree by construction. Issue #2:
1297        // doctor PASSed while status said DOWN for 25 min because each
1298        // computed liveness independently. ensure_up::daemon_liveness
1299        // is the only path now.
1300        let snap = crate::ensure_up::daemon_liveness();
1301        let mut daemon = json!({
1302            "running": snap.pidfile_alive,
1303            "pid": snap.pidfile_pid,
1304            "all_running_pids": snap.pgrep_pids,
1305            "orphans": snap.orphan_pids,
1306        });
1307        if let crate::ensure_up::PidRecord::Json(d) = &snap.record {
1308            daemon["version"] = json!(d.version);
1309            daemon["bin_path"] = json!(d.bin_path);
1310            daemon["did"] = json!(d.did);
1311            daemon["relay_url"] = json!(d.relay_url);
1312            daemon["started_at"] = json!(d.started_at);
1313            daemon["schema"] = json!(d.schema);
1314            if d.version != env!("CARGO_PKG_VERSION") {
1315                daemon["version_mismatch"] = json!({
1316                    "daemon": d.version.clone(),
1317                    "cli": env!("CARGO_PKG_VERSION"),
1318                });
1319            }
1320        } else if matches!(snap.record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1321            daemon["pidfile_form"] = json!("legacy-int");
1322            daemon["version_mismatch"] = json!({
1323                "daemon": "<pre-0.5.11>",
1324                "cli": env!("CARGO_PKG_VERSION"),
1325            });
1326        }
1327        summary["daemon"] = daemon;
1328
1329        // Pending pair sessions — counts by status.
1330        let pending = crate::pending_pair::list_pending().unwrap_or_default();
1331        let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1332        for p in &pending {
1333            *counts.entry(p.status.clone()).or_default() += 1;
1334        }
1335        // v0.5.14: pending-inbound zero-paste pair_drops awaiting accept.
1336        let pending_inbound =
1337            crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
1338        let inbound_handles: Vec<&str> = pending_inbound
1339            .iter()
1340            .map(|p| p.peer_handle.as_str())
1341            .collect();
1342        summary["pending_pairs"] = json!({
1343            "total": pending.len(),
1344            "by_status": counts,
1345            "inbound_count": pending_inbound.len(),
1346            "inbound_handles": inbound_handles,
1347        });
1348    }
1349
1350    if as_json {
1351        println!("{}", serde_json::to_string(&summary)?);
1352    } else if !initialized {
1353        println!("not initialized — run `wire init <handle>` first");
1354    } else {
1355        println!("did:           {}", summary["did"].as_str().unwrap_or("?"));
1356        println!(
1357            "fingerprint:   {}",
1358            summary["fingerprint"].as_str().unwrap_or("?")
1359        );
1360        println!("capabilities:  {}", summary["capabilities"]);
1361        if !summary["self_relay"].is_null() {
1362            println!(
1363                "self relay:    {} (slot {})",
1364                summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1365                summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1366            );
1367        } else {
1368            println!("self relay:    (not bound — run `wire pair-host --relay <url>` to bind)");
1369        }
1370        println!(
1371            "peers:         {}",
1372            summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1373        );
1374        for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1375            println!(
1376                "  - {:<20} tier={}",
1377                p["handle"].as_str().unwrap_or(""),
1378                p["tier"].as_str().unwrap_or("?")
1379            );
1380        }
1381        println!(
1382            "outbox:        {} file(s), {} event(s) queued",
1383            summary["outbox"]["files"].as_u64().unwrap_or(0),
1384            summary["outbox"]["events"].as_u64().unwrap_or(0)
1385        );
1386        println!(
1387            "inbox:         {} file(s), {} event(s) received",
1388            summary["inbox"]["files"].as_u64().unwrap_or(0),
1389            summary["inbox"]["events"].as_u64().unwrap_or(0)
1390        );
1391        let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1392        let daemon_pid = summary["daemon"]["pid"]
1393            .as_u64()
1394            .map(|p| p.to_string())
1395            .unwrap_or_else(|| "—".to_string());
1396        let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1397        let version_suffix = if !daemon_version.is_empty() {
1398            format!(" v{daemon_version}")
1399        } else {
1400            String::new()
1401        };
1402        println!(
1403            "daemon:        {} (pid {}{})",
1404            if daemon_running { "running" } else { "DOWN" },
1405            daemon_pid,
1406            version_suffix,
1407        );
1408        // P1.7: surface version mismatch + orphan procs loudly.
1409        if let Some(mm) = summary["daemon"].get("version_mismatch") {
1410            println!(
1411                "               !! version mismatch: daemon={} CLI={}. \
1412                 run `wire upgrade` to swap atomically.",
1413                mm["daemon"].as_str().unwrap_or("?"),
1414                mm["cli"].as_str().unwrap_or("?"),
1415            );
1416        }
1417        if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1418            && !orphans.is_empty()
1419        {
1420            let pids: Vec<String> = orphans
1421                .iter()
1422                .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1423                .collect();
1424            println!(
1425                "               !! orphan daemon process(es): pids {}. \
1426                 pgrep saw them but pidfile didn't — likely stale process from \
1427                 prior install. Multiple daemons race the relay cursor.",
1428                pids.join(", ")
1429            );
1430        }
1431        let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1432        let inbound_count = summary["pending_pairs"]["inbound_count"]
1433            .as_u64()
1434            .unwrap_or(0);
1435        if pending_total > 0 {
1436            print!("pending pairs: {pending_total}");
1437            if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1438                let parts: Vec<String> = obj
1439                    .iter()
1440                    .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1441                    .collect();
1442                if !parts.is_empty() {
1443                    print!(" ({})", parts.join(", "));
1444                }
1445            }
1446            println!();
1447        } else if inbound_count == 0 {
1448            println!("pending pairs: none");
1449        }
1450        // v0.5.14: separate line for pending-inbound zero-paste requests.
1451        // Loud because each one is awaiting an operator gesture and the
1452        // capability hasn't flowed yet.
1453        if inbound_count > 0 {
1454            let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
1455                .as_array()
1456                .map(|a| {
1457                    a.iter()
1458                        .filter_map(|v| v.as_str().map(str::to_string))
1459                        .collect()
1460                })
1461                .unwrap_or_default();
1462            println!(
1463                "inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
1464                handles.join(", "),
1465            );
1466        }
1467    }
1468    Ok(())
1469}
1470
1471fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1472    if !dir.exists() {
1473        return Ok(json!({"files": 0, "events": 0}));
1474    }
1475    let mut files = 0usize;
1476    let mut events = 0usize;
1477    for entry in std::fs::read_dir(dir)? {
1478        let path = entry?.path();
1479        if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1480            files += 1;
1481            if let Ok(body) = std::fs::read_to_string(&path) {
1482                events += body.lines().filter(|l| !l.trim().is_empty()).count();
1483            }
1484        }
1485    }
1486    Ok(json!({"files": files, "events": events}))
1487}
1488
1489// ---------- responder health ----------
1490
1491fn responder_status_allowed(status: &str) -> bool {
1492    matches!(
1493        status,
1494        "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1495    )
1496}
1497
1498fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1499    let state = config::read_relay_state()?;
1500    let (label, slot_info) = match peer {
1501        Some(peer) => (
1502            peer.to_string(),
1503            state
1504                .get("peers")
1505                .and_then(|p| p.get(peer))
1506                .ok_or_else(|| {
1507                    anyhow!(
1508                        "unknown peer {peer:?} in relay state — pair with them first:\n  \
1509                         wire add {peer}@wireup.net   (or {peer}@<their-relay>)\n\
1510                         (`wire peers` lists who you've already paired with.)"
1511                    )
1512                })?,
1513        ),
1514        None => (
1515            "self".to_string(),
1516            state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1517                anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1518            })?,
1519        ),
1520    };
1521    let relay_url = slot_info["relay_url"]
1522        .as_str()
1523        .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1524        .to_string();
1525    let slot_id = slot_info["slot_id"]
1526        .as_str()
1527        .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1528        .to_string();
1529    let slot_token = slot_info["slot_token"]
1530        .as_str()
1531        .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1532        .to_string();
1533    Ok((label, relay_url, slot_id, slot_token))
1534}
1535
1536fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1537    if !responder_status_allowed(status) {
1538        bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1539    }
1540    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1541    let now = time::OffsetDateTime::now_utc()
1542        .format(&time::format_description::well_known::Rfc3339)
1543        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1544    let mut record = json!({
1545        "status": status,
1546        "set_at": now,
1547    });
1548    if let Some(reason) = reason {
1549        record["reason"] = json!(reason);
1550    }
1551    if status == "online" {
1552        record["last_success_at"] = json!(now);
1553    }
1554    let client = crate::relay_client::RelayClient::new(&relay_url);
1555    let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1556    if as_json {
1557        println!("{}", serde_json::to_string(&saved)?);
1558    } else {
1559        let reason = saved
1560            .get("reason")
1561            .and_then(Value::as_str)
1562            .map(|r| format!(" — {r}"))
1563            .unwrap_or_default();
1564        println!(
1565            "responder {}{}",
1566            saved
1567                .get("status")
1568                .and_then(Value::as_str)
1569                .unwrap_or(status),
1570            reason
1571        );
1572    }
1573    Ok(())
1574}
1575
1576fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1577    let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1578    let client = crate::relay_client::RelayClient::new(&relay_url);
1579    let health = client.responder_health_get(&slot_id, &slot_token)?;
1580    if as_json {
1581        println!(
1582            "{}",
1583            serde_json::to_string(&json!({
1584                "target": label,
1585                "responder_health": health,
1586            }))?
1587        );
1588    } else if health.is_null() {
1589        println!("{label}: responder health not reported");
1590    } else {
1591        let status = health
1592            .get("status")
1593            .and_then(Value::as_str)
1594            .unwrap_or("unknown");
1595        let reason = health
1596            .get("reason")
1597            .and_then(Value::as_str)
1598            .map(|r| format!(" — {r}"))
1599            .unwrap_or_default();
1600        let last_success = health
1601            .get("last_success_at")
1602            .and_then(Value::as_str)
1603            .map(|t| format!(" (last_success: {t})"))
1604            .unwrap_or_default();
1605        println!("{label}: {status}{reason}{last_success}");
1606    }
1607    Ok(())
1608}
1609
1610fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1611    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1612    let client = crate::relay_client::RelayClient::new(&relay_url);
1613
1614    let started = std::time::Instant::now();
1615    let transport_ok = client.healthz().unwrap_or(false);
1616    let latency_ms = started.elapsed().as_millis() as u64;
1617
1618    let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1619    let now = std::time::SystemTime::now()
1620        .duration_since(std::time::UNIX_EPOCH)
1621        .map(|d| d.as_secs())
1622        .unwrap_or(0);
1623    let attention = match last_pull_at_unix {
1624        Some(last) if now.saturating_sub(last) <= 300 => json!({
1625            "status": "ok",
1626            "last_pull_at_unix": last,
1627            "age_seconds": now.saturating_sub(last),
1628            "event_count": event_count,
1629        }),
1630        Some(last) => json!({
1631            "status": "stale",
1632            "last_pull_at_unix": last,
1633            "age_seconds": now.saturating_sub(last),
1634            "event_count": event_count,
1635        }),
1636        None => json!({
1637            "status": "never_pulled",
1638            "last_pull_at_unix": Value::Null,
1639            "event_count": event_count,
1640        }),
1641    };
1642
1643    let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1644    let responder = if responder_health.is_null() {
1645        json!({"status": "not_reported", "record": Value::Null})
1646    } else {
1647        json!({
1648            "status": responder_health
1649                .get("status")
1650                .and_then(Value::as_str)
1651                .unwrap_or("unknown"),
1652            "record": responder_health,
1653        })
1654    };
1655
1656    let report = json!({
1657        "peer": peer,
1658        "transport": {
1659            "status": if transport_ok { "ok" } else { "error" },
1660            "relay_url": relay_url,
1661            "latency_ms": latency_ms,
1662        },
1663        "attention": attention,
1664        "responder": responder,
1665    });
1666
1667    if as_json {
1668        println!("{}", serde_json::to_string(&report)?);
1669    } else {
1670        let transport_line = if transport_ok {
1671            format!("ok relay reachable ({latency_ms}ms)")
1672        } else {
1673            "error relay unreachable".to_string()
1674        };
1675        println!("transport      {transport_line}");
1676        match report["attention"]["status"].as_str().unwrap_or("unknown") {
1677            "ok" => println!(
1678                "attention      ok last pull {}s ago",
1679                report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1680            ),
1681            "stale" => println!(
1682                "attention      stale last pull {}m ago",
1683                report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1684            ),
1685            "never_pulled" => println!("attention      never pulled since relay reset"),
1686            other => println!("attention      {other}"),
1687        }
1688        if report["responder"]["status"] == "not_reported" {
1689            println!("auto-responder not reported");
1690        } else {
1691            let record = &report["responder"]["record"];
1692            let status = record
1693                .get("status")
1694                .and_then(Value::as_str)
1695                .unwrap_or("unknown");
1696            let reason = record
1697                .get("reason")
1698                .and_then(Value::as_str)
1699                .map(|r| format!(" — {r}"))
1700                .unwrap_or_default();
1701            println!("auto-responder {status}{reason}");
1702        }
1703    }
1704    Ok(())
1705}
1706
1707// (Old cmd_join stub removed — superseded by cmd_pair_join below.)
1708
1709// ---------- whoami ----------
1710
1711fn cmd_whoami(as_json: bool) -> Result<()> {
1712    if !config::is_initialized()? {
1713        bail!("not initialized — run `wire init <handle>` first");
1714    }
1715    let card = config::read_agent_card()?;
1716    let did = card
1717        .get("did")
1718        .and_then(Value::as_str)
1719        .unwrap_or("")
1720        .to_string();
1721    let handle = card
1722        .get("handle")
1723        .and_then(Value::as_str)
1724        .map(str::to_string)
1725        .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1726    let pk_b64 = card
1727        .get("verify_keys")
1728        .and_then(Value::as_object)
1729        .and_then(|m| m.values().next())
1730        .and_then(|v| v.get("key"))
1731        .and_then(Value::as_str)
1732        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1733    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1734    let fp = fingerprint(&pk_bytes);
1735    let key_id = make_key_id(&handle, &pk_bytes);
1736    let capabilities = card
1737        .get("capabilities")
1738        .cloned()
1739        .unwrap_or_else(|| json!(["wire/v3.1"]));
1740
1741    if as_json {
1742        println!(
1743            "{}",
1744            serde_json::to_string(&json!({
1745                "did": did,
1746                "handle": handle,
1747                "fingerprint": fp,
1748                "key_id": key_id,
1749                "public_key_b64": pk_b64,
1750                "capabilities": capabilities,
1751                "config_dir": config::config_dir()?.to_string_lossy(),
1752            }))?
1753        );
1754    } else {
1755        println!("{did} (ed25519:{key_id})");
1756        println!("fingerprint: {fp}");
1757        println!("capabilities: {capabilities}");
1758    }
1759    Ok(())
1760}
1761
1762// ---------- peers ----------
1763
1764/// P0.Y (0.5.11): effective tier shown to operators. `wire add` pins a
1765/// peer's card into trust at VERIFIED immediately, but the bilateral pin
1766/// isn't complete until that peer's `pair_drop_ack` arrives carrying their
1767/// slot_token. Until then we CAN'T send to them. Displaying VERIFIED is
1768/// misleading — spark observed this in real usage.
1769///
1770/// Effective rules:
1771///   trust.tier == VERIFIED + relay_state.peers[h].slot_token empty -> "PENDING_ACK"
1772///   otherwise -> raw trust tier (UNTRUSTED / VERIFIED / etc.)
1773///
1774/// Strictly a display concern — trust state machine itself is untouched
1775/// so existing promote/demote logic still works.
1776fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1777    let raw = crate::trust::get_tier(trust, handle);
1778    if raw != "VERIFIED" {
1779        return raw.to_string();
1780    }
1781    let token = relay_state
1782        .get("peers")
1783        .and_then(|p| p.get(handle))
1784        .and_then(|p| p.get("slot_token"))
1785        .and_then(Value::as_str)
1786        .unwrap_or("");
1787    if token.is_empty() {
1788        "PENDING_ACK".to_string()
1789    } else {
1790        raw.to_string()
1791    }
1792}
1793
1794fn cmd_peers(as_json: bool) -> Result<()> {
1795    let trust = config::read_trust()?;
1796    let agents = trust
1797        .get("agents")
1798        .and_then(Value::as_object)
1799        .cloned()
1800        .unwrap_or_default();
1801    let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1802
1803    let mut self_did: Option<String> = None;
1804    if let Ok(card) = config::read_agent_card() {
1805        self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1806    }
1807
1808    let mut peers = Vec::new();
1809    for (handle, agent) in agents.iter() {
1810        let did = agent
1811            .get("did")
1812            .and_then(Value::as_str)
1813            .unwrap_or("")
1814            .to_string();
1815        if Some(did.as_str()) == self_did.as_deref() {
1816            continue; // skip self-attestation
1817        }
1818        let tier = effective_peer_tier(&trust, &relay_state, handle);
1819        let capabilities = agent
1820            .get("card")
1821            .and_then(|c| c.get("capabilities"))
1822            .cloned()
1823            .unwrap_or_else(|| json!([]));
1824        peers.push(json!({
1825            "handle": handle,
1826            "did": did,
1827            "tier": tier,
1828            "capabilities": capabilities,
1829        }));
1830    }
1831
1832    if as_json {
1833        println!("{}", serde_json::to_string(&peers)?);
1834    } else if peers.is_empty() {
1835        println!("no peers pinned (run `wire join <code>` to pair)");
1836    } else {
1837        for p in &peers {
1838            println!(
1839                "{:<20} {:<10} {}",
1840                p["handle"].as_str().unwrap_or(""),
1841                p["tier"].as_str().unwrap_or(""),
1842                p["did"].as_str().unwrap_or(""),
1843            );
1844        }
1845    }
1846    Ok(())
1847}
1848
1849// ---------- send ----------
1850
1851/// R4 attentiveness pre-flight. Best-effort: any failure is silent.
1852///
1853/// Looks up `peer` in relay-state for slot_id + slot_token + relay_url, asks
1854/// the relay for the slot's `last_pull_at_unix`, and prints a warning to
1855/// stderr if the peer hasn't polled in > 5min (or never has). Threshold of
1856/// 300s is the same wire daemon polling cadence rule-of-thumb — a peer
1857/// hasn't crossed two heartbeats means probably degraded.
1858fn maybe_warn_peer_attentiveness(peer: &str) {
1859    let state = match config::read_relay_state() {
1860        Ok(s) => s,
1861        Err(_) => return,
1862    };
1863    let p = state.get("peers").and_then(|p| p.get(peer));
1864    let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1865        Some(s) if !s.is_empty() => s,
1866        _ => return,
1867    };
1868    let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1869        Some(s) if !s.is_empty() => s,
1870        _ => return,
1871    };
1872    let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1873        Some(s) if !s.is_empty() => s.to_string(),
1874        _ => match state
1875            .get("self")
1876            .and_then(|s| s.get("relay_url"))
1877            .and_then(Value::as_str)
1878        {
1879            Some(s) if !s.is_empty() => s.to_string(),
1880            _ => return,
1881        },
1882    };
1883    let client = crate::relay_client::RelayClient::new(&relay_url);
1884    let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1885        Ok(t) => t,
1886        Err(_) => return,
1887    };
1888    let now = std::time::SystemTime::now()
1889        .duration_since(std::time::UNIX_EPOCH)
1890        .map(|d| d.as_secs())
1891        .unwrap_or(0);
1892    match last_pull {
1893        None => {
1894            eprintln!(
1895                "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1896            );
1897        }
1898        Some(t) if now.saturating_sub(t) > 300 => {
1899            let mins = now.saturating_sub(t) / 60;
1900            eprintln!(
1901                "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1902            );
1903        }
1904        _ => {}
1905    }
1906}
1907
1908pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1909    let trimmed = input.trim();
1910    if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1911    {
1912        return Ok(trimmed.to_string());
1913    }
1914    let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1915    let n: i64 = amount
1916        .parse()
1917        .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1918    if n <= 0 {
1919        bail!("deadline duration must be positive: {input:?}");
1920    }
1921    let duration = match unit {
1922        "m" => time::Duration::minutes(n),
1923        "h" => time::Duration::hours(n),
1924        "d" => time::Duration::days(n),
1925        _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1926    };
1927    Ok((time::OffsetDateTime::now_utc() + duration)
1928        .format(&time::format_description::well_known::Rfc3339)
1929        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1930}
1931
1932fn cmd_send(
1933    peer: &str,
1934    kind: &str,
1935    body_arg: &str,
1936    deadline: Option<&str>,
1937    as_json: bool,
1938) -> Result<()> {
1939    if !config::is_initialized()? {
1940        bail!("not initialized — run `wire init <handle>` first");
1941    }
1942    let peer = crate::agent_card::bare_handle(peer);
1943    let sk_seed = config::read_private_key()?;
1944    let card = config::read_agent_card()?;
1945    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1946    let handle = crate::agent_card::display_handle_from_did(did).to_string();
1947    let pk_b64 = card
1948        .get("verify_keys")
1949        .and_then(Value::as_object)
1950        .and_then(|m| m.values().next())
1951        .and_then(|v| v.get("key"))
1952        .and_then(Value::as_str)
1953        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1954    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1955
1956    // Body: literal string, `@/path/to/body.json`, or `-` for stdin.
1957    // P0.S (0.5.11): stdin support lets shells pipe in long content
1958    // without quoting/escaping ceremony, and supports heredocs naturally:
1959    //   wire send peer - <<EOF ... EOF
1960    let body_value: Value = if body_arg == "-" {
1961        use std::io::Read;
1962        let mut raw = String::new();
1963        std::io::stdin()
1964            .read_to_string(&mut raw)
1965            .with_context(|| "reading body from stdin")?;
1966        // Try parsing as JSON first; fall back to string literal for
1967        // plain-text bodies.
1968        serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1969    } else if let Some(path) = body_arg.strip_prefix('@') {
1970        let raw =
1971            std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1972        serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1973    } else {
1974        Value::String(body_arg.to_string())
1975    };
1976
1977    let kind_id = parse_kind(kind)?;
1978
1979    let now = time::OffsetDateTime::now_utc()
1980        .format(&time::format_description::well_known::Rfc3339)
1981        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1982
1983    let mut event = json!({
1984        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1985        "timestamp": now,
1986        "from": did,
1987        "to": format!("did:wire:{peer}"),
1988        "type": kind,
1989        "kind": kind_id,
1990        "body": body_value,
1991    });
1992    if let Some(deadline) = deadline {
1993        event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1994    }
1995    let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1996    let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1997
1998    // R4: best-effort attentiveness pre-flight. Look up the peer's slot
1999    // coords in relay-state and ask the relay how recently the peer pulled.
2000    // Warn on stderr if the peer hasn't pulled in >5min OR has never pulled.
2001    // Never blocks the send — the event still queues to outbox.
2002    maybe_warn_peer_attentiveness(peer);
2003
2004    // For now we append to outbox JSONL and rely on a future daemon to push
2005    // to the relay. That's the file-system contract from AGENT_INTEGRATION.md.
2006    // Append goes through `config::append_outbox_record` which holds a per-
2007    // path mutex so concurrent senders cannot interleave bytes mid-line.
2008    let line = serde_json::to_vec(&signed)?;
2009    let outbox = config::append_outbox_record(peer, &line)?;
2010
2011    if as_json {
2012        println!(
2013            "{}",
2014            serde_json::to_string(&json!({
2015                "event_id": event_id,
2016                "status": "queued",
2017                "peer": peer,
2018                "outbox": outbox.to_string_lossy(),
2019            }))?
2020        );
2021    } else {
2022        println!(
2023            "queued event {event_id} → {peer} (outbox: {})",
2024            outbox.display()
2025        );
2026    }
2027    Ok(())
2028}
2029
2030fn parse_kind(s: &str) -> Result<u32> {
2031    if let Ok(n) = s.parse::<u32>() {
2032        return Ok(n);
2033    }
2034    for (id, name) in crate::signing::kinds() {
2035        if *name == s {
2036            return Ok(*id);
2037        }
2038    }
2039    // Unknown name — default to kind 1 (decision) for v0.1.
2040    Ok(1)
2041}
2042
2043// ---------- tail ----------
2044
2045fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
2046    let inbox = config::inbox_dir()?;
2047    if !inbox.exists() {
2048        if !as_json {
2049            eprintln!("no inbox yet — daemon hasn't run, or no events received");
2050        }
2051        return Ok(());
2052    }
2053    let trust = config::read_trust()?;
2054    let mut count = 0usize;
2055
2056    let entries: Vec<_> = std::fs::read_dir(&inbox)?
2057        .filter_map(|e| e.ok())
2058        .map(|e| e.path())
2059        .filter(|p| {
2060            p.extension().map(|x| x == "jsonl").unwrap_or(false)
2061                && match peer {
2062                    Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
2063                    None => true,
2064                }
2065        })
2066        .collect();
2067
2068    for path in entries {
2069        let body = std::fs::read_to_string(&path)?;
2070        for line in body.lines() {
2071            let event: Value = match serde_json::from_str(line) {
2072                Ok(v) => v,
2073                Err(_) => continue,
2074            };
2075            let verified = verify_message_v31(&event, &trust).is_ok();
2076            if as_json {
2077                let mut event_with_meta = event.clone();
2078                if let Some(obj) = event_with_meta.as_object_mut() {
2079                    obj.insert("verified".into(), json!(verified));
2080                }
2081                println!("{}", serde_json::to_string(&event_with_meta)?);
2082            } else {
2083                let ts = event
2084                    .get("timestamp")
2085                    .and_then(Value::as_str)
2086                    .unwrap_or("?");
2087                let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
2088                let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
2089                let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
2090                let summary = event
2091                    .get("body")
2092                    .map(|b| match b {
2093                        Value::String(s) => s.clone(),
2094                        _ => b.to_string(),
2095                    })
2096                    .unwrap_or_default();
2097                let mark = if verified { "✓" } else { "✗" };
2098                let deadline = event
2099                    .get("time_sensitive_until")
2100                    .and_then(Value::as_str)
2101                    .map(|d| format!(" deadline: {d}"))
2102                    .unwrap_or_default();
2103                println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
2104            }
2105            count += 1;
2106            if limit > 0 && count >= limit {
2107                return Ok(());
2108            }
2109        }
2110    }
2111    Ok(())
2112}
2113
2114// ---------- monitor (live-tail across all peers, harness-friendly) ----------
2115
2116/// Events filtered out of `wire monitor` by default — pair handshake +
2117/// liveness pings. Operators almost never want these surfaced; an explicit
2118/// `--include-handshake` brings them back.
2119fn monitor_is_noise_kind(kind: &str) -> bool {
2120    matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
2121}
2122
2123/// Render a single InboxEvent for `wire monitor` output. JSON form emits the
2124/// full structured event for tooling consumption; the plain form is a tight
2125/// one-line summary suitable as a harness stream-watcher notification.
2126fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
2127    if as_json {
2128        Ok(serde_json::to_string(e)?)
2129    } else {
2130        let eid_short: String = e.event_id.chars().take(12).collect();
2131        let body = e.body_preview.replace('\n', " ");
2132        let ts: String = e.timestamp.chars().take(19).collect();
2133        Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
2134    }
2135}
2136
2137/// `wire monitor` — long-running line-per-event stream of new inbox events.
2138///
2139/// Built for agent harnesses that have an "every stdout line is a chat
2140/// notification" stream watcher (Claude Code Monitor tool, etc.). One
2141/// command, persistent, filtered. Replaces the manual `tail -F inbox/*.jsonl
2142/// | python parse | grep -v pair_drop` pipeline operators improvise on day
2143/// one of every wire session.
2144///
2145/// Default filter strips `pair_drop`, `pair_drop_ack`, and `heartbeat` —
2146/// pure handshake / liveness noise that operators almost never want
2147/// surfaced. Pass `--include-handshake` if you do.
2148///
2149/// Cursor: in-memory only. Starts from EOF (so a fresh `wire monitor`
2150/// doesn't drown the operator in replay), with optional `--replay N` to
2151/// emit the last N events first.
2152fn cmd_monitor(
2153    peer_filter: Option<&str>,
2154    as_json: bool,
2155    include_handshake: bool,
2156    interval_ms: u64,
2157    replay: usize,
2158) -> Result<()> {
2159    let inbox_dir = config::inbox_dir()?;
2160    if !inbox_dir.exists() {
2161        if !as_json {
2162            eprintln!(
2163                "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
2164            );
2165        }
2166        // Still proceed — InboxWatcher::from_dir_head handles missing dir.
2167    }
2168
2169    // Optional replay — read existing files and emit the last `replay` events
2170    // (post-filter) before going live. Useful when the harness restarts and
2171    // wants recent context.
2172    if replay > 0 && inbox_dir.exists() {
2173        let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
2174        for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
2175            let path = entry.path();
2176            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2177                continue;
2178            }
2179            let peer = match path.file_stem().and_then(|s| s.to_str()) {
2180                Some(s) => s.to_string(),
2181                None => continue,
2182            };
2183            if let Some(filter) = peer_filter {
2184                if peer != filter {
2185                    continue;
2186                }
2187            }
2188            let body = std::fs::read_to_string(&path).unwrap_or_default();
2189            for line in body.lines() {
2190                let line = line.trim();
2191                if line.is_empty() {
2192                    continue;
2193                }
2194                let signed: Value = match serde_json::from_str(line) {
2195                    Ok(v) => v,
2196                    Err(_) => continue,
2197                };
2198                let ev = crate::inbox_watch::InboxEvent::from_signed(
2199                    &peer,
2200                    signed,
2201                    /* verified */ true,
2202                );
2203                if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2204                    continue;
2205                }
2206                all.push(ev);
2207            }
2208        }
2209        // Sort by timestamp string (RFC3339-ish — lexicographic order matches
2210        // chronological for same-zoned timestamps).
2211        all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2212        let start = all.len().saturating_sub(replay);
2213        for ev in &all[start..] {
2214            println!("{}", monitor_render(ev, as_json)?);
2215        }
2216        use std::io::Write;
2217        std::io::stdout().flush().ok();
2218    }
2219
2220    // Live loop. InboxWatcher::from_head() seeds cursors at current EOF, so
2221    // the first poll only returns events that arrived AFTER startup.
2222    let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2223    let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2224
2225    loop {
2226        let events = w.poll()?;
2227        let mut wrote = false;
2228        for ev in events {
2229            if let Some(filter) = peer_filter {
2230                if ev.peer != filter {
2231                    continue;
2232                }
2233            }
2234            if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2235                continue;
2236            }
2237            println!("{}", monitor_render(&ev, as_json)?);
2238            wrote = true;
2239        }
2240        if wrote {
2241            use std::io::Write;
2242            std::io::stdout().flush().ok();
2243        }
2244        std::thread::sleep(sleep_dur);
2245    }
2246}
2247
2248#[cfg(test)]
2249mod tier_tests {
2250    use super::*;
2251    use serde_json::json;
2252
2253    fn trust_with(handle: &str, tier: &str) -> Value {
2254        json!({
2255            "version": 1,
2256            "agents": {
2257                handle: {
2258                    "tier": tier,
2259                    "did": format!("did:wire:{handle}"),
2260                    "card": {"capabilities": ["wire/v3.1"]}
2261                }
2262            }
2263        })
2264    }
2265
2266    #[test]
2267    fn pending_ack_when_verified_but_no_slot_token() {
2268        // P0.Y rule: after `wire add`, trust says VERIFIED but the peer's
2269        // slot_token hasn't arrived yet. Display PENDING_ACK so the
2270        // operator knows wire send won't work yet.
2271        let trust = trust_with("willard", "VERIFIED");
2272        let relay_state = json!({
2273            "peers": {
2274                "willard": {
2275                    "relay_url": "https://relay",
2276                    "slot_id": "abc",
2277                    "slot_token": "",
2278                }
2279            }
2280        });
2281        assert_eq!(
2282            effective_peer_tier(&trust, &relay_state, "willard"),
2283            "PENDING_ACK"
2284        );
2285    }
2286
2287    #[test]
2288    fn verified_when_slot_token_present() {
2289        let trust = trust_with("willard", "VERIFIED");
2290        let relay_state = json!({
2291            "peers": {
2292                "willard": {
2293                    "relay_url": "https://relay",
2294                    "slot_id": "abc",
2295                    "slot_token": "tok123",
2296                }
2297            }
2298        });
2299        assert_eq!(
2300            effective_peer_tier(&trust, &relay_state, "willard"),
2301            "VERIFIED"
2302        );
2303    }
2304
2305    #[test]
2306    fn raw_tier_passes_through_for_non_verified() {
2307        // PENDING_ACK should ONLY decorate VERIFIED. UNTRUSTED stays
2308        // UNTRUSTED regardless of slot_token state.
2309        let trust = trust_with("willard", "UNTRUSTED");
2310        let relay_state = json!({
2311            "peers": {"willard": {"slot_token": ""}}
2312        });
2313        assert_eq!(
2314            effective_peer_tier(&trust, &relay_state, "willard"),
2315            "UNTRUSTED"
2316        );
2317    }
2318
2319    #[test]
2320    fn pending_ack_when_relay_state_missing_peer() {
2321        // After wire add, trust gets updated BEFORE relay_state.peers does.
2322        // If relay_state has no entry for the peer at all, the operator
2323        // still hasn't completed the bilateral pin — show PENDING_ACK.
2324        let trust = trust_with("willard", "VERIFIED");
2325        let relay_state = json!({"peers": {}});
2326        assert_eq!(
2327            effective_peer_tier(&trust, &relay_state, "willard"),
2328            "PENDING_ACK"
2329        );
2330    }
2331}
2332
2333#[cfg(test)]
2334mod monitor_tests {
2335    use super::*;
2336    use crate::inbox_watch::InboxEvent;
2337    use serde_json::Value;
2338
2339    fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2340        InboxEvent {
2341            peer: peer.to_string(),
2342            event_id: "abcd1234567890ef".to_string(),
2343            kind: kind.to_string(),
2344            body_preview: body.to_string(),
2345            verified: true,
2346            timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2347            raw: Value::Null,
2348        }
2349    }
2350
2351    #[test]
2352    fn monitor_filter_drops_handshake_kinds_by_default() {
2353        // The whole point: pair_drop / pair_drop_ack / heartbeat are
2354        // protocol noise. If they leak into the operator's chat stream by
2355        // default, the recipe is useless ("wire monitor talks too much,
2356        // disabled it"). Burn this rule in.
2357        assert!(monitor_is_noise_kind("pair_drop"));
2358        assert!(monitor_is_noise_kind("pair_drop_ack"));
2359        assert!(monitor_is_noise_kind("heartbeat"));
2360
2361        // Real-payload kinds — operator wants every one.
2362        assert!(!monitor_is_noise_kind("claim"));
2363        assert!(!monitor_is_noise_kind("decision"));
2364        assert!(!monitor_is_noise_kind("ack"));
2365        assert!(!monitor_is_noise_kind("request"));
2366        assert!(!monitor_is_noise_kind("note"));
2367        // Unknown future kinds shouldn't be filtered as noise either —
2368        // operator probably wants to see something they don't recognise,
2369        // not have it silently dropped (the P0.1 lesson at the UX layer).
2370        assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2371    }
2372
2373    #[test]
2374    fn monitor_render_plain_is_one_short_line() {
2375        let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2376        let line = monitor_render(&e, false).unwrap();
2377        // Must be single-line.
2378        assert!(!line.contains('\n'), "render must be one line: {line}");
2379        // Must include peer, kind, body fragment, short event_id.
2380        assert!(line.contains("willard"));
2381        assert!(line.contains("claim"));
2382        assert!(line.contains("real v8 train"));
2383        // Short event id (first 12 chars).
2384        assert!(line.contains("abcd12345678"));
2385        assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2386        // RFC3339-ish second precision.
2387        assert!(line.contains("2026-05-15T23:14:07"));
2388    }
2389
2390    #[test]
2391    fn monitor_render_strips_newlines_from_body() {
2392        // Multi-line bodies (markdown lists, code, etc.) must collapse to
2393        // one line — otherwise a single message produces multiple
2394        // notifications in the harness, ruining the "one event = one line"
2395        // contract the Monitor tool relies on.
2396        let e = ev("spark", "claim", "line one\nline two\nline three");
2397        let line = monitor_render(&e, false).unwrap();
2398        assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2399        assert!(line.contains("line one line two line three"));
2400    }
2401
2402    #[test]
2403    fn monitor_render_json_is_valid_jsonl() {
2404        let e = ev("spark", "claim", "hi");
2405        let line = monitor_render(&e, true).unwrap();
2406        assert!(!line.contains('\n'));
2407        let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2408        assert_eq!(parsed["peer"], "spark");
2409        assert_eq!(parsed["kind"], "claim");
2410        assert_eq!(parsed["body_preview"], "hi");
2411    }
2412
2413    #[test]
2414    fn monitor_does_not_drop_on_verified_null() {
2415        // Spark's bug confession on 2026-05-15: their monitor pipeline ran
2416        // `select(.verified == true)` against inbox JSONL. Daemon writes
2417        // events with verified=null (verification happens at tail-time, not
2418        // write-time), so the filter silently rejected everything — same
2419        // anti-pattern as P0.1 at the JSON-jq level. Cost: 4 of my events
2420        // never surfaced for ~30min.
2421        //
2422        // wire monitor's render path must NOT consult `.verified` for any
2423        // filter decision. Lock that in here so a future "be conservative,
2424        // only emit verified" patch can't quietly land.
2425        let mut e = ev("spark", "claim", "from disk with verified=null");
2426        e.verified = false; // worst case — even if disk says unverified, emit
2427        let line = monitor_render(&e, false).unwrap();
2428        assert!(line.contains("from disk with verified=null"));
2429        // Noise filter operates purely on kind, never on verified.
2430        assert!(!monitor_is_noise_kind("claim"));
2431    }
2432}
2433
2434// ---------- verify ----------
2435
2436fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2437    let body = if path == "-" {
2438        let mut buf = String::new();
2439        use std::io::Read;
2440        std::io::stdin().read_to_string(&mut buf)?;
2441        buf
2442    } else {
2443        std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2444    };
2445    let event: Value = serde_json::from_str(&body)?;
2446    let trust = config::read_trust()?;
2447    match verify_message_v31(&event, &trust) {
2448        Ok(()) => {
2449            if as_json {
2450                println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2451            } else {
2452                println!("verified ✓");
2453            }
2454            Ok(())
2455        }
2456        Err(e) => {
2457            let reason = e.to_string();
2458            if as_json {
2459                println!(
2460                    "{}",
2461                    serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2462                );
2463            } else {
2464                eprintln!("FAILED: {reason}");
2465            }
2466            std::process::exit(1);
2467        }
2468    }
2469}
2470
2471// ---------- mcp / relay-server stubs ----------
2472
2473fn cmd_mcp() -> Result<()> {
2474    crate::mcp::run()
2475}
2476
2477fn cmd_relay_server(bind: &str, local_only: bool) -> Result<()> {
2478    // v0.5.17: --local-only refuses non-loopback binds. Catches the
2479    // "wait did I just bind a publicly-reachable local-only relay" mistake
2480    // at startup rather than discovering it via an empty phonebook later.
2481    if local_only {
2482        validate_loopback_bind(bind)?;
2483    }
2484    // Default state dir for the relay process: $WIRE_HOME/state/wire-relay
2485    // (or `dirs::state_dir()/wire-relay`). Distinct from the CLI's state dir
2486    // so a single user can run both client and server on one machine.
2487    // For --local-only, suffix with /local so a single operator can run
2488    // both a federation relay and a local-only relay without state collision.
2489    let base = if let Ok(home) = std::env::var("WIRE_HOME") {
2490        std::path::PathBuf::from(home)
2491            .join("state")
2492            .join("wire-relay")
2493    } else {
2494        dirs::state_dir()
2495            .or_else(dirs::data_local_dir)
2496            .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2497            .join("wire-relay")
2498    };
2499    let state_dir = if local_only { base.join("local") } else { base };
2500    let runtime = tokio::runtime::Builder::new_multi_thread()
2501        .enable_all()
2502        .build()?;
2503    runtime.block_on(crate::relay_server::serve_with_mode(
2504        bind,
2505        state_dir,
2506        crate::relay_server::ServerMode { local_only },
2507    ))
2508}
2509
2510/// v0.5.17 loopback-bind guard. Refuses any address whose host portion
2511/// resolves to something outside `127.0.0.0/8` or `::1`. Specifically
2512/// rejects `0.0.0.0`, `::`, `0:0:0:0:0:0:0:0`, and any non-loopback
2513/// IPv4/IPv6 literal. Hostname-form addresses (e.g. `localhost`) are
2514/// accepted only if they resolve to a loopback address.
2515fn validate_loopback_bind(bind: &str) -> Result<()> {
2516    // Split host:port. IPv6 literals use `[::]:port` form.
2517    let host = if let Some(stripped) = bind.strip_prefix('[') {
2518        let close = stripped
2519            .find(']')
2520            .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
2521        stripped[..close].to_string()
2522    } else {
2523        bind.rsplit_once(':')
2524            .map(|(h, _)| h.to_string())
2525            .unwrap_or_else(|| bind.to_string())
2526    };
2527    use std::net::ToSocketAddrs;
2528    let probe = format!("{host}:0");
2529    let resolved: Vec<_> = probe
2530        .to_socket_addrs()
2531        .with_context(|| format!("resolving bind host {host:?}"))?
2532        .collect();
2533    if resolved.is_empty() {
2534        bail!("--local-only: bind host {host:?} resolved to no addresses");
2535    }
2536    for addr in &resolved {
2537        if !addr.ip().is_loopback() {
2538            bail!(
2539                "--local-only refuses non-loopback bind: {host:?} resolves to {} \
2540                 which is not in 127.0.0.0/8 or [::1]. Remove --local-only to bind \
2541                 publicly, or use 127.0.0.1 / [::1] / localhost.",
2542                addr.ip()
2543            );
2544        }
2545    }
2546    Ok(())
2547}
2548
2549// ---------- bind-relay ----------
2550
2551fn cmd_bind_relay(url: &str, migrate_pinned: bool, as_json: bool) -> Result<()> {
2552    if !config::is_initialized()? {
2553        bail!("not initialized — run `wire init <handle>` first");
2554    }
2555    let card = config::read_agent_card()?;
2556    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2557    let handle = crate::agent_card::display_handle_from_did(did).to_string();
2558
2559    // v0.5.19 (issue #7): refuse silent migration that would black-hole
2560    // pinned peers. The peer's relay-state still points at our OLD slot;
2561    // they will keep POSTing successfully to a slot we no longer read,
2562    // and their messages disappear. Pre-fix this command silently
2563    // replaced state.self, the incident report logged 26 events lost
2564    // over 2 days.
2565    let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
2566    let pinned: Vec<String> = existing
2567        .get("peers")
2568        .and_then(|p| p.as_object())
2569        .map(|o| o.keys().cloned().collect())
2570        .unwrap_or_default();
2571    if !pinned.is_empty() && !migrate_pinned {
2572        let list = pinned.join(", ");
2573        bail!(
2574            "bind-relay would silently black-hole {n} pinned peer(s): {list}. \
2575             They are pinned to your CURRENT slot; without coordination they will keep \
2576             pushing to a slot you no longer read.\n\n\
2577             SAFE PATHS:\n\
2578             • `wire rotate-slot` — rotates slot on the SAME relay and emits a \
2579             wire_close event to every pinned peer so their daemons drop the stale \
2580             coords cleanly. This is the supported migration path.\n\
2581             • `wire bind-relay {url} --migrate-pinned` — acknowledges that pinned \
2582             peers will need to re-pin manually (you must notify them out-of-band, \
2583             via a fresh `wire add` from each peer or a re-shared invite). Use this \
2584             only when the current slot is unreachable so rotate-slot can't ack.\n\n\
2585             Issue #7 (silent black-hole on relay change) caught this — proceed only \
2586             if you understand the consequences.",
2587            n = pinned.len(),
2588        );
2589    }
2590
2591    let normalized = url.trim_end_matches('/');
2592    let client = crate::relay_client::RelayClient::new(normalized);
2593    client.check_healthz()?;
2594    let alloc = client.allocate_slot(Some(&handle))?;
2595    let mut state = existing;
2596    if !pinned.is_empty() {
2597        // We're committing to the migration. Surface a final stderr
2598        // banner naming the peers operators must notify out-of-band so
2599        // there's a record in their shell history.
2600        eprintln!(
2601            "wire bind-relay: migrating with {n} pinned peer(s) — they will black-hole \
2602             until they re-pin: {peers}",
2603            n = pinned.len(),
2604            peers = pinned.join(", "),
2605        );
2606    }
2607    state["self"] = json!({
2608        "relay_url": url,
2609        "slot_id": alloc.slot_id,
2610        "slot_token": alloc.slot_token,
2611    });
2612    config::write_relay_state(&state)?;
2613
2614    if as_json {
2615        println!(
2616            "{}",
2617            serde_json::to_string(&json!({
2618                "relay_url": url,
2619                "slot_id": alloc.slot_id,
2620                "slot_token_present": true,
2621            }))?
2622        );
2623    } else {
2624        println!("bound to relay {url}");
2625        println!("slot_id: {}", alloc.slot_id);
2626        println!(
2627            "(slot_token written to {} mode 0600)",
2628            config::relay_state_path()?.display()
2629        );
2630    }
2631    Ok(())
2632}
2633
2634// ---------- add-peer-slot ----------
2635
2636fn cmd_add_peer_slot(
2637    handle: &str,
2638    url: &str,
2639    slot_id: &str,
2640    slot_token: &str,
2641    as_json: bool,
2642) -> Result<()> {
2643    let mut state = config::read_relay_state()?;
2644    let peers = state["peers"]
2645        .as_object_mut()
2646        .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2647    peers.insert(
2648        handle.to_string(),
2649        json!({
2650            "relay_url": url,
2651            "slot_id": slot_id,
2652            "slot_token": slot_token,
2653        }),
2654    );
2655    config::write_relay_state(&state)?;
2656    if as_json {
2657        println!(
2658            "{}",
2659            serde_json::to_string(&json!({
2660                "handle": handle,
2661                "relay_url": url,
2662                "slot_id": slot_id,
2663                "added": true,
2664            }))?
2665        );
2666    } else {
2667        println!("pinned peer slot for {handle} at {url} ({slot_id})");
2668    }
2669    Ok(())
2670}
2671
2672// ---------- push ----------
2673
2674fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2675    let state = config::read_relay_state()?;
2676    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2677    if peers.is_empty() {
2678        bail!(
2679            "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2680        );
2681    }
2682    let outbox_dir = config::outbox_dir()?;
2683    // v0.5.13 loud-fail: warn on outbox files that don't match a pinned peer.
2684    // Pre-v0.5.13 `wire send peer@relay` wrote to `peer@relay.jsonl` while
2685    // push only enumerated bare-handle files. After upgrade, stale FQDN-named
2686    // files sit on disk forever; warn so operator can `cat fqdn.jsonl >> handle.jsonl`.
2687    if outbox_dir.exists() {
2688        let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2689        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2690            let path = entry.path();
2691            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2692                continue;
2693            }
2694            let stem = match path.file_stem().and_then(|s| s.to_str()) {
2695                Some(s) => s.to_string(),
2696                None => continue,
2697            };
2698            if pinned.contains(&stem) {
2699                continue;
2700            }
2701            // Try the bare-handle of the orphaned stem — if THAT matches a
2702            // pinned peer, the stem is a stale FQDN-suffixed file.
2703            let bare = crate::agent_card::bare_handle(&stem);
2704            if pinned.contains(bare) {
2705                eprintln!(
2706                    "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2707                     Merge with: `cat {} >> {}` then delete the FQDN file.",
2708                    stem,
2709                    path.display(),
2710                    outbox_dir.join(format!("{bare}.jsonl")).display(),
2711                );
2712            }
2713        }
2714    }
2715    if !outbox_dir.exists() {
2716        if as_json {
2717            println!(
2718                "{}",
2719                serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2720            );
2721        } else {
2722            println!("phyllis: nothing to dial out — write a message first with `wire send`");
2723        }
2724        return Ok(());
2725    }
2726
2727    let mut pushed = Vec::new();
2728    let mut skipped = Vec::new();
2729
2730    // v0.5.17: walk each peer's pinned endpoints in priority order (local
2731    // first if we share a local relay, federation second). Try POST on the
2732    // first endpoint; on transport failure, fall through to the next.
2733    // Falls back to the v0.5.16 legacy single-endpoint code path when the
2734    // peer record carries no `endpoints[]` array (back-compat).
2735    for (peer_handle, _) in peers.iter() {
2736        if let Some(want) = peer_filter
2737            && peer_handle != want
2738        {
2739            continue;
2740        }
2741        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2742        if !outbox.exists() {
2743            continue;
2744        }
2745        let ordered_endpoints =
2746            crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
2747        if ordered_endpoints.is_empty() {
2748            // Unreachable peer (no federation endpoint AND our local
2749            // relay doesn't match the peer's). Skip with a loud reason
2750            // rather than silently dropping events.
2751            for line in std::fs::read_to_string(&outbox)
2752                .unwrap_or_default()
2753                .lines()
2754            {
2755                let event: Value = match serde_json::from_str(line) {
2756                    Ok(v) => v,
2757                    Err(_) => continue,
2758                };
2759                let event_id = event
2760                    .get("event_id")
2761                    .and_then(Value::as_str)
2762                    .unwrap_or("")
2763                    .to_string();
2764                skipped.push(json!({
2765                    "peer": peer_handle,
2766                    "event_id": event_id,
2767                    "reason": "no reachable endpoint pinned for peer",
2768                }));
2769            }
2770            continue;
2771        }
2772        let body = std::fs::read_to_string(&outbox)?;
2773        for line in body.lines() {
2774            let event: Value = match serde_json::from_str(line) {
2775                Ok(v) => v,
2776                Err(_) => continue,
2777            };
2778            let event_id = event
2779                .get("event_id")
2780                .and_then(Value::as_str)
2781                .unwrap_or("")
2782                .to_string();
2783
2784            let mut delivered = false;
2785            let mut last_err_reason: Option<String> = None;
2786            for endpoint in &ordered_endpoints {
2787                let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2788                match client.post_event(&endpoint.slot_id, &endpoint.slot_token, &event) {
2789                    Ok(resp) => {
2790                        if resp.status == "duplicate" {
2791                            skipped.push(json!({
2792                                "peer": peer_handle,
2793                                "event_id": event_id,
2794                                "reason": "duplicate",
2795                                "endpoint": endpoint.relay_url,
2796                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2797                            }));
2798                        } else {
2799                            pushed.push(json!({
2800                                "peer": peer_handle,
2801                                "event_id": event_id,
2802                                "endpoint": endpoint.relay_url,
2803                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2804                            }));
2805                        }
2806                        delivered = true;
2807                        break;
2808                    }
2809                    Err(e) => {
2810                        // Local-first endpoint failed; record reason and
2811                        // try the next endpoint silently (operator sees
2812                        // the federation success). If every endpoint
2813                        // fails, the last reason is what gets reported.
2814                        last_err_reason =
2815                            Some(crate::relay_client::format_transport_error(&e));
2816                    }
2817                }
2818            }
2819            if !delivered {
2820                skipped.push(json!({
2821                    "peer": peer_handle,
2822                    "event_id": event_id,
2823                    "reason": last_err_reason.unwrap_or_else(|| "all endpoints failed".to_string()),
2824                }));
2825            }
2826        }
2827    }
2828
2829    if as_json {
2830        println!(
2831            "{}",
2832            serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2833        );
2834    } else {
2835        println!(
2836            "pushed {} event(s); skipped {} ({})",
2837            pushed.len(),
2838            skipped.len(),
2839            if skipped.is_empty() {
2840                "none"
2841            } else {
2842                "see --json for detail"
2843            }
2844        );
2845    }
2846    Ok(())
2847}
2848
2849// ---------- pull ----------
2850
2851fn cmd_pull(as_json: bool) -> Result<()> {
2852    let state = config::read_relay_state()?;
2853    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2854    if self_state.is_null() {
2855        bail!("self slot not bound — run `wire bind-relay <url>` first");
2856    }
2857
2858    // v0.5.17: pull from every endpoint in self.endpoints (federation +
2859    // optional local). Each endpoint has its own per-scope cursor so we
2860    // don't re-pull events we've already seen on that path. Events from
2861    // all endpoints feed into the same inbox JSONL via process_events;
2862    // dedup by event_id is the last line of defense.
2863    // Falls back to a single federation endpoint synthesized from the
2864    // top-level legacy fields when self.endpoints is absent (v0.5.16
2865    // back-compat).
2866    let endpoints = crate::endpoints::self_endpoints(&state);
2867    if endpoints.is_empty() {
2868        bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
2869    }
2870
2871    let inbox_dir = config::inbox_dir()?;
2872    config::ensure_dirs()?;
2873
2874    let mut total_seen = 0usize;
2875    let mut all_written: Vec<Value> = Vec::new();
2876    let mut all_rejected: Vec<Value> = Vec::new();
2877    let mut all_blocked = false;
2878    let mut all_advance_cursor_to: Option<String> = None;
2879
2880    for endpoint in &endpoints {
2881        let cursor_key = endpoint_cursor_key(endpoint.scope);
2882        let last_event_id = self_state
2883            .get(&cursor_key)
2884            .and_then(Value::as_str)
2885            .map(str::to_string);
2886        let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2887        let events = match client.list_events(
2888            &endpoint.slot_id,
2889            &endpoint.slot_token,
2890            last_event_id.as_deref(),
2891            Some(1000),
2892        ) {
2893            Ok(ev) => ev,
2894            Err(e) => {
2895                // One endpoint's failure shouldn't kill the whole pull.
2896                // The local-relay-down case in particular needs to
2897                // gracefully continue against federation.
2898                eprintln!(
2899                    "wire pull: endpoint {} ({:?}) errored: {}; continuing",
2900                    endpoint.relay_url,
2901                    endpoint.scope,
2902                    crate::relay_client::format_transport_error(&e),
2903                );
2904                continue;
2905            }
2906        };
2907        total_seen += events.len();
2908        let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
2909        all_written.extend(result.written.iter().cloned());
2910        all_rejected.extend(result.rejected.iter().cloned());
2911        if result.blocked {
2912            all_blocked = true;
2913        }
2914        // Advance per-endpoint cursor. The cursor key is scope-specific
2915        // so federation and local don't trample each other.
2916        if let Some(eid) = result.advance_cursor_to.clone() {
2917            if endpoint.scope == crate::endpoints::EndpointScope::Federation {
2918                all_advance_cursor_to = Some(eid.clone());
2919            }
2920            let key = cursor_key.clone();
2921            config::update_relay_state(|state| {
2922                if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2923                    self_obj.insert(key, Value::String(eid));
2924                }
2925                Ok(())
2926            })?;
2927        }
2928    }
2929
2930    // Compatibility shim for the legacy single-cursor code paths below:
2931    // `result` used to come from one process_events call; we now have
2932    // per-endpoint results aggregated into the all_* accumulators.
2933    // Reconstruct a synthetic result for the remaining display logic.
2934    let result = crate::pull::PullResult {
2935        written: all_written,
2936        rejected: all_rejected,
2937        blocked: all_blocked,
2938        advance_cursor_to: all_advance_cursor_to,
2939    };
2940    let events_len = total_seen;
2941
2942    // Cursor advance happened per-endpoint above; no aggregate cursor
2943    // write needed here.
2944
2945    if as_json {
2946        println!(
2947            "{}",
2948            serde_json::to_string(&json!({
2949                "written": result.written,
2950                "rejected": result.rejected,
2951                "total_seen": events_len,
2952                "cursor_blocked": result.blocked,
2953                "cursor_advanced_to": result.advance_cursor_to,
2954            }))?
2955        );
2956    } else {
2957        let blocking = result
2958            .rejected
2959            .iter()
2960            .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2961            .count();
2962        if blocking > 0 {
2963            println!(
2964                "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2965                events_len,
2966                result.written.len(),
2967                result.rejected.len(),
2968                blocking,
2969            );
2970        } else {
2971            println!(
2972                "pulled {} event(s); wrote {}; rejected {}",
2973                events_len,
2974                result.written.len(),
2975                result.rejected.len(),
2976            );
2977        }
2978    }
2979    Ok(())
2980}
2981
2982/// v0.5.17: cursor key for an endpoint's per-scope read position.
2983/// Federation keeps the v0.5.16 legacy key `last_pulled_event_id` for
2984/// back-compat with on-disk relay_state files; local uses a
2985/// `_local` suffix.
2986fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
2987    match scope {
2988        crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
2989        crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
2990    }
2991}
2992
2993// ---------- rotate-slot ----------
2994
2995fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2996    if !config::is_initialized()? {
2997        bail!("not initialized — run `wire init <handle>` first");
2998    }
2999    let mut state = config::read_relay_state()?;
3000    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3001    if self_state.is_null() {
3002        bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
3003    }
3004    let url = self_state["relay_url"]
3005        .as_str()
3006        .ok_or_else(|| anyhow!("self.relay_url missing"))?
3007        .to_string();
3008    let old_slot_id = self_state["slot_id"]
3009        .as_str()
3010        .ok_or_else(|| anyhow!("self.slot_id missing"))?
3011        .to_string();
3012    let old_slot_token = self_state["slot_token"]
3013        .as_str()
3014        .ok_or_else(|| anyhow!("self.slot_token missing"))?
3015        .to_string();
3016
3017    // Read identity to sign the announcement.
3018    let card = config::read_agent_card()?;
3019    let did = card
3020        .get("did")
3021        .and_then(Value::as_str)
3022        .unwrap_or("")
3023        .to_string();
3024    let handle = crate::agent_card::display_handle_from_did(&did).to_string();
3025    let pk_b64 = card
3026        .get("verify_keys")
3027        .and_then(Value::as_object)
3028        .and_then(|m| m.values().next())
3029        .and_then(|v| v.get("key"))
3030        .and_then(Value::as_str)
3031        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
3032        .to_string();
3033    let pk_bytes = crate::signing::b64decode(&pk_b64)?;
3034    let sk_seed = config::read_private_key()?;
3035
3036    // Allocate new slot on the same relay.
3037    let normalized = url.trim_end_matches('/').to_string();
3038    let client = crate::relay_client::RelayClient::new(&normalized);
3039    client
3040        .check_healthz()
3041        .context("aborting rotation; old slot still valid")?;
3042    let alloc = client.allocate_slot(Some(&handle))?;
3043    let new_slot_id = alloc.slot_id.clone();
3044    let new_slot_token = alloc.slot_token.clone();
3045
3046    // Optionally announce the rotation to every paired peer via the OLD slot.
3047    // Each peer's recipient-side `wire pull` will pick up this event before
3048    // their daemon next polls the new slot — but auto-update of peer's
3049    // relay.json from a wire_close event is a v0.2 daemon feature; for now
3050    // peers see the event and an operator must manually `add-peer-slot` the
3051    // new coords, OR re-pair via SAS.
3052    let mut announced: Vec<String> = Vec::new();
3053    if !no_announce {
3054        let now = time::OffsetDateTime::now_utc()
3055            .format(&time::format_description::well_known::Rfc3339)
3056            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
3057        let body = json!({
3058            "reason": "operator-initiated slot rotation",
3059            "new_relay_url": url,
3060            "new_slot_id": new_slot_id,
3061            // NOTE: new_slot_token deliberately NOT shared in the broadcast.
3062            // In v0.1 slot tokens are bilateral-shared, so peer can post via
3063            // existing add-peer-slot flow if operator chooses to re-issue.
3064        });
3065        let peers = state["peers"].as_object().cloned().unwrap_or_default();
3066        for (peer_handle, _peer_info) in peers.iter() {
3067            let event = json!({
3068                "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3069                "timestamp": now.clone(),
3070                "from": did,
3071                "to": format!("did:wire:{peer_handle}"),
3072                "type": "wire_close",
3073                "kind": 1201,
3074                "body": body.clone(),
3075            });
3076            let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
3077                Ok(s) => s,
3078                Err(e) => {
3079                    eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
3080                    continue;
3081                }
3082            };
3083            // Post to OUR old slot (we're announcing on our own slot, NOT
3084            // peer's slot — peer reads from us). Wait, this is wrong: peers
3085            // read from THEIR OWN slot via wire pull. To reach peer A, we
3086            // post to peer A's slot. Use the existing per-peer slot mapping.
3087            let peer_info = match state["peers"].get(peer_handle) {
3088                Some(p) => p.clone(),
3089                None => continue,
3090            };
3091            let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
3092            let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
3093            let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
3094            if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
3095                continue;
3096            }
3097            let peer_client = if peer_url == url {
3098                client.clone()
3099            } else {
3100                crate::relay_client::RelayClient::new(peer_url)
3101            };
3102            match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
3103                Ok(_) => announced.push(peer_handle.clone()),
3104                Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
3105            }
3106        }
3107    }
3108
3109    // Swap the self-slot to the new one.
3110    state["self"] = json!({
3111        "relay_url": url,
3112        "slot_id": new_slot_id,
3113        "slot_token": new_slot_token,
3114    });
3115    config::write_relay_state(&state)?;
3116
3117    if as_json {
3118        println!(
3119            "{}",
3120            serde_json::to_string(&json!({
3121                "rotated": true,
3122                "old_slot_id": old_slot_id,
3123                "new_slot_id": new_slot_id,
3124                "relay_url": url,
3125                "announced_to": announced,
3126            }))?
3127        );
3128    } else {
3129        println!("rotated slot on {url}");
3130        println!(
3131            "  old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
3132        );
3133        println!("  new slot_id: {new_slot_id}");
3134        if !announced.is_empty() {
3135            println!(
3136                "  announced wire_close (kind=1201) to: {}",
3137                announced.join(", ")
3138            );
3139        }
3140        println!();
3141        println!("next steps:");
3142        println!("  - peers see the wire_close event in their next `wire pull`");
3143        println!(
3144            "  - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
3145        );
3146        println!("    (or full re-pair via `wire pair-host`/`wire join`)");
3147        println!("  - until they do, you'll receive but they won't be able to reach you");
3148        // Suppress unused warning
3149        let _ = old_slot_token;
3150    }
3151    Ok(())
3152}
3153
3154// ---------- forget-peer ----------
3155
3156fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
3157    let mut trust = config::read_trust()?;
3158    let mut removed_from_trust = false;
3159    if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
3160        && agents.remove(handle).is_some()
3161    {
3162        removed_from_trust = true;
3163    }
3164    config::write_trust(&trust)?;
3165
3166    let mut state = config::read_relay_state()?;
3167    let mut removed_from_relay = false;
3168    if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
3169        && peers.remove(handle).is_some()
3170    {
3171        removed_from_relay = true;
3172    }
3173    config::write_relay_state(&state)?;
3174
3175    let mut purged: Vec<String> = Vec::new();
3176    if purge {
3177        for dir in [config::inbox_dir()?, config::outbox_dir()?] {
3178            let path = dir.join(format!("{handle}.jsonl"));
3179            if path.exists() {
3180                std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
3181                purged.push(path.to_string_lossy().into());
3182            }
3183        }
3184    }
3185
3186    if !removed_from_trust && !removed_from_relay {
3187        if as_json {
3188            println!(
3189                "{}",
3190                serde_json::to_string(&json!({
3191                    "removed": false,
3192                    "reason": format!("peer {handle:?} not pinned"),
3193                }))?
3194            );
3195        } else {
3196            eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
3197        }
3198        return Ok(());
3199    }
3200
3201    if as_json {
3202        println!(
3203            "{}",
3204            serde_json::to_string(&json!({
3205                "handle": handle,
3206                "removed_from_trust": removed_from_trust,
3207                "removed_from_relay_state": removed_from_relay,
3208                "purged_files": purged,
3209            }))?
3210        );
3211    } else {
3212        println!("forgot peer {handle:?}");
3213        if removed_from_trust {
3214            println!("  - removed from trust.json");
3215        }
3216        if removed_from_relay {
3217            println!("  - removed from relay.json");
3218        }
3219        if !purged.is_empty() {
3220            for p in &purged {
3221                println!("  - deleted {p}");
3222            }
3223        } else if !purge {
3224            println!("  (inbox/outbox files preserved; pass --purge to delete them)");
3225        }
3226    }
3227    Ok(())
3228}
3229
3230// ---------- daemon (long-lived push+pull sync) ----------
3231
3232fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
3233    if !config::is_initialized()? {
3234        bail!("not initialized — run `wire init <handle>` first");
3235    }
3236    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3237
3238    if !as_json {
3239        if once {
3240            eprintln!("wire daemon: single sync cycle, then exit");
3241        } else {
3242            eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
3243        }
3244    }
3245
3246    // Recover from prior crash: any pending pair in transient state had its
3247    // in-memory SPAKE2 secret lost when the previous daemon exited. Release
3248    // the relay slots and mark the files so the operator can re-issue.
3249    if let Err(e) = crate::pending_pair::cleanup_on_startup() {
3250        eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
3251    }
3252
3253    // R1 phase 2: spawn the SSE stream subscriber. On every event pushed
3254    // to our slot, the subscriber signals `wake_rx`; we use it as the
3255    // sleep-or-wake gate of the polling loop. Polling stays as the
3256    // safety net — stream errors fall back transparently to the existing
3257    // interval-based cadence.
3258    let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
3259    if !once {
3260        crate::daemon_stream::spawn_stream_subscriber(wake_tx);
3261    }
3262
3263    loop {
3264        let pushed = run_sync_push().unwrap_or_else(|e| {
3265            eprintln!("daemon: push error: {e:#}");
3266            json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
3267        });
3268        let pulled = run_sync_pull().unwrap_or_else(|e| {
3269            eprintln!("daemon: pull error: {e:#}");
3270            json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
3271        });
3272        let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
3273            eprintln!("daemon: pending-pair tick error: {e:#}");
3274            json!({"transitions": []})
3275        });
3276
3277        if as_json {
3278            println!(
3279                "{}",
3280                serde_json::to_string(&json!({
3281                    "ts": time::OffsetDateTime::now_utc()
3282                        .format(&time::format_description::well_known::Rfc3339)
3283                        .unwrap_or_default(),
3284                    "push": pushed,
3285                    "pull": pulled,
3286                    "pairs": pairs,
3287                }))?
3288            );
3289        } else {
3290            let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
3291            let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
3292            let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
3293            let pair_transitions = pairs["transitions"]
3294                .as_array()
3295                .map(|a| a.len())
3296                .unwrap_or(0);
3297            if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
3298                eprintln!(
3299                    "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
3300                );
3301            }
3302            // Loud per-transition logging so operator sees pair progress live.
3303            if let Some(arr) = pairs["transitions"].as_array() {
3304                for t in arr {
3305                    eprintln!(
3306                        "  pair {} : {} → {}",
3307                        t.get("code").and_then(Value::as_str).unwrap_or("?"),
3308                        t.get("from").and_then(Value::as_str).unwrap_or("?"),
3309                        t.get("to").and_then(Value::as_str).unwrap_or("?")
3310                    );
3311                    if let Some(sas) = t.get("sas").and_then(Value::as_str)
3312                        && t.get("to").and_then(Value::as_str) == Some("sas_ready")
3313                    {
3314                        eprintln!("    SAS digits: {}-{}", &sas[..3], &sas[3..]);
3315                        eprintln!(
3316                            "    Run: wire pair-confirm {} {}",
3317                            t.get("code").and_then(Value::as_str).unwrap_or("?"),
3318                            sas
3319                        );
3320                    }
3321                }
3322            }
3323        }
3324
3325        if once {
3326            return Ok(());
3327        }
3328        // Wait either for the next poll-interval tick OR for a stream
3329        // wake signal — whichever comes first. Drain any additional
3330        // wake-ups that accumulated during the previous cycle since one
3331        // pull catches up everything.
3332        let _ = wake_rx.recv_timeout(interval);
3333        while wake_rx.try_recv().is_ok() {}
3334    }
3335}
3336
3337/// Programmatic push (no stdout, no exit on errors). Returns the same JSON
3338/// shape `wire push --json` emits.
3339fn run_sync_push() -> Result<Value> {
3340    let state = config::read_relay_state()?;
3341    let peers = state["peers"].as_object().cloned().unwrap_or_default();
3342    if peers.is_empty() {
3343        return Ok(json!({"pushed": [], "skipped": []}));
3344    }
3345    let outbox_dir = config::outbox_dir()?;
3346    if !outbox_dir.exists() {
3347        return Ok(json!({"pushed": [], "skipped": []}));
3348    }
3349    let mut pushed = Vec::new();
3350    let mut skipped = Vec::new();
3351    for (peer_handle, slot_info) in peers.iter() {
3352        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
3353        if !outbox.exists() {
3354            continue;
3355        }
3356        let url = slot_info["relay_url"].as_str().unwrap_or("");
3357        let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
3358        let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
3359        if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
3360            continue;
3361        }
3362        let client = crate::relay_client::RelayClient::new(url);
3363        let body = std::fs::read_to_string(&outbox)?;
3364        for line in body.lines() {
3365            let event: Value = match serde_json::from_str(line) {
3366                Ok(v) => v,
3367                Err(_) => continue,
3368            };
3369            let event_id = event
3370                .get("event_id")
3371                .and_then(Value::as_str)
3372                .unwrap_or("")
3373                .to_string();
3374            match client.post_event(slot_id, slot_token, &event) {
3375                Ok(resp) => {
3376                    if resp.status == "duplicate" {
3377                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
3378                    } else {
3379                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
3380                    }
3381                }
3382                Err(e) => {
3383                    // v0.5.13: flatten the anyhow chain so TLS / DNS / timeout
3384                    // errors aren't hidden behind the topmost-context URL string.
3385                    // Issue #6 highest-impact silent-fail fix.
3386                    let reason = crate::relay_client::format_transport_error(&e);
3387                    skipped.push(
3388                        json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
3389                    );
3390                }
3391            }
3392        }
3393    }
3394    Ok(json!({"pushed": pushed, "skipped": skipped}))
3395}
3396
3397/// Programmatic pull. Same shape as `wire pull --json`.
3398fn run_sync_pull() -> Result<Value> {
3399    let state = config::read_relay_state()?;
3400    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3401    if self_state.is_null() {
3402        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3403    }
3404    let url = self_state["relay_url"].as_str().unwrap_or("");
3405    let slot_id = self_state["slot_id"].as_str().unwrap_or("");
3406    let slot_token = self_state["slot_token"].as_str().unwrap_or("");
3407    let last_event_id = self_state
3408        .get("last_pulled_event_id")
3409        .and_then(Value::as_str)
3410        .map(str::to_string);
3411    if url.is_empty() {
3412        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3413    }
3414    let client = crate::relay_client::RelayClient::new(url);
3415    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3416    let inbox_dir = config::inbox_dir()?;
3417    config::ensure_dirs()?;
3418
3419    // P0.1 (0.5.11): shared cursor-blocking logic. Daemon's --once path
3420    // must match the CLI's `wire pull` semantics or version-skew bugs
3421    // re-emerge by another route.
3422    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3423
3424    // P0.3 (0.5.11): same flock-protected RMW as cmd_pull.
3425    if let Some(eid) = &result.advance_cursor_to {
3426        let eid = eid.clone();
3427        config::update_relay_state(|state| {
3428            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3429                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3430            }
3431            Ok(())
3432        })?;
3433    }
3434
3435    Ok(json!({
3436        "written": result.written,
3437        "rejected": result.rejected,
3438        "total_seen": events.len(),
3439        "cursor_blocked": result.blocked,
3440        "cursor_advanced_to": result.advance_cursor_to,
3441    }))
3442}
3443
3444// ---------- pin (manual out-of-band peer pairing) ----------
3445
3446fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3447    let body =
3448        std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3449    let card: Value =
3450        serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3451    crate::agent_card::verify_agent_card(&card)
3452        .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3453
3454    let mut trust = config::read_trust()?;
3455    crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3456
3457    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3458    let handle = crate::agent_card::display_handle_from_did(did).to_string();
3459    config::write_trust(&trust)?;
3460
3461    if as_json {
3462        println!(
3463            "{}",
3464            serde_json::to_string(&json!({
3465                "handle": handle,
3466                "did": did,
3467                "tier": "VERIFIED",
3468                "pinned": true,
3469            }))?
3470        );
3471    } else {
3472        println!("pinned {handle} ({did}) at tier VERIFIED");
3473    }
3474    Ok(())
3475}
3476
3477// ---------- pair-host / pair-join (the magic-wormhole flow) ----------
3478
3479fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3480    pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3481}
3482
3483fn cmd_pair_join(
3484    code_phrase: &str,
3485    relay_url: &str,
3486    auto_yes: bool,
3487    timeout_secs: u64,
3488) -> Result<()> {
3489    pair_orchestrate(
3490        relay_url,
3491        Some(code_phrase),
3492        "guest",
3493        auto_yes,
3494        timeout_secs,
3495    )
3496}
3497
3498/// Shared orchestration for both sides of the SAS pairing.
3499///
3500/// Now thin: delegates to `pair_session::pair_session_open` / `_try_sas` /
3501/// `_finalize`. CLI keeps its interactive y/N prompt; MCP uses
3502/// `pair_session_confirm_sas` instead.
3503fn pair_orchestrate(
3504    relay_url: &str,
3505    code_in: Option<&str>,
3506    role: &str,
3507    auto_yes: bool,
3508    timeout_secs: u64,
3509) -> Result<()> {
3510    use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3511
3512    let mut s = pair_session_open(role, relay_url, code_in)?;
3513
3514    if role == "host" {
3515        eprintln!();
3516        eprintln!("share this code phrase with your peer:");
3517        eprintln!();
3518        eprintln!("    {}", s.code);
3519        eprintln!();
3520        eprintln!(
3521            "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3522            s.code
3523        );
3524    } else {
3525        eprintln!();
3526        eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3527    }
3528
3529    // Stage 2 — poll for SAS-ready with periodic progress heartbeat. The bare
3530    // pair_session_wait_for_sas helper is silent; the CLI wraps it in a loop
3531    // that emits a "waiting (Ns / Ts)" line every HEARTBEAT_SECS so operators
3532    // see the process is alive while the other side connects.
3533    const HEARTBEAT_SECS: u64 = 10;
3534    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3535    let started = std::time::Instant::now();
3536    let mut last_heartbeat = started;
3537    let formatted = loop {
3538        if let Some(sas) = pair_session_try_sas(&mut s)? {
3539            break sas;
3540        }
3541        let now = std::time::Instant::now();
3542        if now >= deadline {
3543            return Err(anyhow!(
3544                "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3545            ));
3546        }
3547        if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3548            let elapsed = now.duration_since(started).as_secs();
3549            eprintln!("  ... still waiting ({elapsed}s / {timeout_secs}s)");
3550            last_heartbeat = now;
3551        }
3552        std::thread::sleep(std::time::Duration::from_millis(250));
3553    };
3554
3555    eprintln!();
3556    eprintln!("SAS digits (must match peer's terminal):");
3557    eprintln!();
3558    eprintln!("    {formatted}");
3559    eprintln!();
3560
3561    // Stage 3 — operator confirmation. CLI uses interactive y/N for backward
3562    // compatibility; MCP uses pair_session_confirm_sas with the typed digits.
3563    if !auto_yes {
3564        eprint!("does this match your peer's terminal? [y/N]: ");
3565        use std::io::Write;
3566        std::io::stderr().flush().ok();
3567        let mut input = String::new();
3568        std::io::stdin().read_line(&mut input)?;
3569        let trimmed = input.trim().to_lowercase();
3570        if trimmed != "y" && trimmed != "yes" {
3571            bail!("SAS confirmation declined — aborting pairing");
3572        }
3573    }
3574    s.sas_confirmed = true;
3575
3576    // Stage 4 — seal+exchange bootstrap, pin peer.
3577    let result = pair_session_finalize(&mut s, timeout_secs)?;
3578
3579    let peer_did = result["paired_with"].as_str().unwrap_or("");
3580    let peer_role = if role == "host" { "guest" } else { "host" };
3581    eprintln!("paired with {peer_did} (peer role: {peer_role})");
3582    eprintln!("peer card pinned at tier VERIFIED");
3583    eprintln!(
3584        "peer relay slot saved to {}",
3585        config::relay_state_path()?.display()
3586    );
3587
3588    println!("{}", serde_json::to_string(&result)?);
3589    Ok(())
3590}
3591
3592// (poll_until helper removed — pair flow now uses pair_session::pair_session_wait_for_sas
3593// and pair_session_finalize, both of which inline their own deadline loops.)
3594
3595// ---------- pair — single-shot init + pair-* + setup ----------
3596
3597fn cmd_pair(
3598    handle: &str,
3599    code: Option<&str>,
3600    relay: &str,
3601    auto_yes: bool,
3602    timeout_secs: u64,
3603    no_setup: bool,
3604) -> Result<()> {
3605    // Step 1 — idempotent identity. Safe if already initialized with the SAME handle;
3606    // bails loudly if a different handle is already set (operator must explicitly delete).
3607    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3608    let did = init_result
3609        .get("did")
3610        .and_then(|v| v.as_str())
3611        .unwrap_or("(unknown)")
3612        .to_string();
3613    let already = init_result
3614        .get("already_initialized")
3615        .and_then(|v| v.as_bool())
3616        .unwrap_or(false);
3617    if already {
3618        println!("(identity {did} already initialized — reusing)");
3619    } else {
3620        println!("initialized {did}");
3621    }
3622    println!();
3623
3624    // Step 2 — pair-host or pair-join based on code presence.
3625    match code {
3626        None => {
3627            println!("hosting pair on {relay} (no code = host) ...");
3628            cmd_pair_host(relay, auto_yes, timeout_secs)?;
3629        }
3630        Some(c) => {
3631            println!("joining pair with code {c} on {relay} ...");
3632            cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3633        }
3634    }
3635
3636    // Step 3 — register wire as MCP server in detected client configs (idempotent).
3637    if !no_setup {
3638        println!();
3639        println!("registering wire as MCP server in detected client configs ...");
3640        if let Err(e) = cmd_setup(true) {
3641            // Non-fatal — pair succeeded, just print the warning.
3642            eprintln!("warn: setup --apply failed: {e}");
3643            eprintln!("      pair succeeded; you can re-run `wire setup --apply` manually.");
3644        }
3645    }
3646
3647    println!();
3648    println!("pair complete. Next steps:");
3649    println!("  wire daemon start              # background sync of inbox/outbox vs relay");
3650    println!("  wire send <peer> claim <msg>   # send your peer something");
3651    println!("  wire tail                      # watch incoming events");
3652    Ok(())
3653}
3654
3655// ---------- detached pair (daemon-orchestrated) ----------
3656
3657/// `wire pair <handle> [--code <phrase>] --detach` — wraps init + detach
3658/// pair-host/-join into a single command. The non-detached variant lives in
3659/// `cmd_pair`; this one short-circuits to the daemon-orchestrated path.
3660fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3661    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3662    let did = init_result
3663        .get("did")
3664        .and_then(|v| v.as_str())
3665        .unwrap_or("(unknown)")
3666        .to_string();
3667    let already = init_result
3668        .get("already_initialized")
3669        .and_then(|v| v.as_bool())
3670        .unwrap_or(false);
3671    if already {
3672        println!("(identity {did} already initialized — reusing)");
3673    } else {
3674        println!("initialized {did}");
3675    }
3676    println!();
3677    match code {
3678        None => cmd_pair_host_detach(relay, false),
3679        Some(c) => cmd_pair_join_detach(c, relay, false),
3680    }
3681}
3682
3683fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3684    if !config::is_initialized()? {
3685        bail!("not initialized — run `wire init <handle>` first");
3686    }
3687    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3688        Ok(b) => b,
3689        Err(e) => {
3690            if !as_json {
3691                eprintln!(
3692                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3693                );
3694            }
3695            false
3696        }
3697    };
3698    let code = crate::sas::generate_code_phrase();
3699    let code_hash = crate::pair_session::derive_code_hash(&code);
3700    let now = time::OffsetDateTime::now_utc()
3701        .format(&time::format_description::well_known::Rfc3339)
3702        .unwrap_or_default();
3703    let p = crate::pending_pair::PendingPair {
3704        code: code.clone(),
3705        code_hash,
3706        role: "host".to_string(),
3707        relay_url: relay_url.to_string(),
3708        status: "request_host".to_string(),
3709        sas: None,
3710        peer_did: None,
3711        created_at: now,
3712        last_error: None,
3713        pair_id: None,
3714        our_slot_id: None,
3715        our_slot_token: None,
3716        spake2_seed_b64: None,
3717    };
3718    crate::pending_pair::write_pending(&p)?;
3719    if as_json {
3720        println!(
3721            "{}",
3722            serde_json::to_string(&json!({
3723                "state": "queued",
3724                "code_phrase": code,
3725                "relay_url": relay_url,
3726                "role": "host",
3727                "daemon_spawned": daemon_spawned,
3728            }))?
3729        );
3730    } else {
3731        if daemon_spawned {
3732            println!("(started wire daemon in background)");
3733        }
3734        println!("detached pair-host queued. Share this code with your peer:\n");
3735        println!("    {code}\n");
3736        println!("Next steps:");
3737        println!("  wire pair-list                                # check status");
3738        println!("  wire pair-confirm {code} <digits>   # when SAS shows up");
3739        println!("  wire pair-cancel  {code}            # to abort");
3740    }
3741    Ok(())
3742}
3743
3744fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3745    if !config::is_initialized()? {
3746        bail!("not initialized — run `wire init <handle>` first");
3747    }
3748    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3749        Ok(b) => b,
3750        Err(e) => {
3751            if !as_json {
3752                eprintln!(
3753                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3754                );
3755            }
3756            false
3757        }
3758    };
3759    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3760    let code_hash = crate::pair_session::derive_code_hash(&code);
3761    let now = time::OffsetDateTime::now_utc()
3762        .format(&time::format_description::well_known::Rfc3339)
3763        .unwrap_or_default();
3764    let p = crate::pending_pair::PendingPair {
3765        code: code.clone(),
3766        code_hash,
3767        role: "guest".to_string(),
3768        relay_url: relay_url.to_string(),
3769        status: "request_guest".to_string(),
3770        sas: None,
3771        peer_did: None,
3772        created_at: now,
3773        last_error: None,
3774        pair_id: None,
3775        our_slot_id: None,
3776        our_slot_token: None,
3777        spake2_seed_b64: None,
3778    };
3779    crate::pending_pair::write_pending(&p)?;
3780    if as_json {
3781        println!(
3782            "{}",
3783            serde_json::to_string(&json!({
3784                "state": "queued",
3785                "code_phrase": code,
3786                "relay_url": relay_url,
3787                "role": "guest",
3788                "daemon_spawned": daemon_spawned,
3789            }))?
3790        );
3791    } else {
3792        if daemon_spawned {
3793            println!("(started wire daemon in background)");
3794        }
3795        println!("detached pair-join queued for code {code}.");
3796        println!(
3797            "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3798        );
3799    }
3800    Ok(())
3801}
3802
3803fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3804    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3805    let typed: String = typed_digits
3806        .chars()
3807        .filter(|c| c.is_ascii_digit())
3808        .collect();
3809    if typed.len() != 6 {
3810        bail!(
3811            "expected 6 digits (got {} after stripping non-digits)",
3812            typed.len()
3813        );
3814    }
3815    let mut p = crate::pending_pair::read_pending(&code)?
3816        .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3817    if p.status != "sas_ready" {
3818        bail!(
3819            "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3820            p.status
3821        );
3822    }
3823    let stored = p
3824        .sas
3825        .as_ref()
3826        .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3827        .clone();
3828    if stored == typed {
3829        p.status = "confirmed".to_string();
3830        crate::pending_pair::write_pending(&p)?;
3831        if as_json {
3832            println!(
3833                "{}",
3834                serde_json::to_string(&json!({
3835                    "state": "confirmed",
3836                    "code_phrase": code,
3837                }))?
3838            );
3839        } else {
3840            println!("digits match. Daemon will finalize the handshake on its next tick.");
3841            println!("Run `wire peers` after a few seconds to confirm.");
3842        }
3843    } else {
3844        p.status = "aborted".to_string();
3845        p.last_error = Some(format!(
3846            "SAS digit mismatch (typed {typed}, expected {stored})"
3847        ));
3848        let client = crate::relay_client::RelayClient::new(&p.relay_url);
3849        let _ = client.pair_abandon(&p.code_hash);
3850        crate::pending_pair::write_pending(&p)?;
3851        crate::os_notify::toast(
3852            &format!("wire — pair aborted ({})", p.code),
3853            p.last_error.as_deref().unwrap_or("digits mismatch"),
3854        );
3855        if as_json {
3856            println!(
3857                "{}",
3858                serde_json::to_string(&json!({
3859                    "state": "aborted",
3860                    "code_phrase": code,
3861                    "error": "digits mismatch",
3862                }))?
3863            );
3864        }
3865        bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3866    }
3867    Ok(())
3868}
3869
3870fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3871    if watch {
3872        return cmd_pair_list_watch(watch_interval_secs);
3873    }
3874    let spake2_items = crate::pending_pair::list_pending()?;
3875    let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
3876    if as_json {
3877        // Backwards-compat: flat SPAKE2 array (the shape every existing
3878        // script + e2e test parses since v0.5.x). v0.5.14 inbound items
3879        // surface programmatically via `wire pair-list-inbound --json`
3880        // and via `wire status --json` `pending_pairs.inbound_*` fields.
3881        println!("{}", serde_json::to_string(&spake2_items)?);
3882        return Ok(());
3883    }
3884    if spake2_items.is_empty() && inbound_items.is_empty() {
3885        println!("no pending pair sessions.");
3886        return Ok(());
3887    }
3888    // v0.5.14: inbound section first — these need operator action right now.
3889    // SPAKE2 sessions are typically already mid-flow.
3890    if !inbound_items.is_empty() {
3891        println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
3892        println!(
3893            "{:<20} {:<35} {:<25} NEXT STEP",
3894            "PEER", "RELAY", "RECEIVED"
3895        );
3896        for p in &inbound_items {
3897            println!(
3898                "{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
3899                p.peer_handle,
3900                p.peer_relay_url,
3901                p.received_at,
3902                peer = p.peer_handle,
3903            );
3904        }
3905        println!();
3906    }
3907    if !spake2_items.is_empty() {
3908        println!("SPAKE2 SESSIONS");
3909        println!(
3910            "{:<15} {:<8} {:<18} {:<10} NOTE",
3911            "CODE", "ROLE", "STATUS", "SAS"
3912        );
3913        for p in spake2_items {
3914            let sas = p
3915                .sas
3916                .as_ref()
3917                .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3918                .unwrap_or_else(|| "—".to_string());
3919            let note = p
3920                .last_error
3921                .as_deref()
3922                .or(p.peer_did.as_deref())
3923                .unwrap_or("");
3924            println!(
3925                "{:<15} {:<8} {:<18} {:<10} {}",
3926                p.code, p.role, p.status, sas, note
3927            );
3928        }
3929    }
3930    Ok(())
3931}
3932
3933/// Stream-mode pair-list: never exits. Diffs per-code state every
3934/// `interval_secs` and prints one JSON line per transition (creation,
3935/// status flip, deletion). Useful for shell pipelines:
3936///
3937/// ```text
3938/// wire pair-list --watch | while read line; do
3939///     CODE=$(echo "$line" | jq -r .code)
3940///     STATUS=$(echo "$line" | jq -r .status)
3941///     ...
3942/// done
3943/// ```
3944fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3945    use std::collections::HashMap;
3946    use std::io::Write;
3947    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3948    // Emit a snapshot synthetic event for every currently-pending pair on
3949    // startup so a consumer that arrives mid-flight sees the current state.
3950    let mut prev: HashMap<String, String> = HashMap::new();
3951    {
3952        let items = crate::pending_pair::list_pending()?;
3953        for p in &items {
3954            println!("{}", serde_json::to_string(&p)?);
3955            prev.insert(p.code.clone(), p.status.clone());
3956        }
3957        // Flush so the consumer's `while read` gets the snapshot promptly.
3958        let _ = std::io::stdout().flush();
3959    }
3960    loop {
3961        std::thread::sleep(interval);
3962        let items = match crate::pending_pair::list_pending() {
3963            Ok(v) => v,
3964            Err(_) => continue,
3965        };
3966        let mut cur: HashMap<String, String> = HashMap::new();
3967        for p in &items {
3968            cur.insert(p.code.clone(), p.status.clone());
3969            match prev.get(&p.code) {
3970                None => {
3971                    // New code appeared.
3972                    println!("{}", serde_json::to_string(&p)?);
3973                }
3974                Some(prev_status) if prev_status != &p.status => {
3975                    // Status flipped.
3976                    println!("{}", serde_json::to_string(&p)?);
3977                }
3978                _ => {}
3979            }
3980        }
3981        for code in prev.keys() {
3982            if !cur.contains_key(code) {
3983                // File disappeared → finalized or cancelled. Emit a synthetic
3984                // "removed" marker so the consumer sees the terminal event.
3985                println!(
3986                    "{}",
3987                    serde_json::to_string(&json!({
3988                        "code": code,
3989                        "status": "removed",
3990                        "_synthetic": true,
3991                    }))?
3992                );
3993            }
3994        }
3995        let _ = std::io::stdout().flush();
3996        prev = cur;
3997    }
3998}
3999
4000/// Block until a pending pair reaches `target_status` or terminates. Process
4001/// exit code carries the outcome (0 success, 1 terminated abnormally, 2
4002/// timeout) so shell scripts can branch directly.
4003fn cmd_pair_watch(
4004    code_phrase: &str,
4005    target_status: &str,
4006    timeout_secs: u64,
4007    as_json: bool,
4008) -> Result<()> {
4009    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4010    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
4011    let mut last_seen_status: Option<String> = None;
4012    loop {
4013        let p_opt = crate::pending_pair::read_pending(&code)?;
4014        let now = std::time::Instant::now();
4015        match p_opt {
4016            None => {
4017                // File gone — either finalized (success if target=sas_ready
4018                // since finalization implies it passed sas_ready) or never
4019                // existed. Distinguish by whether we ever saw it.
4020                if last_seen_status.is_some() {
4021                    if as_json {
4022                        println!(
4023                            "{}",
4024                            serde_json::to_string(&json!({"state": "finalized", "code": code}))?
4025                        );
4026                    } else {
4027                        println!("pair {code} finalized (file removed)");
4028                    }
4029                    return Ok(());
4030                } else {
4031                    if as_json {
4032                        println!(
4033                            "{}",
4034                            serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
4035                        );
4036                    }
4037                    std::process::exit(1);
4038                }
4039            }
4040            Some(p) => {
4041                let cur = p.status.clone();
4042                if Some(cur.clone()) != last_seen_status {
4043                    if as_json {
4044                        // Emit per-transition line so scripts can stream.
4045                        println!("{}", serde_json::to_string(&p)?);
4046                    }
4047                    last_seen_status = Some(cur.clone());
4048                }
4049                if cur == target_status {
4050                    if !as_json {
4051                        let sas_str = p
4052                            .sas
4053                            .as_ref()
4054                            .map(|s| format!("{}-{}", &s[..3], &s[3..]))
4055                            .unwrap_or_else(|| "—".to_string());
4056                        println!("pair {code} reached {target_status} (SAS: {sas_str})");
4057                    }
4058                    return Ok(());
4059                }
4060                if cur == "aborted" || cur == "aborted_restart" {
4061                    if !as_json {
4062                        let err = p.last_error.as_deref().unwrap_or("(no detail)");
4063                        eprintln!("pair {code} {cur}: {err}");
4064                    }
4065                    std::process::exit(1);
4066                }
4067            }
4068        }
4069        if now >= deadline {
4070            if !as_json {
4071                eprintln!(
4072                    "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
4073                );
4074            }
4075            std::process::exit(2);
4076        }
4077        std::thread::sleep(std::time::Duration::from_millis(250));
4078    }
4079}
4080
4081fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
4082    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
4083    let p = crate::pending_pair::read_pending(&code)?
4084        .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
4085    let client = crate::relay_client::RelayClient::new(&p.relay_url);
4086    let _ = client.pair_abandon(&p.code_hash);
4087    crate::pending_pair::delete_pending(&code)?;
4088    if as_json {
4089        println!(
4090            "{}",
4091            serde_json::to_string(&json!({
4092                "state": "cancelled",
4093                "code_phrase": code,
4094            }))?
4095        );
4096    } else {
4097        println!("cancelled pending pair {code} (relay slot released, file removed).");
4098    }
4099    Ok(())
4100}
4101
4102// ---------- pair-abandon — release stuck pair-slot ----------
4103
4104fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
4105    // Accept either the raw phrase (e.g. "53-CKWIA5") or whatever the user
4106    // typed — normalize via the existing parser.
4107    let code = crate::sas::parse_code_phrase(code_phrase)?;
4108    let code_hash = crate::pair_session::derive_code_hash(code);
4109    let client = crate::relay_client::RelayClient::new(relay_url);
4110    client.pair_abandon(&code_hash)?;
4111    println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
4112    println!("host can now issue a fresh code; guest can re-join.");
4113    Ok(())
4114}
4115
4116// ---------- invite / accept — one-paste pair (v0.4.0) ----------
4117
4118fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
4119    let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
4120
4121    // If --share, register the invite at the relay's short-URL endpoint and
4122    // build the one-curl onboarding line for the peer to paste.
4123    let share_payload: Option<Value> = if share {
4124        let client = reqwest::blocking::Client::new();
4125        let single_use = if uses == 1 { Some(1u32) } else { None };
4126        let body = json!({
4127            "invite_url": url,
4128            "ttl_seconds": ttl,
4129            "uses": single_use,
4130        });
4131        let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
4132        let resp = client.post(&endpoint).json(&body).send()?;
4133        if !resp.status().is_success() {
4134            let code = resp.status();
4135            let txt = resp.text().unwrap_or_default();
4136            bail!("relay {code} on /v1/invite/register: {txt}");
4137        }
4138        let parsed: Value = resp.json()?;
4139        let token = parsed
4140            .get("token")
4141            .and_then(Value::as_str)
4142            .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
4143            .to_string();
4144        let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
4145        let curl_line = format!("curl -fsSL {share_url} | sh");
4146        Some(json!({
4147            "token": token,
4148            "share_url": share_url,
4149            "curl": curl_line,
4150            "expires_unix": parsed.get("expires_unix"),
4151        }))
4152    } else {
4153        None
4154    };
4155
4156    if as_json {
4157        let mut out = json!({
4158            "invite_url": url,
4159            "ttl_secs": ttl,
4160            "uses": uses,
4161            "relay": relay,
4162        });
4163        if let Some(s) = &share_payload {
4164            out["share"] = s.clone();
4165        }
4166        println!("{}", serde_json::to_string(&out)?);
4167    } else if let Some(s) = share_payload {
4168        let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
4169        eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
4170        eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
4171        println!("{curl}");
4172    } else {
4173        eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
4174        eprintln!("# TTL: {ttl}s. Uses: {uses}.");
4175        println!("{url}");
4176    }
4177    Ok(())
4178}
4179
4180fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
4181    // If the user pasted an HTTP(S) short URL (e.g. https://wireup.net/i/AB12),
4182    // resolve it to the underlying wire://pair?... URL via ?format=url before
4183    // accepting. Saves them from having to know which URL shape goes where.
4184    let resolved = if url.starts_with("http://") || url.starts_with("https://") {
4185        let sep = if url.contains('?') { '&' } else { '?' };
4186        let resolve_url = format!("{url}{sep}format=url");
4187        let client = reqwest::blocking::Client::new();
4188        let resp = client
4189            .get(&resolve_url)
4190            .send()
4191            .with_context(|| format!("GET {resolve_url}"))?;
4192        if !resp.status().is_success() {
4193            bail!("could not resolve short URL {url} (HTTP {})", resp.status());
4194        }
4195        let body = resp.text().unwrap_or_default().trim().to_string();
4196        if !body.starts_with("wire://pair?") {
4197            bail!(
4198                "short URL {url} did not resolve to a wire:// invite. \
4199                 (got: {}{})",
4200                body.chars().take(80).collect::<String>(),
4201                if body.chars().count() > 80 { "…" } else { "" }
4202            );
4203        }
4204        body
4205    } else {
4206        url.to_string()
4207    };
4208
4209    let result = crate::pair_invite::accept_invite(&resolved)?;
4210    if as_json {
4211        println!("{}", serde_json::to_string(&result)?);
4212    } else {
4213        let did = result
4214            .get("paired_with")
4215            .and_then(Value::as_str)
4216            .unwrap_or("?");
4217        println!("paired with {did}");
4218        println!(
4219            "you can now: wire send {} <kind> <body>",
4220            crate::agent_card::display_handle_from_did(did)
4221        );
4222    }
4223    Ok(())
4224}
4225
4226// ---------- whois / profile (v0.5) ----------
4227
4228fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
4229    if let Some(h) = handle {
4230        let parsed = crate::pair_profile::parse_handle(h)?;
4231        // Special-case: if the supplied handle matches our own, skip the
4232        // network round-trip and print local.
4233        if config::is_initialized()? {
4234            let card = config::read_agent_card()?;
4235            let local_handle = card
4236                .get("profile")
4237                .and_then(|p| p.get("handle"))
4238                .and_then(Value::as_str)
4239                .map(str::to_string);
4240            if local_handle.as_deref() == Some(h) {
4241                return cmd_whois(None, as_json, None);
4242            }
4243        }
4244        // Remote resolution via .well-known/wire/agent on the handle's domain.
4245        let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4246        if as_json {
4247            println!("{}", serde_json::to_string(&resolved)?);
4248        } else {
4249            print_resolved_profile(&resolved);
4250        }
4251        return Ok(());
4252    }
4253    let card = config::read_agent_card()?;
4254    if as_json {
4255        let profile = card.get("profile").cloned().unwrap_or(Value::Null);
4256        println!(
4257            "{}",
4258            serde_json::to_string(&json!({
4259                "did": card.get("did").cloned().unwrap_or(Value::Null),
4260                "profile": profile,
4261            }))?
4262        );
4263    } else {
4264        print!("{}", crate::pair_profile::render_self_summary()?);
4265    }
4266    Ok(())
4267}
4268
4269fn print_resolved_profile(resolved: &Value) {
4270    let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
4271    let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
4272    let relay = resolved
4273        .get("relay_url")
4274        .and_then(Value::as_str)
4275        .unwrap_or("");
4276    let slot = resolved
4277        .get("slot_id")
4278        .and_then(Value::as_str)
4279        .unwrap_or("");
4280    let profile = resolved
4281        .get("card")
4282        .and_then(|c| c.get("profile"))
4283        .cloned()
4284        .unwrap_or(Value::Null);
4285    println!("{did}");
4286    println!("  nick:         {nick}");
4287    if !relay.is_empty() {
4288        println!("  relay_url:    {relay}");
4289    }
4290    if !slot.is_empty() {
4291        println!("  slot_id:      {slot}");
4292    }
4293    let pick =
4294        |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
4295    if let Some(s) = pick("display_name") {
4296        println!("  display_name: {s}");
4297    }
4298    if let Some(s) = pick("emoji") {
4299        println!("  emoji:        {s}");
4300    }
4301    if let Some(s) = pick("motto") {
4302        println!("  motto:        {s}");
4303    }
4304    if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
4305        let joined: Vec<String> = arr
4306            .iter()
4307            .filter_map(|v| v.as_str().map(str::to_string))
4308            .collect();
4309        println!("  vibe:         {}", joined.join(", "));
4310    }
4311    if let Some(s) = pick("pronouns") {
4312        println!("  pronouns:     {s}");
4313    }
4314}
4315
4316/// `wire add <nick@domain>` — zero-paste pair. Resolve handle, build a
4317/// signed pair_drop event with our card + slot coords, deliver via the
4318/// peer relay's `/v1/handle/intro/<nick>` endpoint (no slot_token needed).
4319/// Peer's daemon completes the bilateral pin on its next pull and emits a
4320/// pair_drop_ack carrying their slot_token so we can send back.
4321/// Extract just the host portion from `https://host:port/path` → `host`.
4322/// Returns empty string if the URL is malformed.
4323fn host_of_url(url: &str) -> String {
4324    let no_scheme = url
4325        .trim_start_matches("https://")
4326        .trim_start_matches("http://");
4327    no_scheme
4328        .split('/')
4329        .next()
4330        .unwrap_or("")
4331        .split(':')
4332        .next()
4333        .unwrap_or("")
4334        .to_string()
4335}
4336
4337/// v0.5.19 (#9.4): is this relay domain on the known-good list, or the
4338/// operator's own relay? Used to suppress the cross-relay phishing
4339/// warning in `wire add` for the happy path.
4340fn is_known_relay_domain(peer_domain: &str, our_relay_url: &str) -> bool {
4341    // Hard-coded known-good list. wireup.net is the default relay.
4342    const KNOWN_GOOD: &[&str] = &["wireup.net", "wire.laulpogan.com"];
4343    let peer_domain = peer_domain.trim().to_ascii_lowercase();
4344    if KNOWN_GOOD.iter().any(|k| *k == peer_domain) {
4345        return true;
4346    }
4347    // Operator's OWN relay is implicitly trusted — they're already
4348    // bound to it; pairing same-relay peers is the common case.
4349    let our_host = host_of_url(our_relay_url).to_ascii_lowercase();
4350    if !our_host.is_empty() && our_host == peer_domain {
4351        return true;
4352    }
4353    false
4354}
4355
4356fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
4357    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4358
4359    // 1. Auto-init self if needed + ensure a relay slot.
4360    let (our_did, our_relay, our_slot_id, our_slot_token) =
4361        crate::pair_invite::ensure_self_with_relay(relay_override)?;
4362    if our_did == format!("did:wire:{}", parsed.nick) {
4363        // Lazy guard — actual self-add would also be caught by FCFS later.
4364        bail!("refusing to add self (handle matches own DID)");
4365    }
4366
4367    // v0.5.14 bilateral-completion path: if a pair_drop from this peer is
4368    // already sitting in pending-inbound, the operator is now accepting it.
4369    // Pin trust, save relay coords + slot_token from the stored drop, ship
4370    // our own slot_token back via pair_drop_ack, delete the pending record.
4371    //
4372    // This branch is the OTHER half of the v0.5.14 fix to maybe_consume_pair_drop:
4373    // receiver-side auto-promote was removed there; operator consent flows
4374    // through here. After this branch returns, both sides are bilaterally
4375    // pinned and capability flows in both directions.
4376    if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
4377        return cmd_add_accept_pending(
4378            handle_arg,
4379            &parsed.nick,
4380            &pending,
4381            &our_relay,
4382            &our_slot_id,
4383            &our_slot_token,
4384            as_json,
4385        );
4386    }
4387
4388    // v0.5.19 (#9.4): cross-relay phishing guardrail.
4389    //
4390    // Threat: operator wants to add `boss@wireup.net` but types
4391    // `boss@evil-relay.example` (typo, malicious link, look-alike domain).
4392    // The .well-known resolution returns whoever claimed the nick on the
4393    // *typo* relay, the bilateral gate still completes (the attacker
4394    // accepts the pair on their side), and the operator pins the
4395    // attacker as "boss". v0.5.14 bilateral gate doesn't catch this —
4396    // there's no asymmetry to detect when the attacker WANTS to be
4397    // paired.
4398    //
4399    // Mitigation: warn loudly when the peer's relay domain is novel
4400    // (not the operator's own relay, not in a small known-good set).
4401    // Doesn't block — operators have legitimate reasons to pair across
4402    // relays. The signal lands in shell history so a phished operator
4403    // can find it in retrospect.
4404    if !is_known_relay_domain(&parsed.domain, &our_relay) {
4405        eprintln!(
4406            "wire add: WARN unfamiliar relay domain `{}`.",
4407            parsed.domain
4408        );
4409        eprintln!(
4410            "  This is NOT `wireup.net` (the default), NOT your own relay (`{}`), "
4411            ,
4412            host_of_url(&our_relay)
4413        );
4414        eprintln!(
4415            "  and not on the known-good list. If you meant `{}@wireup.net`, "
4416            ,
4417            parsed.nick
4418        );
4419        eprintln!(
4420            "  run `wire add {}@wireup.net` instead. Otherwise verify with your",
4421            parsed.nick
4422        );
4423        eprintln!("  peer out-of-band that they actually run a relay at this domain");
4424        eprintln!("  before relying on the pair. (See issue #9.4.)");
4425    }
4426
4427    // 2. Resolve peer via .well-known on their relay.
4428    let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4429    let peer_card = resolved
4430        .get("card")
4431        .cloned()
4432        .ok_or_else(|| anyhow!("resolved missing card"))?;
4433    let peer_did = resolved
4434        .get("did")
4435        .and_then(Value::as_str)
4436        .ok_or_else(|| anyhow!("resolved missing did"))?
4437        .to_string();
4438    let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
4439    let peer_slot_id = resolved
4440        .get("slot_id")
4441        .and_then(Value::as_str)
4442        .ok_or_else(|| anyhow!("resolved missing slot_id"))?
4443        .to_string();
4444    let peer_relay = resolved
4445        .get("relay_url")
4446        .and_then(Value::as_str)
4447        .map(str::to_string)
4448        .or_else(|| relay_override.map(str::to_string))
4449        .unwrap_or_else(|| format!("https://{}", parsed.domain));
4450
4451    // 3. Pin peer in trust + relay-state. slot_token will arrive via ack.
4452    let mut trust = config::read_trust()?;
4453    crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
4454    config::write_trust(&trust)?;
4455    let mut relay_state = config::read_relay_state()?;
4456    let existing_token = relay_state
4457        .get("peers")
4458        .and_then(|p| p.get(&peer_handle))
4459        .and_then(|p| p.get("slot_token"))
4460        .and_then(Value::as_str)
4461        .map(str::to_string)
4462        .unwrap_or_default();
4463    relay_state["peers"][&peer_handle] = json!({
4464        "relay_url": peer_relay,
4465        "slot_id": peer_slot_id,
4466        "slot_token": existing_token, // empty until pair_drop_ack lands
4467    });
4468    config::write_relay_state(&relay_state)?;
4469
4470    // 4. Build signed pair_drop with our card + coords (no pair_nonce — this
4471    // is the v0.5 zero-paste open-mode path).
4472    let our_card = config::read_agent_card()?;
4473    let sk_seed = config::read_private_key()?;
4474    let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4475    let pk_b64 = our_card
4476        .get("verify_keys")
4477        .and_then(Value::as_object)
4478        .and_then(|m| m.values().next())
4479        .and_then(|v| v.get("key"))
4480        .and_then(Value::as_str)
4481        .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4482    let pk_bytes = crate::signing::b64decode(pk_b64)?;
4483    let now = time::OffsetDateTime::now_utc()
4484        .format(&time::format_description::well_known::Rfc3339)
4485        .unwrap_or_default();
4486    // v0.5.17: advertise all our endpoints (federation + optional local)
4487    // to the peer in the pair_drop body. Back-compat: top-level
4488    // relay_url/slot_id/slot_token still point at the federation
4489    // endpoint so v0.5.16-and-earlier peers ingest unchanged.
4490    let our_relay_state = config::read_relay_state().unwrap_or_else(|_| json!({}));
4491    let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4492    let mut body = json!({
4493        "card": our_card,
4494        "relay_url": our_relay,
4495        "slot_id": our_slot_id,
4496        "slot_token": our_slot_token,
4497    });
4498    if !our_endpoints.is_empty() {
4499        body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4500    }
4501    let event = json!({
4502        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4503        "timestamp": now,
4504        "from": our_did,
4505        "to": peer_did,
4506        "type": "pair_drop",
4507        "kind": 1100u32,
4508        "body": body,
4509    });
4510    let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4511
4512    // 5. Deliver via /v1/handle/intro/<nick> (auth-free; relay validates kind).
4513    let client = crate::relay_client::RelayClient::new(&peer_relay);
4514    let resp = client.handle_intro(&parsed.nick, &signed)?;
4515    let event_id = signed
4516        .get("event_id")
4517        .and_then(Value::as_str)
4518        .unwrap_or("")
4519        .to_string();
4520
4521    if as_json {
4522        println!(
4523            "{}",
4524            serde_json::to_string(&json!({
4525                "handle": handle_arg,
4526                "paired_with": peer_did,
4527                "peer_handle": peer_handle,
4528                "event_id": event_id,
4529                "drop_response": resp,
4530                "status": "drop_sent",
4531            }))?
4532        );
4533    } else {
4534        println!(
4535            "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
4536        );
4537    }
4538    Ok(())
4539}
4540
4541/// v0.5.14 bilateral-completion path for `wire add`. Called when the peer's
4542/// pair_drop is already sitting in `pending-inbound`. Pin trust, write relay
4543/// coords + slot_token from the stored drop, ship our slot_token back via
4544/// `pair_drop_ack`, delete the pending record. Symmetric with the SPAKE2
4545/// invite-URL path (which is already bilateral by virtue of the pre-shared
4546/// nonce).
4547fn cmd_add_accept_pending(
4548    handle_arg: &str,
4549    peer_nick: &str,
4550    pending: &crate::pending_inbound_pair::PendingInboundPair,
4551    _our_relay: &str,
4552    _our_slot_id: &str,
4553    _our_slot_token: &str,
4554    as_json: bool,
4555) -> Result<()> {
4556    // 1. Pin peer in trust with VERIFIED — operator gestured consent by running
4557    //    `wire add` against this handle while a drop was waiting.
4558    let mut trust = config::read_trust()?;
4559    crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
4560    config::write_trust(&trust)?;
4561
4562    // 2. Record peer's relay coords + slot_token (already shipped to us in
4563    //    the original drop body; held back until now).
4564    // v0.5.17: pin all advertised endpoints (federation + optional local).
4565    // Falls back to a single federation entry when the record was written
4566    // by v0.5.16-era code that didn't carry endpoints[].
4567    let mut relay_state = config::read_relay_state()?;
4568    let endpoints_to_pin = if pending.peer_endpoints.is_empty() {
4569        vec![crate::endpoints::Endpoint::federation(
4570            pending.peer_relay_url.clone(),
4571            pending.peer_slot_id.clone(),
4572            pending.peer_slot_token.clone(),
4573        )]
4574    } else {
4575        pending.peer_endpoints.clone()
4576    };
4577    crate::endpoints::pin_peer_endpoints(
4578        &mut relay_state,
4579        &pending.peer_handle,
4580        &endpoints_to_pin,
4581    )?;
4582    config::write_relay_state(&relay_state)?;
4583
4584    // 3. Ship our slot_token to peer via pair_drop_ack so they can write back.
4585    crate::pair_invite::send_pair_drop_ack(
4586        &pending.peer_handle,
4587        &pending.peer_relay_url,
4588        &pending.peer_slot_id,
4589        &pending.peer_slot_token,
4590    )
4591    .with_context(|| {
4592        format!(
4593            "pair_drop_ack send to {} @ {} slot {} failed",
4594            pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
4595        )
4596    })?;
4597
4598    // 4. Delete the pending-inbound record now that bilateral is complete.
4599    crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
4600
4601    if as_json {
4602        println!(
4603            "{}",
4604            serde_json::to_string(&json!({
4605                "handle": handle_arg,
4606                "paired_with": pending.peer_did,
4607                "peer_handle": pending.peer_handle,
4608                "status": "bilateral_accepted",
4609                "via": "pending_inbound",
4610            }))?
4611        );
4612    } else {
4613        println!(
4614            "→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
4615            peer = pending.peer_handle,
4616        );
4617    }
4618    Ok(())
4619}
4620
4621/// v0.5.14: explicit `wire pair-accept <peer>` — bilateral-completion path
4622/// for a pending-inbound pair request. Pin trust, write relay_state from the
4623/// stored pair_drop, send `pair_drop_ack` with our slot_token, delete the
4624/// pending record. Equivalent to running `wire add <peer>@<their-relay>`
4625/// when a pending-inbound record exists, but without needing to remember
4626/// the peer's relay domain.
4627fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
4628    let nick = crate::agent_card::bare_handle(peer_nick);
4629    let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
4630        anyhow!(
4631            "no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
4632             or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
4633        )
4634    })?;
4635    let (_our_did, our_relay, our_slot_id, our_slot_token) =
4636        crate::pair_invite::ensure_self_with_relay(None)?;
4637    let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
4638    cmd_add_accept_pending(
4639        &handle_arg,
4640        nick,
4641        &pending,
4642        &our_relay,
4643        &our_slot_id,
4644        &our_slot_token,
4645        as_json,
4646    )
4647}
4648
4649/// v0.5.14: programmatic access to pending-inbound for scripts.
4650/// `wire pair-list-inbound --json` returns a flat array of records.
4651fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
4652    let items = crate::pending_inbound_pair::list_pending_inbound()?;
4653    if as_json {
4654        println!("{}", serde_json::to_string(&items)?);
4655        return Ok(());
4656    }
4657    if items.is_empty() {
4658        println!("no pending inbound pair requests.");
4659        return Ok(());
4660    }
4661    println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
4662    for p in items {
4663        println!(
4664            "{:<20} {:<35} {:<25} {}",
4665            p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
4666        );
4667    }
4668    println!(
4669        "→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`."
4670    );
4671    Ok(())
4672}
4673
4674/// v0.5.14: `wire pair-reject <peer>` — drop a pending-inbound record
4675/// without pairing. No event is sent back to the peer; their side stays
4676/// pending until they time out or the operator-side data ages out.
4677fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
4678    let nick = crate::agent_card::bare_handle(peer_nick);
4679    let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
4680    crate::pending_inbound_pair::consume_pending_inbound(nick)?;
4681
4682    if as_json {
4683        println!(
4684            "{}",
4685            serde_json::to_string(&json!({
4686                "peer": nick,
4687                "rejected": existed.is_some(),
4688                "had_pending": existed.is_some(),
4689            }))?
4690        );
4691    } else if existed.is_some() {
4692        println!("→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent.");
4693    } else {
4694        println!("no pending pair from {nick} — nothing to reject");
4695    }
4696    Ok(())
4697}
4698
4699// ---------- session (v0.5.16) ----------
4700//
4701// Multi-session wire on one machine. See src/session.rs for the storage
4702// layout + naming rules. The CLI dispatcher here orchestrates child
4703// `wire` invocations with `WIRE_HOME` overridden to the session's dir;
4704// each session-local `init` / `claim` / `daemon` runs in its own world
4705// without cross-contamination via env vars in this process.
4706
4707fn cmd_session(cmd: SessionCommand) -> Result<()> {
4708    match cmd {
4709        SessionCommand::New {
4710            name,
4711            relay,
4712            with_local,
4713            local_relay,
4714            no_daemon,
4715            json,
4716        } => cmd_session_new(
4717            name.as_deref(),
4718            &relay,
4719            with_local,
4720            &local_relay,
4721            no_daemon,
4722            json,
4723        ),
4724        SessionCommand::List { json } => cmd_session_list(json),
4725        SessionCommand::ListLocal { json } => cmd_session_list_local(json),
4726        SessionCommand::PairAllLocal {
4727            settle_secs,
4728            federation_relay,
4729            json,
4730        } => cmd_session_pair_all_local(settle_secs, &federation_relay, json),
4731        SessionCommand::Env { name, json } => cmd_session_env(name.as_deref(), json),
4732        SessionCommand::Current { json } => cmd_session_current(json),
4733        SessionCommand::Destroy { name, force, json } => cmd_session_destroy(&name, force, json),
4734    }
4735}
4736
4737fn resolve_session_name(name: Option<&str>) -> Result<String> {
4738    if let Some(n) = name {
4739        return Ok(crate::session::sanitize_name(n));
4740    }
4741    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4742    let registry = crate::session::read_registry().unwrap_or_default();
4743    Ok(crate::session::derive_name_from_cwd(&cwd, &registry))
4744}
4745
4746fn cmd_session_new(
4747    name_arg: Option<&str>,
4748    relay: &str,
4749    with_local: bool,
4750    local_relay: &str,
4751    no_daemon: bool,
4752    as_json: bool,
4753) -> Result<()> {
4754    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4755    let mut registry = crate::session::read_registry().unwrap_or_default();
4756    let name = match name_arg {
4757        Some(n) => crate::session::sanitize_name(n),
4758        None => crate::session::derive_name_from_cwd(&cwd, &registry),
4759    };
4760    let session_home = crate::session::session_dir(&name)?;
4761
4762    let already_exists = session_home.exists()
4763        && session_home
4764            .join("config")
4765            .join("wire")
4766            .join("agent-card.json")
4767            .exists();
4768    if already_exists {
4769        // Idempotent: re-register the cwd (if not already), refresh the
4770        // daemon if requested, surface the env-var line. Do not re-init
4771        // identity — that would clobber the keypair.
4772        registry
4773            .by_cwd
4774            .insert(cwd.to_string_lossy().into_owned(), name.clone());
4775        crate::session::write_registry(&registry)?;
4776        let info = render_session_info(&name, &session_home, &cwd)?;
4777        emit_session_new_result(&info, "already_exists", as_json)?;
4778        if !no_daemon {
4779            ensure_session_daemon(&session_home)?;
4780        }
4781        return Ok(());
4782    }
4783
4784    std::fs::create_dir_all(&session_home)
4785        .with_context(|| format!("creating session dir {session_home:?}"))?;
4786
4787    // Phase 1: init identity in the new session's WIRE_HOME.
4788    let init_status = run_wire_with_home(
4789        &session_home,
4790        &["init", &name, "--relay", relay],
4791    )?;
4792    if !init_status.success() {
4793        bail!(
4794            "`wire init {name} --relay {relay}` failed inside session dir {session_home:?}"
4795        );
4796    }
4797
4798    // Phase 2: claim the handle on the relay. If FCFS rejects the name
4799    // (another machine has it), fall back to `<name>-<2hex>` until success
4800    // or 5 attempts exhausted. Failure here is fatal — the session is
4801    // unreachable without a claim.
4802    let mut claim_attempt = 0u32;
4803    let mut effective_handle = name.clone();
4804    loop {
4805        claim_attempt += 1;
4806        let status = run_wire_with_home(
4807            &session_home,
4808            &["claim", &effective_handle, "--relay", relay],
4809        )?;
4810        if status.success() {
4811            break;
4812        }
4813        if claim_attempt >= 5 {
4814            bail!(
4815                "5 failed attempts to claim a handle on {relay} for session {name}. \
4816                 Try `wire session destroy {name} --force` and re-run with a different name."
4817            );
4818        }
4819        // Use a fresh random-ish suffix on each retry. We piggyback on the
4820        // path-hash logic but mix in the attempt counter to avoid getting
4821        // stuck on the same colliding suffix.
4822        let attempt_path = cwd.join(format!("__attempt_{claim_attempt}"));
4823        let suffix = crate::session::derive_name_from_cwd(&attempt_path, &registry);
4824        // suffix here is the full derived name for attempt_path; we just
4825        // want a short token, so take the trailing hash if it has one,
4826        // else hash the attempt-path ourselves.
4827        let token = suffix
4828            .rsplit('-')
4829            .next()
4830            .filter(|t| t.len() == 4)
4831            .map(str::to_string)
4832            .unwrap_or_else(|| format!("{claim_attempt}"));
4833        effective_handle = format!("{name}-{token}");
4834    }
4835
4836    // Persist the cwd → name mapping NOW so subsequent invocations from
4837    // this directory short-circuit to the "already_exists" branch.
4838    registry
4839        .by_cwd
4840        .insert(cwd.to_string_lossy().into_owned(), name.clone());
4841    crate::session::write_registry(&registry)?;
4842
4843    // v0.5.17: --with-local probes the local relay and, if it's
4844    // reachable, allocates a second slot there. The session's
4845    // relay_state.json grows a `self.endpoints[]` array carrying both
4846    // endpoints; routing layer (cmd_push) prefers local for sister-
4847    // session peers that also have a local slot.
4848    if with_local {
4849        try_allocate_local_slot(&session_home, &effective_handle, relay, local_relay);
4850    }
4851
4852    if !no_daemon {
4853        ensure_session_daemon(&session_home)?;
4854    }
4855
4856    let info = render_session_info(&name, &session_home, &cwd)?;
4857    emit_session_new_result(&info, "created", as_json)
4858}
4859
4860/// v0.5.17: probe the named local relay; if `/healthz` returns ok within
4861/// a short timeout, allocate a slot there and update the session's
4862/// `relay_state.json` `self.endpoints[]` to advertise both endpoints.
4863///
4864/// Failure to reach the local relay is NOT fatal — the session stays
4865/// federation-only. Logs to stderr on failure so operators can tell
4866/// the local relay isn't running, but doesn't abort the bootstrap.
4867fn try_allocate_local_slot(
4868    session_home: &std::path::Path,
4869    handle: &str,
4870    federation_relay: &str,
4871    local_relay: &str,
4872) {
4873    // Probe healthz with a tight timeout. Use a fresh client (don't
4874    // share the daemon-wide one) so the timeout is local to this call.
4875    let probe = match crate::relay_client::build_blocking_client(Some(
4876        std::time::Duration::from_millis(500),
4877    )) {
4878        Ok(c) => c,
4879        Err(e) => {
4880            eprintln!("wire session new: cannot build probe client for {local_relay}: {e:#}");
4881            return;
4882        }
4883    };
4884    let healthz_url = format!("{}/healthz", local_relay.trim_end_matches('/'));
4885    match probe.get(&healthz_url).send() {
4886        Ok(resp) if resp.status().is_success() => {}
4887        Ok(resp) => {
4888            eprintln!(
4889                "wire session new: local relay probe at {healthz_url} returned {} — staying federation-only",
4890                resp.status()
4891            );
4892            return;
4893        }
4894        Err(e) => {
4895            eprintln!(
4896                "wire session new: local relay at {local_relay} unreachable ({}) — staying federation-only. \
4897                 Start one with `wire relay-server --bind 127.0.0.1:8771 --local-only`.",
4898                crate::relay_client::format_transport_error(&anyhow::Error::new(e))
4899            );
4900            return;
4901        }
4902    };
4903
4904    // Allocate a slot on the local relay.
4905    let local_client = crate::relay_client::RelayClient::new(local_relay);
4906    let alloc = match local_client.allocate_slot(Some(handle)) {
4907        Ok(a) => a,
4908        Err(e) => {
4909            eprintln!(
4910                "wire session new: local relay slot allocation failed: {e:#} — staying federation-only"
4911            );
4912            return;
4913        }
4914    };
4915
4916    // Merge into the session's relay.json. We invoke wire via
4917    // run_wire_with_home for federation calls (subprocess isolation),
4918    // but relay.json is a simple file we can edit directly
4919    // — and need to, because there's no `wire bind-relay --add-local`
4920    // command yet (could add later; out of scope for v0.5.17 MVP).
4921    //
4922    // v0.5.20 BUG FIX: previously joined `relay-state.json` here, which
4923    // does not exist (canonical filename is `relay.json` per
4924    // `config::relay_state_path`). The mis-named file write succeeded
4925    // but landed in a sibling path nothing else reads. Every
4926    // `wire session new --with-local` invocation silently degraded to
4927    // federation-only despite the "local slot allocated" stderr line.
4928    // Caught by deploying v0.5.19 on the dev laptop and inspecting the
4929    // session's relay.json — it had only the federation endpoint.
4930    let state_path = session_home
4931        .join("config")
4932        .join("wire")
4933        .join("relay.json");
4934    let mut state: serde_json::Value = std::fs::read(&state_path)
4935        .ok()
4936        .and_then(|b| serde_json::from_slice(&b).ok())
4937        .unwrap_or_else(|| serde_json::json!({}));
4938    // Read the existing federation self info (already written by
4939    // `wire init` + `wire bind-relay` path during session bootstrap).
4940    let fed_endpoint = state
4941        .get("self")
4942        .and_then(|s| {
4943            let url = s.get("relay_url").and_then(serde_json::Value::as_str)?;
4944            let slot_id = s.get("slot_id").and_then(serde_json::Value::as_str)?;
4945            let slot_token = s.get("slot_token").and_then(serde_json::Value::as_str)?;
4946            Some(crate::endpoints::Endpoint::federation(
4947                url.to_string(),
4948                slot_id.to_string(),
4949                slot_token.to_string(),
4950            ))
4951        });
4952
4953    let local_endpoint = crate::endpoints::Endpoint::local(
4954        local_relay.trim_end_matches('/').to_string(),
4955        alloc.slot_id.clone(),
4956        alloc.slot_token.clone(),
4957    );
4958
4959    let mut endpoints: Vec<crate::endpoints::Endpoint> = Vec::new();
4960    if let Some(f) = fed_endpoint.clone() {
4961        endpoints.push(f);
4962    }
4963    endpoints.push(local_endpoint);
4964
4965    let self_obj = state
4966        .as_object_mut()
4967        .expect("relay_state root is an object")
4968        .entry("self")
4969        .or_insert_with(|| {
4970            serde_json::json!({
4971                "relay_url": federation_relay,
4972            })
4973        });
4974    if let Some(obj) = self_obj.as_object_mut() {
4975        obj.insert(
4976            "endpoints".into(),
4977            serde_json::to_value(&endpoints).unwrap_or(serde_json::Value::Null),
4978        );
4979    }
4980
4981    if let Err(e) = std::fs::write(
4982        &state_path,
4983        serde_json::to_vec_pretty(&state).unwrap_or_default(),
4984    ) {
4985        eprintln!(
4986            "wire session new: persisting dual-slot relay_state at {state_path:?} failed: {e}"
4987        );
4988        return;
4989    }
4990    eprintln!(
4991        "wire session new: local slot allocated on {local_relay} (slot_id={})",
4992        alloc.slot_id
4993    );
4994}
4995
4996fn render_session_info(
4997    name: &str,
4998    session_home: &std::path::Path,
4999    cwd: &std::path::Path,
5000) -> Result<serde_json::Value> {
5001    let card_path = session_home.join("config").join("wire").join("agent-card.json");
5002    let (did, handle) = if card_path.exists() {
5003        let card: Value = serde_json::from_slice(&std::fs::read(&card_path)?)?;
5004        let did = card
5005            .get("did")
5006            .and_then(Value::as_str)
5007            .unwrap_or("")
5008            .to_string();
5009        let handle = card
5010            .get("handle")
5011            .and_then(Value::as_str)
5012            .map(str::to_string)
5013            .unwrap_or_else(|| {
5014                crate::agent_card::display_handle_from_did(&did).to_string()
5015            });
5016        (did, handle)
5017    } else {
5018        (String::new(), String::new())
5019    };
5020    Ok(json!({
5021        "name": name,
5022        "home_dir": session_home.to_string_lossy(),
5023        "cwd": cwd.to_string_lossy(),
5024        "did": did,
5025        "handle": handle,
5026        "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
5027    }))
5028}
5029
5030fn emit_session_new_result(
5031    info: &serde_json::Value,
5032    status: &str,
5033    as_json: bool,
5034) -> Result<()> {
5035    if as_json {
5036        let mut obj = info.clone();
5037        obj["status"] = json!(status);
5038        println!("{}", serde_json::to_string(&obj)?);
5039    } else {
5040        let name = info["name"].as_str().unwrap_or("?");
5041        let handle = info["handle"].as_str().unwrap_or("?");
5042        let home = info["home_dir"].as_str().unwrap_or("?");
5043        let did = info["did"].as_str().unwrap_or("?");
5044        let export = info["export"].as_str().unwrap_or("?");
5045        let prefix = if status == "already_exists" {
5046            "session already exists (re-registered cwd)"
5047        } else {
5048            "session created"
5049        };
5050        println!(
5051            "{prefix}\n  name:   {name}\n  handle: {handle}\n  did:    {did}\n  home:   {home}\n\nactivate with:\n  {export}"
5052        );
5053    }
5054    Ok(())
5055}
5056
5057fn run_wire_with_home(
5058    session_home: &std::path::Path,
5059    args: &[&str],
5060) -> Result<std::process::ExitStatus> {
5061    let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5062    let status = std::process::Command::new(&bin)
5063        .env("WIRE_HOME", session_home)
5064        .env_remove("RUST_LOG")
5065        .args(args)
5066        .status()
5067        .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
5068    Ok(status)
5069}
5070
5071fn ensure_session_daemon(session_home: &std::path::Path) -> Result<()> {
5072    // Check if a daemon is already alive in this session's WIRE_HOME.
5073    // If so, no-op (let the existing process keep running).
5074    let pidfile = session_home
5075        .join("state")
5076        .join("wire")
5077        .join("daemon.pid");
5078    if pidfile.exists() {
5079        let bytes = std::fs::read(&pidfile).unwrap_or_default();
5080        let pid: Option<u32> =
5081            if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5082                v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5083            } else {
5084                String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5085            };
5086        if let Some(p) = pid {
5087            let alive = {
5088                #[cfg(target_os = "linux")]
5089                {
5090                    std::path::Path::new(&format!("/proc/{p}")).exists()
5091                }
5092                #[cfg(not(target_os = "linux"))]
5093                {
5094                    std::process::Command::new("kill")
5095                        .args(["-0", &p.to_string()])
5096                        .output()
5097                        .map(|o| o.status.success())
5098                        .unwrap_or(false)
5099                }
5100            };
5101            if alive {
5102                return Ok(());
5103            }
5104        }
5105    }
5106
5107    // Spawn `wire daemon` detached. The existing `cmd_daemon` writes the
5108    // versioned pidfile; we just kick it off and return.
5109    let bin = std::env::current_exe().with_context(|| "locating self exe")?;
5110    let log_path = session_home.join("state").join("wire").join("daemon.log");
5111    if let Some(parent) = log_path.parent() {
5112        std::fs::create_dir_all(parent).ok();
5113    }
5114    let log_file = std::fs::OpenOptions::new()
5115        .create(true)
5116        .append(true)
5117        .open(&log_path)
5118        .with_context(|| format!("opening daemon log {log_path:?}"))?;
5119    let log_err = log_file.try_clone()?;
5120    std::process::Command::new(&bin)
5121        .env("WIRE_HOME", session_home)
5122        .env_remove("RUST_LOG")
5123        .args(["daemon", "--interval", "5"])
5124        .stdout(log_file)
5125        .stderr(log_err)
5126        .stdin(std::process::Stdio::null())
5127        .spawn()
5128        .with_context(|| "spawning session-local `wire daemon`")?;
5129    Ok(())
5130}
5131
5132fn cmd_session_list(as_json: bool) -> Result<()> {
5133    let items = crate::session::list_sessions()?;
5134    if as_json {
5135        println!("{}", serde_json::to_string(&items)?);
5136        return Ok(());
5137    }
5138    if items.is_empty() {
5139        println!("no sessions on this machine. `wire session new` to create one.");
5140        return Ok(());
5141    }
5142    println!(
5143        "{:<24} {:<24} {:<10} CWD",
5144        "NAME", "HANDLE", "DAEMON"
5145    );
5146    for s in items {
5147        println!(
5148            "{:<24} {:<24} {:<10} {}",
5149            s.name,
5150            s.handle.as_deref().unwrap_or("?"),
5151            if s.daemon_running { "running" } else { "down" },
5152            s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5153        );
5154    }
5155    Ok(())
5156}
5157
5158/// v0.5.19: `wire session list-local` — sister-session discovery.
5159///
5160/// For each on-disk session, read its `relay-state.json` and surface
5161/// the ones that have a Local-scope endpoint (allocated via
5162/// `wire session new --with-local`). Group by the local-relay URL so
5163/// the operator can see at a glance which sessions are mutually
5164/// reachable over the same loopback relay.
5165///
5166/// Read-only, no daemon contact. Useful as the prelude to teaming /
5167/// pairing same-box sister claudes (see also `wire session
5168/// pair-all-local` once implemented).
5169fn cmd_session_list_local(as_json: bool) -> Result<()> {
5170    let listing = crate::session::list_local_sessions()?;
5171    if as_json {
5172        println!("{}", serde_json::to_string(&listing)?);
5173        return Ok(());
5174    }
5175
5176    if listing.local.is_empty() && listing.federation_only.is_empty() {
5177        println!(
5178            "no sessions on this machine. `wire session new --with-local` to create one \
5179             with a local-relay endpoint (start the relay first: \
5180             `wire relay-server --bind 127.0.0.1:8771 --local-only`)."
5181        );
5182        return Ok(());
5183    }
5184
5185    if listing.local.is_empty() {
5186        println!(
5187            "no sister sessions reachable via a local relay. \
5188             Re-run `wire session new --with-local` to add a Local endpoint, or \
5189             start a local relay with `wire relay-server --bind 127.0.0.1:8771 --local-only`."
5190        );
5191    } else {
5192        // Stable iteration order: sort the relay URLs.
5193        let mut keys: Vec<&String> = listing.local.keys().collect();
5194        keys.sort();
5195        for relay_url in keys {
5196            let group = &listing.local[relay_url];
5197            println!("LOCAL RELAY: {relay_url}");
5198            println!(
5199                "  {:<24} {:<32} {:<10} CWD",
5200                "NAME", "HANDLE", "DAEMON"
5201            );
5202            for s in group {
5203                println!(
5204                    "  {:<24} {:<32} {:<10} {}",
5205                    s.name,
5206                    s.handle.as_deref().unwrap_or("?"),
5207                    if s.daemon_running { "running" } else { "down" },
5208                    s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5209                );
5210            }
5211            println!();
5212        }
5213    }
5214
5215    if !listing.federation_only.is_empty() {
5216        println!("federation-only (no local endpoint):");
5217        for s in &listing.federation_only {
5218            println!(
5219                "  {:<24} {:<32} {}",
5220                s.name,
5221                s.handle.as_deref().unwrap_or("?"),
5222                s.cwd.as_deref().unwrap_or("(no cwd registered)"),
5223            );
5224        }
5225    }
5226    Ok(())
5227}
5228
5229/// v0.6.0 (issue #12): orchestrate bilateral pair across every sister
5230/// session that has a Local-scope endpoint. Skips already-paired
5231/// pairs; reports a per-pair outcome JSON suitable for scripting.
5232///
5233/// Same-uid trust anchor: the caller owns every session enumerated by
5234/// `list_local_sessions`, so the operator running this command IS the
5235/// consent for both sides. The bilateral SAS / network-level handshake
5236/// assumes strangers; same-uid sister sessions are not strangers.
5237///
5238/// Per-pair flow (sequential to keep relay-side load + log clarity):
5239///   1. WIRE_HOME=A wire add <B-handle>@<host>  (writes pending-inbound on B)
5240///   2. WIRE_HOME=A wire push --json            (sends pair_drop to relay)
5241///   3. sleep settle_secs                       (pair_drop reaches B)
5242///   4. WIRE_HOME=B wire pull --json            (B receives pair_drop)
5243///   5. WIRE_HOME=B wire pair-accept <A-bare>   (B pins A, sends ack)
5244///   6. WIRE_HOME=B wire push --json            (sends pair_drop_ack)
5245///   7. sleep settle_secs                       (ack reaches A)
5246///   8. WIRE_HOME=A wire pull --json            (A pins B)
5247fn cmd_session_pair_all_local(
5248    settle_secs: u64,
5249    federation_relay: &str,
5250    as_json: bool,
5251) -> Result<()> {
5252    use std::collections::BTreeSet;
5253    use std::time::Duration;
5254
5255    let listing = crate::session::list_local_sessions()?;
5256    // Flatten + dedup by session NAME (same session can appear under
5257    // multiple local-relay URLs if it advertises two local endpoints;
5258    // rare, but pair each pair exactly once).
5259    let mut by_name: std::collections::BTreeMap<String, crate::session::LocalSessionView> =
5260        Default::default();
5261    for group in listing.local.into_values() {
5262        for s in group {
5263            by_name.entry(s.name.clone()).or_insert(s);
5264        }
5265    }
5266    let sessions: Vec<crate::session::LocalSessionView> = by_name.into_values().collect();
5267
5268    if sessions.len() < 2 {
5269        let msg = format!(
5270            "{} sister session(s) with a local endpoint — need at least 2 to pair.",
5271            sessions.len()
5272        );
5273        if as_json {
5274            println!(
5275                "{}",
5276                serde_json::to_string(&json!({
5277                    "sessions": sessions.iter().map(|s| &s.name).collect::<Vec<_>>(),
5278                    "pairs_attempted": 0,
5279                    "pairs_succeeded": 0,
5280                    "pairs_skipped_already_paired": 0,
5281                    "pairs_failed": 0,
5282                    "note": msg,
5283                }))?
5284            );
5285        } else {
5286            println!("{msg}");
5287            if let Some(s) = sessions.first() {
5288                println!("  - {} ({})", s.name, s.cwd.as_deref().unwrap_or("?"));
5289            }
5290            println!("Use `wire session new --with-local` to add more.");
5291        }
5292        return Ok(());
5293    }
5294
5295    let fed_host = host_of_url(federation_relay);
5296    if fed_host.is_empty() {
5297        bail!(
5298            "federation_relay `{federation_relay}` has no parseable host — \
5299             pass a full URL like `https://wireup.net`."
5300        );
5301    }
5302
5303    // Enumerate unordered pairs deterministically by session name.
5304    let mut attempted = 0u32;
5305    let mut succeeded = 0u32;
5306    let mut skipped_already = 0u32;
5307    let mut failed = 0u32;
5308    let mut per_pair: Vec<Value> = Vec::new();
5309
5310    for i in 0..sessions.len() {
5311        for j in (i + 1)..sessions.len() {
5312            let a = &sessions[i];
5313            let b = &sessions[j];
5314            attempted += 1;
5315
5316            // Already-paired check: if A's relay-state has B's nick in
5317            // peers AND vice versa, skip.
5318            let a_pinned_b = session_has_peer(&a.home_dir, &b.name);
5319            let b_pinned_a = session_has_peer(&b.home_dir, &a.name);
5320            if a_pinned_b && b_pinned_a {
5321                skipped_already += 1;
5322                per_pair.push(json!({
5323                    "from": a.name,
5324                    "to": b.name,
5325                    "status": "already_paired",
5326                }));
5327                continue;
5328            }
5329
5330            let pair_result = drive_bilateral_pair(
5331                &a.home_dir,
5332                &a.name,
5333                &b.home_dir,
5334                &b.name,
5335                &fed_host,
5336                federation_relay,
5337                settle_secs,
5338            );
5339
5340            match pair_result {
5341                Ok(()) => {
5342                    succeeded += 1;
5343                    per_pair.push(json!({
5344                        "from": a.name,
5345                        "to": b.name,
5346                        "status": "paired",
5347                    }));
5348                }
5349                Err(e) => {
5350                    failed += 1;
5351                    let detail = format!("{e:#}");
5352                    per_pair.push(json!({
5353                        "from": a.name,
5354                        "to": b.name,
5355                        "status": "failed",
5356                        "error": detail,
5357                    }));
5358                }
5359            }
5360
5361            // Brief settle between pairs so we don't slam the relay
5362            // with N(N-1) parallel requests.
5363            std::thread::sleep(Duration::from_millis(200));
5364        }
5365    }
5366
5367    let _ = BTreeSet::<String>::new(); // silence unused-import lint if any
5368    let summary = json!({
5369        "sessions": sessions.iter().map(|s| s.name.clone()).collect::<Vec<_>>(),
5370        "pairs_attempted": attempted,
5371        "pairs_succeeded": succeeded,
5372        "pairs_skipped_already_paired": skipped_already,
5373        "pairs_failed": failed,
5374        "results": per_pair,
5375    });
5376    if as_json {
5377        println!("{}", serde_json::to_string(&summary)?);
5378    } else {
5379        println!(
5380            "wire session pair-all-local: {} session(s), {} pair(s) attempted",
5381            sessions.len(),
5382            attempted
5383        );
5384        println!("  paired:                 {succeeded}");
5385        println!("  skipped (already pinned): {skipped_already}");
5386        println!("  failed:                 {failed}");
5387        for entry in summary["results"].as_array().unwrap_or(&vec![]) {
5388            let from = entry["from"].as_str().unwrap_or("?");
5389            let to = entry["to"].as_str().unwrap_or("?");
5390            let status = entry["status"].as_str().unwrap_or("?");
5391            let err = entry.get("error").and_then(Value::as_str).unwrap_or("");
5392            if err.is_empty() {
5393                println!("  {from:<24} ↔ {to:<24} {status}");
5394            } else {
5395                println!("  {from:<24} ↔ {to:<24} {status} — {err}");
5396            }
5397        }
5398    }
5399    Ok(())
5400}
5401
5402/// Check whether `session_home`'s `relay.json` already lists `peer_name`
5403/// under `state.peers`. Best-effort — any read/parse error → false.
5404fn session_has_peer(session_home: &std::path::Path, peer_name: &str) -> bool {
5405    let path = session_home.join("config").join("wire").join("relay.json");
5406    let bytes = match std::fs::read(&path) {
5407        Ok(b) => b,
5408        Err(_) => return false,
5409    };
5410    let val: Value = match serde_json::from_slice(&bytes) {
5411        Ok(v) => v,
5412        Err(_) => return false,
5413    };
5414    val.get("peers")
5415        .and_then(|p| p.get(peer_name))
5416        .is_some()
5417}
5418
5419/// Drive one bilateral pair handshake between two sister sessions
5420/// using their session home dirs as `WIRE_HOME`. Sequential 8-step
5421/// flow so failures bubble up at the offending step, not buried in
5422/// a parallel race. See `cmd_session_pair_all_local` docstring.
5423fn drive_bilateral_pair(
5424    a_home: &std::path::Path,
5425    a_name: &str,
5426    b_home: &std::path::Path,
5427    b_name: &str,
5428    fed_host: &str,
5429    federation_relay: &str,
5430    settle_secs: u64,
5431) -> Result<()> {
5432    use std::time::Duration;
5433    let bin = std::env::current_exe().context("locating self exe")?;
5434
5435    let run = |home: &std::path::Path, args: &[&str]| -> Result<()> {
5436        let out = std::process::Command::new(&bin)
5437            .env("WIRE_HOME", home)
5438            .env_remove("RUST_LOG")
5439            .args(args)
5440            .output()
5441            .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
5442        if !out.status.success() {
5443            bail!(
5444                "`wire {}` failed: stderr={}",
5445                args.join(" "),
5446                String::from_utf8_lossy(&out.stderr).trim()
5447            );
5448        }
5449        Ok(())
5450    };
5451
5452    let b_handle = format!("{b_name}@{fed_host}");
5453
5454    // 1. A initiates → 2. A push pair_drop
5455    run(a_home, &["add", &b_handle, "--relay", federation_relay, "--json"])
5456        .with_context(|| format!("step 1/8: {a_name} `wire add {b_handle}`"))?;
5457    run(a_home, &["push", "--json"])
5458        .with_context(|| format!("step 2/8: {a_name} `wire push`"))?;
5459
5460    // 3. settle so pair_drop reaches B's slot
5461    std::thread::sleep(Duration::from_secs(settle_secs));
5462
5463    // 4. B pulls pair_drop → 5. B pair-accept (pins A) → 6. B push pair_drop_ack
5464    run(b_home, &["pull", "--json"])
5465        .with_context(|| format!("step 4/8: {b_name} `wire pull`"))?;
5466    run(b_home, &["pair-accept", a_name, "--json"])
5467        .with_context(|| format!("step 5/8: {b_name} `wire pair-accept {a_name}`"))?;
5468    run(b_home, &["push", "--json"])
5469        .with_context(|| format!("step 6/8: {b_name} `wire push`"))?;
5470
5471    // 7. settle so ack reaches A's slot
5472    std::thread::sleep(Duration::from_secs(settle_secs));
5473
5474    // 8. A pulls ack (pins B with the slot_token + endpoints[] from the ack)
5475    run(a_home, &["pull", "--json"])
5476        .with_context(|| format!("step 8/8: {a_name} `wire pull`"))?;
5477
5478    Ok(())
5479}
5480
5481fn cmd_session_env(name_arg: Option<&str>, as_json: bool) -> Result<()> {
5482    let name = resolve_session_name(name_arg)?;
5483    let session_home = crate::session::session_dir(&name)?;
5484    if !session_home.exists() {
5485        bail!(
5486            "no session named {name:?} on this machine. `wire session list` to enumerate, \
5487             `wire session new {name}` to create."
5488        );
5489    }
5490    if as_json {
5491        println!(
5492            "{}",
5493            serde_json::to_string(&json!({
5494                "name": name,
5495                "home_dir": session_home.to_string_lossy(),
5496                "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
5497            }))?
5498        );
5499    } else {
5500        println!("export WIRE_HOME={}", session_home.to_string_lossy());
5501    }
5502    Ok(())
5503}
5504
5505fn cmd_session_current(as_json: bool) -> Result<()> {
5506    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5507    let registry = crate::session::read_registry().unwrap_or_default();
5508    let cwd_key = cwd.to_string_lossy().into_owned();
5509    let name = registry.by_cwd.get(&cwd_key).cloned();
5510    if as_json {
5511        println!(
5512            "{}",
5513            serde_json::to_string(&json!({
5514                "cwd": cwd_key,
5515                "session": name,
5516            }))?
5517        );
5518    } else if let Some(n) = name {
5519        println!("{n}");
5520    } else {
5521        println!("(no session registered for this cwd)");
5522    }
5523    Ok(())
5524}
5525
5526fn cmd_session_destroy(name_arg: &str, force: bool, as_json: bool) -> Result<()> {
5527    let name = crate::session::sanitize_name(name_arg);
5528    let session_home = crate::session::session_dir(&name)?;
5529    if !session_home.exists() {
5530        if as_json {
5531            println!(
5532                "{}",
5533                serde_json::to_string(&json!({
5534                    "name": name,
5535                    "destroyed": false,
5536                    "reason": "no such session",
5537                }))?
5538            );
5539        } else {
5540            println!("no session named {name:?} — nothing to destroy.");
5541        }
5542        return Ok(());
5543    }
5544    if !force {
5545        bail!(
5546            "destroying session {name:?} would delete its keypair + state irrecoverably. \
5547             Pass --force to confirm."
5548        );
5549    }
5550
5551    // Kill the session-local daemon if alive.
5552    let pidfile = session_home
5553        .join("state")
5554        .join("wire")
5555        .join("daemon.pid");
5556    if let Ok(bytes) = std::fs::read(&pidfile) {
5557        let pid: Option<u32> =
5558            if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5559                v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5560            } else {
5561                String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5562            };
5563        if let Some(p) = pid {
5564            let _ = std::process::Command::new("kill")
5565                .args(["-TERM", &p.to_string()])
5566                .output();
5567        }
5568    }
5569
5570    std::fs::remove_dir_all(&session_home)
5571        .with_context(|| format!("removing session dir {session_home:?}"))?;
5572
5573    // Strip from registry.
5574    let mut registry = crate::session::read_registry().unwrap_or_default();
5575    registry.by_cwd.retain(|_, v| v != &name);
5576    crate::session::write_registry(&registry)?;
5577
5578    if as_json {
5579        println!(
5580            "{}",
5581            serde_json::to_string(&json!({
5582                "name": name,
5583                "destroyed": true,
5584            }))?
5585        );
5586    } else {
5587        println!("destroyed session {name:?}.");
5588    }
5589    Ok(())
5590}
5591
5592// ---------- diag (structured trace) ----------
5593
5594fn cmd_diag(action: DiagAction) -> Result<()> {
5595    let state = config::state_dir()?;
5596    let knob = state.join("diag.enabled");
5597    let log_path = state.join("diag.jsonl");
5598    match action {
5599        DiagAction::Tail { limit, json } => {
5600            let entries = crate::diag::tail(limit);
5601            if json {
5602                for e in entries {
5603                    println!("{}", serde_json::to_string(&e)?);
5604                }
5605            } else if entries.is_empty() {
5606                println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
5607            } else {
5608                for e in entries {
5609                    let ts = e["ts"].as_u64().unwrap_or(0);
5610                    let ty = e["type"].as_str().unwrap_or("?");
5611                    let pid = e["pid"].as_u64().unwrap_or(0);
5612                    let payload = e["payload"].to_string();
5613                    println!("[{ts}] pid={pid} {ty} {payload}");
5614                }
5615            }
5616        }
5617        DiagAction::Enable => {
5618            config::ensure_dirs()?;
5619            std::fs::write(&knob, "1")?;
5620            println!("wire diag: enabled at {knob:?}");
5621        }
5622        DiagAction::Disable => {
5623            if knob.exists() {
5624                std::fs::remove_file(&knob)?;
5625            }
5626            println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
5627        }
5628        DiagAction::Status { json } => {
5629            let enabled = crate::diag::is_enabled();
5630            let size = std::fs::metadata(&log_path)
5631                .map(|m| m.len())
5632                .unwrap_or(0);
5633            if json {
5634                println!(
5635                    "{}",
5636                    serde_json::to_string(&serde_json::json!({
5637                        "enabled": enabled,
5638                        "log_path": log_path,
5639                        "log_size_bytes": size,
5640                    }))?
5641                );
5642            } else {
5643                println!("wire diag status");
5644                println!("  enabled:    {enabled}");
5645                println!("  log:        {log_path:?}");
5646                println!("  log size:   {size} bytes");
5647            }
5648        }
5649    }
5650    Ok(())
5651}
5652
5653// ---------- service (install / uninstall / status) ----------
5654
5655fn cmd_service(action: ServiceAction) -> Result<()> {
5656    let kind = |local_relay: bool| {
5657        if local_relay {
5658            crate::service::ServiceKind::LocalRelay
5659        } else {
5660            crate::service::ServiceKind::Daemon
5661        }
5662    };
5663    let (report, as_json) = match action {
5664        ServiceAction::Install { local_relay, json } => {
5665            (crate::service::install_kind(kind(local_relay))?, json)
5666        }
5667        ServiceAction::Uninstall { local_relay, json } => {
5668            (crate::service::uninstall_kind(kind(local_relay))?, json)
5669        }
5670        ServiceAction::Status { local_relay, json } => {
5671            (crate::service::status_kind(kind(local_relay))?, json)
5672        }
5673    };
5674    if as_json {
5675        println!("{}", serde_json::to_string(&report)?);
5676    } else {
5677        println!("wire service {}", report.action);
5678        println!("  platform:  {}", report.platform);
5679        println!("  unit:      {}", report.unit_path);
5680        println!("  status:    {}", report.status);
5681        println!("  detail:    {}", report.detail);
5682    }
5683    Ok(())
5684}
5685
5686// ---------- upgrade (atomic daemon swap) ----------
5687
5688/// `wire upgrade` — kill all running `wire daemon` processes, spawn a
5689/// fresh one from the currently-installed binary, write a new versioned
5690/// pidfile. The fix for today's exact failure mode: a daemon process that
5691/// kept running OLD binary text in memory under a symlink that had since
5692/// been repointed at a NEW binary on disk.
5693///
5694/// Idempotent. If no stale daemon is running, just starts a fresh one
5695/// (same as `wire daemon &` but with the wait-until-alive guard from
5696/// ensure_up::ensure_daemon_running).
5697///
5698/// `--check` mode reports drift without acting — lists the processes
5699/// that WOULD be killed and the binary version of each.
5700fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
5701    // 1. Identify all `wire daemon` processes.
5702    let pgrep_out = std::process::Command::new("pgrep")
5703        .args(["-f", "wire daemon"])
5704        .output();
5705    let running_pids: Vec<u32> = match pgrep_out {
5706        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
5707            .split_whitespace()
5708            .filter_map(|s| s.parse::<u32>().ok())
5709            .collect(),
5710        _ => Vec::new(),
5711    };
5712
5713    // 2. Read pidfile to surface what the daemon THINKS it is.
5714    let record = crate::ensure_up::read_pid_record("daemon");
5715    let recorded_version: Option<String> = match &record {
5716        crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
5717        crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
5718        _ => None,
5719    };
5720    let cli_version = env!("CARGO_PKG_VERSION").to_string();
5721
5722    if check_only {
5723        let report = json!({
5724            "running_pids": running_pids,
5725            "pidfile_version": recorded_version,
5726            "cli_version": cli_version,
5727            "would_kill": running_pids,
5728        });
5729        if as_json {
5730            println!("{}", serde_json::to_string(&report)?);
5731        } else {
5732            println!("wire upgrade --check");
5733            println!("  cli version:      {cli_version}");
5734            println!("  pidfile version:  {}", recorded_version.as_deref().unwrap_or("(missing)"));
5735            if running_pids.is_empty() {
5736                println!("  running daemons:  none");
5737            } else {
5738                let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
5739                println!("  running daemons:  pids {}", pids.join(", "));
5740                println!("  would kill all + spawn fresh");
5741            }
5742        }
5743        return Ok(());
5744    }
5745
5746    // 3. Kill every running wire daemon. Use SIGTERM first, then SIGKILL
5747    // after a brief grace period.
5748    let mut killed: Vec<u32> = Vec::new();
5749    for pid in &running_pids {
5750        // SIGTERM (15).
5751        let _ = std::process::Command::new("kill")
5752            .args(["-15", &pid.to_string()])
5753            .status();
5754        killed.push(*pid);
5755    }
5756    // Wait up to ~2s for graceful exit.
5757    if !killed.is_empty() {
5758        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
5759        loop {
5760            let still_alive: Vec<u32> = killed
5761                .iter()
5762                .copied()
5763                .filter(|p| process_alive_pid(*p))
5764                .collect();
5765            if still_alive.is_empty() {
5766                break;
5767            }
5768            if std::time::Instant::now() >= deadline {
5769                // SIGKILL hold-outs.
5770                for pid in still_alive {
5771                    let _ = std::process::Command::new("kill")
5772                        .args(["-9", &pid.to_string()])
5773                        .status();
5774                }
5775                break;
5776            }
5777            std::thread::sleep(std::time::Duration::from_millis(50));
5778        }
5779    }
5780
5781    // 4. Remove stale pidfile so ensure_daemon_running doesn't think the
5782    //    old daemon is still owning it.
5783    let pidfile = config::state_dir()?.join("daemon.pid");
5784    if pidfile.exists() {
5785        let _ = std::fs::remove_file(&pidfile);
5786    }
5787
5788    // 5. Spawn fresh daemon via ensure_up — atomically waits for
5789    //    process_alive + writes the versioned pidfile.
5790    let spawned = crate::ensure_up::ensure_daemon_running()?;
5791
5792    let new_record = crate::ensure_up::read_pid_record("daemon");
5793    let new_pid = new_record.pid();
5794    let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
5795        Some(d.version.clone())
5796    } else {
5797        None
5798    };
5799
5800    if as_json {
5801        println!(
5802            "{}",
5803            serde_json::to_string(&json!({
5804                "killed": killed,
5805                "spawned_fresh_daemon": spawned,
5806                "new_pid": new_pid,
5807                "new_version": new_version,
5808                "cli_version": cli_version,
5809            }))?
5810        );
5811    } else {
5812        if killed.is_empty() {
5813            println!("wire upgrade: no stale daemons running");
5814        } else {
5815            println!("wire upgrade: killed {} daemon(s) (pids {})",
5816                killed.len(),
5817                killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
5818        }
5819        if spawned {
5820            println!(
5821                "wire upgrade: spawned fresh daemon (pid {} v{})",
5822                new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
5823                new_version.as_deref().unwrap_or(&cli_version),
5824            );
5825        } else {
5826            println!("wire upgrade: daemon was already running on current binary");
5827        }
5828    }
5829    Ok(())
5830}
5831
5832fn process_alive_pid(pid: u32) -> bool {
5833    #[cfg(target_os = "linux")]
5834    {
5835        std::path::Path::new(&format!("/proc/{pid}")).exists()
5836    }
5837    #[cfg(not(target_os = "linux"))]
5838    {
5839        std::process::Command::new("kill")
5840            .args(["-0", &pid.to_string()])
5841            .stdin(std::process::Stdio::null())
5842            .stdout(std::process::Stdio::null())
5843            .stderr(std::process::Stdio::null())
5844            .status()
5845            .map(|s| s.success())
5846            .unwrap_or(false)
5847    }
5848}
5849
5850// ---------- doctor (single-command diagnostic) ----------
5851
5852/// One DoctorCheck = one verdict on one health dimension.
5853#[derive(Clone, Debug, serde::Serialize)]
5854pub struct DoctorCheck {
5855    /// Short stable identifier (`daemon`, `relay`, `pair_rejections`, ...).
5856    /// Stable across versions for tooling consumption.
5857    pub id: String,
5858    /// PASS / WARN / FAIL.
5859    pub status: String,
5860    /// One-line human summary.
5861    pub detail: String,
5862    /// Optional remediation hint shown after the failing line.
5863    #[serde(skip_serializing_if = "Option::is_none")]
5864    pub fix: Option<String>,
5865}
5866
5867impl DoctorCheck {
5868    fn pass(id: &str, detail: impl Into<String>) -> Self {
5869        Self {
5870            id: id.into(),
5871            status: "PASS".into(),
5872            detail: detail.into(),
5873            fix: None,
5874        }
5875    }
5876    fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5877        Self {
5878            id: id.into(),
5879            status: "WARN".into(),
5880            detail: detail.into(),
5881            fix: Some(fix.into()),
5882        }
5883    }
5884    fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5885        Self {
5886            id: id.into(),
5887            status: "FAIL".into(),
5888            detail: detail.into(),
5889            fix: Some(fix.into()),
5890        }
5891    }
5892}
5893
5894/// `wire doctor` — single-command diagnostic for the silent-fail classes
5895/// 0.5.11 ships fixes for. Surfaces what each fix produces (P0.1 cursor
5896/// blocks, P0.2 pair-rejection logs, P0.4 daemon version mismatch, etc.)
5897/// so operators don't have to know where each lives.
5898fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
5899    let mut checks: Vec<DoctorCheck> = Vec::new();
5900
5901    checks.push(check_daemon_health());
5902    checks.push(check_daemon_pid_consistency());
5903    checks.push(check_relay_reachable());
5904    checks.push(check_pair_rejections(recent_rejections));
5905    checks.push(check_cursor_progress());
5906
5907    let fails = checks.iter().filter(|c| c.status == "FAIL").count();
5908    let warns = checks.iter().filter(|c| c.status == "WARN").count();
5909
5910    if as_json {
5911        println!(
5912            "{}",
5913            serde_json::to_string(&json!({
5914                "checks": checks,
5915                "fail_count": fails,
5916                "warn_count": warns,
5917                "ok": fails == 0,
5918            }))?
5919        );
5920    } else {
5921        println!("wire doctor — {} checks", checks.len());
5922        for c in &checks {
5923            let bullet = match c.status.as_str() {
5924                "PASS" => "✓",
5925                "WARN" => "!",
5926                "FAIL" => "✗",
5927                _ => "?",
5928            };
5929            println!("  {bullet} [{}] {}: {}", c.status, c.id, c.detail);
5930            if let Some(fix) = &c.fix {
5931                println!("      fix: {fix}");
5932            }
5933        }
5934        println!();
5935        if fails == 0 && warns == 0 {
5936            println!("ALL GREEN");
5937        } else {
5938            println!("{fails} FAIL, {warns} WARN");
5939        }
5940    }
5941
5942    if fails > 0 {
5943        std::process::exit(1);
5944    }
5945    Ok(())
5946}
5947
5948/// Check: daemon running, exactly one instance, no orphans.
5949///
5950/// Today's debug surfaced PID 54017 (old-binary wire daemon running for 4
5951/// days, advancing cursor without pinning). `wire status` lied about it.
5952/// `wire doctor` must catch THIS class: multiple daemons running, OR
5953/// pid-file claims daemon down while a process is actually up.
5954fn check_daemon_health() -> DoctorCheck {
5955    // v0.5.13 (issue #2 bug A): doctor PASSed on orphan-only state while
5956    // `wire status` reported DOWN, disagreeing for 25 min. v0.5.19 (#2
5957    // hardening): every surface routes through ensure_up::daemon_liveness
5958    // so they share one view of the world. No more parallel liveness
5959    // logic to drift out of sync.
5960    let snap = crate::ensure_up::daemon_liveness();
5961    let pgrep_pids = &snap.pgrep_pids;
5962    let pidfile_pid = snap.pidfile_pid;
5963    let pidfile_alive = snap.pidfile_alive;
5964    let orphan_pids = &snap.orphan_pids;
5965
5966    let fmt_pids = |xs: &[u32]| -> String {
5967        xs.iter()
5968            .map(|p| p.to_string())
5969            .collect::<Vec<_>>()
5970            .join(", ")
5971    };
5972
5973    match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
5974        (0, _, _) => DoctorCheck::fail(
5975            "daemon",
5976            "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
5977            "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
5978        ),
5979        // Single daemon AND it matches the pidfile → healthy.
5980        (1, true, true) => DoctorCheck::pass(
5981            "daemon",
5982            format!(
5983                "one daemon running (pid {}, matches pidfile)",
5984                pgrep_pids[0]
5985            ),
5986        ),
5987        // Pidfile is alive but pgrep ALSO sees orphan processes.
5988        (n, true, false) => DoctorCheck::fail(
5989            "daemon",
5990            format!(
5991                "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
5992                 The orphans race the relay cursor — they advance past events your current binary can't process. \
5993                 (Issue #2 exact class.)",
5994                fmt_pids(&pgrep_pids),
5995                pidfile_pid.unwrap(),
5996                fmt_pids(&orphan_pids),
5997            ),
5998            "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
5999        ),
6000        // Pidfile is dead but processes ARE running → all are orphans.
6001        (n, false, _) => DoctorCheck::fail(
6002            "daemon",
6003            format!(
6004                "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
6005                 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
6006                 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
6007                fmt_pids(&pgrep_pids),
6008                match pidfile_pid {
6009                    Some(p) => format!("claims pid {p} which is dead"),
6010                    None => "is missing".to_string(),
6011                },
6012            ),
6013            "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
6014        ),
6015        // Multiple daemons all matching … impossible by construction; fall back to warn.
6016        (n, true, true) => DoctorCheck::warn(
6017            "daemon",
6018            format!(
6019                "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
6020                fmt_pids(&pgrep_pids)
6021            ),
6022            "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
6023        ),
6024    }
6025}
6026
6027/// Check: structured pidfile matches running daemon. Spark's P0.4 5th
6028/// check. Surfaces version mismatch (daemon running old binary text in
6029/// memory under a current symlink — today's exact bug class), schema
6030/// drift (future format bumps), and identity contamination (daemon's
6031/// recorded DID doesn't match this box's configured DID).
6032///
6033/// v0.5.19 (#2 hardening): also surfaces stale pidfiles — a well-formed
6034/// JSON pid record whose recorded `pid` is no longer a live OS process.
6035/// Pre-hardening this check PASSed in that state (it only validated
6036/// content, not liveness), letting `wire status: DOWN` and
6037/// `wire doctor: PASS` disagree for 25 min in incident #2.
6038fn check_daemon_pid_consistency() -> DoctorCheck {
6039    let snap = crate::ensure_up::daemon_liveness();
6040    match &snap.record {
6041        crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
6042            "daemon_pid_consistency",
6043            "no daemon.pid yet — fresh box or daemon never started",
6044        ),
6045        crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
6046            "daemon_pid_consistency",
6047            format!("daemon.pid is corrupt: {reason}"),
6048            "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
6049        ),
6050        crate::ensure_up::PidRecord::LegacyInt(pid) => {
6051            // Legacy pidfile: still surface liveness so a dead legacy pid
6052            // doesn't quietly PASS this check while status says DOWN.
6053            let pid = *pid;
6054            if !crate::ensure_up::pid_is_alive(pid) {
6055                return DoctorCheck::warn(
6056                    "daemon_pid_consistency",
6057                    format!(
6058                        "daemon.pid (legacy-int) points at pid {pid} which is not running. \
6059                         Stale pidfile from a crashed pre-0.5.11 daemon. \
6060                         (Issue #2: this surface used to PASS while `wire status` said DOWN.)"
6061                    ),
6062                    "`wire upgrade` (kills any orphan + spawns a fresh daemon with JSON pidfile)",
6063                );
6064            }
6065            DoctorCheck::warn(
6066                "daemon_pid_consistency",
6067                format!(
6068                    "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
6069                     Daemon was started by a pre-0.5.11 binary."
6070                ),
6071                "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
6072            )
6073        }
6074        crate::ensure_up::PidRecord::Json(d) => {
6075            // v0.5.19 liveness gate: if the recorded pid is dead, the
6076            // pidfile is stale and the rest of the content drift checks
6077            // are moot — `wire upgrade` is the answer regardless.
6078            if !snap.pidfile_alive {
6079                return DoctorCheck::warn(
6080                    "daemon_pid_consistency",
6081                    format!(
6082                        "daemon.pid records pid {pid} (v{version}) but that process is not running — \
6083                         pidfile is stale. `wire status` will report DOWN, but pre-v0.5.19 doctor \
6084                         silently PASSed this state and ignored any live orphan daemons (#2 root cause).",
6085                        pid = d.pid,
6086                        version = d.version,
6087                    ),
6088                    "`wire upgrade` to clean up the stale pidfile + spawn a fresh daemon \
6089                     (kills any orphan daemon advancing the cursor without coordination)",
6090                );
6091            }
6092            let mut issues: Vec<String> = Vec::new();
6093            if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
6094                issues.push(format!(
6095                    "schema={} (expected {})",
6096                    d.schema,
6097                    crate::ensure_up::DAEMON_PID_SCHEMA
6098                ));
6099            }
6100            let cli_version = env!("CARGO_PKG_VERSION");
6101            if d.version != cli_version {
6102                issues.push(format!(
6103                    "version daemon={} cli={cli_version}",
6104                    d.version
6105                ));
6106            }
6107            if !std::path::Path::new(&d.bin_path).exists() {
6108                issues.push(format!("bin_path {} missing on disk", d.bin_path));
6109            }
6110            // Cross-check DID + relay against current config (best-effort).
6111            if let Ok(card) = config::read_agent_card()
6112                && let Some(current_did) = card.get("did").and_then(Value::as_str)
6113                && let Some(recorded_did) = &d.did
6114                && recorded_did != current_did
6115            {
6116                issues.push(format!(
6117                    "did daemon={recorded_did} config={current_did} — identity drift"
6118                ));
6119            }
6120            if let Ok(state) = config::read_relay_state()
6121                && let Some(current_relay) = state
6122                    .get("self")
6123                    .and_then(|s| s.get("relay_url"))
6124                    .and_then(Value::as_str)
6125                && let Some(recorded_relay) = &d.relay_url
6126                && recorded_relay != current_relay
6127            {
6128                issues.push(format!(
6129                    "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
6130                ));
6131            }
6132            if issues.is_empty() {
6133                DoctorCheck::pass(
6134                    "daemon_pid_consistency",
6135                    format!(
6136                        "daemon v{} bound to {} as {}",
6137                        d.version,
6138                        d.relay_url.as_deref().unwrap_or("?"),
6139                        d.did.as_deref().unwrap_or("?")
6140                    ),
6141                )
6142            } else {
6143                DoctorCheck::warn(
6144                    "daemon_pid_consistency",
6145                    format!("daemon pidfile drift: {}", issues.join("; ")),
6146                    "`wire upgrade` to atomically restart daemon with current config".to_string(),
6147                )
6148            }
6149        }
6150    }
6151}
6152
6153/// Check: bound relay's /healthz returns 200.
6154fn check_relay_reachable() -> DoctorCheck {
6155    let state = match config::read_relay_state() {
6156        Ok(s) => s,
6157        Err(e) => return DoctorCheck::fail(
6158            "relay",
6159            format!("could not read relay state: {e}"),
6160            "run `wire up <handle>@<relay>` to bootstrap",
6161        ),
6162    };
6163    let url = state
6164        .get("self")
6165        .and_then(|s| s.get("relay_url"))
6166        .and_then(Value::as_str)
6167        .unwrap_or("");
6168    if url.is_empty() {
6169        return DoctorCheck::warn(
6170            "relay",
6171            "no relay bound — wire send/pull will not work",
6172            "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
6173        );
6174    }
6175    let client = crate::relay_client::RelayClient::new(url);
6176    match client.check_healthz() {
6177        Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
6178        Err(e) => DoctorCheck::fail(
6179            "relay",
6180            format!("{url} unreachable: {e}"),
6181            format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
6182        ),
6183    }
6184}
6185
6186/// Check: count recent entries in pair-rejected.jsonl (P0.2 output). Every
6187/// entry there is a silent failure that, pre-0.5.11, would have left the
6188/// operator wondering why pairing didn't complete.
6189fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
6190    let path = match config::state_dir() {
6191        Ok(d) => d.join("pair-rejected.jsonl"),
6192        Err(e) => return DoctorCheck::warn(
6193            "pair_rejections",
6194            format!("could not resolve state dir: {e}"),
6195            "set WIRE_HOME or fix XDG_STATE_HOME",
6196        ),
6197    };
6198    if !path.exists() {
6199        return DoctorCheck::pass(
6200            "pair_rejections",
6201            "no pair-rejected.jsonl — no recorded pair failures",
6202        );
6203    }
6204    let body = match std::fs::read_to_string(&path) {
6205        Ok(b) => b,
6206        Err(e) => return DoctorCheck::warn(
6207            "pair_rejections",
6208            format!("could not read {path:?}: {e}"),
6209            "check file permissions",
6210        ),
6211    };
6212    let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
6213    if lines.is_empty() {
6214        return DoctorCheck::pass(
6215            "pair_rejections",
6216            "pair-rejected.jsonl present but empty",
6217        );
6218    }
6219    let total = lines.len();
6220    let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
6221    let mut summary: Vec<String> = Vec::new();
6222    for line in &recent {
6223        if let Ok(rec) = serde_json::from_str::<Value>(line) {
6224            let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
6225            let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
6226            summary.push(format!("{peer}/{code}"));
6227        }
6228    }
6229    DoctorCheck::warn(
6230        "pair_rejections",
6231        format!(
6232            "{total} pair failures recorded. recent: [{}]",
6233            summary.join(", ")
6234        ),
6235        format!(
6236            "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
6237        ),
6238    )
6239}
6240
6241/// Check: cursor isn't stuck. We can't tell without polling — but we can
6242/// report the current cursor position so operators see if it changes.
6243/// Real "stuck" detection needs two pulls separated in time; defer that
6244/// behaviour to a `wire doctor --watch` mode.
6245fn check_cursor_progress() -> DoctorCheck {
6246    let state = match config::read_relay_state() {
6247        Ok(s) => s,
6248        Err(e) => return DoctorCheck::warn(
6249            "cursor",
6250            format!("could not read relay state: {e}"),
6251            "check ~/Library/Application Support/wire/relay.json",
6252        ),
6253    };
6254    let cursor = state
6255        .get("self")
6256        .and_then(|s| s.get("last_pulled_event_id"))
6257        .and_then(Value::as_str)
6258        .map(|s| s.chars().take(16).collect::<String>())
6259        .unwrap_or_else(|| "<none>".to_string());
6260    DoctorCheck::pass(
6261        "cursor",
6262        format!(
6263            "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
6264        ),
6265    )
6266}
6267
6268#[cfg(test)]
6269mod doctor_tests {
6270    use super::*;
6271
6272    #[test]
6273    fn doctor_check_constructors_set_status_correctly() {
6274        // Silent-fail-prevention rule: pass/warn/fail must be visibly
6275        // distinguishable to operators. If any constructor lets the wrong
6276        // status through, `wire doctor` lies and we're back to today's
6277        // 30-minute debug.
6278        let p = DoctorCheck::pass("x", "ok");
6279        assert_eq!(p.status, "PASS");
6280        assert_eq!(p.fix, None);
6281
6282        let w = DoctorCheck::warn("x", "watch out", "do this");
6283        assert_eq!(w.status, "WARN");
6284        assert_eq!(w.fix, Some("do this".to_string()));
6285
6286        let f = DoctorCheck::fail("x", "broken", "fix it");
6287        assert_eq!(f.status, "FAIL");
6288        assert_eq!(f.fix, Some("fix it".to_string()));
6289    }
6290
6291    #[test]
6292    fn check_pair_rejections_no_file_is_pass() {
6293        // Fresh-box case: no pair-rejected.jsonl yet. Must NOT report this
6294        // as a problem.
6295        config::test_support::with_temp_home(|| {
6296            config::ensure_dirs().unwrap();
6297            let c = check_pair_rejections(5);
6298            assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
6299        });
6300    }
6301
6302    #[test]
6303    fn check_pair_rejections_with_entries_warns() {
6304        // Existence of rejections is itself a signal — even if each entry
6305        // is a "known good failure," the operator wants to know they
6306        // happened.
6307        config::test_support::with_temp_home(|| {
6308            config::ensure_dirs().unwrap();
6309            crate::pair_invite::record_pair_rejection(
6310                "willard",
6311                "pair_drop_ack_send_failed",
6312                "POST 502",
6313            );
6314            let c = check_pair_rejections(5);
6315            assert_eq!(c.status, "WARN");
6316            assert!(c.detail.contains("1 pair failures"));
6317            assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
6318        });
6319    }
6320}
6321
6322// ---------- up megacommand (full bootstrap) ----------
6323
6324/// `wire up <nick@relay-host>` — single command from fresh box to ready-to-
6325/// pair. Composes the steps that today's onboarding walks operators through
6326/// one by one (init / bind-relay / claim / background daemon / arm monitor
6327/// recipe). Idempotent: every step checks current state and skips if done.
6328///
6329/// Argument parsing accepts:
6330///   - `<nick>@<relay-host>` — explicit relay
6331///   - `<nick>`              — defaults to wireup.net (the configured public
6332///                             relay)
6333fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
6334    let (nick, relay_url) = match handle_arg.split_once('@') {
6335        Some((n, host)) => {
6336            let url = if host.starts_with("http://") || host.starts_with("https://") {
6337                host.to_string()
6338            } else {
6339                format!("https://{host}")
6340            };
6341            (n.to_string(), url)
6342        }
6343        None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
6344    };
6345
6346    let mut report: Vec<(String, String)> = Vec::new();
6347    let mut step = |stage: &str, detail: String| {
6348        report.push((stage.to_string(), detail.clone()));
6349        if !as_json {
6350            eprintln!("wire up: {stage} — {detail}");
6351        }
6352    };
6353
6354    // 1. init (or verify existing identity matches the requested nick).
6355    if config::is_initialized()? {
6356        let card = config::read_agent_card()?;
6357        let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
6358        let existing_handle =
6359            crate::agent_card::display_handle_from_did(existing_did).to_string();
6360        if existing_handle != nick {
6361            bail!(
6362                "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
6363                 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
6364                 delete `{:?}` to start fresh.",
6365                config::config_dir()?
6366            );
6367        }
6368        step("init", format!("already initialized as {existing_handle}"));
6369    } else {
6370        cmd_init(&nick, name, Some(&relay_url), /* as_json */ false)?;
6371        step("init", format!("created identity {nick} bound to {relay_url}"));
6372    }
6373
6374    // 2. Ensure relay binding matches. cmd_init with --relay binds it; if
6375    // already initialized we may need to bind to the requested relay
6376    // separately (operator switched relays).
6377    let relay_state = config::read_relay_state()?;
6378    let bound_relay = relay_state
6379        .get("self")
6380        .and_then(|s| s.get("relay_url"))
6381        .and_then(Value::as_str)
6382        .unwrap_or("")
6383        .to_string();
6384    if bound_relay.is_empty() {
6385        // Identity exists but never bound to a relay — bind now.
6386        // Fresh box (no pinned peers yet) — migrate_pinned irrelevant.
6387        // Pass `false` so the safety check kicks in if state was non-empty.
6388        cmd_bind_relay(&relay_url, /* migrate_pinned */ false, /* as_json */ false)?;
6389        step("bind-relay", format!("bound to {relay_url}"));
6390    } else if bound_relay != relay_url {
6391        step(
6392            "bind-relay",
6393            format!(
6394                "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
6395                 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
6396            ),
6397        );
6398    } else {
6399        step("bind-relay", format!("already bound to {bound_relay}"));
6400    }
6401
6402    // 3. Claim nick on the relay's handle directory. Idempotent — same-DID
6403    // re-claims are accepted by the relay.
6404    match cmd_claim(&nick, Some(&relay_url), None, /* hidden */ false, /* as_json */ false) {
6405        Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
6406        Err(e) => step(
6407            "claim",
6408            format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
6409        ),
6410    }
6411
6412    // 4. Background daemon — must be running for pull/push/ack to flow.
6413    match crate::ensure_up::ensure_daemon_running() {
6414        Ok(true) => step("daemon", "started fresh background daemon".to_string()),
6415        Ok(false) => step("daemon", "already running".to_string()),
6416        Err(e) => step(
6417            "daemon",
6418            format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
6419        ),
6420    }
6421
6422    // 5. Final summary — point operator at the next commands.
6423    let summary = format!(
6424        "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
6425         `wire monitor` to watch incoming events."
6426    );
6427    step("ready", summary.clone());
6428
6429    if as_json {
6430        let steps_json: Vec<_> = report
6431            .iter()
6432            .map(|(k, v)| json!({"stage": k, "detail": v}))
6433            .collect();
6434        println!(
6435            "{}",
6436            serde_json::to_string(&json!({
6437                "nick": nick,
6438                "relay": relay_url,
6439                "steps": steps_json,
6440            }))?
6441        );
6442    }
6443    Ok(())
6444}
6445
6446/// Strip http:// or https:// prefix for display in `wire up` step output.
6447fn strip_proto(url: &str) -> String {
6448    url.trim_start_matches("https://")
6449        .trim_start_matches("http://")
6450        .to_string()
6451}
6452
6453// ---------- pair megacommand (zero-paste handle-based) ----------
6454
6455/// `wire pair <nick@domain>` zero-shot. Dispatched from Command::Pair when
6456/// the handle is in `nick@domain` form. Wraps:
6457///
6458///   1. cmd_add — resolve, pin, drop intro
6459///   2. Wait up to `timeout_secs` for the peer's `pair_drop_ack` to arrive
6460///      (signalled by `peers.<handle>.slot_token` populating in relay state)
6461///   3. Verify bilateral pin: trust contains peer + relay state has token
6462///   4. Print final state — both sides VERIFIED + can `wire send`
6463///
6464/// On timeout: hard-errors with the specific stuck step so the operator
6465/// knows which side to chase. No silent partial success.
6466fn cmd_pair_megacommand(
6467    handle_arg: &str,
6468    relay_override: Option<&str>,
6469    timeout_secs: u64,
6470    _as_json: bool,
6471) -> Result<()> {
6472    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
6473    let peer_handle = parsed.nick.clone();
6474
6475    eprintln!("wire pair: resolving {handle_arg}...");
6476    cmd_add(handle_arg, relay_override, /* as_json */ false)?;
6477
6478    eprintln!(
6479        "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
6480         to ack (their daemon must be running + pulling)..."
6481    );
6482
6483    // Trigger an immediate daemon-style pull so we don't wait the full daemon
6484    // interval. Best-effort — if it fails, we still fall through to the
6485    // polling loop.
6486    let _ = run_sync_pull();
6487
6488    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
6489    let poll_interval = std::time::Duration::from_millis(500);
6490
6491    loop {
6492        // Drain anything new from the relay (e.g. our pair_drop_ack landing).
6493        let _ = run_sync_pull();
6494        let relay_state = config::read_relay_state()?;
6495        let peer_entry = relay_state
6496            .get("peers")
6497            .and_then(|p| p.get(&peer_handle))
6498            .cloned();
6499        let token = peer_entry
6500            .as_ref()
6501            .and_then(|e| e.get("slot_token"))
6502            .and_then(Value::as_str)
6503            .unwrap_or("");
6504
6505        if !token.is_empty() {
6506            // Bilateral pin complete — we have their slot_token, we can send.
6507            let trust = config::read_trust()?;
6508            let pinned_in_trust = trust
6509                .get("agents")
6510                .and_then(|a| a.get(&peer_handle))
6511                .is_some();
6512            println!(
6513                "wire pair: paired with {peer_handle}.\n  trust: {}  bilateral: yes (slot_token recorded)\n  next: `wire send {peer_handle} \"<msg>\"`",
6514                if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
6515            );
6516            return Ok(());
6517        }
6518
6519        if std::time::Instant::now() >= deadline {
6520            // Timeout — surface the EXACT stuck step. Likely culprits:
6521            //   - peer daemon not running on their box
6522            //   - peer's relay slot is offline
6523            //   - their daemon is on an older binary that doesn't know
6524            //     pair_drop kind=1100 (the P0.1 class — now visible via
6525            //     wire pull --json on their side as a blocking rejection)
6526            bail!(
6527                "wire pair: timed out after {timeout_secs}s. \
6528                 peer {peer_handle} never sent pair_drop_ack. \
6529                 likely causes: (a) their daemon is down — ask them to run \
6530                 `wire status` and `wire daemon &`; (b) their binary is older \
6531                 than 0.5.x and doesn't understand pair_drop events — ask \
6532                 them to `wire upgrade`; (c) network / relay blip — re-run \
6533                 `wire pair {handle_arg}` to retry."
6534            );
6535        }
6536
6537        std::thread::sleep(poll_interval);
6538    }
6539}
6540
6541fn cmd_claim(
6542    nick: &str,
6543    relay_override: Option<&str>,
6544    public_url: Option<&str>,
6545    hidden: bool,
6546    as_json: bool,
6547) -> Result<()> {
6548    if !crate::pair_profile::is_valid_nick(nick) {
6549        bail!(
6550            "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
6551        );
6552    }
6553    // `wire claim` is the one-step bootstrap: auto-init + auto-allocate slot
6554    // + claim handle. Operator should never have to run init/bind-relay first.
6555    let (_did, relay_url, slot_id, slot_token) =
6556        crate::pair_invite::ensure_self_with_relay(relay_override)?;
6557    let card = config::read_agent_card()?;
6558
6559    let client = crate::relay_client::RelayClient::new(&relay_url);
6560    // v0.5.19 (#9.1): forward the `discoverable` flag. None for default
6561    // (back-compat); Some(false) for `--hidden`. Relays older than
6562    // v0.5.19 ignore the field, so this is safe to always send.
6563    let discoverable = if hidden { Some(false) } else { None };
6564    let resp = client.handle_claim_v2(
6565        nick,
6566        &slot_id,
6567        &slot_token,
6568        public_url,
6569        &card,
6570        discoverable,
6571    )?;
6572
6573    if as_json {
6574        println!(
6575            "{}",
6576            serde_json::to_string(&json!({
6577                "nick": nick,
6578                "relay": relay_url,
6579                "response": resp,
6580            }))?
6581        );
6582    } else {
6583        // Best-effort: derive the public domain from the relay URL. If
6584        // operator passed --public-url that's the canonical address; else
6585        // the relay URL itself. Falls back to a placeholder if both miss.
6586        let domain = public_url
6587            .unwrap_or(&relay_url)
6588            .trim_start_matches("https://")
6589            .trim_start_matches("http://")
6590            .trim_end_matches('/')
6591            .split('/')
6592            .next()
6593            .unwrap_or("<this-relay-domain>")
6594            .to_string();
6595        println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
6596        println!("verify with: wire whois {nick}@{domain}");
6597    }
6598    Ok(())
6599}
6600
6601fn cmd_profile(action: ProfileAction) -> Result<()> {
6602    match action {
6603        ProfileAction::Set { field, value, json } => {
6604            // Try parsing the value as JSON; if that fails, treat it as a
6605            // bare string. Lets operators pass either `42` or `"hello"` or
6606            // `["rust","late-night"]` without quoting hell.
6607            let parsed: Value =
6608                serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
6609            let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
6610            if json {
6611                println!(
6612                    "{}",
6613                    serde_json::to_string(&json!({
6614                        "field": field,
6615                        "profile": new_profile,
6616                    }))?
6617                );
6618            } else {
6619                println!("profile.{field} set");
6620            }
6621        }
6622        ProfileAction::Get { json } => return cmd_whois(None, json, None),
6623        ProfileAction::Clear { field, json } => {
6624            let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
6625            if json {
6626                println!(
6627                    "{}",
6628                    serde_json::to_string(&json!({
6629                        "field": field,
6630                        "cleared": true,
6631                        "profile": new_profile,
6632                    }))?
6633                );
6634            } else {
6635                println!("profile.{field} cleared");
6636            }
6637        }
6638    }
6639    Ok(())
6640}
6641
6642// ---------- setup — one-shot MCP host registration ----------
6643
6644fn cmd_setup(apply: bool) -> Result<()> {
6645    use std::path::PathBuf;
6646
6647    let entry = json!({"command": "wire", "args": ["mcp"]});
6648    let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
6649
6650    // Detect probable MCP host config locations. Cross-platform — we only
6651    // touch the file if it already exists OR --apply was passed.
6652    let mut targets: Vec<(&str, PathBuf)> = Vec::new();
6653    if let Some(home) = dirs::home_dir() {
6654        // Claude Code (CLI) — real config path is ~/.claude.json on all platforms (Linux/macOS/Windows).
6655        // The mcpServers map lives at the top level of that file.
6656        targets.push(("Claude Code", home.join(".claude.json")));
6657        // Legacy / alternate Claude Code XDG path — still try, harmless if absent.
6658        targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
6659        // Claude Desktop macOS
6660        #[cfg(target_os = "macos")]
6661        targets.push((
6662            "Claude Desktop (macOS)",
6663            home.join("Library/Application Support/Claude/claude_desktop_config.json"),
6664        ));
6665        // Claude Desktop Windows
6666        #[cfg(target_os = "windows")]
6667        if let Ok(appdata) = std::env::var("APPDATA") {
6668            targets.push((
6669                "Claude Desktop (Windows)",
6670                PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
6671            ));
6672        }
6673        // Cursor
6674        targets.push(("Cursor", home.join(".cursor/mcp.json")));
6675    }
6676    // Project-local — works for several MCP-aware tools
6677    targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
6678
6679    println!("wire setup\n");
6680    println!("MCP server snippet (add this to your client's mcpServers):");
6681    println!();
6682    println!("{entry_pretty}");
6683    println!();
6684
6685    if !apply {
6686        println!("Probable MCP host config locations on this machine:");
6687        for (name, path) in &targets {
6688            let marker = if path.exists() {
6689                "✓ found"
6690            } else {
6691                "  (would create)"
6692            };
6693            println!("  {marker:14}  {name}: {}", path.display());
6694        }
6695        println!();
6696        println!("Run `wire setup --apply` to merge wire into each config above.");
6697        println!(
6698            "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
6699        );
6700        return Ok(());
6701    }
6702
6703    let mut modified: Vec<String> = Vec::new();
6704    let mut skipped: Vec<String> = Vec::new();
6705    for (name, path) in &targets {
6706        match upsert_mcp_entry(path, "wire", &entry) {
6707            Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
6708            Ok(false) => skipped.push(format!("  {name} ({}): already configured", path.display())),
6709            Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
6710        }
6711    }
6712    if !modified.is_empty() {
6713        println!("Modified:");
6714        for line in &modified {
6715            println!("  {line}");
6716        }
6717        println!();
6718        println!("Restart the app(s) above to load wire MCP.");
6719    }
6720    if !skipped.is_empty() {
6721        println!();
6722        println!("Skipped:");
6723        for line in &skipped {
6724            println!("  {line}");
6725        }
6726    }
6727    Ok(())
6728}
6729
6730/// Idempotent merge of an `mcpServers.<name>` entry into a JSON config file.
6731/// Returns Ok(true) if file was changed, Ok(false) if entry already matched.
6732fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
6733    let mut cfg: Value = if path.exists() {
6734        let body = std::fs::read_to_string(path).context("reading config")?;
6735        serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
6736    } else {
6737        json!({})
6738    };
6739    if !cfg.is_object() {
6740        cfg = json!({});
6741    }
6742    let root = cfg.as_object_mut().unwrap();
6743    let servers = root
6744        .entry("mcpServers".to_string())
6745        .or_insert_with(|| json!({}));
6746    if !servers.is_object() {
6747        *servers = json!({});
6748    }
6749    let map = servers.as_object_mut().unwrap();
6750    if map.get(server_name) == Some(entry) {
6751        return Ok(false);
6752    }
6753    map.insert(server_name.to_string(), entry.clone());
6754    if let Some(parent) = path.parent()
6755        && !parent.as_os_str().is_empty()
6756    {
6757        std::fs::create_dir_all(parent).context("creating parent dir")?;
6758    }
6759    let out = serde_json::to_string_pretty(&cfg)? + "\n";
6760    std::fs::write(path, out).context("writing config")?;
6761    Ok(true)
6762}
6763
6764// ---------- reactor — event-handler dispatch loop ----------
6765
6766#[allow(clippy::too_many_arguments)]
6767fn cmd_reactor(
6768    on_event: &str,
6769    peer_filter: Option<&str>,
6770    kind_filter: Option<&str>,
6771    verified_only: bool,
6772    interval_secs: u64,
6773    once: bool,
6774    dry_run: bool,
6775    max_per_minute: u32,
6776    max_chain_depth: u32,
6777) -> Result<()> {
6778    use crate::inbox_watch::{InboxEvent, InboxWatcher};
6779    use std::collections::{HashMap, HashSet, VecDeque};
6780    use std::io::Write;
6781    use std::process::{Command, Stdio};
6782    use std::time::{Duration, Instant};
6783
6784    let cursor_path = config::state_dir()?.join("reactor.cursor");
6785    // event_ids THIS reactor's handler has caused to be sent (via wire send).
6786    // Used by chain-depth check — an incoming `(re:X)` where X is in this set
6787    // means peer is replying to something we just said → don't reply back.
6788    //
6789    // Persisted across restarts so a reactor that crashes mid-conversation
6790    // doesn't re-enter the loop. Reads on startup, writes after each
6791    // outbox-grow detection. Capped at 500 entries (LRU-ish — old entries
6792    // dropped from front of file).
6793    let emitted_path = config::state_dir()?.join("reactor-emitted.log");
6794    let mut emitted_ids: HashSet<String> = HashSet::new();
6795    if emitted_path.exists()
6796        && let Ok(body) = std::fs::read_to_string(&emitted_path)
6797    {
6798        for line in body.lines() {
6799            let t = line.trim();
6800            if !t.is_empty() {
6801                emitted_ids.insert(t.to_string());
6802            }
6803        }
6804    }
6805    // Outbox file paths the reactor watches for new sent-event_ids.
6806    let outbox_dir = config::outbox_dir()?;
6807    // (peer → file size we've already scanned). Lets us notice new outbox
6808    // appends without re-reading the whole file each sweep.
6809    let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
6810
6811    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6812
6813    let kind_num: Option<u32> = match kind_filter {
6814        Some(k) => Some(parse_kind(k)?),
6815        None => None,
6816    };
6817
6818    // Per-peer sliding window of dispatch instants for rate-limit check.
6819    let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
6820
6821    let dispatch = |ev: &InboxEvent,
6822                    peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
6823                    emitted_ids: &HashSet<String>|
6824     -> Result<bool> {
6825        if let Some(p) = peer_filter
6826            && ev.peer != p
6827        {
6828            return Ok(false);
6829        }
6830        if verified_only && !ev.verified {
6831            return Ok(false);
6832        }
6833        if let Some(want) = kind_num {
6834            let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
6835            if ev_kind != Some(want) {
6836                return Ok(false);
6837            }
6838        }
6839
6840        // Chain-depth check: if the body contains `(re:<event_id>)` and that
6841        // event_id is in our emitted set, this is a reply to one of our
6842        // replies → loop suspected, skip.
6843        if max_chain_depth > 0 {
6844            let body_str = match &ev.raw["body"] {
6845                Value::String(s) => s.clone(),
6846                other => serde_json::to_string(other).unwrap_or_default(),
6847            };
6848            if let Some(referenced) = parse_re_marker(&body_str) {
6849                // Handler scripts usually truncate event_id (e.g. ${ID:0:12}).
6850                // Match emitted set by prefix to catch both full + truncated.
6851                let matched = emitted_ids.contains(&referenced)
6852                    || emitted_ids.iter().any(|full| full.starts_with(&referenced));
6853                if matched {
6854                    eprintln!(
6855                        "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
6856                        ev.event_id, ev.peer, referenced
6857                    );
6858                    return Ok(false);
6859                }
6860            }
6861        }
6862
6863        // Per-peer rate-limit check (sliding 60s window).
6864        if max_per_minute > 0 {
6865            let now = Instant::now();
6866            let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
6867            while let Some(&front) = win.front() {
6868                if now.duration_since(front) > Duration::from_secs(60) {
6869                    win.pop_front();
6870                } else {
6871                    break;
6872                }
6873            }
6874            if win.len() as u32 >= max_per_minute {
6875                eprintln!(
6876                    "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
6877                    ev.event_id, ev.peer, max_per_minute
6878                );
6879                return Ok(false);
6880            }
6881            win.push_back(now);
6882        }
6883
6884        if dry_run {
6885            println!("{}", serde_json::to_string(&ev.raw)?);
6886            return Ok(true);
6887        }
6888
6889        let mut child = Command::new("sh")
6890            .arg("-c")
6891            .arg(on_event)
6892            .stdin(Stdio::piped())
6893            .stdout(Stdio::inherit())
6894            .stderr(Stdio::inherit())
6895            .env("WIRE_EVENT_PEER", &ev.peer)
6896            .env("WIRE_EVENT_ID", &ev.event_id)
6897            .env("WIRE_EVENT_KIND", &ev.kind)
6898            .spawn()
6899            .with_context(|| format!("spawning reactor handler: {on_event}"))?;
6900        if let Some(mut stdin) = child.stdin.take() {
6901            let body = serde_json::to_vec(&ev.raw)?;
6902            let _ = stdin.write_all(&body);
6903            let _ = stdin.write_all(b"\n");
6904        }
6905        std::mem::drop(child);
6906        Ok(true)
6907    };
6908
6909    // Scan outbox files for newly-appended event_ids and add to emitted set.
6910    let scan_outbox = |emitted_ids: &mut HashSet<String>,
6911                       outbox_cursors: &mut HashMap<String, u64>|
6912     -> Result<usize> {
6913        if !outbox_dir.exists() {
6914            return Ok(0);
6915        }
6916        let mut added = 0;
6917        let mut new_ids: Vec<String> = Vec::new();
6918        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
6919            let path = entry.path();
6920            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
6921                continue;
6922            }
6923            let peer = match path.file_stem().and_then(|s| s.to_str()) {
6924                Some(s) => s.to_string(),
6925                None => continue,
6926            };
6927            let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
6928            let start = *outbox_cursors.get(&peer).unwrap_or(&0);
6929            if cur_len <= start {
6930                outbox_cursors.insert(peer, start);
6931                continue;
6932            }
6933            let body = std::fs::read_to_string(&path).unwrap_or_default();
6934            let tail = &body[start as usize..];
6935            for line in tail.lines() {
6936                if let Ok(v) = serde_json::from_str::<Value>(line)
6937                    && let Some(eid) = v.get("event_id").and_then(Value::as_str)
6938                    && emitted_ids.insert(eid.to_string())
6939                {
6940                    new_ids.push(eid.to_string());
6941                    added += 1;
6942                }
6943            }
6944            outbox_cursors.insert(peer, cur_len);
6945        }
6946        if !new_ids.is_empty() {
6947            // Append new ids to disk, cap on-disk file at 500 entries.
6948            let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
6949            if all.len() > 500 {
6950                all.sort();
6951                let drop_n = all.len() - 500;
6952                let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
6953                emitted_ids.retain(|x| !dropped.contains(x));
6954                all = emitted_ids.iter().cloned().collect();
6955            }
6956            let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
6957        }
6958        Ok(added)
6959    };
6960
6961    let sweep = |watcher: &mut InboxWatcher,
6962                 emitted_ids: &mut HashSet<String>,
6963                 outbox_cursors: &mut HashMap<String, u64>,
6964                 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
6965     -> Result<usize> {
6966        // Pick up any event_ids we sent since last sweep.
6967        let _ = scan_outbox(emitted_ids, outbox_cursors);
6968
6969        let events = watcher.poll()?;
6970        let mut fired = 0usize;
6971        for ev in &events {
6972            match dispatch(ev, peer_dispatch_log, emitted_ids) {
6973                Ok(true) => fired += 1,
6974                Ok(false) => {}
6975                Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
6976            }
6977        }
6978        watcher.save_cursors(&cursor_path)?;
6979        Ok(fired)
6980    };
6981
6982    if once {
6983        sweep(
6984            &mut watcher,
6985            &mut emitted_ids,
6986            &mut outbox_cursors,
6987            &mut peer_dispatch_log,
6988        )?;
6989        return Ok(());
6990    }
6991    let interval = std::time::Duration::from_secs(interval_secs.max(1));
6992    loop {
6993        if let Err(e) = sweep(
6994            &mut watcher,
6995            &mut emitted_ids,
6996            &mut outbox_cursors,
6997            &mut peer_dispatch_log,
6998        ) {
6999            eprintln!("wire reactor: sweep error: {e}");
7000        }
7001        std::thread::sleep(interval);
7002    }
7003}
7004
7005/// Parse `(re:<event_id>)` marker out of an event body. Returns the
7006/// referenced event_id (full or prefix) if present. Tolerates spaces.
7007fn parse_re_marker(body: &str) -> Option<String> {
7008    let needle = "(re:";
7009    let i = body.find(needle)?;
7010    let rest = &body[i + needle.len()..];
7011    let end = rest.find(')')?;
7012    let id = rest[..end].trim().to_string();
7013    if id.is_empty() {
7014        return None;
7015    }
7016    Some(id)
7017}
7018
7019// ---------- notify (Goal 2) ----------
7020
7021fn cmd_notify(
7022    interval_secs: u64,
7023    peer_filter: Option<&str>,
7024    once: bool,
7025    as_json: bool,
7026) -> Result<()> {
7027    use crate::inbox_watch::InboxWatcher;
7028    let cursor_path = config::state_dir()?.join("notify.cursor");
7029    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
7030
7031    let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
7032        let events = watcher.poll()?;
7033        for ev in events {
7034            if let Some(p) = peer_filter
7035                && ev.peer != p
7036            {
7037                continue;
7038            }
7039            if as_json {
7040                println!("{}", serde_json::to_string(&ev)?);
7041            } else {
7042                os_notify_inbox_event(&ev);
7043            }
7044        }
7045        watcher.save_cursors(&cursor_path)?;
7046        Ok(())
7047    };
7048
7049    if once {
7050        return sweep(&mut watcher);
7051    }
7052
7053    let interval = std::time::Duration::from_secs(interval_secs.max(1));
7054    loop {
7055        if let Err(e) = sweep(&mut watcher) {
7056            eprintln!("wire notify: sweep error: {e}");
7057        }
7058        std::thread::sleep(interval);
7059    }
7060}
7061
7062fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
7063    let title = if ev.verified {
7064        format!("wire ← {}", ev.peer)
7065    } else {
7066        format!("wire ← {} (UNVERIFIED)", ev.peer)
7067    };
7068    let body = format!("{}: {}", ev.kind, ev.body_preview);
7069    crate::os_notify::toast(&title, &body);
7070}
7071
7072#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
7073fn os_toast(title: &str, body: &str) {
7074    eprintln!("[wire notify] {title}\n  {body}");
7075}
7076
7077// Integration tests for the CLI live in `tests/cli.rs` (cargo's tests/ dir).