Skip to main content

wire/
cli.rs

1//! `wire` CLI surface.
2//!
3//! Every subcommand emits human-readable text by default and structured JSON
4//! when `--json` is passed. Stable JSON shape is part of the API contract —
5//! see `docs/AGENT_INTEGRATION.md`.
6//!
7//! Subcommand split:
8//!   - **agent-safe**: `whoami`, `peers`, `verify`, `send`, `tail` — pure
9//!     message-layer ops, no trust establishment.
10//!   - **trust-establishing**: `init`, `pair-host`, `pair-join`. The CLI
11//!     uses interactive `y/N` prompts here. The MCP equivalents
12//!     (`wire_init`, `wire_pair_initiate`, `wire_pair_join`, `wire_pair_check`,
13//!     `wire_pair_confirm`) preserve the human gate by requiring the user to
14//!     type the 6 SAS digits back into chat — see `docs/THREAT_MODEL.md` T10/T14.
15
16use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21    agent_card::{build_agent_card, sign_agent_card},
22    config,
23    signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24    trust::{add_self_to_trust, empty_trust},
25};
26
27/// Top-level CLI.
28#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31    #[command(subcommand)]
32    pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37    /// Generate a keypair, write self-card, and prepare to pair. (HUMAN-ONLY — DO NOT exec from agents.)
38    Init {
39        /// Short handle for this agent (becomes did:wire:<handle>).
40        handle: String,
41        /// Optional display name (defaults to capitalized handle).
42        #[arg(long)]
43        name: Option<String>,
44        /// Optional relay URL — if set, also allocates a relay slot in one step
45        /// (equivalent to running `wire init` then `wire bind-relay <url>`).
46        #[arg(long)]
47        relay: Option<String>,
48        /// Emit JSON.
49        #[arg(long)]
50        json: bool,
51    },
52    // (Old `Join` stub removed in iter 11 — superseded by `pair-join` with
53    // `join` alias. See PairJoin below.)
54    /// Print this agent's identity (DID, fingerprint, mailbox slot).
55    Whoami {
56        #[arg(long)]
57        json: bool,
58    },
59    /// List pinned peers with their tiers and capabilities.
60    Peers {
61        #[arg(long)]
62        json: bool,
63    },
64    /// Sign and queue an event to a peer.
65    ///
66    /// Forms (P0.S 0.5.11):
67    ///   wire send <peer> <body>              # kind defaults to "claim"
68    ///   wire send <peer> <kind> <body>       # explicit kind (back-compat)
69    ///   wire send <peer> -                   # body from stdin (kind=claim)
70    ///   wire send <peer> @/path/to/body.json # body from file
71    Send {
72        /// Peer handle (without `did:wire:` prefix).
73        peer: String,
74        /// When `<body>` is omitted, this is the event body (kind defaults
75        /// to `claim`). When both this and `<body>` are given, this is the
76        /// event kind (`decision`, `claim`, etc., or numeric kind id) and
77        /// the next positional is the body.
78        kind_or_body: String,
79        /// Event body — free-form text, `@/path/to/body.json` to load from
80        /// a file, or `-` to read from stdin. Optional; omit to use
81        /// `<kind_or_body>` as the body with kind=`claim`.
82        body: Option<String>,
83        /// Advisory deadline: duration (`30m`, `2h`, `1d`) or RFC3339 timestamp.
84        #[arg(long)]
85        deadline: Option<String>,
86        /// Emit JSON.
87        #[arg(long)]
88        json: bool,
89    },
90    /// Stream signed events from peers.
91    Tail {
92        /// Optional peer filter; if omitted, tails all peers.
93        peer: Option<String>,
94        /// Emit JSONL (one event per line).
95        #[arg(long)]
96        json: bool,
97        /// Maximum events to read before exiting (0 = stream until SIGINT).
98        #[arg(long, default_value_t = 0)]
99        limit: usize,
100    },
101    /// Live tail of new inbox events across all pinned peers — one line per
102    /// new event, handshake (pair_drop / pair_drop_ack / heartbeat) filtered
103    /// by default.
104    ///
105    /// Designed to be left running in an agent harness's stream-watcher
106    /// (Claude Code Monitor tool, etc.) so peer messages surface in the
107    /// session as they arrive, not on next manual `wire pull`.
108    ///
109    /// See docs/AGENT_INTEGRATION.md for the recommended Monitor invocation
110    /// template.
111    Monitor {
112        /// Only show events from this peer.
113        #[arg(long)]
114        peer: Option<String>,
115        /// Emit JSONL (one InboxEvent per line) for tooling consumption.
116        #[arg(long)]
117        json: bool,
118        /// Include handshake events (pair_drop, pair_drop_ack, heartbeat).
119        /// Default filters them out as noise.
120        #[arg(long)]
121        include_handshake: bool,
122        /// Poll interval in milliseconds. Lower = lower latency, higher CPU.
123        #[arg(long, default_value_t = 500)]
124        interval_ms: u64,
125        /// Replay last N events from history before going live (0 = none).
126        #[arg(long, default_value_t = 0)]
127        replay: usize,
128    },
129    /// Verify a signed event from a JSON file or stdin (`-`).
130    Verify {
131        /// Path to event JSON, or `-` for stdin.
132        path: String,
133        /// Emit JSON.
134        #[arg(long)]
135        json: bool,
136    },
137    /// Run the MCP (Model Context Protocol) server over stdio.
138    /// This is how Claude Desktop / Claude Code / Cursor / etc. expose
139    /// `wire_send`, `wire_tail`, etc. as native tools.
140    Mcp,
141    /// Run a relay server on this host.
142    RelayServer {
143        /// Bind address (e.g. `127.0.0.1:8770`).
144        #[arg(long, default_value = "127.0.0.1:8770")]
145        bind: String,
146    },
147    /// Allocate a slot on a relay; bind it to this agent's identity.
148    BindRelay {
149        /// Relay base URL, e.g. `http://127.0.0.1:8770`.
150        url: String,
151        #[arg(long)]
152        json: bool,
153    },
154    /// Manually pin a peer's relay slot. (Replaces SAS pairing for v0.1 bootstrap;
155    /// real `wire join` lands in the SPAKE2 iter.)
156    AddPeerSlot {
157        /// Peer handle (becomes did:wire:<handle>).
158        handle: String,
159        /// Peer's relay base URL.
160        url: String,
161        /// Peer's slot id.
162        slot_id: String,
163        /// Slot bearer token (shared between paired peers in v0.1).
164        slot_token: String,
165        #[arg(long)]
166        json: bool,
167    },
168    /// Drain outbox JSONL files to peers' relay slots.
169    Push {
170        /// Optional peer filter; default = all peers with outbox entries.
171        peer: Option<String>,
172        #[arg(long)]
173        json: bool,
174    },
175    /// Pull events from our relay slot, verify, write to inbox.
176    Pull {
177        #[arg(long)]
178        json: bool,
179    },
180    /// Print a summary of identity, relay binding, peers, inbox/outbox queue depth.
181    /// Useful as a single "where am I" check.
182    Status {
183        /// Inspect a paired peer's transport / attention / responder health.
184        #[arg(long)]
185        peer: Option<String>,
186        #[arg(long)]
187        json: bool,
188    },
189    /// Publish or inspect auto-responder health for this slot.
190    Responder {
191        #[command(subcommand)]
192        command: ResponderCommand,
193    },
194    /// Pin a peer's signed agent-card from a file. (Manual out-of-band pairing
195    /// — fallback path; the magic-wormhole flow is `pair-host` / `pair-join`.)
196    Pin {
197        /// Path to peer's signed agent-card JSON.
198        card_file: String,
199        #[arg(long)]
200        json: bool,
201    },
202    /// Allocate a NEW slot on the same relay and abandon the old one.
203    /// Sends a kind=1201 wire_close event to every paired peer over the OLD
204    /// slot announcing the new mailbox before swapping. After rotation,
205    /// peers must re-pair (or operator runs `add-peer-slot` with the new
206    /// coords) — auto-update via wire_close is a v0.2 daemon feature.
207    ///
208    /// Use case: a paired peer turned hostile (T11 in THREAT_MODEL.md —
209    /// abusive bearer-holder spamming your slot). Rotate → old slot is
210    /// orphaned → attacker's leverage gone. Operator pairs again with
211    /// peers they still want.
212    RotateSlot {
213        /// Skip the wire_close announcement to peers (faster but they won't know
214        /// where you went).
215        #[arg(long)]
216        no_announce: bool,
217        #[arg(long)]
218        json: bool,
219    },
220    /// Remove a peer from trust + relay state. Inbox/outbox files for that
221    /// peer are NOT deleted (operator can grep history); pass --purge to
222    /// also wipe the JSONL files.
223    ForgetPeer {
224        /// Peer handle to forget.
225        handle: String,
226        /// Also delete inbox/<handle>.jsonl and outbox/<handle>.jsonl.
227        #[arg(long)]
228        purge: bool,
229        #[arg(long)]
230        json: bool,
231    },
232    /// Run a long-lived sync loop: every <interval> seconds, push outbox to
233    /// peers' relay slots and pull inbox from our own slot. Foreground process;
234    /// background it with systemd / `&` / tmux as you prefer.
235    Daemon {
236        /// Sync interval in seconds. Default 5.
237        #[arg(long, default_value_t = 5)]
238        interval: u64,
239        /// Run a single sync cycle and exit (useful for cron-driven setups).
240        #[arg(long)]
241        once: bool,
242        #[arg(long)]
243        json: bool,
244    },
245    /// Host a SAS-confirmed pairing. Generates a code phrase, prints it, waits
246    /// for a peer to `pair-join`, exchanges signed agent-cards via SPAKE2 +
247    /// ChaCha20-Poly1305. Auto-pins on success. (HUMAN-ONLY — operator must
248    /// read the SAS digits aloud and confirm.)
249    PairHost {
250        /// Relay base URL.
251        #[arg(long)]
252        relay: String,
253        /// Skip the SAS confirmation prompt. ONLY use when piping under
254        /// automated tests or when the SAS has already been verified by
255        /// another channel. Documented as test-only.
256        #[arg(long)]
257        yes: bool,
258        /// How long (seconds) to wait for the peer to join before timing out.
259        #[arg(long, default_value_t = 300)]
260        timeout: u64,
261        /// Detach: write a pending-pair file, print the code phrase, and exit
262        /// immediately. The running `wire daemon` does the handshake in the
263        /// background; confirm SAS later via `wire pair-confirm <code> <digits>`.
264        /// `wire pair-list` shows pending sessions. Default is foreground
265        /// blocking behavior for backward compat.
266        #[arg(long)]
267        detach: bool,
268        /// Emit JSON instead of text. Currently only meaningful with --detach.
269        #[arg(long)]
270        json: bool,
271    },
272    /// Join a pair-slot using a code phrase from the host. (HUMAN-ONLY.)
273    ///
274    /// Aliased as `wire join <code>` for magic-wormhole muscle-memory.
275    #[command(alias = "join")]
276    PairJoin {
277        /// Code phrase from the host's `pair-host` output (e.g. `73-2QXC4P`).
278        code_phrase: String,
279        /// Relay base URL (must match the host's relay).
280        #[arg(long)]
281        relay: String,
282        #[arg(long)]
283        yes: bool,
284        #[arg(long, default_value_t = 300)]
285        timeout: u64,
286        /// Detach: see `pair-host --detach`.
287        #[arg(long)]
288        detach: bool,
289        /// Emit JSON instead of text. Currently only meaningful with --detach.
290        #[arg(long)]
291        json: bool,
292    },
293    /// Confirm SAS digits for a detached pending pair. The daemon must be
294    /// running for this to do anything — it picks up the confirmation on its
295    /// next tick. Mismatch aborts the pair.
296    PairConfirm {
297        /// The code phrase the original `wire pair-host --detach` printed.
298        code_phrase: String,
299        /// 6 digits as displayed by `wire pair-list` (dashes/spaces stripped).
300        digits: String,
301        /// Emit JSON instead of human-readable text.
302        #[arg(long)]
303        json: bool,
304    },
305    /// List all pending detached pair sessions and their state.
306    PairList {
307        /// Emit JSON instead of the table.
308        #[arg(long)]
309        json: bool,
310        /// Stream mode: never exit; print one JSON line per status transition
311        /// (creation, status change, deletion) across all pending pairs.
312        /// Compose with bash `while read` to react in shell. Implies --json.
313        #[arg(long)]
314        watch: bool,
315        /// Poll interval in seconds for --watch.
316        #[arg(long, default_value_t = 1)]
317        watch_interval: u64,
318    },
319    /// Cancel a pending pair. Releases the relay slot and removes the pending file.
320    PairCancel {
321        code_phrase: String,
322        #[arg(long)]
323        json: bool,
324    },
325    /// Block until a pending pair reaches a target status (default sas_ready),
326    /// or terminates (finalized = file removed, aborted, aborted_restart), or
327    /// the timeout expires. Useful for shell scripts that want to drive the
328    /// detached flow without polling pair-list themselves.
329    ///
330    /// Exit codes:
331    ///   0 — reached target status (or finalized, if target was sas_ready)
332    ///   1 — terminated abnormally (aborted, aborted_restart, no such code)
333    ///   2 — timeout
334    PairWatch {
335        code_phrase: String,
336        /// Target status to wait for. Default: sas_ready.
337        #[arg(long, default_value = "sas_ready")]
338        status: String,
339        /// Max seconds to wait.
340        #[arg(long, default_value_t = 300)]
341        timeout: u64,
342        /// Emit JSON on each status change (one per line) instead of just on exit.
343        #[arg(long)]
344        json: bool,
345    },
346    /// One-shot bootstrap. Inits identity (idempotent), opens pair-host or
347    /// pair-join, then registers wire as an MCP server. Single command from
348    /// nothing to paired and ready — no separate init/pair-host/setup steps.
349    /// Operator still must confirm SAS digits.
350    ///
351    /// Examples:
352    ///   wire pair paul                          # host a new pair on default relay
353    ///   wire pair willard --code 58-NMTY7A      # join paul's pair
354    Pair {
355        /// Short handle for this agent (becomes did:wire:<handle>). Used by init
356        /// step if no identity exists; ignored if already initialized.
357        handle: String,
358        /// Code phrase from peer's pair-host output. Omit to be the host
359        /// (this command will print one for you to share).
360        #[arg(long)]
361        code: Option<String>,
362        /// Relay base URL. Defaults to the laulpogan public-good relay.
363        #[arg(long, default_value = "https://wireup.net")]
364        relay: String,
365        /// Skip SAS prompt. Test-only.
366        #[arg(long)]
367        yes: bool,
368        /// Pair-step timeout in seconds.
369        #[arg(long, default_value_t = 300)]
370        timeout: u64,
371        /// Skip the post-pair `setup --apply` step (don't register wire as
372        /// an MCP server in detected client configs).
373        #[arg(long)]
374        no_setup: bool,
375        /// Run via the daemon-orchestrated detached path (auto-starts daemon,
376        /// exits immediately, daemon does the handshake). Confirm via
377        /// `wire pair-confirm <code> <digits>` from any terminal. See
378        /// `pair-host --detach` for details.
379        #[arg(long)]
380        detach: bool,
381    },
382    /// Forget a half-finished pair-slot on the relay. Use this if `pair-host`
383    /// or `pair-join` crashed (process killed, network blip, OOM) before SAS
384    /// confirmation, leaving the relay-side slot stuck with "guest already
385    /// registered" or "host already registered" until the 5-minute TTL expires.
386    /// Either side can call. Idempotent.
387    PairAbandon {
388        /// The code phrase from the original pair-host (e.g. `58-NMTY7A`).
389        code_phrase: String,
390        /// Relay base URL.
391        #[arg(long, default_value = "https://wireup.net")]
392        relay: String,
393    },
394    /// Detect known MCP host config locations (Claude Desktop, Claude Code,
395    /// Cursor, project-local) and either print or auto-merge the wire MCP
396    /// server entry. Default prints; pass `--apply` to actually modify config
397    /// files. Idempotent — re-running is safe.
398    Setup {
399        /// Actually write the changes (default = print only).
400        #[arg(long)]
401        apply: bool,
402    },
403    /// Show an agent's profile. With no arg, prints local self. With a
404    /// `nick@domain` arg, resolves via that domain's `.well-known/wire/agent`
405    /// endpoint and verifies the returned signed card before display.
406    Whois {
407        /// Optional handle (`nick@domain`). Omit to show self.
408        handle: Option<String>,
409        #[arg(long)]
410        json: bool,
411        /// Override the relay base URL used for resolution (default:
412        /// `https://<domain>` from the handle).
413        #[arg(long)]
414        relay: Option<String>,
415    },
416    /// Zero-paste pair with a known handle. Resolves `nick@domain` via that
417    /// domain's `.well-known/wire/agent`, then delivers a signed pair-intro
418    /// to the peer's slot via `/v1/handle/intro`. Peer's daemon completes
419    /// the bilateral pin on its next pull (sends back pair_drop_ack carrying
420    /// their slot_token so we can `wire send` to them).
421    Add {
422        /// Peer handle (`nick@domain`).
423        handle: String,
424        /// Override the relay base URL used for resolution.
425        #[arg(long)]
426        relay: Option<String>,
427        #[arg(long)]
428        json: bool,
429    },
430    /// One-shot full bootstrap — `wire up <nick@relay-host>` does in one
431    /// command what 0.5.10 took five (init + bind-relay + claim + daemon-
432    /// background + remember-to-restart-on-login). Idempotent: re-run on
433    /// an already-set-up box prints state without churn.
434    ///
435    /// Examples:
436    ///   wire up paul@wireup.net           # full bootstrap
437    ///   wire up paul-mac@wireup.net       # ditto, nick = paul-mac
438    ///   wire up paul                      # bootstrap, default relay
439    Up {
440        /// Full handle in `nick@relay-host` form, or just `nick` (defaults
441        /// to the configured public relay wireup.net).
442        handle: String,
443        /// Optional display name (defaults to capitalized nick).
444        #[arg(long)]
445        name: Option<String>,
446        #[arg(long)]
447        json: bool,
448    },
449    /// Diagnose wire setup health. Single command that surfaces every
450    /// silent-fail class — daemon down or duplicated, relay unreachable,
451    /// cursor stuck, pair rejections piling up, trust ↔ directory drift.
452    /// Replaces today's 30-minute manual debug.
453    ///
454    /// Exit code non-zero if any FAIL findings.
455    Doctor {
456        /// Emit JSON.
457        #[arg(long)]
458        json: bool,
459        /// Show last N entries from pair-rejected.jsonl in the report.
460        #[arg(long, default_value_t = 5)]
461        recent_rejections: usize,
462    },
463    /// Atomic upgrade: kill every `wire daemon` process, spawn a fresh
464    /// one from the current binary, write a new pidfile. Eliminates the
465    /// "stale binary text in memory under a fresh symlink" bug class that
466    /// burned 30 minutes today.
467    Upgrade {
468        /// Report drift without taking action (lists processes that would
469        /// be killed + the version of each).
470        #[arg(long)]
471        check: bool,
472        #[arg(long)]
473        json: bool,
474    },
475    /// Install / inspect / remove a launchd plist (macOS) or systemd
476    /// user unit (linux) that runs `wire daemon` on login + restarts
477    /// on crash. Replaces today's "background it with tmux/&/systemd
478    /// as you prefer" footgun.
479    Service {
480        #[command(subcommand)]
481        action: ServiceAction,
482    },
483    /// Inspect or toggle the structured diagnostic trace
484    /// (`$WIRE_HOME/state/wire/diag.jsonl`). Off by default. Enable per
485    /// process via `WIRE_DIAG=1`, or per-machine via `wire diag enable`
486    /// (writes the file knob a running daemon picks up automatically).
487    Diag {
488        #[command(subcommand)]
489        action: DiagAction,
490    },
491    /// Claim a nick on a relay's handle directory. Anyone can then reach
492    /// this agent by `<nick>@<relay-domain>` via the relay's
493    /// `.well-known/wire/agent` endpoint. FCFS; same-DID re-claims allowed.
494    Claim {
495        nick: String,
496        /// Relay to claim the nick on. Default = relay our slot is on.
497        #[arg(long)]
498        relay: Option<String>,
499        /// Public URL the relay should advertise to resolvers (default = relay).
500        #[arg(long)]
501        public_url: Option<String>,
502        #[arg(long)]
503        json: bool,
504    },
505    /// Edit profile fields (display_name, emoji, motto, vibe, pronouns,
506    /// avatar_url, handle, now). Re-signs the agent-card atomically.
507    ///
508    /// Examples:
509    ///   wire profile set motto "compiles or dies trying"
510    ///   wire profile set emoji "🦀"
511    ///   wire profile set vibe '["rust","late-night","no-async-please"]'
512    ///   wire profile set handle "coffee-ghost@anthropic.dev"
513    ///   wire profile get
514    Profile {
515        #[command(subcommand)]
516        action: ProfileAction,
517    },
518    /// Mint a one-paste invite URL. Anyone with this URL can pair to us in a
519    /// single step (no SAS digits, no code typing). Auto-inits + auto-allocates
520    /// a relay slot on first use. Default TTL 24h, single-use.
521    Invite {
522        /// Override the relay URL for first-time auto-allocation.
523        #[arg(long, default_value = "https://wireup.net")]
524        relay: String,
525        /// Invite lifetime in seconds (default 86400 = 24h).
526        #[arg(long, default_value_t = 86_400)]
527        ttl: u64,
528        /// Number of distinct peers that can accept this invite before it's
529        /// consumed (default 1).
530        #[arg(long, default_value_t = 1)]
531        uses: u32,
532        /// Register the invite at the relay's short-URL endpoint and print
533        /// a `curl ... | sh` one-liner the peer can run on a fresh machine.
534        /// Installs wire if missing, then accepts the invite, then pairs.
535        #[arg(long)]
536        share: bool,
537        /// Emit JSON.
538        #[arg(long)]
539        json: bool,
540    },
541    /// Accept a wire invite URL. Single-step pair — pins issuer, sends our
542    /// signed card to issuer's slot. Auto-inits + auto-allocates if needed.
543    Accept {
544        /// The full invite URL (starts with `wire://pair?v=1&inv=...`).
545        url: String,
546        /// Emit JSON.
547        #[arg(long)]
548        json: bool,
549    },
550    /// Long-running event dispatcher. Watches inbox for new verified events
551    /// and spawns the given shell command per event, passing the event JSON
552    /// on stdin. Use to wire up autonomous reply loops:
553    ///   wire reactor --on-event 'claude -p "respond via wire send"'
554    /// Cursor persisted to `$WIRE_HOME/state/wire/reactor.cursor`.
555    Reactor {
556        /// Shell command to spawn per event. Event JSON written to its stdin.
557        #[arg(long)]
558        on_event: String,
559        /// Only fire for events from this peer.
560        #[arg(long)]
561        peer: Option<String>,
562        /// Only fire for events of this kind (numeric or name, e.g. 1 / decision).
563        #[arg(long)]
564        kind: Option<String>,
565        /// Skip events whose verified flag is false (default true).
566        #[arg(long, default_value_t = true)]
567        verified_only: bool,
568        /// Poll interval in seconds.
569        #[arg(long, default_value_t = 2)]
570        interval: u64,
571        /// Process one sweep and exit.
572        #[arg(long)]
573        once: bool,
574        /// Don't actually spawn — print one JSONL line per event for smoke-testing.
575        #[arg(long)]
576        dry_run: bool,
577        /// Hard rate-limit: max events handler is fired for per peer per minute.
578        /// 0 = unlimited. Default 6 — covers normal conversational tempo, kills
579        /// LLM-vs-LLM feedback loops (which fire 10+/sec).
580        #[arg(long, default_value_t = 6)]
581        max_per_minute: u32,
582        /// Anti-loop chain depth. Track event_ids this reactor emitted; if an
583        /// incoming event body contains `(re:X)` where X is in our emitted log,
584        /// skip — that's a reply-to-our-reply, depth ≥ 2. Disable with 0.
585        #[arg(long, default_value_t = 1)]
586        max_chain_depth: u32,
587    },
588    /// Watch the inbox for new verified events and fire an OS notification per
589    /// event. Long-running; background under systemd / `&` / tmux. Cursor is
590    /// persisted to `$WIRE_HOME/state/wire/notify.cursor` so restarts don't
591    /// re-emit history.
592    Notify {
593        /// Poll interval in seconds.
594        #[arg(long, default_value_t = 2)]
595        interval: u64,
596        /// Only notify for events from this peer (handle, no did: prefix).
597        #[arg(long)]
598        peer: Option<String>,
599        /// Run a single sweep and exit (useful for cron / tests).
600        #[arg(long)]
601        once: bool,
602        /// Suppress the OS notification call; print one JSON line per event to
603        /// stdout instead (for piping into other tooling or smoke-testing
604        /// without a desktop session).
605        #[arg(long)]
606        json: bool,
607    },
608}
609
610#[derive(Subcommand, Debug)]
611pub enum DiagAction {
612    /// Tail the last N entries from diag.jsonl.
613    Tail {
614        #[arg(long, default_value_t = 20)]
615        limit: usize,
616        #[arg(long)]
617        json: bool,
618    },
619    /// Flip the file-based knob ON. Running daemons pick this up on
620    /// the next emit call without restart.
621    Enable,
622    /// Flip the file-based knob OFF.
623    Disable,
624    /// Report whether diag is currently enabled + the file's size.
625    Status {
626        #[arg(long)]
627        json: bool,
628    },
629}
630
631#[derive(Subcommand, Debug)]
632pub enum ServiceAction {
633    /// Write the launchd plist (macOS) or systemd user unit (linux) and
634    /// load it. Idempotent — re-running re-bootstraps an existing service.
635    Install {
636        #[arg(long)]
637        json: bool,
638    },
639    /// Unload + delete the service unit. Daemon keeps running until the
640    /// next reboot or `wire upgrade`; this only changes the boot-time
641    /// behaviour.
642    Uninstall {
643        #[arg(long)]
644        json: bool,
645    },
646    /// Report whether the unit is installed + active.
647    Status {
648        #[arg(long)]
649        json: bool,
650    },
651}
652
653#[derive(Subcommand, Debug)]
654pub enum ResponderCommand {
655    /// Publish this agent's auto-responder health.
656    Set {
657        /// One of: online, offline, oauth_locked, rate_limited, degraded.
658        status: String,
659        /// Optional operator-facing reason.
660        #[arg(long)]
661        reason: Option<String>,
662        /// Emit JSON.
663        #[arg(long)]
664        json: bool,
665    },
666    /// Read responder health for self, or for a paired peer.
667    Get {
668        /// Optional peer handle; omitted means this agent's own slot.
669        peer: Option<String>,
670        /// Emit JSON.
671        #[arg(long)]
672        json: bool,
673    },
674}
675
676#[derive(Subcommand, Debug)]
677pub enum ProfileAction {
678    /// Set a profile field. Field names: display_name, emoji, motto, vibe,
679    /// pronouns, avatar_url, handle, now. Values are strings except `vibe`
680    /// (JSON array) and `now` (JSON object).
681    Set {
682        field: String,
683        value: String,
684        #[arg(long)]
685        json: bool,
686    },
687    /// Show all profile fields. Equivalent to `wire whois`.
688    Get {
689        #[arg(long)]
690        json: bool,
691    },
692    /// Clear a profile field.
693    Clear {
694        field: String,
695        #[arg(long)]
696        json: bool,
697    },
698}
699
700/// Entry point — parse and dispatch.
701pub fn run() -> Result<()> {
702    let cli = Cli::parse();
703    match cli.command {
704        Command::Init {
705            handle,
706            name,
707            relay,
708            json,
709        } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
710        Command::Status { peer, json } => {
711            if let Some(peer) = peer {
712                cmd_status_peer(&peer, json)
713            } else {
714                cmd_status(json)
715            }
716        }
717        Command::Whoami { json } => cmd_whoami(json),
718        Command::Peers { json } => cmd_peers(json),
719        Command::Send {
720            peer,
721            kind_or_body,
722            body,
723            deadline,
724            json,
725        } => {
726            // P0.S: smart-positional API. `wire send peer body` =
727            // kind=claim. `wire send peer kind body` = explicit kind.
728            let (kind, body) = match body {
729                Some(real_body) => (kind_or_body, real_body),
730                None => ("claim".to_string(), kind_or_body),
731            };
732            cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
733        }
734        Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
735        Command::Monitor {
736            peer,
737            json,
738            include_handshake,
739            interval_ms,
740            replay,
741        } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
742        Command::Verify { path, json } => cmd_verify(&path, json),
743        Command::Responder { command } => match command {
744            ResponderCommand::Set {
745                status,
746                reason,
747                json,
748            } => cmd_responder_set(&status, reason.as_deref(), json),
749            ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
750        },
751        Command::Mcp => cmd_mcp(),
752        Command::RelayServer { bind } => cmd_relay_server(&bind),
753        Command::BindRelay { url, json } => cmd_bind_relay(&url, json),
754        Command::AddPeerSlot {
755            handle,
756            url,
757            slot_id,
758            slot_token,
759            json,
760        } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
761        Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
762        Command::Pull { json } => cmd_pull(json),
763        Command::Pin { card_file, json } => cmd_pin(&card_file, json),
764        Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
765        Command::ForgetPeer {
766            handle,
767            purge,
768            json,
769        } => cmd_forget_peer(&handle, purge, json),
770        Command::Daemon {
771            interval,
772            once,
773            json,
774        } => cmd_daemon(interval, once, json),
775        Command::PairHost {
776            relay,
777            yes,
778            timeout,
779            detach,
780            json,
781        } => {
782            if detach {
783                cmd_pair_host_detach(&relay, json)
784            } else {
785                cmd_pair_host(&relay, yes, timeout)
786            }
787        }
788        Command::PairJoin {
789            code_phrase,
790            relay,
791            yes,
792            timeout,
793            detach,
794            json,
795        } => {
796            if detach {
797                cmd_pair_join_detach(&code_phrase, &relay, json)
798            } else {
799                cmd_pair_join(&code_phrase, &relay, yes, timeout)
800            }
801        }
802        Command::PairConfirm {
803            code_phrase,
804            digits,
805            json,
806        } => cmd_pair_confirm(&code_phrase, &digits, json),
807        Command::PairList {
808            json,
809            watch,
810            watch_interval,
811        } => cmd_pair_list(json, watch, watch_interval),
812        Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
813        Command::PairWatch {
814            code_phrase,
815            status,
816            timeout,
817            json,
818        } => cmd_pair_watch(&code_phrase, &status, timeout, json),
819        Command::Pair {
820            handle,
821            code,
822            relay,
823            yes,
824            timeout,
825            no_setup,
826            detach,
827        } => {
828            // P0.P (0.5.11): if the handle is in `nick@domain` form, route to
829            // the zero-paste megacommand path — `wire pair slancha-spark@
830            // wireup.net` does add + poll-for-ack + verify in one shot. The
831            // SAS / code-based pair flow stays available for handles without
832            // `@` (bootstrap pairing between two boxes that don't yet share a
833            // relay directory).
834            if handle.contains('@') && code.is_none() {
835                cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
836            } else if detach {
837                cmd_pair_detach(&handle, code.as_deref(), &relay)
838            } else {
839                cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
840            }
841        }
842        Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
843        Command::Invite {
844            relay,
845            ttl,
846            uses,
847            share,
848            json,
849        } => cmd_invite(&relay, ttl, uses, share, json),
850        Command::Accept { url, json } => cmd_accept(&url, json),
851        Command::Whois {
852            handle,
853            json,
854            relay,
855        } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
856        Command::Add {
857            handle,
858            relay,
859            json,
860        } => cmd_add(&handle, relay.as_deref(), json),
861        Command::Up {
862            handle,
863            name,
864            json,
865        } => cmd_up(&handle, name.as_deref(), json),
866        Command::Doctor {
867            json,
868            recent_rejections,
869        } => cmd_doctor(json, recent_rejections),
870        Command::Upgrade { check, json } => cmd_upgrade(check, json),
871        Command::Service { action } => cmd_service(action),
872        Command::Diag { action } => cmd_diag(action),
873        Command::Claim {
874            nick,
875            relay,
876            public_url,
877            json,
878        } => cmd_claim(&nick, relay.as_deref(), public_url.as_deref(), json),
879        Command::Profile { action } => cmd_profile(action),
880        Command::Setup { apply } => cmd_setup(apply),
881        Command::Reactor {
882            on_event,
883            peer,
884            kind,
885            verified_only,
886            interval,
887            once,
888            dry_run,
889            max_per_minute,
890            max_chain_depth,
891        } => cmd_reactor(
892            &on_event,
893            peer.as_deref(),
894            kind.as_deref(),
895            verified_only,
896            interval,
897            once,
898            dry_run,
899            max_per_minute,
900            max_chain_depth,
901        ),
902        Command::Notify {
903            interval,
904            peer,
905            once,
906            json,
907        } => cmd_notify(interval, peer.as_deref(), once, json),
908    }
909}
910
911// ---------- init ----------
912
913fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
914    if !handle
915        .chars()
916        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
917    {
918        bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
919    }
920    if config::is_initialized()? {
921        bail!(
922            "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
923            config::config_dir()?
924        );
925    }
926
927    config::ensure_dirs()?;
928    let (sk_seed, pk_bytes) = generate_keypair();
929    config::write_private_key(&sk_seed)?;
930
931    let card = build_agent_card(handle, &pk_bytes, name, None, None);
932    let signed = sign_agent_card(&card, &sk_seed);
933    config::write_agent_card(&signed)?;
934
935    let mut trust = empty_trust();
936    add_self_to_trust(&mut trust, handle, &pk_bytes);
937    config::write_trust(&trust)?;
938
939    let fp = fingerprint(&pk_bytes);
940    let key_id = make_key_id(handle, &pk_bytes);
941
942    // If --relay was passed, also bind a slot inline so init+bind happen in one step.
943    let mut relay_info: Option<(String, String)> = None;
944    if let Some(url) = relay {
945        let normalized = url.trim_end_matches('/');
946        let client = crate::relay_client::RelayClient::new(normalized);
947        client.check_healthz()?;
948        let alloc = client.allocate_slot(Some(handle))?;
949        let mut state = config::read_relay_state()?;
950        state["self"] = json!({
951            "relay_url": normalized,
952            "slot_id": alloc.slot_id.clone(),
953            "slot_token": alloc.slot_token,
954        });
955        config::write_relay_state(&state)?;
956        relay_info = Some((normalized.to_string(), alloc.slot_id));
957    }
958
959    let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
960    if as_json {
961        let mut out = json!({
962            "did": did_str.clone(),
963            "fingerprint": fp,
964            "key_id": key_id,
965            "config_dir": config::config_dir()?.to_string_lossy(),
966        });
967        if let Some((url, slot_id)) = &relay_info {
968            out["relay_url"] = json!(url);
969            out["slot_id"] = json!(slot_id);
970        }
971        println!("{}", serde_json::to_string(&out)?);
972    } else {
973        println!("generated {did_str} (ed25519:{key_id})");
974        println!(
975            "config written to {}",
976            config::config_dir()?.to_string_lossy()
977        );
978        if let Some((url, slot_id)) = &relay_info {
979            println!("bound to relay {url} (slot {slot_id})");
980            println!();
981            println!(
982                "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
983            );
984        } else {
985            println!();
986            println!(
987                "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
988            );
989        }
990    }
991    Ok(())
992}
993
994// ---------- status ----------
995
996fn cmd_status(as_json: bool) -> Result<()> {
997    let initialized = config::is_initialized()?;
998
999    let mut summary = json!({
1000        "initialized": initialized,
1001    });
1002
1003    if initialized {
1004        let card = config::read_agent_card()?;
1005        let did = card
1006            .get("did")
1007            .and_then(Value::as_str)
1008            .unwrap_or("")
1009            .to_string();
1010        // Prefer the explicit `handle` field added in v0.5.7. Fall back to
1011        // stripping the DID prefix (and the v0.5.7+ pubkey suffix) for
1012        // legacy cards.
1013        let handle = card
1014            .get("handle")
1015            .and_then(Value::as_str)
1016            .map(str::to_string)
1017            .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1018        let pk_b64 = card
1019            .get("verify_keys")
1020            .and_then(Value::as_object)
1021            .and_then(|m| m.values().next())
1022            .and_then(|v| v.get("key"))
1023            .and_then(Value::as_str)
1024            .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1025        let pk_bytes = crate::signing::b64decode(pk_b64)?;
1026        summary["did"] = json!(did);
1027        summary["handle"] = json!(handle);
1028        summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1029        summary["capabilities"] = card
1030            .get("capabilities")
1031            .cloned()
1032            .unwrap_or_else(|| json!([]));
1033
1034        let trust = config::read_trust()?;
1035        let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1036        let mut peers = Vec::new();
1037        if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1038            for (peer_handle, _agent) in agents {
1039                if peer_handle == &handle {
1040                    continue; // self
1041                }
1042                // P0.Y (0.5.11): use effective tier — surfaces PENDING_ACK
1043                // for peers we've pinned but never received a pair_drop_ack
1044                // from, so the operator sees the "we can't send to them yet"
1045                // state instead of seeing a misleading VERIFIED.
1046                peers.push(json!({
1047                    "handle": peer_handle,
1048                    "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1049                }));
1050            }
1051        }
1052        summary["peers"] = json!(peers);
1053
1054        let relay_state = config::read_relay_state()?;
1055        summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1056        if !summary["self_relay"].is_null() {
1057            // Hide slot_token from default view.
1058            if let Some(obj) = summary["self_relay"].as_object_mut() {
1059                obj.remove("slot_token");
1060            }
1061        }
1062        summary["peer_slots_count"] = json!(
1063            relay_state
1064                .get("peers")
1065                .and_then(Value::as_object)
1066                .map(|m| m.len())
1067                .unwrap_or(0)
1068        );
1069
1070        // Outbox / inbox queue depth (file count + total events)
1071        let outbox = config::outbox_dir()?;
1072        let inbox = config::inbox_dir()?;
1073        summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1074        summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1075
1076        // P1.7 (0.5.11): daemon liveness now consults the structured
1077        // pidfile (P0.4) AND `pgrep -f "wire daemon"` to detect orphans
1078        // that the pidfile didn't record. Today's debug had a 4-day-old
1079        // 0.2.4 daemon (PID 54017) running while the pidfile pointed at
1080        // an unrelated dead PID — wire status said `daemon: DOWN` while
1081        // the box was actually full of stale-daemon-eating-events
1082        // behaviour. Catch THAT class here.
1083        let record = crate::ensure_up::read_pid_record("daemon");
1084        let pidfile_pid = record.pid();
1085        let pidfile_alive = pidfile_pid
1086            .map(|pid| {
1087                #[cfg(target_os = "linux")]
1088                {
1089                    std::path::Path::new(&format!("/proc/{pid}")).exists()
1090                }
1091                #[cfg(not(target_os = "linux"))]
1092                {
1093                    std::process::Command::new("kill")
1094                        .args(["-0", &pid.to_string()])
1095                        .output()
1096                        .map(|o| o.status.success())
1097                        .unwrap_or(false)
1098                }
1099            })
1100            .unwrap_or(false);
1101
1102        // Cross-check with pgrep — surfaces orphan daemons not in pidfile.
1103        let pgrep_pids: Vec<u32> = std::process::Command::new("pgrep")
1104            .args(["-f", "wire daemon"])
1105            .output()
1106            .ok()
1107            .filter(|o| o.status.success())
1108            .map(|o| {
1109                String::from_utf8_lossy(&o.stdout)
1110                    .split_whitespace()
1111                    .filter_map(|s| s.parse::<u32>().ok())
1112                    .collect()
1113            })
1114            .unwrap_or_default();
1115        let orphan_pids: Vec<u32> = pgrep_pids
1116            .iter()
1117            .filter(|p| Some(**p) != pidfile_pid)
1118            .copied()
1119            .collect();
1120
1121        let mut daemon = json!({
1122            "running": pidfile_alive,
1123            "pid": pidfile_pid,
1124            "all_running_pids": pgrep_pids,
1125            "orphans": orphan_pids,
1126        });
1127        if let crate::ensure_up::PidRecord::Json(d) = &record {
1128            daemon["version"] = json!(d.version);
1129            daemon["bin_path"] = json!(d.bin_path);
1130            daemon["did"] = json!(d.did);
1131            daemon["relay_url"] = json!(d.relay_url);
1132            daemon["started_at"] = json!(d.started_at);
1133            daemon["schema"] = json!(d.schema);
1134            if d.version != env!("CARGO_PKG_VERSION") {
1135                daemon["version_mismatch"] = json!({
1136                    "daemon": d.version.clone(),
1137                    "cli": env!("CARGO_PKG_VERSION"),
1138                });
1139            }
1140        } else if matches!(record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1141            daemon["pidfile_form"] = json!("legacy-int");
1142            daemon["version_mismatch"] = json!({
1143                "daemon": "<pre-0.5.11>",
1144                "cli": env!("CARGO_PKG_VERSION"),
1145            });
1146        }
1147        summary["daemon"] = daemon;
1148
1149        // Pending pair sessions — counts by status.
1150        let pending = crate::pending_pair::list_pending().unwrap_or_default();
1151        let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1152        for p in &pending {
1153            *counts.entry(p.status.clone()).or_default() += 1;
1154        }
1155        summary["pending_pairs"] = json!({
1156            "total": pending.len(),
1157            "by_status": counts,
1158        });
1159    }
1160
1161    if as_json {
1162        println!("{}", serde_json::to_string(&summary)?);
1163    } else if !initialized {
1164        println!("not initialized — run `wire init <handle>` first");
1165    } else {
1166        println!("did:           {}", summary["did"].as_str().unwrap_or("?"));
1167        println!(
1168            "fingerprint:   {}",
1169            summary["fingerprint"].as_str().unwrap_or("?")
1170        );
1171        println!("capabilities:  {}", summary["capabilities"]);
1172        if !summary["self_relay"].is_null() {
1173            println!(
1174                "self relay:    {} (slot {})",
1175                summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1176                summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1177            );
1178        } else {
1179            println!("self relay:    (not bound — run `wire pair-host --relay <url>` to bind)");
1180        }
1181        println!(
1182            "peers:         {}",
1183            summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1184        );
1185        for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1186            println!(
1187                "  - {:<20} tier={}",
1188                p["handle"].as_str().unwrap_or(""),
1189                p["tier"].as_str().unwrap_or("?")
1190            );
1191        }
1192        println!(
1193            "outbox:        {} file(s), {} event(s) queued",
1194            summary["outbox"]["files"].as_u64().unwrap_or(0),
1195            summary["outbox"]["events"].as_u64().unwrap_or(0)
1196        );
1197        println!(
1198            "inbox:         {} file(s), {} event(s) received",
1199            summary["inbox"]["files"].as_u64().unwrap_or(0),
1200            summary["inbox"]["events"].as_u64().unwrap_or(0)
1201        );
1202        let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1203        let daemon_pid = summary["daemon"]["pid"]
1204            .as_u64()
1205            .map(|p| p.to_string())
1206            .unwrap_or_else(|| "—".to_string());
1207        let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1208        let version_suffix = if !daemon_version.is_empty() {
1209            format!(" v{daemon_version}")
1210        } else {
1211            String::new()
1212        };
1213        println!(
1214            "daemon:        {} (pid {}{})",
1215            if daemon_running { "running" } else { "DOWN" },
1216            daemon_pid,
1217            version_suffix,
1218        );
1219        // P1.7: surface version mismatch + orphan procs loudly.
1220        if let Some(mm) = summary["daemon"].get("version_mismatch") {
1221            println!(
1222                "               !! version mismatch: daemon={} CLI={}. \
1223                 run `wire upgrade` to swap atomically.",
1224                mm["daemon"].as_str().unwrap_or("?"),
1225                mm["cli"].as_str().unwrap_or("?"),
1226            );
1227        }
1228        if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1229            && !orphans.is_empty()
1230        {
1231            let pids: Vec<String> = orphans
1232                .iter()
1233                .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1234                .collect();
1235            println!(
1236                "               !! orphan daemon process(es): pids {}. \
1237                 pgrep saw them but pidfile didn't — likely stale process from \
1238                 prior install. Multiple daemons race the relay cursor.",
1239                pids.join(", ")
1240            );
1241        }
1242        let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1243        if pending_total > 0 {
1244            print!("pending pairs: {pending_total}");
1245            if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1246                let parts: Vec<String> = obj
1247                    .iter()
1248                    .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1249                    .collect();
1250                if !parts.is_empty() {
1251                    print!(" ({})", parts.join(", "));
1252                }
1253            }
1254            println!();
1255        } else {
1256            println!("pending pairs: none");
1257        }
1258    }
1259    Ok(())
1260}
1261
1262fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1263    if !dir.exists() {
1264        return Ok(json!({"files": 0, "events": 0}));
1265    }
1266    let mut files = 0usize;
1267    let mut events = 0usize;
1268    for entry in std::fs::read_dir(dir)? {
1269        let path = entry?.path();
1270        if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1271            files += 1;
1272            if let Ok(body) = std::fs::read_to_string(&path) {
1273                events += body.lines().filter(|l| !l.trim().is_empty()).count();
1274            }
1275        }
1276    }
1277    Ok(json!({"files": files, "events": events}))
1278}
1279
1280// ---------- responder health ----------
1281
1282fn responder_status_allowed(status: &str) -> bool {
1283    matches!(
1284        status,
1285        "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1286    )
1287}
1288
1289fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1290    let state = config::read_relay_state()?;
1291    let (label, slot_info) = match peer {
1292        Some(peer) => (
1293            peer.to_string(),
1294            state
1295                .get("peers")
1296                .and_then(|p| p.get(peer))
1297                .ok_or_else(|| {
1298                    anyhow!(
1299                        "unknown peer {peer:?} in relay state — pair with them first:\n  \
1300                         wire add {peer}@wireup.net   (or {peer}@<their-relay>)\n\
1301                         (`wire peers` lists who you've already paired with.)"
1302                    )
1303                })?,
1304        ),
1305        None => (
1306            "self".to_string(),
1307            state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1308                anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1309            })?,
1310        ),
1311    };
1312    let relay_url = slot_info["relay_url"]
1313        .as_str()
1314        .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1315        .to_string();
1316    let slot_id = slot_info["slot_id"]
1317        .as_str()
1318        .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1319        .to_string();
1320    let slot_token = slot_info["slot_token"]
1321        .as_str()
1322        .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1323        .to_string();
1324    Ok((label, relay_url, slot_id, slot_token))
1325}
1326
1327fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1328    if !responder_status_allowed(status) {
1329        bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1330    }
1331    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1332    let now = time::OffsetDateTime::now_utc()
1333        .format(&time::format_description::well_known::Rfc3339)
1334        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1335    let mut record = json!({
1336        "status": status,
1337        "set_at": now,
1338    });
1339    if let Some(reason) = reason {
1340        record["reason"] = json!(reason);
1341    }
1342    if status == "online" {
1343        record["last_success_at"] = json!(now);
1344    }
1345    let client = crate::relay_client::RelayClient::new(&relay_url);
1346    let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1347    if as_json {
1348        println!("{}", serde_json::to_string(&saved)?);
1349    } else {
1350        let reason = saved
1351            .get("reason")
1352            .and_then(Value::as_str)
1353            .map(|r| format!(" — {r}"))
1354            .unwrap_or_default();
1355        println!(
1356            "responder {}{}",
1357            saved
1358                .get("status")
1359                .and_then(Value::as_str)
1360                .unwrap_or(status),
1361            reason
1362        );
1363    }
1364    Ok(())
1365}
1366
1367fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1368    let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1369    let client = crate::relay_client::RelayClient::new(&relay_url);
1370    let health = client.responder_health_get(&slot_id, &slot_token)?;
1371    if as_json {
1372        println!(
1373            "{}",
1374            serde_json::to_string(&json!({
1375                "target": label,
1376                "responder_health": health,
1377            }))?
1378        );
1379    } else if health.is_null() {
1380        println!("{label}: responder health not reported");
1381    } else {
1382        let status = health
1383            .get("status")
1384            .and_then(Value::as_str)
1385            .unwrap_or("unknown");
1386        let reason = health
1387            .get("reason")
1388            .and_then(Value::as_str)
1389            .map(|r| format!(" — {r}"))
1390            .unwrap_or_default();
1391        let last_success = health
1392            .get("last_success_at")
1393            .and_then(Value::as_str)
1394            .map(|t| format!(" (last_success: {t})"))
1395            .unwrap_or_default();
1396        println!("{label}: {status}{reason}{last_success}");
1397    }
1398    Ok(())
1399}
1400
1401fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1402    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1403    let client = crate::relay_client::RelayClient::new(&relay_url);
1404
1405    let started = std::time::Instant::now();
1406    let transport_ok = client.healthz().unwrap_or(false);
1407    let latency_ms = started.elapsed().as_millis() as u64;
1408
1409    let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1410    let now = std::time::SystemTime::now()
1411        .duration_since(std::time::UNIX_EPOCH)
1412        .map(|d| d.as_secs())
1413        .unwrap_or(0);
1414    let attention = match last_pull_at_unix {
1415        Some(last) if now.saturating_sub(last) <= 300 => json!({
1416            "status": "ok",
1417            "last_pull_at_unix": last,
1418            "age_seconds": now.saturating_sub(last),
1419            "event_count": event_count,
1420        }),
1421        Some(last) => json!({
1422            "status": "stale",
1423            "last_pull_at_unix": last,
1424            "age_seconds": now.saturating_sub(last),
1425            "event_count": event_count,
1426        }),
1427        None => json!({
1428            "status": "never_pulled",
1429            "last_pull_at_unix": Value::Null,
1430            "event_count": event_count,
1431        }),
1432    };
1433
1434    let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1435    let responder = if responder_health.is_null() {
1436        json!({"status": "not_reported", "record": Value::Null})
1437    } else {
1438        json!({
1439            "status": responder_health
1440                .get("status")
1441                .and_then(Value::as_str)
1442                .unwrap_or("unknown"),
1443            "record": responder_health,
1444        })
1445    };
1446
1447    let report = json!({
1448        "peer": peer,
1449        "transport": {
1450            "status": if transport_ok { "ok" } else { "error" },
1451            "relay_url": relay_url,
1452            "latency_ms": latency_ms,
1453        },
1454        "attention": attention,
1455        "responder": responder,
1456    });
1457
1458    if as_json {
1459        println!("{}", serde_json::to_string(&report)?);
1460    } else {
1461        let transport_line = if transport_ok {
1462            format!("ok relay reachable ({latency_ms}ms)")
1463        } else {
1464            "error relay unreachable".to_string()
1465        };
1466        println!("transport      {transport_line}");
1467        match report["attention"]["status"].as_str().unwrap_or("unknown") {
1468            "ok" => println!(
1469                "attention      ok last pull {}s ago",
1470                report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1471            ),
1472            "stale" => println!(
1473                "attention      stale last pull {}m ago",
1474                report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1475            ),
1476            "never_pulled" => println!("attention      never pulled since relay reset"),
1477            other => println!("attention      {other}"),
1478        }
1479        if report["responder"]["status"] == "not_reported" {
1480            println!("auto-responder not reported");
1481        } else {
1482            let record = &report["responder"]["record"];
1483            let status = record
1484                .get("status")
1485                .and_then(Value::as_str)
1486                .unwrap_or("unknown");
1487            let reason = record
1488                .get("reason")
1489                .and_then(Value::as_str)
1490                .map(|r| format!(" — {r}"))
1491                .unwrap_or_default();
1492            println!("auto-responder {status}{reason}");
1493        }
1494    }
1495    Ok(())
1496}
1497
1498// (Old cmd_join stub removed — superseded by cmd_pair_join below.)
1499
1500// ---------- whoami ----------
1501
1502fn cmd_whoami(as_json: bool) -> Result<()> {
1503    if !config::is_initialized()? {
1504        bail!("not initialized — run `wire init <handle>` first");
1505    }
1506    let card = config::read_agent_card()?;
1507    let did = card
1508        .get("did")
1509        .and_then(Value::as_str)
1510        .unwrap_or("")
1511        .to_string();
1512    let handle = card
1513        .get("handle")
1514        .and_then(Value::as_str)
1515        .map(str::to_string)
1516        .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1517    let pk_b64 = card
1518        .get("verify_keys")
1519        .and_then(Value::as_object)
1520        .and_then(|m| m.values().next())
1521        .and_then(|v| v.get("key"))
1522        .and_then(Value::as_str)
1523        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1524    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1525    let fp = fingerprint(&pk_bytes);
1526    let key_id = make_key_id(&handle, &pk_bytes);
1527    let capabilities = card
1528        .get("capabilities")
1529        .cloned()
1530        .unwrap_or_else(|| json!(["wire/v3.1"]));
1531
1532    if as_json {
1533        println!(
1534            "{}",
1535            serde_json::to_string(&json!({
1536                "did": did,
1537                "handle": handle,
1538                "fingerprint": fp,
1539                "key_id": key_id,
1540                "public_key_b64": pk_b64,
1541                "capabilities": capabilities,
1542                "config_dir": config::config_dir()?.to_string_lossy(),
1543            }))?
1544        );
1545    } else {
1546        println!("{did} (ed25519:{key_id})");
1547        println!("fingerprint: {fp}");
1548        println!("capabilities: {capabilities}");
1549    }
1550    Ok(())
1551}
1552
1553// ---------- peers ----------
1554
1555/// P0.Y (0.5.11): effective tier shown to operators. `wire add` pins a
1556/// peer's card into trust at VERIFIED immediately, but the bilateral pin
1557/// isn't complete until that peer's `pair_drop_ack` arrives carrying their
1558/// slot_token. Until then we CAN'T send to them. Displaying VERIFIED is
1559/// misleading — spark observed this in real usage.
1560///
1561/// Effective rules:
1562///   trust.tier == VERIFIED + relay_state.peers[h].slot_token empty -> "PENDING_ACK"
1563///   otherwise -> raw trust tier (UNTRUSTED / VERIFIED / etc.)
1564///
1565/// Strictly a display concern — trust state machine itself is untouched
1566/// so existing promote/demote logic still works.
1567fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1568    let raw = crate::trust::get_tier(trust, handle);
1569    if raw != "VERIFIED" {
1570        return raw.to_string();
1571    }
1572    let token = relay_state
1573        .get("peers")
1574        .and_then(|p| p.get(handle))
1575        .and_then(|p| p.get("slot_token"))
1576        .and_then(Value::as_str)
1577        .unwrap_or("");
1578    if token.is_empty() {
1579        "PENDING_ACK".to_string()
1580    } else {
1581        raw.to_string()
1582    }
1583}
1584
1585fn cmd_peers(as_json: bool) -> Result<()> {
1586    let trust = config::read_trust()?;
1587    let agents = trust
1588        .get("agents")
1589        .and_then(Value::as_object)
1590        .cloned()
1591        .unwrap_or_default();
1592    let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1593
1594    let mut self_did: Option<String> = None;
1595    if let Ok(card) = config::read_agent_card() {
1596        self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1597    }
1598
1599    let mut peers = Vec::new();
1600    for (handle, agent) in agents.iter() {
1601        let did = agent
1602            .get("did")
1603            .and_then(Value::as_str)
1604            .unwrap_or("")
1605            .to_string();
1606        if Some(did.as_str()) == self_did.as_deref() {
1607            continue; // skip self-attestation
1608        }
1609        let tier = effective_peer_tier(&trust, &relay_state, handle);
1610        let capabilities = agent
1611            .get("card")
1612            .and_then(|c| c.get("capabilities"))
1613            .cloned()
1614            .unwrap_or_else(|| json!([]));
1615        peers.push(json!({
1616            "handle": handle,
1617            "did": did,
1618            "tier": tier,
1619            "capabilities": capabilities,
1620        }));
1621    }
1622
1623    if as_json {
1624        println!("{}", serde_json::to_string(&peers)?);
1625    } else if peers.is_empty() {
1626        println!("no peers pinned (run `wire join <code>` to pair)");
1627    } else {
1628        for p in &peers {
1629            println!(
1630                "{:<20} {:<10} {}",
1631                p["handle"].as_str().unwrap_or(""),
1632                p["tier"].as_str().unwrap_or(""),
1633                p["did"].as_str().unwrap_or(""),
1634            );
1635        }
1636    }
1637    Ok(())
1638}
1639
1640// ---------- send ----------
1641
1642/// R4 attentiveness pre-flight. Best-effort: any failure is silent.
1643///
1644/// Looks up `peer` in relay-state for slot_id + slot_token + relay_url, asks
1645/// the relay for the slot's `last_pull_at_unix`, and prints a warning to
1646/// stderr if the peer hasn't polled in > 5min (or never has). Threshold of
1647/// 300s is the same wire daemon polling cadence rule-of-thumb — a peer
1648/// hasn't crossed two heartbeats means probably degraded.
1649fn maybe_warn_peer_attentiveness(peer: &str) {
1650    let state = match config::read_relay_state() {
1651        Ok(s) => s,
1652        Err(_) => return,
1653    };
1654    let p = state.get("peers").and_then(|p| p.get(peer));
1655    let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1656        Some(s) if !s.is_empty() => s,
1657        _ => return,
1658    };
1659    let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1660        Some(s) if !s.is_empty() => s,
1661        _ => return,
1662    };
1663    let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1664        Some(s) if !s.is_empty() => s.to_string(),
1665        _ => match state
1666            .get("self")
1667            .and_then(|s| s.get("relay_url"))
1668            .and_then(Value::as_str)
1669        {
1670            Some(s) if !s.is_empty() => s.to_string(),
1671            _ => return,
1672        },
1673    };
1674    let client = crate::relay_client::RelayClient::new(&relay_url);
1675    let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1676        Ok(t) => t,
1677        Err(_) => return,
1678    };
1679    let now = std::time::SystemTime::now()
1680        .duration_since(std::time::UNIX_EPOCH)
1681        .map(|d| d.as_secs())
1682        .unwrap_or(0);
1683    match last_pull {
1684        None => {
1685            eprintln!(
1686                "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1687            );
1688        }
1689        Some(t) if now.saturating_sub(t) > 300 => {
1690            let mins = now.saturating_sub(t) / 60;
1691            eprintln!(
1692                "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1693            );
1694        }
1695        _ => {}
1696    }
1697}
1698
1699pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1700    let trimmed = input.trim();
1701    if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1702    {
1703        return Ok(trimmed.to_string());
1704    }
1705    let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1706    let n: i64 = amount
1707        .parse()
1708        .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1709    if n <= 0 {
1710        bail!("deadline duration must be positive: {input:?}");
1711    }
1712    let duration = match unit {
1713        "m" => time::Duration::minutes(n),
1714        "h" => time::Duration::hours(n),
1715        "d" => time::Duration::days(n),
1716        _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1717    };
1718    Ok((time::OffsetDateTime::now_utc() + duration)
1719        .format(&time::format_description::well_known::Rfc3339)
1720        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1721}
1722
1723fn cmd_send(
1724    peer: &str,
1725    kind: &str,
1726    body_arg: &str,
1727    deadline: Option<&str>,
1728    as_json: bool,
1729) -> Result<()> {
1730    if !config::is_initialized()? {
1731        bail!("not initialized — run `wire init <handle>` first");
1732    }
1733    let peer = crate::agent_card::bare_handle(peer);
1734    let sk_seed = config::read_private_key()?;
1735    let card = config::read_agent_card()?;
1736    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1737    let handle = crate::agent_card::display_handle_from_did(did).to_string();
1738    let pk_b64 = card
1739        .get("verify_keys")
1740        .and_then(Value::as_object)
1741        .and_then(|m| m.values().next())
1742        .and_then(|v| v.get("key"))
1743        .and_then(Value::as_str)
1744        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1745    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1746
1747    // Body: literal string, `@/path/to/body.json`, or `-` for stdin.
1748    // P0.S (0.5.11): stdin support lets shells pipe in long content
1749    // without quoting/escaping ceremony, and supports heredocs naturally:
1750    //   wire send peer - <<EOF ... EOF
1751    let body_value: Value = if body_arg == "-" {
1752        use std::io::Read;
1753        let mut raw = String::new();
1754        std::io::stdin()
1755            .read_to_string(&mut raw)
1756            .with_context(|| "reading body from stdin")?;
1757        // Try parsing as JSON first; fall back to string literal for
1758        // plain-text bodies.
1759        serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1760    } else if let Some(path) = body_arg.strip_prefix('@') {
1761        let raw =
1762            std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1763        serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1764    } else {
1765        Value::String(body_arg.to_string())
1766    };
1767
1768    let kind_id = parse_kind(kind)?;
1769
1770    let now = time::OffsetDateTime::now_utc()
1771        .format(&time::format_description::well_known::Rfc3339)
1772        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1773
1774    let mut event = json!({
1775        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1776        "timestamp": now,
1777        "from": did,
1778        "to": format!("did:wire:{peer}"),
1779        "type": kind,
1780        "kind": kind_id,
1781        "body": body_value,
1782    });
1783    if let Some(deadline) = deadline {
1784        event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1785    }
1786    let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1787    let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1788
1789    // R4: best-effort attentiveness pre-flight. Look up the peer's slot
1790    // coords in relay-state and ask the relay how recently the peer pulled.
1791    // Warn on stderr if the peer hasn't pulled in >5min OR has never pulled.
1792    // Never blocks the send — the event still queues to outbox.
1793    maybe_warn_peer_attentiveness(peer);
1794
1795    // For now we append to outbox JSONL and rely on a future daemon to push
1796    // to the relay. That's the file-system contract from AGENT_INTEGRATION.md.
1797    // Append goes through `config::append_outbox_record` which holds a per-
1798    // path mutex so concurrent senders cannot interleave bytes mid-line.
1799    let line = serde_json::to_vec(&signed)?;
1800    let outbox = config::append_outbox_record(peer, &line)?;
1801
1802    if as_json {
1803        println!(
1804            "{}",
1805            serde_json::to_string(&json!({
1806                "event_id": event_id,
1807                "status": "queued",
1808                "peer": peer,
1809                "outbox": outbox.to_string_lossy(),
1810            }))?
1811        );
1812    } else {
1813        println!(
1814            "queued event {event_id} → {peer} (outbox: {})",
1815            outbox.display()
1816        );
1817    }
1818    Ok(())
1819}
1820
1821fn parse_kind(s: &str) -> Result<u32> {
1822    if let Ok(n) = s.parse::<u32>() {
1823        return Ok(n);
1824    }
1825    for (id, name) in crate::signing::kinds() {
1826        if *name == s {
1827            return Ok(*id);
1828        }
1829    }
1830    // Unknown name — default to kind 1 (decision) for v0.1.
1831    Ok(1)
1832}
1833
1834// ---------- tail ----------
1835
1836fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
1837    let inbox = config::inbox_dir()?;
1838    if !inbox.exists() {
1839        if !as_json {
1840            eprintln!("no inbox yet — daemon hasn't run, or no events received");
1841        }
1842        return Ok(());
1843    }
1844    let trust = config::read_trust()?;
1845    let mut count = 0usize;
1846
1847    let entries: Vec<_> = std::fs::read_dir(&inbox)?
1848        .filter_map(|e| e.ok())
1849        .map(|e| e.path())
1850        .filter(|p| {
1851            p.extension().map(|x| x == "jsonl").unwrap_or(false)
1852                && match peer {
1853                    Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
1854                    None => true,
1855                }
1856        })
1857        .collect();
1858
1859    for path in entries {
1860        let body = std::fs::read_to_string(&path)?;
1861        for line in body.lines() {
1862            let event: Value = match serde_json::from_str(line) {
1863                Ok(v) => v,
1864                Err(_) => continue,
1865            };
1866            let verified = verify_message_v31(&event, &trust).is_ok();
1867            if as_json {
1868                let mut event_with_meta = event.clone();
1869                if let Some(obj) = event_with_meta.as_object_mut() {
1870                    obj.insert("verified".into(), json!(verified));
1871                }
1872                println!("{}", serde_json::to_string(&event_with_meta)?);
1873            } else {
1874                let ts = event
1875                    .get("timestamp")
1876                    .and_then(Value::as_str)
1877                    .unwrap_or("?");
1878                let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
1879                let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1880                let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
1881                let summary = event
1882                    .get("body")
1883                    .map(|b| match b {
1884                        Value::String(s) => s.clone(),
1885                        _ => b.to_string(),
1886                    })
1887                    .unwrap_or_default();
1888                let mark = if verified { "✓" } else { "✗" };
1889                let deadline = event
1890                    .get("time_sensitive_until")
1891                    .and_then(Value::as_str)
1892                    .map(|d| format!(" deadline: {d}"))
1893                    .unwrap_or_default();
1894                println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
1895            }
1896            count += 1;
1897            if limit > 0 && count >= limit {
1898                return Ok(());
1899            }
1900        }
1901    }
1902    Ok(())
1903}
1904
1905// ---------- monitor (live-tail across all peers, harness-friendly) ----------
1906
1907/// Events filtered out of `wire monitor` by default — pair handshake +
1908/// liveness pings. Operators almost never want these surfaced; an explicit
1909/// `--include-handshake` brings them back.
1910fn monitor_is_noise_kind(kind: &str) -> bool {
1911    matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
1912}
1913
1914/// Render a single InboxEvent for `wire monitor` output. JSON form emits the
1915/// full structured event for tooling consumption; the plain form is a tight
1916/// one-line summary suitable as a harness stream-watcher notification.
1917fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
1918    if as_json {
1919        Ok(serde_json::to_string(e)?)
1920    } else {
1921        let eid_short: String = e.event_id.chars().take(12).collect();
1922        let body = e.body_preview.replace('\n', " ");
1923        let ts: String = e.timestamp.chars().take(19).collect();
1924        Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
1925    }
1926}
1927
1928/// `wire monitor` — long-running line-per-event stream of new inbox events.
1929///
1930/// Built for agent harnesses that have an "every stdout line is a chat
1931/// notification" stream watcher (Claude Code Monitor tool, etc.). One
1932/// command, persistent, filtered. Replaces the manual `tail -F inbox/*.jsonl
1933/// | python parse | grep -v pair_drop` pipeline operators improvise on day
1934/// one of every wire session.
1935///
1936/// Default filter strips `pair_drop`, `pair_drop_ack`, and `heartbeat` —
1937/// pure handshake / liveness noise that operators almost never want
1938/// surfaced. Pass `--include-handshake` if you do.
1939///
1940/// Cursor: in-memory only. Starts from EOF (so a fresh `wire monitor`
1941/// doesn't drown the operator in replay), with optional `--replay N` to
1942/// emit the last N events first.
1943fn cmd_monitor(
1944    peer_filter: Option<&str>,
1945    as_json: bool,
1946    include_handshake: bool,
1947    interval_ms: u64,
1948    replay: usize,
1949) -> Result<()> {
1950    let inbox_dir = config::inbox_dir()?;
1951    if !inbox_dir.exists() {
1952        if !as_json {
1953            eprintln!(
1954                "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
1955            );
1956        }
1957        // Still proceed — InboxWatcher::from_dir_head handles missing dir.
1958    }
1959
1960    // Optional replay — read existing files and emit the last `replay` events
1961    // (post-filter) before going live. Useful when the harness restarts and
1962    // wants recent context.
1963    if replay > 0 && inbox_dir.exists() {
1964        let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
1965        for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
1966            let path = entry.path();
1967            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
1968                continue;
1969            }
1970            let peer = match path.file_stem().and_then(|s| s.to_str()) {
1971                Some(s) => s.to_string(),
1972                None => continue,
1973            };
1974            if let Some(filter) = peer_filter {
1975                if peer != filter {
1976                    continue;
1977                }
1978            }
1979            let body = std::fs::read_to_string(&path).unwrap_or_default();
1980            for line in body.lines() {
1981                let line = line.trim();
1982                if line.is_empty() {
1983                    continue;
1984                }
1985                let signed: Value = match serde_json::from_str(line) {
1986                    Ok(v) => v,
1987                    Err(_) => continue,
1988                };
1989                let ev = crate::inbox_watch::InboxEvent::from_signed(
1990                    &peer,
1991                    signed,
1992                    /* verified */ true,
1993                );
1994                if !include_handshake && monitor_is_noise_kind(&ev.kind) {
1995                    continue;
1996                }
1997                all.push(ev);
1998            }
1999        }
2000        // Sort by timestamp string (RFC3339-ish — lexicographic order matches
2001        // chronological for same-zoned timestamps).
2002        all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2003        let start = all.len().saturating_sub(replay);
2004        for ev in &all[start..] {
2005            println!("{}", monitor_render(ev, as_json)?);
2006        }
2007        use std::io::Write;
2008        std::io::stdout().flush().ok();
2009    }
2010
2011    // Live loop. InboxWatcher::from_head() seeds cursors at current EOF, so
2012    // the first poll only returns events that arrived AFTER startup.
2013    let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2014    let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2015
2016    loop {
2017        let events = w.poll()?;
2018        let mut wrote = false;
2019        for ev in events {
2020            if let Some(filter) = peer_filter {
2021                if ev.peer != filter {
2022                    continue;
2023                }
2024            }
2025            if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2026                continue;
2027            }
2028            println!("{}", monitor_render(&ev, as_json)?);
2029            wrote = true;
2030        }
2031        if wrote {
2032            use std::io::Write;
2033            std::io::stdout().flush().ok();
2034        }
2035        std::thread::sleep(sleep_dur);
2036    }
2037}
2038
2039#[cfg(test)]
2040mod tier_tests {
2041    use super::*;
2042    use serde_json::json;
2043
2044    fn trust_with(handle: &str, tier: &str) -> Value {
2045        json!({
2046            "version": 1,
2047            "agents": {
2048                handle: {
2049                    "tier": tier,
2050                    "did": format!("did:wire:{handle}"),
2051                    "card": {"capabilities": ["wire/v3.1"]}
2052                }
2053            }
2054        })
2055    }
2056
2057    #[test]
2058    fn pending_ack_when_verified_but_no_slot_token() {
2059        // P0.Y rule: after `wire add`, trust says VERIFIED but the peer's
2060        // slot_token hasn't arrived yet. Display PENDING_ACK so the
2061        // operator knows wire send won't work yet.
2062        let trust = trust_with("willard", "VERIFIED");
2063        let relay_state = json!({
2064            "peers": {
2065                "willard": {
2066                    "relay_url": "https://relay",
2067                    "slot_id": "abc",
2068                    "slot_token": "",
2069                }
2070            }
2071        });
2072        assert_eq!(
2073            effective_peer_tier(&trust, &relay_state, "willard"),
2074            "PENDING_ACK"
2075        );
2076    }
2077
2078    #[test]
2079    fn verified_when_slot_token_present() {
2080        let trust = trust_with("willard", "VERIFIED");
2081        let relay_state = json!({
2082            "peers": {
2083                "willard": {
2084                    "relay_url": "https://relay",
2085                    "slot_id": "abc",
2086                    "slot_token": "tok123",
2087                }
2088            }
2089        });
2090        assert_eq!(
2091            effective_peer_tier(&trust, &relay_state, "willard"),
2092            "VERIFIED"
2093        );
2094    }
2095
2096    #[test]
2097    fn raw_tier_passes_through_for_non_verified() {
2098        // PENDING_ACK should ONLY decorate VERIFIED. UNTRUSTED stays
2099        // UNTRUSTED regardless of slot_token state.
2100        let trust = trust_with("willard", "UNTRUSTED");
2101        let relay_state = json!({
2102            "peers": {"willard": {"slot_token": ""}}
2103        });
2104        assert_eq!(
2105            effective_peer_tier(&trust, &relay_state, "willard"),
2106            "UNTRUSTED"
2107        );
2108    }
2109
2110    #[test]
2111    fn pending_ack_when_relay_state_missing_peer() {
2112        // After wire add, trust gets updated BEFORE relay_state.peers does.
2113        // If relay_state has no entry for the peer at all, the operator
2114        // still hasn't completed the bilateral pin — show PENDING_ACK.
2115        let trust = trust_with("willard", "VERIFIED");
2116        let relay_state = json!({"peers": {}});
2117        assert_eq!(
2118            effective_peer_tier(&trust, &relay_state, "willard"),
2119            "PENDING_ACK"
2120        );
2121    }
2122}
2123
2124#[cfg(test)]
2125mod monitor_tests {
2126    use super::*;
2127    use crate::inbox_watch::InboxEvent;
2128    use serde_json::Value;
2129
2130    fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2131        InboxEvent {
2132            peer: peer.to_string(),
2133            event_id: "abcd1234567890ef".to_string(),
2134            kind: kind.to_string(),
2135            body_preview: body.to_string(),
2136            verified: true,
2137            timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2138            raw: Value::Null,
2139        }
2140    }
2141
2142    #[test]
2143    fn monitor_filter_drops_handshake_kinds_by_default() {
2144        // The whole point: pair_drop / pair_drop_ack / heartbeat are
2145        // protocol noise. If they leak into the operator's chat stream by
2146        // default, the recipe is useless ("wire monitor talks too much,
2147        // disabled it"). Burn this rule in.
2148        assert!(monitor_is_noise_kind("pair_drop"));
2149        assert!(monitor_is_noise_kind("pair_drop_ack"));
2150        assert!(monitor_is_noise_kind("heartbeat"));
2151
2152        // Real-payload kinds — operator wants every one.
2153        assert!(!monitor_is_noise_kind("claim"));
2154        assert!(!monitor_is_noise_kind("decision"));
2155        assert!(!monitor_is_noise_kind("ack"));
2156        assert!(!monitor_is_noise_kind("request"));
2157        assert!(!monitor_is_noise_kind("note"));
2158        // Unknown future kinds shouldn't be filtered as noise either —
2159        // operator probably wants to see something they don't recognise,
2160        // not have it silently dropped (the P0.1 lesson at the UX layer).
2161        assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2162    }
2163
2164    #[test]
2165    fn monitor_render_plain_is_one_short_line() {
2166        let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2167        let line = monitor_render(&e, false).unwrap();
2168        // Must be single-line.
2169        assert!(!line.contains('\n'), "render must be one line: {line}");
2170        // Must include peer, kind, body fragment, short event_id.
2171        assert!(line.contains("willard"));
2172        assert!(line.contains("claim"));
2173        assert!(line.contains("real v8 train"));
2174        // Short event id (first 12 chars).
2175        assert!(line.contains("abcd12345678"));
2176        assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2177        // RFC3339-ish second precision.
2178        assert!(line.contains("2026-05-15T23:14:07"));
2179    }
2180
2181    #[test]
2182    fn monitor_render_strips_newlines_from_body() {
2183        // Multi-line bodies (markdown lists, code, etc.) must collapse to
2184        // one line — otherwise a single message produces multiple
2185        // notifications in the harness, ruining the "one event = one line"
2186        // contract the Monitor tool relies on.
2187        let e = ev("spark", "claim", "line one\nline two\nline three");
2188        let line = monitor_render(&e, false).unwrap();
2189        assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2190        assert!(line.contains("line one line two line three"));
2191    }
2192
2193    #[test]
2194    fn monitor_render_json_is_valid_jsonl() {
2195        let e = ev("spark", "claim", "hi");
2196        let line = monitor_render(&e, true).unwrap();
2197        assert!(!line.contains('\n'));
2198        let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2199        assert_eq!(parsed["peer"], "spark");
2200        assert_eq!(parsed["kind"], "claim");
2201        assert_eq!(parsed["body_preview"], "hi");
2202    }
2203
2204    #[test]
2205    fn monitor_does_not_drop_on_verified_null() {
2206        // Spark's bug confession on 2026-05-15: their monitor pipeline ran
2207        // `select(.verified == true)` against inbox JSONL. Daemon writes
2208        // events with verified=null (verification happens at tail-time, not
2209        // write-time), so the filter silently rejected everything — same
2210        // anti-pattern as P0.1 at the JSON-jq level. Cost: 4 of my events
2211        // never surfaced for ~30min.
2212        //
2213        // wire monitor's render path must NOT consult `.verified` for any
2214        // filter decision. Lock that in here so a future "be conservative,
2215        // only emit verified" patch can't quietly land.
2216        let mut e = ev("spark", "claim", "from disk with verified=null");
2217        e.verified = false; // worst case — even if disk says unverified, emit
2218        let line = monitor_render(&e, false).unwrap();
2219        assert!(line.contains("from disk with verified=null"));
2220        // Noise filter operates purely on kind, never on verified.
2221        assert!(!monitor_is_noise_kind("claim"));
2222    }
2223}
2224
2225// ---------- verify ----------
2226
2227fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2228    let body = if path == "-" {
2229        let mut buf = String::new();
2230        use std::io::Read;
2231        std::io::stdin().read_to_string(&mut buf)?;
2232        buf
2233    } else {
2234        std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2235    };
2236    let event: Value = serde_json::from_str(&body)?;
2237    let trust = config::read_trust()?;
2238    match verify_message_v31(&event, &trust) {
2239        Ok(()) => {
2240            if as_json {
2241                println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2242            } else {
2243                println!("verified ✓");
2244            }
2245            Ok(())
2246        }
2247        Err(e) => {
2248            let reason = e.to_string();
2249            if as_json {
2250                println!(
2251                    "{}",
2252                    serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2253                );
2254            } else {
2255                eprintln!("FAILED: {reason}");
2256            }
2257            std::process::exit(1);
2258        }
2259    }
2260}
2261
2262// ---------- mcp / relay-server stubs ----------
2263
2264fn cmd_mcp() -> Result<()> {
2265    crate::mcp::run()
2266}
2267
2268fn cmd_relay_server(bind: &str) -> Result<()> {
2269    // Default state dir for the relay process: $WIRE_HOME/state/wire-relay
2270    // (or `dirs::state_dir()/wire-relay`). Distinct from the CLI's state dir
2271    // so a single user can run both client and server on one machine.
2272    let state_dir = if let Ok(home) = std::env::var("WIRE_HOME") {
2273        std::path::PathBuf::from(home)
2274            .join("state")
2275            .join("wire-relay")
2276    } else {
2277        dirs::state_dir()
2278            .or_else(dirs::data_local_dir)
2279            .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2280            .join("wire-relay")
2281    };
2282    let runtime = tokio::runtime::Builder::new_multi_thread()
2283        .enable_all()
2284        .build()?;
2285    runtime.block_on(crate::relay_server::serve(bind, state_dir))
2286}
2287
2288// ---------- bind-relay ----------
2289
2290fn cmd_bind_relay(url: &str, as_json: bool) -> Result<()> {
2291    if !config::is_initialized()? {
2292        bail!("not initialized — run `wire init <handle>` first");
2293    }
2294    let card = config::read_agent_card()?;
2295    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2296    let handle = crate::agent_card::display_handle_from_did(did).to_string();
2297
2298    let normalized = url.trim_end_matches('/');
2299    let client = crate::relay_client::RelayClient::new(normalized);
2300    client.check_healthz()?;
2301    let alloc = client.allocate_slot(Some(&handle))?;
2302    let mut state = config::read_relay_state()?;
2303    state["self"] = json!({
2304        "relay_url": url,
2305        "slot_id": alloc.slot_id,
2306        "slot_token": alloc.slot_token,
2307    });
2308    config::write_relay_state(&state)?;
2309
2310    if as_json {
2311        println!(
2312            "{}",
2313            serde_json::to_string(&json!({
2314                "relay_url": url,
2315                "slot_id": alloc.slot_id,
2316                "slot_token_present": true,
2317            }))?
2318        );
2319    } else {
2320        println!("bound to relay {url}");
2321        println!("slot_id: {}", alloc.slot_id);
2322        println!(
2323            "(slot_token written to {} mode 0600)",
2324            config::relay_state_path()?.display()
2325        );
2326    }
2327    Ok(())
2328}
2329
2330// ---------- add-peer-slot ----------
2331
2332fn cmd_add_peer_slot(
2333    handle: &str,
2334    url: &str,
2335    slot_id: &str,
2336    slot_token: &str,
2337    as_json: bool,
2338) -> Result<()> {
2339    let mut state = config::read_relay_state()?;
2340    let peers = state["peers"]
2341        .as_object_mut()
2342        .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2343    peers.insert(
2344        handle.to_string(),
2345        json!({
2346            "relay_url": url,
2347            "slot_id": slot_id,
2348            "slot_token": slot_token,
2349        }),
2350    );
2351    config::write_relay_state(&state)?;
2352    if as_json {
2353        println!(
2354            "{}",
2355            serde_json::to_string(&json!({
2356                "handle": handle,
2357                "relay_url": url,
2358                "slot_id": slot_id,
2359                "added": true,
2360            }))?
2361        );
2362    } else {
2363        println!("pinned peer slot for {handle} at {url} ({slot_id})");
2364    }
2365    Ok(())
2366}
2367
2368// ---------- push ----------
2369
2370fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2371    let state = config::read_relay_state()?;
2372    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2373    if peers.is_empty() {
2374        bail!(
2375            "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2376        );
2377    }
2378    let outbox_dir = config::outbox_dir()?;
2379    // v0.5.13 loud-fail: warn on outbox files that don't match a pinned peer.
2380    // Pre-v0.5.13 `wire send peer@relay` wrote to `peer@relay.jsonl` while
2381    // push only enumerated bare-handle files. After upgrade, stale FQDN-named
2382    // files sit on disk forever; warn so operator can `cat fqdn.jsonl >> handle.jsonl`.
2383    if outbox_dir.exists() {
2384        let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2385        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2386            let path = entry.path();
2387            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2388                continue;
2389            }
2390            let stem = match path.file_stem().and_then(|s| s.to_str()) {
2391                Some(s) => s.to_string(),
2392                None => continue,
2393            };
2394            if pinned.contains(&stem) {
2395                continue;
2396            }
2397            // Try the bare-handle of the orphaned stem — if THAT matches a
2398            // pinned peer, the stem is a stale FQDN-suffixed file.
2399            let bare = crate::agent_card::bare_handle(&stem);
2400            if pinned.contains(bare) {
2401                eprintln!(
2402                    "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2403                     Merge with: `cat {} >> {}` then delete the FQDN file.",
2404                    stem,
2405                    path.display(),
2406                    outbox_dir.join(format!("{bare}.jsonl")).display(),
2407                );
2408            }
2409        }
2410    }
2411    if !outbox_dir.exists() {
2412        if as_json {
2413            println!(
2414                "{}",
2415                serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2416            );
2417        } else {
2418            println!("phyllis: nothing to dial out — write a message first with `wire send`");
2419        }
2420        return Ok(());
2421    }
2422
2423    let mut pushed = Vec::new();
2424    let mut skipped = Vec::new();
2425
2426    for (peer_handle, slot_info) in peers.iter() {
2427        if let Some(want) = peer_filter
2428            && peer_handle != want
2429        {
2430            continue;
2431        }
2432        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2433        if !outbox.exists() {
2434            continue;
2435        }
2436        let url = slot_info["relay_url"]
2437            .as_str()
2438            .ok_or_else(|| anyhow!("peer {peer_handle} missing relay_url"))?;
2439        let slot_id = slot_info["slot_id"]
2440            .as_str()
2441            .ok_or_else(|| anyhow!("peer {peer_handle} missing slot_id"))?;
2442        let slot_token = slot_info["slot_token"]
2443            .as_str()
2444            .ok_or_else(|| anyhow!("peer {peer_handle} missing slot_token"))?;
2445        let client = crate::relay_client::RelayClient::new(url);
2446        let body = std::fs::read_to_string(&outbox)?;
2447        for line in body.lines() {
2448            let event: Value = match serde_json::from_str(line) {
2449                Ok(v) => v,
2450                Err(_) => continue,
2451            };
2452            let event_id = event
2453                .get("event_id")
2454                .and_then(Value::as_str)
2455                .unwrap_or("")
2456                .to_string();
2457            match client.post_event(slot_id, slot_token, &event) {
2458                Ok(resp) => {
2459                    if resp.status == "duplicate" {
2460                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
2461                    } else {
2462                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
2463                    }
2464                }
2465                Err(e) => {
2466                    // v0.5.13: flatten the anyhow chain so TLS / DNS / timeout
2467                    // errors aren't hidden behind the topmost-context URL string.
2468                    // Issue #6 highest-impact silent-fail fix.
2469                    let reason = crate::relay_client::format_transport_error(&e);
2470                    skipped.push(
2471                        json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
2472                    );
2473                }
2474            }
2475        }
2476    }
2477
2478    if as_json {
2479        println!(
2480            "{}",
2481            serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2482        );
2483    } else {
2484        println!(
2485            "pushed {} event(s); skipped {} ({})",
2486            pushed.len(),
2487            skipped.len(),
2488            if skipped.is_empty() {
2489                "none"
2490            } else {
2491                "see --json for detail"
2492            }
2493        );
2494    }
2495    Ok(())
2496}
2497
2498// ---------- pull ----------
2499
2500fn cmd_pull(as_json: bool) -> Result<()> {
2501    let state = config::read_relay_state()?;
2502    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2503    if self_state.is_null() {
2504        bail!("self slot not bound — run `wire bind-relay <url>` first");
2505    }
2506    let url = self_state["relay_url"]
2507        .as_str()
2508        .ok_or_else(|| anyhow!("self.relay_url missing"))?;
2509    let slot_id = self_state["slot_id"]
2510        .as_str()
2511        .ok_or_else(|| anyhow!("self.slot_id missing"))?;
2512    let slot_token = self_state["slot_token"]
2513        .as_str()
2514        .ok_or_else(|| anyhow!("self.slot_token missing"))?;
2515    let last_event_id = self_state
2516        .get("last_pulled_event_id")
2517        .and_then(Value::as_str)
2518        .map(str::to_string);
2519
2520    let client = crate::relay_client::RelayClient::new(url);
2521    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
2522
2523    let inbox_dir = config::inbox_dir()?;
2524    config::ensure_dirs()?;
2525
2526    // P0.1 (0.5.11): cursor advancement now blocks on unknown kinds and
2527    // transient verify errors. See `pull::process_events`.
2528    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
2529
2530    // P0.3 (0.5.11): cursor write goes through update_relay_state, which
2531    // takes an exclusive flock on relay.lock for the whole RMW. Concurrent
2532    // wire processes (multi-daemon, CLI vs daemon, CLI vs MCP) serialise
2533    // through the lock — no more racing the cursor like today's debug.
2534    if let Some(eid) = &result.advance_cursor_to {
2535        let eid = eid.clone();
2536        config::update_relay_state(|state| {
2537            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2538                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
2539            }
2540            Ok(())
2541        })?;
2542    }
2543
2544    if as_json {
2545        println!(
2546            "{}",
2547            serde_json::to_string(&json!({
2548                "written": result.written,
2549                "rejected": result.rejected,
2550                "total_seen": events.len(),
2551                "cursor_blocked": result.blocked,
2552                "cursor_advanced_to": result.advance_cursor_to,
2553            }))?
2554        );
2555    } else {
2556        let blocking = result
2557            .rejected
2558            .iter()
2559            .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2560            .count();
2561        if blocking > 0 {
2562            println!(
2563                "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2564                events.len(),
2565                result.written.len(),
2566                result.rejected.len(),
2567                blocking,
2568            );
2569        } else {
2570            println!(
2571                "pulled {} event(s); wrote {}; rejected {}",
2572                events.len(),
2573                result.written.len(),
2574                result.rejected.len(),
2575            );
2576        }
2577    }
2578    Ok(())
2579}
2580
2581// ---------- rotate-slot ----------
2582
2583fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2584    if !config::is_initialized()? {
2585        bail!("not initialized — run `wire init <handle>` first");
2586    }
2587    let mut state = config::read_relay_state()?;
2588    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2589    if self_state.is_null() {
2590        bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
2591    }
2592    let url = self_state["relay_url"]
2593        .as_str()
2594        .ok_or_else(|| anyhow!("self.relay_url missing"))?
2595        .to_string();
2596    let old_slot_id = self_state["slot_id"]
2597        .as_str()
2598        .ok_or_else(|| anyhow!("self.slot_id missing"))?
2599        .to_string();
2600    let old_slot_token = self_state["slot_token"]
2601        .as_str()
2602        .ok_or_else(|| anyhow!("self.slot_token missing"))?
2603        .to_string();
2604
2605    // Read identity to sign the announcement.
2606    let card = config::read_agent_card()?;
2607    let did = card
2608        .get("did")
2609        .and_then(Value::as_str)
2610        .unwrap_or("")
2611        .to_string();
2612    let handle = crate::agent_card::display_handle_from_did(&did).to_string();
2613    let pk_b64 = card
2614        .get("verify_keys")
2615        .and_then(Value::as_object)
2616        .and_then(|m| m.values().next())
2617        .and_then(|v| v.get("key"))
2618        .and_then(Value::as_str)
2619        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
2620        .to_string();
2621    let pk_bytes = crate::signing::b64decode(&pk_b64)?;
2622    let sk_seed = config::read_private_key()?;
2623
2624    // Allocate new slot on the same relay.
2625    let normalized = url.trim_end_matches('/').to_string();
2626    let client = crate::relay_client::RelayClient::new(&normalized);
2627    client
2628        .check_healthz()
2629        .context("aborting rotation; old slot still valid")?;
2630    let alloc = client.allocate_slot(Some(&handle))?;
2631    let new_slot_id = alloc.slot_id.clone();
2632    let new_slot_token = alloc.slot_token.clone();
2633
2634    // Optionally announce the rotation to every paired peer via the OLD slot.
2635    // Each peer's recipient-side `wire pull` will pick up this event before
2636    // their daemon next polls the new slot — but auto-update of peer's
2637    // relay.json from a wire_close event is a v0.2 daemon feature; for now
2638    // peers see the event and an operator must manually `add-peer-slot` the
2639    // new coords, OR re-pair via SAS.
2640    let mut announced: Vec<String> = Vec::new();
2641    if !no_announce {
2642        let now = time::OffsetDateTime::now_utc()
2643            .format(&time::format_description::well_known::Rfc3339)
2644            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
2645        let body = json!({
2646            "reason": "operator-initiated slot rotation",
2647            "new_relay_url": url,
2648            "new_slot_id": new_slot_id,
2649            // NOTE: new_slot_token deliberately NOT shared in the broadcast.
2650            // In v0.1 slot tokens are bilateral-shared, so peer can post via
2651            // existing add-peer-slot flow if operator chooses to re-issue.
2652        });
2653        let peers = state["peers"].as_object().cloned().unwrap_or_default();
2654        for (peer_handle, _peer_info) in peers.iter() {
2655            let event = json!({
2656                "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
2657                "timestamp": now.clone(),
2658                "from": did,
2659                "to": format!("did:wire:{peer_handle}"),
2660                "type": "wire_close",
2661                "kind": 1201,
2662                "body": body.clone(),
2663            });
2664            let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
2665                Ok(s) => s,
2666                Err(e) => {
2667                    eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
2668                    continue;
2669                }
2670            };
2671            // Post to OUR old slot (we're announcing on our own slot, NOT
2672            // peer's slot — peer reads from us). Wait, this is wrong: peers
2673            // read from THEIR OWN slot via wire pull. To reach peer A, we
2674            // post to peer A's slot. Use the existing per-peer slot mapping.
2675            let peer_info = match state["peers"].get(peer_handle) {
2676                Some(p) => p.clone(),
2677                None => continue,
2678            };
2679            let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
2680            let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
2681            let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
2682            if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
2683                continue;
2684            }
2685            let peer_client = if peer_url == url {
2686                client.clone()
2687            } else {
2688                crate::relay_client::RelayClient::new(peer_url)
2689            };
2690            match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
2691                Ok(_) => announced.push(peer_handle.clone()),
2692                Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
2693            }
2694        }
2695    }
2696
2697    // Swap the self-slot to the new one.
2698    state["self"] = json!({
2699        "relay_url": url,
2700        "slot_id": new_slot_id,
2701        "slot_token": new_slot_token,
2702    });
2703    config::write_relay_state(&state)?;
2704
2705    if as_json {
2706        println!(
2707            "{}",
2708            serde_json::to_string(&json!({
2709                "rotated": true,
2710                "old_slot_id": old_slot_id,
2711                "new_slot_id": new_slot_id,
2712                "relay_url": url,
2713                "announced_to": announced,
2714            }))?
2715        );
2716    } else {
2717        println!("rotated slot on {url}");
2718        println!(
2719            "  old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
2720        );
2721        println!("  new slot_id: {new_slot_id}");
2722        if !announced.is_empty() {
2723            println!(
2724                "  announced wire_close (kind=1201) to: {}",
2725                announced.join(", ")
2726            );
2727        }
2728        println!();
2729        println!("next steps:");
2730        println!("  - peers see the wire_close event in their next `wire pull`");
2731        println!(
2732            "  - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
2733        );
2734        println!("    (or full re-pair via `wire pair-host`/`wire join`)");
2735        println!("  - until they do, you'll receive but they won't be able to reach you");
2736        // Suppress unused warning
2737        let _ = old_slot_token;
2738    }
2739    Ok(())
2740}
2741
2742// ---------- forget-peer ----------
2743
2744fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
2745    let mut trust = config::read_trust()?;
2746    let mut removed_from_trust = false;
2747    if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
2748        && agents.remove(handle).is_some()
2749    {
2750        removed_from_trust = true;
2751    }
2752    config::write_trust(&trust)?;
2753
2754    let mut state = config::read_relay_state()?;
2755    let mut removed_from_relay = false;
2756    if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
2757        && peers.remove(handle).is_some()
2758    {
2759        removed_from_relay = true;
2760    }
2761    config::write_relay_state(&state)?;
2762
2763    let mut purged: Vec<String> = Vec::new();
2764    if purge {
2765        for dir in [config::inbox_dir()?, config::outbox_dir()?] {
2766            let path = dir.join(format!("{handle}.jsonl"));
2767            if path.exists() {
2768                std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
2769                purged.push(path.to_string_lossy().into());
2770            }
2771        }
2772    }
2773
2774    if !removed_from_trust && !removed_from_relay {
2775        if as_json {
2776            println!(
2777                "{}",
2778                serde_json::to_string(&json!({
2779                    "removed": false,
2780                    "reason": format!("peer {handle:?} not pinned"),
2781                }))?
2782            );
2783        } else {
2784            eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
2785        }
2786        return Ok(());
2787    }
2788
2789    if as_json {
2790        println!(
2791            "{}",
2792            serde_json::to_string(&json!({
2793                "handle": handle,
2794                "removed_from_trust": removed_from_trust,
2795                "removed_from_relay_state": removed_from_relay,
2796                "purged_files": purged,
2797            }))?
2798        );
2799    } else {
2800        println!("forgot peer {handle:?}");
2801        if removed_from_trust {
2802            println!("  - removed from trust.json");
2803        }
2804        if removed_from_relay {
2805            println!("  - removed from relay.json");
2806        }
2807        if !purged.is_empty() {
2808            for p in &purged {
2809                println!("  - deleted {p}");
2810            }
2811        } else if !purge {
2812            println!("  (inbox/outbox files preserved; pass --purge to delete them)");
2813        }
2814    }
2815    Ok(())
2816}
2817
2818// ---------- daemon (long-lived push+pull sync) ----------
2819
2820fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
2821    if !config::is_initialized()? {
2822        bail!("not initialized — run `wire init <handle>` first");
2823    }
2824    let interval = std::time::Duration::from_secs(interval_secs.max(1));
2825
2826    if !as_json {
2827        if once {
2828            eprintln!("wire daemon: single sync cycle, then exit");
2829        } else {
2830            eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
2831        }
2832    }
2833
2834    // Recover from prior crash: any pending pair in transient state had its
2835    // in-memory SPAKE2 secret lost when the previous daemon exited. Release
2836    // the relay slots and mark the files so the operator can re-issue.
2837    if let Err(e) = crate::pending_pair::cleanup_on_startup() {
2838        eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
2839    }
2840
2841    // R1 phase 2: spawn the SSE stream subscriber. On every event pushed
2842    // to our slot, the subscriber signals `wake_rx`; we use it as the
2843    // sleep-or-wake gate of the polling loop. Polling stays as the
2844    // safety net — stream errors fall back transparently to the existing
2845    // interval-based cadence.
2846    let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
2847    if !once {
2848        crate::daemon_stream::spawn_stream_subscriber(wake_tx);
2849    }
2850
2851    loop {
2852        let pushed = run_sync_push().unwrap_or_else(|e| {
2853            eprintln!("daemon: push error: {e:#}");
2854            json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
2855        });
2856        let pulled = run_sync_pull().unwrap_or_else(|e| {
2857            eprintln!("daemon: pull error: {e:#}");
2858            json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
2859        });
2860        let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
2861            eprintln!("daemon: pending-pair tick error: {e:#}");
2862            json!({"transitions": []})
2863        });
2864
2865        if as_json {
2866            println!(
2867                "{}",
2868                serde_json::to_string(&json!({
2869                    "ts": time::OffsetDateTime::now_utc()
2870                        .format(&time::format_description::well_known::Rfc3339)
2871                        .unwrap_or_default(),
2872                    "push": pushed,
2873                    "pull": pulled,
2874                    "pairs": pairs,
2875                }))?
2876            );
2877        } else {
2878            let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
2879            let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
2880            let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
2881            let pair_transitions = pairs["transitions"]
2882                .as_array()
2883                .map(|a| a.len())
2884                .unwrap_or(0);
2885            if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
2886                eprintln!(
2887                    "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
2888                );
2889            }
2890            // Loud per-transition logging so operator sees pair progress live.
2891            if let Some(arr) = pairs["transitions"].as_array() {
2892                for t in arr {
2893                    eprintln!(
2894                        "  pair {} : {} → {}",
2895                        t.get("code").and_then(Value::as_str).unwrap_or("?"),
2896                        t.get("from").and_then(Value::as_str).unwrap_or("?"),
2897                        t.get("to").and_then(Value::as_str).unwrap_or("?")
2898                    );
2899                    if let Some(sas) = t.get("sas").and_then(Value::as_str)
2900                        && t.get("to").and_then(Value::as_str) == Some("sas_ready")
2901                    {
2902                        eprintln!("    SAS digits: {}-{}", &sas[..3], &sas[3..]);
2903                        eprintln!(
2904                            "    Run: wire pair-confirm {} {}",
2905                            t.get("code").and_then(Value::as_str).unwrap_or("?"),
2906                            sas
2907                        );
2908                    }
2909                }
2910            }
2911        }
2912
2913        if once {
2914            return Ok(());
2915        }
2916        // Wait either for the next poll-interval tick OR for a stream
2917        // wake signal — whichever comes first. Drain any additional
2918        // wake-ups that accumulated during the previous cycle since one
2919        // pull catches up everything.
2920        let _ = wake_rx.recv_timeout(interval);
2921        while wake_rx.try_recv().is_ok() {}
2922    }
2923}
2924
2925/// Programmatic push (no stdout, no exit on errors). Returns the same JSON
2926/// shape `wire push --json` emits.
2927fn run_sync_push() -> Result<Value> {
2928    let state = config::read_relay_state()?;
2929    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2930    if peers.is_empty() {
2931        return Ok(json!({"pushed": [], "skipped": []}));
2932    }
2933    let outbox_dir = config::outbox_dir()?;
2934    if !outbox_dir.exists() {
2935        return Ok(json!({"pushed": [], "skipped": []}));
2936    }
2937    let mut pushed = Vec::new();
2938    let mut skipped = Vec::new();
2939    for (peer_handle, slot_info) in peers.iter() {
2940        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2941        if !outbox.exists() {
2942            continue;
2943        }
2944        let url = slot_info["relay_url"].as_str().unwrap_or("");
2945        let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
2946        let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
2947        if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
2948            continue;
2949        }
2950        let client = crate::relay_client::RelayClient::new(url);
2951        let body = std::fs::read_to_string(&outbox)?;
2952        for line in body.lines() {
2953            let event: Value = match serde_json::from_str(line) {
2954                Ok(v) => v,
2955                Err(_) => continue,
2956            };
2957            let event_id = event
2958                .get("event_id")
2959                .and_then(Value::as_str)
2960                .unwrap_or("")
2961                .to_string();
2962            match client.post_event(slot_id, slot_token, &event) {
2963                Ok(resp) => {
2964                    if resp.status == "duplicate" {
2965                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
2966                    } else {
2967                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
2968                    }
2969                }
2970                Err(e) => {
2971                    // v0.5.13: flatten the anyhow chain so TLS / DNS / timeout
2972                    // errors aren't hidden behind the topmost-context URL string.
2973                    // Issue #6 highest-impact silent-fail fix.
2974                    let reason = crate::relay_client::format_transport_error(&e);
2975                    skipped.push(
2976                        json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
2977                    );
2978                }
2979            }
2980        }
2981    }
2982    Ok(json!({"pushed": pushed, "skipped": skipped}))
2983}
2984
2985/// Programmatic pull. Same shape as `wire pull --json`.
2986fn run_sync_pull() -> Result<Value> {
2987    let state = config::read_relay_state()?;
2988    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2989    if self_state.is_null() {
2990        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
2991    }
2992    let url = self_state["relay_url"].as_str().unwrap_or("");
2993    let slot_id = self_state["slot_id"].as_str().unwrap_or("");
2994    let slot_token = self_state["slot_token"].as_str().unwrap_or("");
2995    let last_event_id = self_state
2996        .get("last_pulled_event_id")
2997        .and_then(Value::as_str)
2998        .map(str::to_string);
2999    if url.is_empty() {
3000        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3001    }
3002    let client = crate::relay_client::RelayClient::new(url);
3003    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3004    let inbox_dir = config::inbox_dir()?;
3005    config::ensure_dirs()?;
3006
3007    // P0.1 (0.5.11): shared cursor-blocking logic. Daemon's --once path
3008    // must match the CLI's `wire pull` semantics or version-skew bugs
3009    // re-emerge by another route.
3010    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3011
3012    // P0.3 (0.5.11): same flock-protected RMW as cmd_pull.
3013    if let Some(eid) = &result.advance_cursor_to {
3014        let eid = eid.clone();
3015        config::update_relay_state(|state| {
3016            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3017                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3018            }
3019            Ok(())
3020        })?;
3021    }
3022
3023    Ok(json!({
3024        "written": result.written,
3025        "rejected": result.rejected,
3026        "total_seen": events.len(),
3027        "cursor_blocked": result.blocked,
3028        "cursor_advanced_to": result.advance_cursor_to,
3029    }))
3030}
3031
3032// ---------- pin (manual out-of-band peer pairing) ----------
3033
3034fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3035    let body =
3036        std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3037    let card: Value =
3038        serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3039    crate::agent_card::verify_agent_card(&card)
3040        .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3041
3042    let mut trust = config::read_trust()?;
3043    crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3044
3045    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3046    let handle = crate::agent_card::display_handle_from_did(did).to_string();
3047    config::write_trust(&trust)?;
3048
3049    if as_json {
3050        println!(
3051            "{}",
3052            serde_json::to_string(&json!({
3053                "handle": handle,
3054                "did": did,
3055                "tier": "VERIFIED",
3056                "pinned": true,
3057            }))?
3058        );
3059    } else {
3060        println!("pinned {handle} ({did}) at tier VERIFIED");
3061    }
3062    Ok(())
3063}
3064
3065// ---------- pair-host / pair-join (the magic-wormhole flow) ----------
3066
3067fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3068    pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3069}
3070
3071fn cmd_pair_join(
3072    code_phrase: &str,
3073    relay_url: &str,
3074    auto_yes: bool,
3075    timeout_secs: u64,
3076) -> Result<()> {
3077    pair_orchestrate(
3078        relay_url,
3079        Some(code_phrase),
3080        "guest",
3081        auto_yes,
3082        timeout_secs,
3083    )
3084}
3085
3086/// Shared orchestration for both sides of the SAS pairing.
3087///
3088/// Now thin: delegates to `pair_session::pair_session_open` / `_try_sas` /
3089/// `_finalize`. CLI keeps its interactive y/N prompt; MCP uses
3090/// `pair_session_confirm_sas` instead.
3091fn pair_orchestrate(
3092    relay_url: &str,
3093    code_in: Option<&str>,
3094    role: &str,
3095    auto_yes: bool,
3096    timeout_secs: u64,
3097) -> Result<()> {
3098    use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3099
3100    let mut s = pair_session_open(role, relay_url, code_in)?;
3101
3102    if role == "host" {
3103        eprintln!();
3104        eprintln!("share this code phrase with your peer:");
3105        eprintln!();
3106        eprintln!("    {}", s.code);
3107        eprintln!();
3108        eprintln!(
3109            "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3110            s.code
3111        );
3112    } else {
3113        eprintln!();
3114        eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3115    }
3116
3117    // Stage 2 — poll for SAS-ready with periodic progress heartbeat. The bare
3118    // pair_session_wait_for_sas helper is silent; the CLI wraps it in a loop
3119    // that emits a "waiting (Ns / Ts)" line every HEARTBEAT_SECS so operators
3120    // see the process is alive while the other side connects.
3121    const HEARTBEAT_SECS: u64 = 10;
3122    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3123    let started = std::time::Instant::now();
3124    let mut last_heartbeat = started;
3125    let formatted = loop {
3126        if let Some(sas) = pair_session_try_sas(&mut s)? {
3127            break sas;
3128        }
3129        let now = std::time::Instant::now();
3130        if now >= deadline {
3131            return Err(anyhow!(
3132                "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3133            ));
3134        }
3135        if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3136            let elapsed = now.duration_since(started).as_secs();
3137            eprintln!("  ... still waiting ({elapsed}s / {timeout_secs}s)");
3138            last_heartbeat = now;
3139        }
3140        std::thread::sleep(std::time::Duration::from_millis(250));
3141    };
3142
3143    eprintln!();
3144    eprintln!("SAS digits (must match peer's terminal):");
3145    eprintln!();
3146    eprintln!("    {formatted}");
3147    eprintln!();
3148
3149    // Stage 3 — operator confirmation. CLI uses interactive y/N for backward
3150    // compatibility; MCP uses pair_session_confirm_sas with the typed digits.
3151    if !auto_yes {
3152        eprint!("does this match your peer's terminal? [y/N]: ");
3153        use std::io::Write;
3154        std::io::stderr().flush().ok();
3155        let mut input = String::new();
3156        std::io::stdin().read_line(&mut input)?;
3157        let trimmed = input.trim().to_lowercase();
3158        if trimmed != "y" && trimmed != "yes" {
3159            bail!("SAS confirmation declined — aborting pairing");
3160        }
3161    }
3162    s.sas_confirmed = true;
3163
3164    // Stage 4 — seal+exchange bootstrap, pin peer.
3165    let result = pair_session_finalize(&mut s, timeout_secs)?;
3166
3167    let peer_did = result["paired_with"].as_str().unwrap_or("");
3168    let peer_role = if role == "host" { "guest" } else { "host" };
3169    eprintln!("paired with {peer_did} (peer role: {peer_role})");
3170    eprintln!("peer card pinned at tier VERIFIED");
3171    eprintln!(
3172        "peer relay slot saved to {}",
3173        config::relay_state_path()?.display()
3174    );
3175
3176    println!("{}", serde_json::to_string(&result)?);
3177    Ok(())
3178}
3179
3180// (poll_until helper removed — pair flow now uses pair_session::pair_session_wait_for_sas
3181// and pair_session_finalize, both of which inline their own deadline loops.)
3182
3183// ---------- pair — single-shot init + pair-* + setup ----------
3184
3185fn cmd_pair(
3186    handle: &str,
3187    code: Option<&str>,
3188    relay: &str,
3189    auto_yes: bool,
3190    timeout_secs: u64,
3191    no_setup: bool,
3192) -> Result<()> {
3193    // Step 1 — idempotent identity. Safe if already initialized with the SAME handle;
3194    // bails loudly if a different handle is already set (operator must explicitly delete).
3195    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3196    let did = init_result
3197        .get("did")
3198        .and_then(|v| v.as_str())
3199        .unwrap_or("(unknown)")
3200        .to_string();
3201    let already = init_result
3202        .get("already_initialized")
3203        .and_then(|v| v.as_bool())
3204        .unwrap_or(false);
3205    if already {
3206        println!("(identity {did} already initialized — reusing)");
3207    } else {
3208        println!("initialized {did}");
3209    }
3210    println!();
3211
3212    // Step 2 — pair-host or pair-join based on code presence.
3213    match code {
3214        None => {
3215            println!("hosting pair on {relay} (no code = host) ...");
3216            cmd_pair_host(relay, auto_yes, timeout_secs)?;
3217        }
3218        Some(c) => {
3219            println!("joining pair with code {c} on {relay} ...");
3220            cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3221        }
3222    }
3223
3224    // Step 3 — register wire as MCP server in detected client configs (idempotent).
3225    if !no_setup {
3226        println!();
3227        println!("registering wire as MCP server in detected client configs ...");
3228        if let Err(e) = cmd_setup(true) {
3229            // Non-fatal — pair succeeded, just print the warning.
3230            eprintln!("warn: setup --apply failed: {e}");
3231            eprintln!("      pair succeeded; you can re-run `wire setup --apply` manually.");
3232        }
3233    }
3234
3235    println!();
3236    println!("pair complete. Next steps:");
3237    println!("  wire daemon start              # background sync of inbox/outbox vs relay");
3238    println!("  wire send <peer> claim <msg>   # send your peer something");
3239    println!("  wire tail                      # watch incoming events");
3240    Ok(())
3241}
3242
3243// ---------- detached pair (daemon-orchestrated) ----------
3244
3245/// `wire pair <handle> [--code <phrase>] --detach` — wraps init + detach
3246/// pair-host/-join into a single command. The non-detached variant lives in
3247/// `cmd_pair`; this one short-circuits to the daemon-orchestrated path.
3248fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3249    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3250    let did = init_result
3251        .get("did")
3252        .and_then(|v| v.as_str())
3253        .unwrap_or("(unknown)")
3254        .to_string();
3255    let already = init_result
3256        .get("already_initialized")
3257        .and_then(|v| v.as_bool())
3258        .unwrap_or(false);
3259    if already {
3260        println!("(identity {did} already initialized — reusing)");
3261    } else {
3262        println!("initialized {did}");
3263    }
3264    println!();
3265    match code {
3266        None => cmd_pair_host_detach(relay, false),
3267        Some(c) => cmd_pair_join_detach(c, relay, false),
3268    }
3269}
3270
3271fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3272    if !config::is_initialized()? {
3273        bail!("not initialized — run `wire init <handle>` first");
3274    }
3275    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3276        Ok(b) => b,
3277        Err(e) => {
3278            if !as_json {
3279                eprintln!(
3280                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3281                );
3282            }
3283            false
3284        }
3285    };
3286    let code = crate::sas::generate_code_phrase();
3287    let code_hash = crate::pair_session::derive_code_hash(&code);
3288    let now = time::OffsetDateTime::now_utc()
3289        .format(&time::format_description::well_known::Rfc3339)
3290        .unwrap_or_default();
3291    let p = crate::pending_pair::PendingPair {
3292        code: code.clone(),
3293        code_hash,
3294        role: "host".to_string(),
3295        relay_url: relay_url.to_string(),
3296        status: "request_host".to_string(),
3297        sas: None,
3298        peer_did: None,
3299        created_at: now,
3300        last_error: None,
3301        pair_id: None,
3302        our_slot_id: None,
3303        our_slot_token: None,
3304        spake2_seed_b64: None,
3305    };
3306    crate::pending_pair::write_pending(&p)?;
3307    if as_json {
3308        println!(
3309            "{}",
3310            serde_json::to_string(&json!({
3311                "state": "queued",
3312                "code_phrase": code,
3313                "relay_url": relay_url,
3314                "role": "host",
3315                "daemon_spawned": daemon_spawned,
3316            }))?
3317        );
3318    } else {
3319        if daemon_spawned {
3320            println!("(started wire daemon in background)");
3321        }
3322        println!("detached pair-host queued. Share this code with your peer:\n");
3323        println!("    {code}\n");
3324        println!("Next steps:");
3325        println!("  wire pair-list                                # check status");
3326        println!("  wire pair-confirm {code} <digits>   # when SAS shows up");
3327        println!("  wire pair-cancel  {code}            # to abort");
3328    }
3329    Ok(())
3330}
3331
3332fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3333    if !config::is_initialized()? {
3334        bail!("not initialized — run `wire init <handle>` first");
3335    }
3336    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3337        Ok(b) => b,
3338        Err(e) => {
3339            if !as_json {
3340                eprintln!(
3341                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3342                );
3343            }
3344            false
3345        }
3346    };
3347    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3348    let code_hash = crate::pair_session::derive_code_hash(&code);
3349    let now = time::OffsetDateTime::now_utc()
3350        .format(&time::format_description::well_known::Rfc3339)
3351        .unwrap_or_default();
3352    let p = crate::pending_pair::PendingPair {
3353        code: code.clone(),
3354        code_hash,
3355        role: "guest".to_string(),
3356        relay_url: relay_url.to_string(),
3357        status: "request_guest".to_string(),
3358        sas: None,
3359        peer_did: None,
3360        created_at: now,
3361        last_error: None,
3362        pair_id: None,
3363        our_slot_id: None,
3364        our_slot_token: None,
3365        spake2_seed_b64: None,
3366    };
3367    crate::pending_pair::write_pending(&p)?;
3368    if as_json {
3369        println!(
3370            "{}",
3371            serde_json::to_string(&json!({
3372                "state": "queued",
3373                "code_phrase": code,
3374                "relay_url": relay_url,
3375                "role": "guest",
3376                "daemon_spawned": daemon_spawned,
3377            }))?
3378        );
3379    } else {
3380        if daemon_spawned {
3381            println!("(started wire daemon in background)");
3382        }
3383        println!("detached pair-join queued for code {code}.");
3384        println!(
3385            "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3386        );
3387    }
3388    Ok(())
3389}
3390
3391fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3392    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3393    let typed: String = typed_digits
3394        .chars()
3395        .filter(|c| c.is_ascii_digit())
3396        .collect();
3397    if typed.len() != 6 {
3398        bail!(
3399            "expected 6 digits (got {} after stripping non-digits)",
3400            typed.len()
3401        );
3402    }
3403    let mut p = crate::pending_pair::read_pending(&code)?
3404        .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3405    if p.status != "sas_ready" {
3406        bail!(
3407            "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3408            p.status
3409        );
3410    }
3411    let stored = p
3412        .sas
3413        .as_ref()
3414        .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3415        .clone();
3416    if stored == typed {
3417        p.status = "confirmed".to_string();
3418        crate::pending_pair::write_pending(&p)?;
3419        if as_json {
3420            println!(
3421                "{}",
3422                serde_json::to_string(&json!({
3423                    "state": "confirmed",
3424                    "code_phrase": code,
3425                }))?
3426            );
3427        } else {
3428            println!("digits match. Daemon will finalize the handshake on its next tick.");
3429            println!("Run `wire peers` after a few seconds to confirm.");
3430        }
3431    } else {
3432        p.status = "aborted".to_string();
3433        p.last_error = Some(format!(
3434            "SAS digit mismatch (typed {typed}, expected {stored})"
3435        ));
3436        let client = crate::relay_client::RelayClient::new(&p.relay_url);
3437        let _ = client.pair_abandon(&p.code_hash);
3438        crate::pending_pair::write_pending(&p)?;
3439        crate::os_notify::toast(
3440            &format!("wire — pair aborted ({})", p.code),
3441            p.last_error.as_deref().unwrap_or("digits mismatch"),
3442        );
3443        if as_json {
3444            println!(
3445                "{}",
3446                serde_json::to_string(&json!({
3447                    "state": "aborted",
3448                    "code_phrase": code,
3449                    "error": "digits mismatch",
3450                }))?
3451            );
3452        }
3453        bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3454    }
3455    Ok(())
3456}
3457
3458fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3459    if watch {
3460        return cmd_pair_list_watch(watch_interval_secs);
3461    }
3462    let items = crate::pending_pair::list_pending()?;
3463    if as_json {
3464        println!("{}", serde_json::to_string(&items)?);
3465        return Ok(());
3466    }
3467    if items.is_empty() {
3468        println!("no pending pair sessions.");
3469        return Ok(());
3470    }
3471    println!(
3472        "{:<15} {:<8} {:<18} {:<10} NOTE",
3473        "CODE", "ROLE", "STATUS", "SAS"
3474    );
3475    for p in items {
3476        let sas = p
3477            .sas
3478            .as_ref()
3479            .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3480            .unwrap_or_else(|| "—".to_string());
3481        let note = p
3482            .last_error
3483            .as_deref()
3484            .or(p.peer_did.as_deref())
3485            .unwrap_or("");
3486        println!(
3487            "{:<15} {:<8} {:<18} {:<10} {}",
3488            p.code, p.role, p.status, sas, note
3489        );
3490    }
3491    Ok(())
3492}
3493
3494/// Stream-mode pair-list: never exits. Diffs per-code state every
3495/// `interval_secs` and prints one JSON line per transition (creation,
3496/// status flip, deletion). Useful for shell pipelines:
3497///
3498/// ```text
3499/// wire pair-list --watch | while read line; do
3500///     CODE=$(echo "$line" | jq -r .code)
3501///     STATUS=$(echo "$line" | jq -r .status)
3502///     ...
3503/// done
3504/// ```
3505fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3506    use std::collections::HashMap;
3507    use std::io::Write;
3508    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3509    // Emit a snapshot synthetic event for every currently-pending pair on
3510    // startup so a consumer that arrives mid-flight sees the current state.
3511    let mut prev: HashMap<String, String> = HashMap::new();
3512    {
3513        let items = crate::pending_pair::list_pending()?;
3514        for p in &items {
3515            println!("{}", serde_json::to_string(&p)?);
3516            prev.insert(p.code.clone(), p.status.clone());
3517        }
3518        // Flush so the consumer's `while read` gets the snapshot promptly.
3519        let _ = std::io::stdout().flush();
3520    }
3521    loop {
3522        std::thread::sleep(interval);
3523        let items = match crate::pending_pair::list_pending() {
3524            Ok(v) => v,
3525            Err(_) => continue,
3526        };
3527        let mut cur: HashMap<String, String> = HashMap::new();
3528        for p in &items {
3529            cur.insert(p.code.clone(), p.status.clone());
3530            match prev.get(&p.code) {
3531                None => {
3532                    // New code appeared.
3533                    println!("{}", serde_json::to_string(&p)?);
3534                }
3535                Some(prev_status) if prev_status != &p.status => {
3536                    // Status flipped.
3537                    println!("{}", serde_json::to_string(&p)?);
3538                }
3539                _ => {}
3540            }
3541        }
3542        for code in prev.keys() {
3543            if !cur.contains_key(code) {
3544                // File disappeared → finalized or cancelled. Emit a synthetic
3545                // "removed" marker so the consumer sees the terminal event.
3546                println!(
3547                    "{}",
3548                    serde_json::to_string(&json!({
3549                        "code": code,
3550                        "status": "removed",
3551                        "_synthetic": true,
3552                    }))?
3553                );
3554            }
3555        }
3556        let _ = std::io::stdout().flush();
3557        prev = cur;
3558    }
3559}
3560
3561/// Block until a pending pair reaches `target_status` or terminates. Process
3562/// exit code carries the outcome (0 success, 1 terminated abnormally, 2
3563/// timeout) so shell scripts can branch directly.
3564fn cmd_pair_watch(
3565    code_phrase: &str,
3566    target_status: &str,
3567    timeout_secs: u64,
3568    as_json: bool,
3569) -> Result<()> {
3570    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3571    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3572    let mut last_seen_status: Option<String> = None;
3573    loop {
3574        let p_opt = crate::pending_pair::read_pending(&code)?;
3575        let now = std::time::Instant::now();
3576        match p_opt {
3577            None => {
3578                // File gone — either finalized (success if target=sas_ready
3579                // since finalization implies it passed sas_ready) or never
3580                // existed. Distinguish by whether we ever saw it.
3581                if last_seen_status.is_some() {
3582                    if as_json {
3583                        println!(
3584                            "{}",
3585                            serde_json::to_string(&json!({"state": "finalized", "code": code}))?
3586                        );
3587                    } else {
3588                        println!("pair {code} finalized (file removed)");
3589                    }
3590                    return Ok(());
3591                } else {
3592                    if as_json {
3593                        println!(
3594                            "{}",
3595                            serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
3596                        );
3597                    }
3598                    std::process::exit(1);
3599                }
3600            }
3601            Some(p) => {
3602                let cur = p.status.clone();
3603                if Some(cur.clone()) != last_seen_status {
3604                    if as_json {
3605                        // Emit per-transition line so scripts can stream.
3606                        println!("{}", serde_json::to_string(&p)?);
3607                    }
3608                    last_seen_status = Some(cur.clone());
3609                }
3610                if cur == target_status {
3611                    if !as_json {
3612                        let sas_str = p
3613                            .sas
3614                            .as_ref()
3615                            .map(|s| format!("{}-{}", &s[..3], &s[3..]))
3616                            .unwrap_or_else(|| "—".to_string());
3617                        println!("pair {code} reached {target_status} (SAS: {sas_str})");
3618                    }
3619                    return Ok(());
3620                }
3621                if cur == "aborted" || cur == "aborted_restart" {
3622                    if !as_json {
3623                        let err = p.last_error.as_deref().unwrap_or("(no detail)");
3624                        eprintln!("pair {code} {cur}: {err}");
3625                    }
3626                    std::process::exit(1);
3627                }
3628            }
3629        }
3630        if now >= deadline {
3631            if !as_json {
3632                eprintln!(
3633                    "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
3634                );
3635            }
3636            std::process::exit(2);
3637        }
3638        std::thread::sleep(std::time::Duration::from_millis(250));
3639    }
3640}
3641
3642fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
3643    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3644    let p = crate::pending_pair::read_pending(&code)?
3645        .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
3646    let client = crate::relay_client::RelayClient::new(&p.relay_url);
3647    let _ = client.pair_abandon(&p.code_hash);
3648    crate::pending_pair::delete_pending(&code)?;
3649    if as_json {
3650        println!(
3651            "{}",
3652            serde_json::to_string(&json!({
3653                "state": "cancelled",
3654                "code_phrase": code,
3655            }))?
3656        );
3657    } else {
3658        println!("cancelled pending pair {code} (relay slot released, file removed).");
3659    }
3660    Ok(())
3661}
3662
3663// ---------- pair-abandon — release stuck pair-slot ----------
3664
3665fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
3666    // Accept either the raw phrase (e.g. "53-CKWIA5") or whatever the user
3667    // typed — normalize via the existing parser.
3668    let code = crate::sas::parse_code_phrase(code_phrase)?;
3669    let code_hash = crate::pair_session::derive_code_hash(code);
3670    let client = crate::relay_client::RelayClient::new(relay_url);
3671    client.pair_abandon(&code_hash)?;
3672    println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
3673    println!("host can now issue a fresh code; guest can re-join.");
3674    Ok(())
3675}
3676
3677// ---------- invite / accept — one-paste pair (v0.4.0) ----------
3678
3679fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
3680    let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
3681
3682    // If --share, register the invite at the relay's short-URL endpoint and
3683    // build the one-curl onboarding line for the peer to paste.
3684    let share_payload: Option<Value> = if share {
3685        let client = reqwest::blocking::Client::new();
3686        let single_use = if uses == 1 { Some(1u32) } else { None };
3687        let body = json!({
3688            "invite_url": url,
3689            "ttl_seconds": ttl,
3690            "uses": single_use,
3691        });
3692        let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
3693        let resp = client.post(&endpoint).json(&body).send()?;
3694        if !resp.status().is_success() {
3695            let code = resp.status();
3696            let txt = resp.text().unwrap_or_default();
3697            bail!("relay {code} on /v1/invite/register: {txt}");
3698        }
3699        let parsed: Value = resp.json()?;
3700        let token = parsed
3701            .get("token")
3702            .and_then(Value::as_str)
3703            .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
3704            .to_string();
3705        let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
3706        let curl_line = format!("curl -fsSL {share_url} | sh");
3707        Some(json!({
3708            "token": token,
3709            "share_url": share_url,
3710            "curl": curl_line,
3711            "expires_unix": parsed.get("expires_unix"),
3712        }))
3713    } else {
3714        None
3715    };
3716
3717    if as_json {
3718        let mut out = json!({
3719            "invite_url": url,
3720            "ttl_secs": ttl,
3721            "uses": uses,
3722            "relay": relay,
3723        });
3724        if let Some(s) = &share_payload {
3725            out["share"] = s.clone();
3726        }
3727        println!("{}", serde_json::to_string(&out)?);
3728    } else if let Some(s) = share_payload {
3729        let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
3730        eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
3731        eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
3732        println!("{curl}");
3733    } else {
3734        eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
3735        eprintln!("# TTL: {ttl}s. Uses: {uses}.");
3736        println!("{url}");
3737    }
3738    Ok(())
3739}
3740
3741fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
3742    // If the user pasted an HTTP(S) short URL (e.g. https://wireup.net/i/AB12),
3743    // resolve it to the underlying wire://pair?... URL via ?format=url before
3744    // accepting. Saves them from having to know which URL shape goes where.
3745    let resolved = if url.starts_with("http://") || url.starts_with("https://") {
3746        let sep = if url.contains('?') { '&' } else { '?' };
3747        let resolve_url = format!("{url}{sep}format=url");
3748        let client = reqwest::blocking::Client::new();
3749        let resp = client
3750            .get(&resolve_url)
3751            .send()
3752            .with_context(|| format!("GET {resolve_url}"))?;
3753        if !resp.status().is_success() {
3754            bail!("could not resolve short URL {url} (HTTP {})", resp.status());
3755        }
3756        let body = resp.text().unwrap_or_default().trim().to_string();
3757        if !body.starts_with("wire://pair?") {
3758            bail!(
3759                "short URL {url} did not resolve to a wire:// invite. \
3760                 (got: {}{})",
3761                body.chars().take(80).collect::<String>(),
3762                if body.chars().count() > 80 { "…" } else { "" }
3763            );
3764        }
3765        body
3766    } else {
3767        url.to_string()
3768    };
3769
3770    let result = crate::pair_invite::accept_invite(&resolved)?;
3771    if as_json {
3772        println!("{}", serde_json::to_string(&result)?);
3773    } else {
3774        let did = result
3775            .get("paired_with")
3776            .and_then(Value::as_str)
3777            .unwrap_or("?");
3778        println!("paired with {did}");
3779        println!(
3780            "you can now: wire send {} <kind> <body>",
3781            crate::agent_card::display_handle_from_did(did)
3782        );
3783    }
3784    Ok(())
3785}
3786
3787// ---------- whois / profile (v0.5) ----------
3788
3789fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
3790    if let Some(h) = handle {
3791        let parsed = crate::pair_profile::parse_handle(h)?;
3792        // Special-case: if the supplied handle matches our own, skip the
3793        // network round-trip and print local.
3794        if config::is_initialized()? {
3795            let card = config::read_agent_card()?;
3796            let local_handle = card
3797                .get("profile")
3798                .and_then(|p| p.get("handle"))
3799                .and_then(Value::as_str)
3800                .map(str::to_string);
3801            if local_handle.as_deref() == Some(h) {
3802                return cmd_whois(None, as_json, None);
3803            }
3804        }
3805        // Remote resolution via .well-known/wire/agent on the handle's domain.
3806        let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
3807        if as_json {
3808            println!("{}", serde_json::to_string(&resolved)?);
3809        } else {
3810            print_resolved_profile(&resolved);
3811        }
3812        return Ok(());
3813    }
3814    let card = config::read_agent_card()?;
3815    if as_json {
3816        let profile = card.get("profile").cloned().unwrap_or(Value::Null);
3817        println!(
3818            "{}",
3819            serde_json::to_string(&json!({
3820                "did": card.get("did").cloned().unwrap_or(Value::Null),
3821                "profile": profile,
3822            }))?
3823        );
3824    } else {
3825        print!("{}", crate::pair_profile::render_self_summary()?);
3826    }
3827    Ok(())
3828}
3829
3830fn print_resolved_profile(resolved: &Value) {
3831    let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
3832    let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
3833    let relay = resolved
3834        .get("relay_url")
3835        .and_then(Value::as_str)
3836        .unwrap_or("");
3837    let slot = resolved
3838        .get("slot_id")
3839        .and_then(Value::as_str)
3840        .unwrap_or("");
3841    let profile = resolved
3842        .get("card")
3843        .and_then(|c| c.get("profile"))
3844        .cloned()
3845        .unwrap_or(Value::Null);
3846    println!("{did}");
3847    println!("  nick:         {nick}");
3848    if !relay.is_empty() {
3849        println!("  relay_url:    {relay}");
3850    }
3851    if !slot.is_empty() {
3852        println!("  slot_id:      {slot}");
3853    }
3854    let pick =
3855        |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
3856    if let Some(s) = pick("display_name") {
3857        println!("  display_name: {s}");
3858    }
3859    if let Some(s) = pick("emoji") {
3860        println!("  emoji:        {s}");
3861    }
3862    if let Some(s) = pick("motto") {
3863        println!("  motto:        {s}");
3864    }
3865    if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
3866        let joined: Vec<String> = arr
3867            .iter()
3868            .filter_map(|v| v.as_str().map(str::to_string))
3869            .collect();
3870        println!("  vibe:         {}", joined.join(", "));
3871    }
3872    if let Some(s) = pick("pronouns") {
3873        println!("  pronouns:     {s}");
3874    }
3875}
3876
3877/// `wire add <nick@domain>` — zero-paste pair. Resolve handle, build a
3878/// signed pair_drop event with our card + slot coords, deliver via the
3879/// peer relay's `/v1/handle/intro/<nick>` endpoint (no slot_token needed).
3880/// Peer's daemon completes the bilateral pin on its next pull and emits a
3881/// pair_drop_ack carrying their slot_token so we can send back.
3882fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
3883    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
3884
3885    // 1. Auto-init self if needed + ensure a relay slot.
3886    let (our_did, our_relay, our_slot_id, our_slot_token) =
3887        crate::pair_invite::ensure_self_with_relay(relay_override)?;
3888    if our_did == format!("did:wire:{}", parsed.nick) {
3889        // Lazy guard — actual self-add would also be caught by FCFS later.
3890        bail!("refusing to add self (handle matches own DID)");
3891    }
3892
3893    // 2. Resolve peer via .well-known on their relay.
3894    let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
3895    let peer_card = resolved
3896        .get("card")
3897        .cloned()
3898        .ok_or_else(|| anyhow!("resolved missing card"))?;
3899    let peer_did = resolved
3900        .get("did")
3901        .and_then(Value::as_str)
3902        .ok_or_else(|| anyhow!("resolved missing did"))?
3903        .to_string();
3904    let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
3905    let peer_slot_id = resolved
3906        .get("slot_id")
3907        .and_then(Value::as_str)
3908        .ok_or_else(|| anyhow!("resolved missing slot_id"))?
3909        .to_string();
3910    let peer_relay = resolved
3911        .get("relay_url")
3912        .and_then(Value::as_str)
3913        .map(str::to_string)
3914        .or_else(|| relay_override.map(str::to_string))
3915        .unwrap_or_else(|| format!("https://{}", parsed.domain));
3916
3917    // 3. Pin peer in trust + relay-state. slot_token will arrive via ack.
3918    let mut trust = config::read_trust()?;
3919    crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
3920    config::write_trust(&trust)?;
3921    let mut relay_state = config::read_relay_state()?;
3922    let existing_token = relay_state
3923        .get("peers")
3924        .and_then(|p| p.get(&peer_handle))
3925        .and_then(|p| p.get("slot_token"))
3926        .and_then(Value::as_str)
3927        .map(str::to_string)
3928        .unwrap_or_default();
3929    relay_state["peers"][&peer_handle] = json!({
3930        "relay_url": peer_relay,
3931        "slot_id": peer_slot_id,
3932        "slot_token": existing_token, // empty until pair_drop_ack lands
3933    });
3934    config::write_relay_state(&relay_state)?;
3935
3936    // 4. Build signed pair_drop with our card + coords (no pair_nonce — this
3937    // is the v0.5 zero-paste open-mode path).
3938    let our_card = config::read_agent_card()?;
3939    let sk_seed = config::read_private_key()?;
3940    let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
3941    let pk_b64 = our_card
3942        .get("verify_keys")
3943        .and_then(Value::as_object)
3944        .and_then(|m| m.values().next())
3945        .and_then(|v| v.get("key"))
3946        .and_then(Value::as_str)
3947        .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
3948    let pk_bytes = crate::signing::b64decode(pk_b64)?;
3949    let now = time::OffsetDateTime::now_utc()
3950        .format(&time::format_description::well_known::Rfc3339)
3951        .unwrap_or_default();
3952    let event = json!({
3953        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3954        "timestamp": now,
3955        "from": our_did,
3956        "to": peer_did,
3957        "type": "pair_drop",
3958        "kind": 1100u32,
3959        "body": {
3960            "card": our_card,
3961            "relay_url": our_relay,
3962            "slot_id": our_slot_id,
3963            "slot_token": our_slot_token,
3964        },
3965    });
3966    let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
3967
3968    // 5. Deliver via /v1/handle/intro/<nick> (auth-free; relay validates kind).
3969    let client = crate::relay_client::RelayClient::new(&peer_relay);
3970    let resp = client.handle_intro(&parsed.nick, &signed)?;
3971    let event_id = signed
3972        .get("event_id")
3973        .and_then(Value::as_str)
3974        .unwrap_or("")
3975        .to_string();
3976
3977    if as_json {
3978        println!(
3979            "{}",
3980            serde_json::to_string(&json!({
3981                "handle": handle_arg,
3982                "paired_with": peer_did,
3983                "peer_handle": peer_handle,
3984                "event_id": event_id,
3985                "drop_response": resp,
3986                "status": "drop_sent",
3987            }))?
3988        );
3989    } else {
3990        println!(
3991            "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
3992        );
3993    }
3994    Ok(())
3995}
3996
3997// ---------- diag (structured trace) ----------
3998
3999fn cmd_diag(action: DiagAction) -> Result<()> {
4000    let state = config::state_dir()?;
4001    let knob = state.join("diag.enabled");
4002    let log_path = state.join("diag.jsonl");
4003    match action {
4004        DiagAction::Tail { limit, json } => {
4005            let entries = crate::diag::tail(limit);
4006            if json {
4007                for e in entries {
4008                    println!("{}", serde_json::to_string(&e)?);
4009                }
4010            } else if entries.is_empty() {
4011                println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
4012            } else {
4013                for e in entries {
4014                    let ts = e["ts"].as_u64().unwrap_or(0);
4015                    let ty = e["type"].as_str().unwrap_or("?");
4016                    let pid = e["pid"].as_u64().unwrap_or(0);
4017                    let payload = e["payload"].to_string();
4018                    println!("[{ts}] pid={pid} {ty} {payload}");
4019                }
4020            }
4021        }
4022        DiagAction::Enable => {
4023            config::ensure_dirs()?;
4024            std::fs::write(&knob, "1")?;
4025            println!("wire diag: enabled at {knob:?}");
4026        }
4027        DiagAction::Disable => {
4028            if knob.exists() {
4029                std::fs::remove_file(&knob)?;
4030            }
4031            println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
4032        }
4033        DiagAction::Status { json } => {
4034            let enabled = crate::diag::is_enabled();
4035            let size = std::fs::metadata(&log_path)
4036                .map(|m| m.len())
4037                .unwrap_or(0);
4038            if json {
4039                println!(
4040                    "{}",
4041                    serde_json::to_string(&serde_json::json!({
4042                        "enabled": enabled,
4043                        "log_path": log_path,
4044                        "log_size_bytes": size,
4045                    }))?
4046                );
4047            } else {
4048                println!("wire diag status");
4049                println!("  enabled:    {enabled}");
4050                println!("  log:        {log_path:?}");
4051                println!("  log size:   {size} bytes");
4052            }
4053        }
4054    }
4055    Ok(())
4056}
4057
4058// ---------- service (install / uninstall / status) ----------
4059
4060fn cmd_service(action: ServiceAction) -> Result<()> {
4061    let (report, as_json) = match action {
4062        ServiceAction::Install { json } => (crate::service::install()?, json),
4063        ServiceAction::Uninstall { json } => (crate::service::uninstall()?, json),
4064        ServiceAction::Status { json } => (crate::service::status()?, json),
4065    };
4066    if as_json {
4067        println!("{}", serde_json::to_string(&report)?);
4068    } else {
4069        println!("wire service {}", report.action);
4070        println!("  platform:  {}", report.platform);
4071        println!("  unit:      {}", report.unit_path);
4072        println!("  status:    {}", report.status);
4073        println!("  detail:    {}", report.detail);
4074    }
4075    Ok(())
4076}
4077
4078// ---------- upgrade (atomic daemon swap) ----------
4079
4080/// `wire upgrade` — kill all running `wire daemon` processes, spawn a
4081/// fresh one from the currently-installed binary, write a new versioned
4082/// pidfile. The fix for today's exact failure mode: a daemon process that
4083/// kept running OLD binary text in memory under a symlink that had since
4084/// been repointed at a NEW binary on disk.
4085///
4086/// Idempotent. If no stale daemon is running, just starts a fresh one
4087/// (same as `wire daemon &` but with the wait-until-alive guard from
4088/// ensure_up::ensure_daemon_running).
4089///
4090/// `--check` mode reports drift without acting — lists the processes
4091/// that WOULD be killed and the binary version of each.
4092fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
4093    // 1. Identify all `wire daemon` processes.
4094    let pgrep_out = std::process::Command::new("pgrep")
4095        .args(["-f", "wire daemon"])
4096        .output();
4097    let running_pids: Vec<u32> = match pgrep_out {
4098        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
4099            .split_whitespace()
4100            .filter_map(|s| s.parse::<u32>().ok())
4101            .collect(),
4102        _ => Vec::new(),
4103    };
4104
4105    // 2. Read pidfile to surface what the daemon THINKS it is.
4106    let record = crate::ensure_up::read_pid_record("daemon");
4107    let recorded_version: Option<String> = match &record {
4108        crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
4109        crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
4110        _ => None,
4111    };
4112    let cli_version = env!("CARGO_PKG_VERSION").to_string();
4113
4114    if check_only {
4115        let report = json!({
4116            "running_pids": running_pids,
4117            "pidfile_version": recorded_version,
4118            "cli_version": cli_version,
4119            "would_kill": running_pids,
4120        });
4121        if as_json {
4122            println!("{}", serde_json::to_string(&report)?);
4123        } else {
4124            println!("wire upgrade --check");
4125            println!("  cli version:      {cli_version}");
4126            println!("  pidfile version:  {}", recorded_version.as_deref().unwrap_or("(missing)"));
4127            if running_pids.is_empty() {
4128                println!("  running daemons:  none");
4129            } else {
4130                let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
4131                println!("  running daemons:  pids {}", pids.join(", "));
4132                println!("  would kill all + spawn fresh");
4133            }
4134        }
4135        return Ok(());
4136    }
4137
4138    // 3. Kill every running wire daemon. Use SIGTERM first, then SIGKILL
4139    // after a brief grace period.
4140    let mut killed: Vec<u32> = Vec::new();
4141    for pid in &running_pids {
4142        // SIGTERM (15).
4143        let _ = std::process::Command::new("kill")
4144            .args(["-15", &pid.to_string()])
4145            .status();
4146        killed.push(*pid);
4147    }
4148    // Wait up to ~2s for graceful exit.
4149    if !killed.is_empty() {
4150        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
4151        loop {
4152            let still_alive: Vec<u32> = killed
4153                .iter()
4154                .copied()
4155                .filter(|p| process_alive_pid(*p))
4156                .collect();
4157            if still_alive.is_empty() {
4158                break;
4159            }
4160            if std::time::Instant::now() >= deadline {
4161                // SIGKILL hold-outs.
4162                for pid in still_alive {
4163                    let _ = std::process::Command::new("kill")
4164                        .args(["-9", &pid.to_string()])
4165                        .status();
4166                }
4167                break;
4168            }
4169            std::thread::sleep(std::time::Duration::from_millis(50));
4170        }
4171    }
4172
4173    // 4. Remove stale pidfile so ensure_daemon_running doesn't think the
4174    //    old daemon is still owning it.
4175    let pidfile = config::state_dir()?.join("daemon.pid");
4176    if pidfile.exists() {
4177        let _ = std::fs::remove_file(&pidfile);
4178    }
4179
4180    // 5. Spawn fresh daemon via ensure_up — atomically waits for
4181    //    process_alive + writes the versioned pidfile.
4182    let spawned = crate::ensure_up::ensure_daemon_running()?;
4183
4184    let new_record = crate::ensure_up::read_pid_record("daemon");
4185    let new_pid = new_record.pid();
4186    let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
4187        Some(d.version.clone())
4188    } else {
4189        None
4190    };
4191
4192    if as_json {
4193        println!(
4194            "{}",
4195            serde_json::to_string(&json!({
4196                "killed": killed,
4197                "spawned_fresh_daemon": spawned,
4198                "new_pid": new_pid,
4199                "new_version": new_version,
4200                "cli_version": cli_version,
4201            }))?
4202        );
4203    } else {
4204        if killed.is_empty() {
4205            println!("wire upgrade: no stale daemons running");
4206        } else {
4207            println!("wire upgrade: killed {} daemon(s) (pids {})",
4208                killed.len(),
4209                killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
4210        }
4211        if spawned {
4212            println!(
4213                "wire upgrade: spawned fresh daemon (pid {} v{})",
4214                new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
4215                new_version.as_deref().unwrap_or(&cli_version),
4216            );
4217        } else {
4218            println!("wire upgrade: daemon was already running on current binary");
4219        }
4220    }
4221    Ok(())
4222}
4223
4224fn process_alive_pid(pid: u32) -> bool {
4225    #[cfg(target_os = "linux")]
4226    {
4227        std::path::Path::new(&format!("/proc/{pid}")).exists()
4228    }
4229    #[cfg(not(target_os = "linux"))]
4230    {
4231        std::process::Command::new("kill")
4232            .args(["-0", &pid.to_string()])
4233            .stdin(std::process::Stdio::null())
4234            .stdout(std::process::Stdio::null())
4235            .stderr(std::process::Stdio::null())
4236            .status()
4237            .map(|s| s.success())
4238            .unwrap_or(false)
4239    }
4240}
4241
4242// ---------- doctor (single-command diagnostic) ----------
4243
4244/// One DoctorCheck = one verdict on one health dimension.
4245#[derive(Clone, Debug, serde::Serialize)]
4246pub struct DoctorCheck {
4247    /// Short stable identifier (`daemon`, `relay`, `pair_rejections`, ...).
4248    /// Stable across versions for tooling consumption.
4249    pub id: String,
4250    /// PASS / WARN / FAIL.
4251    pub status: String,
4252    /// One-line human summary.
4253    pub detail: String,
4254    /// Optional remediation hint shown after the failing line.
4255    #[serde(skip_serializing_if = "Option::is_none")]
4256    pub fix: Option<String>,
4257}
4258
4259impl DoctorCheck {
4260    fn pass(id: &str, detail: impl Into<String>) -> Self {
4261        Self {
4262            id: id.into(),
4263            status: "PASS".into(),
4264            detail: detail.into(),
4265            fix: None,
4266        }
4267    }
4268    fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
4269        Self {
4270            id: id.into(),
4271            status: "WARN".into(),
4272            detail: detail.into(),
4273            fix: Some(fix.into()),
4274        }
4275    }
4276    fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
4277        Self {
4278            id: id.into(),
4279            status: "FAIL".into(),
4280            detail: detail.into(),
4281            fix: Some(fix.into()),
4282        }
4283    }
4284}
4285
4286/// `wire doctor` — single-command diagnostic for the silent-fail classes
4287/// 0.5.11 ships fixes for. Surfaces what each fix produces (P0.1 cursor
4288/// blocks, P0.2 pair-rejection logs, P0.4 daemon version mismatch, etc.)
4289/// so operators don't have to know where each lives.
4290fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
4291    let mut checks: Vec<DoctorCheck> = Vec::new();
4292
4293    checks.push(check_daemon_health());
4294    checks.push(check_daemon_pid_consistency());
4295    checks.push(check_relay_reachable());
4296    checks.push(check_pair_rejections(recent_rejections));
4297    checks.push(check_cursor_progress());
4298
4299    let fails = checks.iter().filter(|c| c.status == "FAIL").count();
4300    let warns = checks.iter().filter(|c| c.status == "WARN").count();
4301
4302    if as_json {
4303        println!(
4304            "{}",
4305            serde_json::to_string(&json!({
4306                "checks": checks,
4307                "fail_count": fails,
4308                "warn_count": warns,
4309                "ok": fails == 0,
4310            }))?
4311        );
4312    } else {
4313        println!("wire doctor — {} checks", checks.len());
4314        for c in &checks {
4315            let bullet = match c.status.as_str() {
4316                "PASS" => "✓",
4317                "WARN" => "!",
4318                "FAIL" => "✗",
4319                _ => "?",
4320            };
4321            println!("  {bullet} [{}] {}: {}", c.status, c.id, c.detail);
4322            if let Some(fix) = &c.fix {
4323                println!("      fix: {fix}");
4324            }
4325        }
4326        println!();
4327        if fails == 0 && warns == 0 {
4328            println!("ALL GREEN");
4329        } else {
4330            println!("{fails} FAIL, {warns} WARN");
4331        }
4332    }
4333
4334    if fails > 0 {
4335        std::process::exit(1);
4336    }
4337    Ok(())
4338}
4339
4340/// Check: daemon running, exactly one instance, no orphans.
4341///
4342/// Today's debug surfaced PID 54017 (old-binary wire daemon running for 4
4343/// days, advancing cursor without pinning). `wire status` lied about it.
4344/// `wire doctor` must catch THIS class: multiple daemons running, OR
4345/// pid-file claims daemon down while a process is actually up.
4346fn check_daemon_health() -> DoctorCheck {
4347    // v0.5.13 (issue #2 bug A): doctor PASSed on orphan-only state while
4348    // `wire status` reported DOWN, disagreeing for 25 min. Doctor used
4349    // pgrep alone; status cross-checked the pidfile. Doctor now consults
4350    // BOTH so the two surfaces never disagree.
4351    let output = std::process::Command::new("pgrep")
4352        .args(["-f", "wire daemon"])
4353        .output();
4354    let pgrep_pids: Vec<u32> = match output {
4355        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
4356            .split_whitespace()
4357            .filter_map(|s| s.parse::<u32>().ok())
4358            .collect(),
4359        _ => Vec::new(),
4360    };
4361    let pidfile_pid = crate::ensure_up::read_pid_record("daemon").pid();
4362    // Is the pidfile-claimed daemon actually alive?
4363    let pidfile_alive = pidfile_pid
4364        .map(|pid| {
4365            #[cfg(target_os = "linux")]
4366            {
4367                std::path::Path::new(&format!("/proc/{pid}")).exists()
4368            }
4369            #[cfg(not(target_os = "linux"))]
4370            {
4371                std::process::Command::new("kill")
4372                    .args(["-0", &pid.to_string()])
4373                    .output()
4374                    .map(|o| o.status.success())
4375                    .unwrap_or(false)
4376            }
4377        })
4378        .unwrap_or(false);
4379    let orphan_pids: Vec<u32> = pgrep_pids
4380        .iter()
4381        .filter(|p| Some(**p) != pidfile_pid)
4382        .copied()
4383        .collect();
4384
4385    let fmt_pids = |xs: &[u32]| -> String {
4386        xs.iter()
4387            .map(|p| p.to_string())
4388            .collect::<Vec<_>>()
4389            .join(", ")
4390    };
4391
4392    match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
4393        (0, _, _) => DoctorCheck::fail(
4394            "daemon",
4395            "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
4396            "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
4397        ),
4398        // Single daemon AND it matches the pidfile → healthy.
4399        (1, true, true) => DoctorCheck::pass(
4400            "daemon",
4401            format!(
4402                "one daemon running (pid {}, matches pidfile)",
4403                pgrep_pids[0]
4404            ),
4405        ),
4406        // Pidfile is alive but pgrep ALSO sees orphan processes.
4407        (n, true, false) => DoctorCheck::fail(
4408            "daemon",
4409            format!(
4410                "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
4411                 The orphans race the relay cursor — they advance past events your current binary can't process. \
4412                 (Issue #2 exact class.)",
4413                fmt_pids(&pgrep_pids),
4414                pidfile_pid.unwrap(),
4415                fmt_pids(&orphan_pids),
4416            ),
4417            "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
4418        ),
4419        // Pidfile is dead but processes ARE running → all are orphans.
4420        (n, false, _) => DoctorCheck::fail(
4421            "daemon",
4422            format!(
4423                "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
4424                 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
4425                 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
4426                fmt_pids(&pgrep_pids),
4427                match pidfile_pid {
4428                    Some(p) => format!("claims pid {p} which is dead"),
4429                    None => "is missing".to_string(),
4430                },
4431            ),
4432            "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
4433        ),
4434        // Multiple daemons all matching … impossible by construction; fall back to warn.
4435        (n, true, true) => DoctorCheck::warn(
4436            "daemon",
4437            format!(
4438                "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
4439                fmt_pids(&pgrep_pids)
4440            ),
4441            "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
4442        ),
4443    }
4444}
4445
4446/// Check: structured pidfile matches running daemon. Spark's P0.4 5th
4447/// check. Surfaces version mismatch (daemon running old binary text in
4448/// memory under a current symlink — today's exact bug class), schema
4449/// drift (future format bumps), and identity contamination (daemon's
4450/// recorded DID doesn't match this box's configured DID).
4451fn check_daemon_pid_consistency() -> DoctorCheck {
4452    let record = crate::ensure_up::read_pid_record("daemon");
4453    match record {
4454        crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
4455            "daemon_pid_consistency",
4456            "no daemon.pid yet — fresh box or daemon never started",
4457        ),
4458        crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
4459            "daemon_pid_consistency",
4460            format!("daemon.pid is corrupt: {reason}"),
4461            "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
4462        ),
4463        crate::ensure_up::PidRecord::LegacyInt(pid) => DoctorCheck::warn(
4464            "daemon_pid_consistency",
4465            format!(
4466                "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
4467                 Daemon was started by a pre-0.5.11 binary."
4468            ),
4469            "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
4470        ),
4471        crate::ensure_up::PidRecord::Json(d) => {
4472            let mut issues: Vec<String> = Vec::new();
4473            if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
4474                issues.push(format!(
4475                    "schema={} (expected {})",
4476                    d.schema,
4477                    crate::ensure_up::DAEMON_PID_SCHEMA
4478                ));
4479            }
4480            let cli_version = env!("CARGO_PKG_VERSION");
4481            if d.version != cli_version {
4482                issues.push(format!(
4483                    "version daemon={} cli={cli_version}",
4484                    d.version
4485                ));
4486            }
4487            if !std::path::Path::new(&d.bin_path).exists() {
4488                issues.push(format!("bin_path {} missing on disk", d.bin_path));
4489            }
4490            // Cross-check DID + relay against current config (best-effort).
4491            if let Ok(card) = config::read_agent_card()
4492                && let Some(current_did) = card.get("did").and_then(Value::as_str)
4493                && let Some(recorded_did) = &d.did
4494                && recorded_did != current_did
4495            {
4496                issues.push(format!(
4497                    "did daemon={recorded_did} config={current_did} — identity drift"
4498                ));
4499            }
4500            if let Ok(state) = config::read_relay_state()
4501                && let Some(current_relay) = state
4502                    .get("self")
4503                    .and_then(|s| s.get("relay_url"))
4504                    .and_then(Value::as_str)
4505                && let Some(recorded_relay) = &d.relay_url
4506                && recorded_relay != current_relay
4507            {
4508                issues.push(format!(
4509                    "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
4510                ));
4511            }
4512            if issues.is_empty() {
4513                DoctorCheck::pass(
4514                    "daemon_pid_consistency",
4515                    format!(
4516                        "daemon v{} bound to {} as {}",
4517                        d.version,
4518                        d.relay_url.as_deref().unwrap_or("?"),
4519                        d.did.as_deref().unwrap_or("?")
4520                    ),
4521                )
4522            } else {
4523                DoctorCheck::warn(
4524                    "daemon_pid_consistency",
4525                    format!("daemon pidfile drift: {}", issues.join("; ")),
4526                    "`wire upgrade` to atomically restart daemon with current config".to_string(),
4527                )
4528            }
4529        }
4530    }
4531}
4532
4533/// Check: bound relay's /healthz returns 200.
4534fn check_relay_reachable() -> DoctorCheck {
4535    let state = match config::read_relay_state() {
4536        Ok(s) => s,
4537        Err(e) => return DoctorCheck::fail(
4538            "relay",
4539            format!("could not read relay state: {e}"),
4540            "run `wire up <handle>@<relay>` to bootstrap",
4541        ),
4542    };
4543    let url = state
4544        .get("self")
4545        .and_then(|s| s.get("relay_url"))
4546        .and_then(Value::as_str)
4547        .unwrap_or("");
4548    if url.is_empty() {
4549        return DoctorCheck::warn(
4550            "relay",
4551            "no relay bound — wire send/pull will not work",
4552            "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
4553        );
4554    }
4555    let client = crate::relay_client::RelayClient::new(url);
4556    match client.check_healthz() {
4557        Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
4558        Err(e) => DoctorCheck::fail(
4559            "relay",
4560            format!("{url} unreachable: {e}"),
4561            format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
4562        ),
4563    }
4564}
4565
4566/// Check: count recent entries in pair-rejected.jsonl (P0.2 output). Every
4567/// entry there is a silent failure that, pre-0.5.11, would have left the
4568/// operator wondering why pairing didn't complete.
4569fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
4570    let path = match config::state_dir() {
4571        Ok(d) => d.join("pair-rejected.jsonl"),
4572        Err(e) => return DoctorCheck::warn(
4573            "pair_rejections",
4574            format!("could not resolve state dir: {e}"),
4575            "set WIRE_HOME or fix XDG_STATE_HOME",
4576        ),
4577    };
4578    if !path.exists() {
4579        return DoctorCheck::pass(
4580            "pair_rejections",
4581            "no pair-rejected.jsonl — no recorded pair failures",
4582        );
4583    }
4584    let body = match std::fs::read_to_string(&path) {
4585        Ok(b) => b,
4586        Err(e) => return DoctorCheck::warn(
4587            "pair_rejections",
4588            format!("could not read {path:?}: {e}"),
4589            "check file permissions",
4590        ),
4591    };
4592    let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
4593    if lines.is_empty() {
4594        return DoctorCheck::pass(
4595            "pair_rejections",
4596            "pair-rejected.jsonl present but empty",
4597        );
4598    }
4599    let total = lines.len();
4600    let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
4601    let mut summary: Vec<String> = Vec::new();
4602    for line in &recent {
4603        if let Ok(rec) = serde_json::from_str::<Value>(line) {
4604            let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
4605            let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
4606            summary.push(format!("{peer}/{code}"));
4607        }
4608    }
4609    DoctorCheck::warn(
4610        "pair_rejections",
4611        format!(
4612            "{total} pair failures recorded. recent: [{}]",
4613            summary.join(", ")
4614        ),
4615        format!(
4616            "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
4617        ),
4618    )
4619}
4620
4621/// Check: cursor isn't stuck. We can't tell without polling — but we can
4622/// report the current cursor position so operators see if it changes.
4623/// Real "stuck" detection needs two pulls separated in time; defer that
4624/// behaviour to a `wire doctor --watch` mode.
4625fn check_cursor_progress() -> DoctorCheck {
4626    let state = match config::read_relay_state() {
4627        Ok(s) => s,
4628        Err(e) => return DoctorCheck::warn(
4629            "cursor",
4630            format!("could not read relay state: {e}"),
4631            "check ~/Library/Application Support/wire/relay.json",
4632        ),
4633    };
4634    let cursor = state
4635        .get("self")
4636        .and_then(|s| s.get("last_pulled_event_id"))
4637        .and_then(Value::as_str)
4638        .map(|s| s.chars().take(16).collect::<String>())
4639        .unwrap_or_else(|| "<none>".to_string());
4640    DoctorCheck::pass(
4641        "cursor",
4642        format!(
4643            "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
4644        ),
4645    )
4646}
4647
4648#[cfg(test)]
4649mod doctor_tests {
4650    use super::*;
4651
4652    #[test]
4653    fn doctor_check_constructors_set_status_correctly() {
4654        // Silent-fail-prevention rule: pass/warn/fail must be visibly
4655        // distinguishable to operators. If any constructor lets the wrong
4656        // status through, `wire doctor` lies and we're back to today's
4657        // 30-minute debug.
4658        let p = DoctorCheck::pass("x", "ok");
4659        assert_eq!(p.status, "PASS");
4660        assert_eq!(p.fix, None);
4661
4662        let w = DoctorCheck::warn("x", "watch out", "do this");
4663        assert_eq!(w.status, "WARN");
4664        assert_eq!(w.fix, Some("do this".to_string()));
4665
4666        let f = DoctorCheck::fail("x", "broken", "fix it");
4667        assert_eq!(f.status, "FAIL");
4668        assert_eq!(f.fix, Some("fix it".to_string()));
4669    }
4670
4671    #[test]
4672    fn check_pair_rejections_no_file_is_pass() {
4673        // Fresh-box case: no pair-rejected.jsonl yet. Must NOT report this
4674        // as a problem.
4675        config::test_support::with_temp_home(|| {
4676            config::ensure_dirs().unwrap();
4677            let c = check_pair_rejections(5);
4678            assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
4679        });
4680    }
4681
4682    #[test]
4683    fn check_pair_rejections_with_entries_warns() {
4684        // Existence of rejections is itself a signal — even if each entry
4685        // is a "known good failure," the operator wants to know they
4686        // happened.
4687        config::test_support::with_temp_home(|| {
4688            config::ensure_dirs().unwrap();
4689            crate::pair_invite::record_pair_rejection(
4690                "willard",
4691                "pair_drop_ack_send_failed",
4692                "POST 502",
4693            );
4694            let c = check_pair_rejections(5);
4695            assert_eq!(c.status, "WARN");
4696            assert!(c.detail.contains("1 pair failures"));
4697            assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
4698        });
4699    }
4700}
4701
4702// ---------- up megacommand (full bootstrap) ----------
4703
4704/// `wire up <nick@relay-host>` — single command from fresh box to ready-to-
4705/// pair. Composes the steps that today's onboarding walks operators through
4706/// one by one (init / bind-relay / claim / background daemon / arm monitor
4707/// recipe). Idempotent: every step checks current state and skips if done.
4708///
4709/// Argument parsing accepts:
4710///   - `<nick>@<relay-host>` — explicit relay
4711///   - `<nick>`              — defaults to wireup.net (the configured public
4712///                             relay)
4713fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
4714    let (nick, relay_url) = match handle_arg.split_once('@') {
4715        Some((n, host)) => {
4716            let url = if host.starts_with("http://") || host.starts_with("https://") {
4717                host.to_string()
4718            } else {
4719                format!("https://{host}")
4720            };
4721            (n.to_string(), url)
4722        }
4723        None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
4724    };
4725
4726    let mut report: Vec<(String, String)> = Vec::new();
4727    let mut step = |stage: &str, detail: String| {
4728        report.push((stage.to_string(), detail.clone()));
4729        if !as_json {
4730            eprintln!("wire up: {stage} — {detail}");
4731        }
4732    };
4733
4734    // 1. init (or verify existing identity matches the requested nick).
4735    if config::is_initialized()? {
4736        let card = config::read_agent_card()?;
4737        let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
4738        let existing_handle =
4739            crate::agent_card::display_handle_from_did(existing_did).to_string();
4740        if existing_handle != nick {
4741            bail!(
4742                "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
4743                 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
4744                 delete `{:?}` to start fresh.",
4745                config::config_dir()?
4746            );
4747        }
4748        step("init", format!("already initialized as {existing_handle}"));
4749    } else {
4750        cmd_init(&nick, name, Some(&relay_url), /* as_json */ false)?;
4751        step("init", format!("created identity {nick} bound to {relay_url}"));
4752    }
4753
4754    // 2. Ensure relay binding matches. cmd_init with --relay binds it; if
4755    // already initialized we may need to bind to the requested relay
4756    // separately (operator switched relays).
4757    let relay_state = config::read_relay_state()?;
4758    let bound_relay = relay_state
4759        .get("self")
4760        .and_then(|s| s.get("relay_url"))
4761        .and_then(Value::as_str)
4762        .unwrap_or("")
4763        .to_string();
4764    if bound_relay.is_empty() {
4765        // Identity exists but never bound to a relay — bind now.
4766        cmd_bind_relay(&relay_url, /* as_json */ false)?;
4767        step("bind-relay", format!("bound to {relay_url}"));
4768    } else if bound_relay != relay_url {
4769        step(
4770            "bind-relay",
4771            format!(
4772                "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
4773                 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
4774            ),
4775        );
4776    } else {
4777        step("bind-relay", format!("already bound to {bound_relay}"));
4778    }
4779
4780    // 3. Claim nick on the relay's handle directory. Idempotent — same-DID
4781    // re-claims are accepted by the relay.
4782    match cmd_claim(&nick, Some(&relay_url), None, /* as_json */ false) {
4783        Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
4784        Err(e) => step(
4785            "claim",
4786            format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
4787        ),
4788    }
4789
4790    // 4. Background daemon — must be running for pull/push/ack to flow.
4791    match crate::ensure_up::ensure_daemon_running() {
4792        Ok(true) => step("daemon", "started fresh background daemon".to_string()),
4793        Ok(false) => step("daemon", "already running".to_string()),
4794        Err(e) => step(
4795            "daemon",
4796            format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
4797        ),
4798    }
4799
4800    // 5. Final summary — point operator at the next commands.
4801    let summary = format!(
4802        "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
4803         `wire monitor` to watch incoming events."
4804    );
4805    step("ready", summary.clone());
4806
4807    if as_json {
4808        let steps_json: Vec<_> = report
4809            .iter()
4810            .map(|(k, v)| json!({"stage": k, "detail": v}))
4811            .collect();
4812        println!(
4813            "{}",
4814            serde_json::to_string(&json!({
4815                "nick": nick,
4816                "relay": relay_url,
4817                "steps": steps_json,
4818            }))?
4819        );
4820    }
4821    Ok(())
4822}
4823
4824/// Strip http:// or https:// prefix for display in `wire up` step output.
4825fn strip_proto(url: &str) -> String {
4826    url.trim_start_matches("https://")
4827        .trim_start_matches("http://")
4828        .to_string()
4829}
4830
4831// ---------- pair megacommand (zero-paste handle-based) ----------
4832
4833/// `wire pair <nick@domain>` zero-shot. Dispatched from Command::Pair when
4834/// the handle is in `nick@domain` form. Wraps:
4835///
4836///   1. cmd_add — resolve, pin, drop intro
4837///   2. Wait up to `timeout_secs` for the peer's `pair_drop_ack` to arrive
4838///      (signalled by `peers.<handle>.slot_token` populating in relay state)
4839///   3. Verify bilateral pin: trust contains peer + relay state has token
4840///   4. Print final state — both sides VERIFIED + can `wire send`
4841///
4842/// On timeout: hard-errors with the specific stuck step so the operator
4843/// knows which side to chase. No silent partial success.
4844fn cmd_pair_megacommand(
4845    handle_arg: &str,
4846    relay_override: Option<&str>,
4847    timeout_secs: u64,
4848    _as_json: bool,
4849) -> Result<()> {
4850    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4851    let peer_handle = parsed.nick.clone();
4852
4853    eprintln!("wire pair: resolving {handle_arg}...");
4854    cmd_add(handle_arg, relay_override, /* as_json */ false)?;
4855
4856    eprintln!(
4857        "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
4858         to ack (their daemon must be running + pulling)..."
4859    );
4860
4861    // Trigger an immediate daemon-style pull so we don't wait the full daemon
4862    // interval. Best-effort — if it fails, we still fall through to the
4863    // polling loop.
4864    let _ = run_sync_pull();
4865
4866    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
4867    let poll_interval = std::time::Duration::from_millis(500);
4868
4869    loop {
4870        // Drain anything new from the relay (e.g. our pair_drop_ack landing).
4871        let _ = run_sync_pull();
4872        let relay_state = config::read_relay_state()?;
4873        let peer_entry = relay_state
4874            .get("peers")
4875            .and_then(|p| p.get(&peer_handle))
4876            .cloned();
4877        let token = peer_entry
4878            .as_ref()
4879            .and_then(|e| e.get("slot_token"))
4880            .and_then(Value::as_str)
4881            .unwrap_or("");
4882
4883        if !token.is_empty() {
4884            // Bilateral pin complete — we have their slot_token, we can send.
4885            let trust = config::read_trust()?;
4886            let pinned_in_trust = trust
4887                .get("agents")
4888                .and_then(|a| a.get(&peer_handle))
4889                .is_some();
4890            println!(
4891                "wire pair: paired with {peer_handle}.\n  trust: {}  bilateral: yes (slot_token recorded)\n  next: `wire send {peer_handle} \"<msg>\"`",
4892                if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
4893            );
4894            return Ok(());
4895        }
4896
4897        if std::time::Instant::now() >= deadline {
4898            // Timeout — surface the EXACT stuck step. Likely culprits:
4899            //   - peer daemon not running on their box
4900            //   - peer's relay slot is offline
4901            //   - their daemon is on an older binary that doesn't know
4902            //     pair_drop kind=1100 (the P0.1 class — now visible via
4903            //     wire pull --json on their side as a blocking rejection)
4904            bail!(
4905                "wire pair: timed out after {timeout_secs}s. \
4906                 peer {peer_handle} never sent pair_drop_ack. \
4907                 likely causes: (a) their daemon is down — ask them to run \
4908                 `wire status` and `wire daemon &`; (b) their binary is older \
4909                 than 0.5.x and doesn't understand pair_drop events — ask \
4910                 them to `wire upgrade`; (c) network / relay blip — re-run \
4911                 `wire pair {handle_arg}` to retry."
4912            );
4913        }
4914
4915        std::thread::sleep(poll_interval);
4916    }
4917}
4918
4919fn cmd_claim(
4920    nick: &str,
4921    relay_override: Option<&str>,
4922    public_url: Option<&str>,
4923    as_json: bool,
4924) -> Result<()> {
4925    if !crate::pair_profile::is_valid_nick(nick) {
4926        bail!(
4927            "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
4928        );
4929    }
4930    // `wire claim` is the one-step bootstrap: auto-init + auto-allocate slot
4931    // + claim handle. Operator should never have to run init/bind-relay first.
4932    let (_did, relay_url, slot_id, slot_token) =
4933        crate::pair_invite::ensure_self_with_relay(relay_override)?;
4934    let card = config::read_agent_card()?;
4935
4936    let client = crate::relay_client::RelayClient::new(&relay_url);
4937    let resp = client.handle_claim(nick, &slot_id, &slot_token, public_url, &card)?;
4938
4939    if as_json {
4940        println!(
4941            "{}",
4942            serde_json::to_string(&json!({
4943                "nick": nick,
4944                "relay": relay_url,
4945                "response": resp,
4946            }))?
4947        );
4948    } else {
4949        // Best-effort: derive the public domain from the relay URL. If
4950        // operator passed --public-url that's the canonical address; else
4951        // the relay URL itself. Falls back to a placeholder if both miss.
4952        let domain = public_url
4953            .unwrap_or(&relay_url)
4954            .trim_start_matches("https://")
4955            .trim_start_matches("http://")
4956            .trim_end_matches('/')
4957            .split('/')
4958            .next()
4959            .unwrap_or("<this-relay-domain>")
4960            .to_string();
4961        println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
4962        println!("verify with: wire whois {nick}@{domain}");
4963    }
4964    Ok(())
4965}
4966
4967fn cmd_profile(action: ProfileAction) -> Result<()> {
4968    match action {
4969        ProfileAction::Set { field, value, json } => {
4970            // Try parsing the value as JSON; if that fails, treat it as a
4971            // bare string. Lets operators pass either `42` or `"hello"` or
4972            // `["rust","late-night"]` without quoting hell.
4973            let parsed: Value =
4974                serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
4975            let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
4976            if json {
4977                println!(
4978                    "{}",
4979                    serde_json::to_string(&json!({
4980                        "field": field,
4981                        "profile": new_profile,
4982                    }))?
4983                );
4984            } else {
4985                println!("profile.{field} set");
4986            }
4987        }
4988        ProfileAction::Get { json } => return cmd_whois(None, json, None),
4989        ProfileAction::Clear { field, json } => {
4990            let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
4991            if json {
4992                println!(
4993                    "{}",
4994                    serde_json::to_string(&json!({
4995                        "field": field,
4996                        "cleared": true,
4997                        "profile": new_profile,
4998                    }))?
4999                );
5000            } else {
5001                println!("profile.{field} cleared");
5002            }
5003        }
5004    }
5005    Ok(())
5006}
5007
5008// ---------- setup — one-shot MCP host registration ----------
5009
5010fn cmd_setup(apply: bool) -> Result<()> {
5011    use std::path::PathBuf;
5012
5013    let entry = json!({"command": "wire", "args": ["mcp"]});
5014    let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
5015
5016    // Detect probable MCP host config locations. Cross-platform — we only
5017    // touch the file if it already exists OR --apply was passed.
5018    let mut targets: Vec<(&str, PathBuf)> = Vec::new();
5019    if let Some(home) = dirs::home_dir() {
5020        // Claude Code (CLI) — real config path is ~/.claude.json on all platforms (Linux/macOS/Windows).
5021        // The mcpServers map lives at the top level of that file.
5022        targets.push(("Claude Code", home.join(".claude.json")));
5023        // Legacy / alternate Claude Code XDG path — still try, harmless if absent.
5024        targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
5025        // Claude Desktop macOS
5026        #[cfg(target_os = "macos")]
5027        targets.push((
5028            "Claude Desktop (macOS)",
5029            home.join("Library/Application Support/Claude/claude_desktop_config.json"),
5030        ));
5031        // Claude Desktop Windows
5032        #[cfg(target_os = "windows")]
5033        if let Ok(appdata) = std::env::var("APPDATA") {
5034            targets.push((
5035                "Claude Desktop (Windows)",
5036                PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
5037            ));
5038        }
5039        // Cursor
5040        targets.push(("Cursor", home.join(".cursor/mcp.json")));
5041    }
5042    // Project-local — works for several MCP-aware tools
5043    targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
5044
5045    println!("wire setup\n");
5046    println!("MCP server snippet (add this to your client's mcpServers):");
5047    println!();
5048    println!("{entry_pretty}");
5049    println!();
5050
5051    if !apply {
5052        println!("Probable MCP host config locations on this machine:");
5053        for (name, path) in &targets {
5054            let marker = if path.exists() {
5055                "✓ found"
5056            } else {
5057                "  (would create)"
5058            };
5059            println!("  {marker:14}  {name}: {}", path.display());
5060        }
5061        println!();
5062        println!("Run `wire setup --apply` to merge wire into each config above.");
5063        println!(
5064            "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
5065        );
5066        return Ok(());
5067    }
5068
5069    let mut modified: Vec<String> = Vec::new();
5070    let mut skipped: Vec<String> = Vec::new();
5071    for (name, path) in &targets {
5072        match upsert_mcp_entry(path, "wire", &entry) {
5073            Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
5074            Ok(false) => skipped.push(format!("  {name} ({}): already configured", path.display())),
5075            Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
5076        }
5077    }
5078    if !modified.is_empty() {
5079        println!("Modified:");
5080        for line in &modified {
5081            println!("  {line}");
5082        }
5083        println!();
5084        println!("Restart the app(s) above to load wire MCP.");
5085    }
5086    if !skipped.is_empty() {
5087        println!();
5088        println!("Skipped:");
5089        for line in &skipped {
5090            println!("  {line}");
5091        }
5092    }
5093    Ok(())
5094}
5095
5096/// Idempotent merge of an `mcpServers.<name>` entry into a JSON config file.
5097/// Returns Ok(true) if file was changed, Ok(false) if entry already matched.
5098fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
5099    let mut cfg: Value = if path.exists() {
5100        let body = std::fs::read_to_string(path).context("reading config")?;
5101        serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
5102    } else {
5103        json!({})
5104    };
5105    if !cfg.is_object() {
5106        cfg = json!({});
5107    }
5108    let root = cfg.as_object_mut().unwrap();
5109    let servers = root
5110        .entry("mcpServers".to_string())
5111        .or_insert_with(|| json!({}));
5112    if !servers.is_object() {
5113        *servers = json!({});
5114    }
5115    let map = servers.as_object_mut().unwrap();
5116    if map.get(server_name) == Some(entry) {
5117        return Ok(false);
5118    }
5119    map.insert(server_name.to_string(), entry.clone());
5120    if let Some(parent) = path.parent()
5121        && !parent.as_os_str().is_empty()
5122    {
5123        std::fs::create_dir_all(parent).context("creating parent dir")?;
5124    }
5125    let out = serde_json::to_string_pretty(&cfg)? + "\n";
5126    std::fs::write(path, out).context("writing config")?;
5127    Ok(true)
5128}
5129
5130// ---------- reactor — event-handler dispatch loop ----------
5131
5132#[allow(clippy::too_many_arguments)]
5133fn cmd_reactor(
5134    on_event: &str,
5135    peer_filter: Option<&str>,
5136    kind_filter: Option<&str>,
5137    verified_only: bool,
5138    interval_secs: u64,
5139    once: bool,
5140    dry_run: bool,
5141    max_per_minute: u32,
5142    max_chain_depth: u32,
5143) -> Result<()> {
5144    use crate::inbox_watch::{InboxEvent, InboxWatcher};
5145    use std::collections::{HashMap, HashSet, VecDeque};
5146    use std::io::Write;
5147    use std::process::{Command, Stdio};
5148    use std::time::{Duration, Instant};
5149
5150    let cursor_path = config::state_dir()?.join("reactor.cursor");
5151    // event_ids THIS reactor's handler has caused to be sent (via wire send).
5152    // Used by chain-depth check — an incoming `(re:X)` where X is in this set
5153    // means peer is replying to something we just said → don't reply back.
5154    //
5155    // Persisted across restarts so a reactor that crashes mid-conversation
5156    // doesn't re-enter the loop. Reads on startup, writes after each
5157    // outbox-grow detection. Capped at 500 entries (LRU-ish — old entries
5158    // dropped from front of file).
5159    let emitted_path = config::state_dir()?.join("reactor-emitted.log");
5160    let mut emitted_ids: HashSet<String> = HashSet::new();
5161    if emitted_path.exists()
5162        && let Ok(body) = std::fs::read_to_string(&emitted_path)
5163    {
5164        for line in body.lines() {
5165            let t = line.trim();
5166            if !t.is_empty() {
5167                emitted_ids.insert(t.to_string());
5168            }
5169        }
5170    }
5171    // Outbox file paths the reactor watches for new sent-event_ids.
5172    let outbox_dir = config::outbox_dir()?;
5173    // (peer → file size we've already scanned). Lets us notice new outbox
5174    // appends without re-reading the whole file each sweep.
5175    let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
5176
5177    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
5178
5179    let kind_num: Option<u32> = match kind_filter {
5180        Some(k) => Some(parse_kind(k)?),
5181        None => None,
5182    };
5183
5184    // Per-peer sliding window of dispatch instants for rate-limit check.
5185    let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
5186
5187    let dispatch = |ev: &InboxEvent,
5188                    peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
5189                    emitted_ids: &HashSet<String>|
5190     -> Result<bool> {
5191        if let Some(p) = peer_filter
5192            && ev.peer != p
5193        {
5194            return Ok(false);
5195        }
5196        if verified_only && !ev.verified {
5197            return Ok(false);
5198        }
5199        if let Some(want) = kind_num {
5200            let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
5201            if ev_kind != Some(want) {
5202                return Ok(false);
5203            }
5204        }
5205
5206        // Chain-depth check: if the body contains `(re:<event_id>)` and that
5207        // event_id is in our emitted set, this is a reply to one of our
5208        // replies → loop suspected, skip.
5209        if max_chain_depth > 0 {
5210            let body_str = match &ev.raw["body"] {
5211                Value::String(s) => s.clone(),
5212                other => serde_json::to_string(other).unwrap_or_default(),
5213            };
5214            if let Some(referenced) = parse_re_marker(&body_str) {
5215                // Handler scripts usually truncate event_id (e.g. ${ID:0:12}).
5216                // Match emitted set by prefix to catch both full + truncated.
5217                let matched = emitted_ids.contains(&referenced)
5218                    || emitted_ids.iter().any(|full| full.starts_with(&referenced));
5219                if matched {
5220                    eprintln!(
5221                        "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
5222                        ev.event_id, ev.peer, referenced
5223                    );
5224                    return Ok(false);
5225                }
5226            }
5227        }
5228
5229        // Per-peer rate-limit check (sliding 60s window).
5230        if max_per_minute > 0 {
5231            let now = Instant::now();
5232            let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
5233            while let Some(&front) = win.front() {
5234                if now.duration_since(front) > Duration::from_secs(60) {
5235                    win.pop_front();
5236                } else {
5237                    break;
5238                }
5239            }
5240            if win.len() as u32 >= max_per_minute {
5241                eprintln!(
5242                    "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
5243                    ev.event_id, ev.peer, max_per_minute
5244                );
5245                return Ok(false);
5246            }
5247            win.push_back(now);
5248        }
5249
5250        if dry_run {
5251            println!("{}", serde_json::to_string(&ev.raw)?);
5252            return Ok(true);
5253        }
5254
5255        let mut child = Command::new("sh")
5256            .arg("-c")
5257            .arg(on_event)
5258            .stdin(Stdio::piped())
5259            .stdout(Stdio::inherit())
5260            .stderr(Stdio::inherit())
5261            .env("WIRE_EVENT_PEER", &ev.peer)
5262            .env("WIRE_EVENT_ID", &ev.event_id)
5263            .env("WIRE_EVENT_KIND", &ev.kind)
5264            .spawn()
5265            .with_context(|| format!("spawning reactor handler: {on_event}"))?;
5266        if let Some(mut stdin) = child.stdin.take() {
5267            let body = serde_json::to_vec(&ev.raw)?;
5268            let _ = stdin.write_all(&body);
5269            let _ = stdin.write_all(b"\n");
5270        }
5271        std::mem::drop(child);
5272        Ok(true)
5273    };
5274
5275    // Scan outbox files for newly-appended event_ids and add to emitted set.
5276    let scan_outbox = |emitted_ids: &mut HashSet<String>,
5277                       outbox_cursors: &mut HashMap<String, u64>|
5278     -> Result<usize> {
5279        if !outbox_dir.exists() {
5280            return Ok(0);
5281        }
5282        let mut added = 0;
5283        let mut new_ids: Vec<String> = Vec::new();
5284        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
5285            let path = entry.path();
5286            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
5287                continue;
5288            }
5289            let peer = match path.file_stem().and_then(|s| s.to_str()) {
5290                Some(s) => s.to_string(),
5291                None => continue,
5292            };
5293            let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
5294            let start = *outbox_cursors.get(&peer).unwrap_or(&0);
5295            if cur_len <= start {
5296                outbox_cursors.insert(peer, start);
5297                continue;
5298            }
5299            let body = std::fs::read_to_string(&path).unwrap_or_default();
5300            let tail = &body[start as usize..];
5301            for line in tail.lines() {
5302                if let Ok(v) = serde_json::from_str::<Value>(line)
5303                    && let Some(eid) = v.get("event_id").and_then(Value::as_str)
5304                    && emitted_ids.insert(eid.to_string())
5305                {
5306                    new_ids.push(eid.to_string());
5307                    added += 1;
5308                }
5309            }
5310            outbox_cursors.insert(peer, cur_len);
5311        }
5312        if !new_ids.is_empty() {
5313            // Append new ids to disk, cap on-disk file at 500 entries.
5314            let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
5315            if all.len() > 500 {
5316                all.sort();
5317                let drop_n = all.len() - 500;
5318                let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
5319                emitted_ids.retain(|x| !dropped.contains(x));
5320                all = emitted_ids.iter().cloned().collect();
5321            }
5322            let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
5323        }
5324        Ok(added)
5325    };
5326
5327    let sweep = |watcher: &mut InboxWatcher,
5328                 emitted_ids: &mut HashSet<String>,
5329                 outbox_cursors: &mut HashMap<String, u64>,
5330                 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
5331     -> Result<usize> {
5332        // Pick up any event_ids we sent since last sweep.
5333        let _ = scan_outbox(emitted_ids, outbox_cursors);
5334
5335        let events = watcher.poll()?;
5336        let mut fired = 0usize;
5337        for ev in &events {
5338            match dispatch(ev, peer_dispatch_log, emitted_ids) {
5339                Ok(true) => fired += 1,
5340                Ok(false) => {}
5341                Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
5342            }
5343        }
5344        watcher.save_cursors(&cursor_path)?;
5345        Ok(fired)
5346    };
5347
5348    if once {
5349        sweep(
5350            &mut watcher,
5351            &mut emitted_ids,
5352            &mut outbox_cursors,
5353            &mut peer_dispatch_log,
5354        )?;
5355        return Ok(());
5356    }
5357    let interval = std::time::Duration::from_secs(interval_secs.max(1));
5358    loop {
5359        if let Err(e) = sweep(
5360            &mut watcher,
5361            &mut emitted_ids,
5362            &mut outbox_cursors,
5363            &mut peer_dispatch_log,
5364        ) {
5365            eprintln!("wire reactor: sweep error: {e}");
5366        }
5367        std::thread::sleep(interval);
5368    }
5369}
5370
5371/// Parse `(re:<event_id>)` marker out of an event body. Returns the
5372/// referenced event_id (full or prefix) if present. Tolerates spaces.
5373fn parse_re_marker(body: &str) -> Option<String> {
5374    let needle = "(re:";
5375    let i = body.find(needle)?;
5376    let rest = &body[i + needle.len()..];
5377    let end = rest.find(')')?;
5378    let id = rest[..end].trim().to_string();
5379    if id.is_empty() {
5380        return None;
5381    }
5382    Some(id)
5383}
5384
5385// ---------- notify (Goal 2) ----------
5386
5387fn cmd_notify(
5388    interval_secs: u64,
5389    peer_filter: Option<&str>,
5390    once: bool,
5391    as_json: bool,
5392) -> Result<()> {
5393    use crate::inbox_watch::InboxWatcher;
5394    let cursor_path = config::state_dir()?.join("notify.cursor");
5395    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
5396
5397    let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
5398        let events = watcher.poll()?;
5399        for ev in events {
5400            if let Some(p) = peer_filter
5401                && ev.peer != p
5402            {
5403                continue;
5404            }
5405            if as_json {
5406                println!("{}", serde_json::to_string(&ev)?);
5407            } else {
5408                os_notify_inbox_event(&ev);
5409            }
5410        }
5411        watcher.save_cursors(&cursor_path)?;
5412        Ok(())
5413    };
5414
5415    if once {
5416        return sweep(&mut watcher);
5417    }
5418
5419    let interval = std::time::Duration::from_secs(interval_secs.max(1));
5420    loop {
5421        if let Err(e) = sweep(&mut watcher) {
5422            eprintln!("wire notify: sweep error: {e}");
5423        }
5424        std::thread::sleep(interval);
5425    }
5426}
5427
5428fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
5429    let title = if ev.verified {
5430        format!("wire ← {}", ev.peer)
5431    } else {
5432        format!("wire ← {} (UNVERIFIED)", ev.peer)
5433    };
5434    let body = format!("{}: {}", ev.kind, ev.body_preview);
5435    crate::os_notify::toast(&title, &body);
5436}
5437
5438#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
5439fn os_toast(title: &str, body: &str) {
5440    eprintln!("[wire notify] {title}\n  {body}");
5441}
5442
5443// Integration tests for the CLI live in `tests/cli.rs` (cargo's tests/ dir).