Skip to main content

wire/
cli.rs

1//! `wire` CLI surface.
2//!
3//! Every subcommand emits human-readable text by default and structured JSON
4//! when `--json` is passed. Stable JSON shape is part of the API contract —
5//! see `docs/AGENT_INTEGRATION.md`.
6//!
7//! Subcommand split:
8//!   - **agent-safe**: `whoami`, `peers`, `verify`, `send`, `tail` — pure
9//!     message-layer ops, no trust establishment.
10//!   - **trust-establishing**: `init`, `pair-host`, `pair-join`. The CLI
11//!     uses interactive `y/N` prompts here. The MCP equivalents
12//!     (`wire_init`, `wire_pair_initiate`, `wire_pair_join`, `wire_pair_check`,
13//!     `wire_pair_confirm`) preserve the human gate by requiring the user to
14//!     type the 6 SAS digits back into chat — see `docs/THREAT_MODEL.md` T10/T14.
15
16use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21    agent_card::{build_agent_card, sign_agent_card},
22    config,
23    signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24    trust::{add_self_to_trust, empty_trust},
25};
26
27/// Top-level CLI.
28#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31    #[command(subcommand)]
32    pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37    /// Generate a keypair, write self-card, and prepare to pair. (HUMAN-ONLY — DO NOT exec from agents.)
38    Init {
39        /// Short handle for this agent (becomes did:wire:<handle>).
40        handle: String,
41        /// Optional display name (defaults to capitalized handle).
42        #[arg(long)]
43        name: Option<String>,
44        /// Optional relay URL — if set, also allocates a relay slot in one step
45        /// (equivalent to running `wire init` then `wire bind-relay <url>`).
46        #[arg(long)]
47        relay: Option<String>,
48        /// Emit JSON.
49        #[arg(long)]
50        json: bool,
51    },
52    // (Old `Join` stub removed in iter 11 — superseded by `pair-join` with
53    // `join` alias. See PairJoin below.)
54    /// Print this agent's identity (DID, fingerprint, mailbox slot).
55    Whoami {
56        #[arg(long)]
57        json: bool,
58    },
59    /// List pinned peers with their tiers and capabilities.
60    Peers {
61        #[arg(long)]
62        json: bool,
63    },
64    /// Sign and queue an event to a peer.
65    ///
66    /// Forms (P0.S 0.5.11):
67    ///   wire send <peer> <body>              # kind defaults to "claim"
68    ///   wire send <peer> <kind> <body>       # explicit kind (back-compat)
69    ///   wire send <peer> -                   # body from stdin (kind=claim)
70    ///   wire send <peer> @/path/to/body.json # body from file
71    Send {
72        /// Peer handle (without `did:wire:` prefix).
73        peer: String,
74        /// When `<body>` is omitted, this is the event body (kind defaults
75        /// to `claim`). When both this and `<body>` are given, this is the
76        /// event kind (`decision`, `claim`, etc., or numeric kind id) and
77        /// the next positional is the body.
78        kind_or_body: String,
79        /// Event body — free-form text, `@/path/to/body.json` to load from
80        /// a file, or `-` to read from stdin. Optional; omit to use
81        /// `<kind_or_body>` as the body with kind=`claim`.
82        body: Option<String>,
83        /// Advisory deadline: duration (`30m`, `2h`, `1d`) or RFC3339 timestamp.
84        #[arg(long)]
85        deadline: Option<String>,
86        /// Emit JSON.
87        #[arg(long)]
88        json: bool,
89    },
90    /// Stream signed events from peers.
91    Tail {
92        /// Optional peer filter; if omitted, tails all peers.
93        peer: Option<String>,
94        /// Emit JSONL (one event per line).
95        #[arg(long)]
96        json: bool,
97        /// Maximum events to read before exiting (0 = stream until SIGINT).
98        #[arg(long, default_value_t = 0)]
99        limit: usize,
100    },
101    /// Live tail of new inbox events across all pinned peers — one line per
102    /// new event, handshake (pair_drop / pair_drop_ack / heartbeat) filtered
103    /// by default.
104    ///
105    /// Designed to be left running in an agent harness's stream-watcher
106    /// (Claude Code Monitor tool, etc.) so peer messages surface in the
107    /// session as they arrive, not on next manual `wire pull`.
108    ///
109    /// See docs/AGENT_INTEGRATION.md for the recommended Monitor invocation
110    /// template.
111    Monitor {
112        /// Only show events from this peer.
113        #[arg(long)]
114        peer: Option<String>,
115        /// Emit JSONL (one InboxEvent per line) for tooling consumption.
116        #[arg(long)]
117        json: bool,
118        /// Include handshake events (pair_drop, pair_drop_ack, heartbeat).
119        /// Default filters them out as noise.
120        #[arg(long)]
121        include_handshake: bool,
122        /// Poll interval in milliseconds. Lower = lower latency, higher CPU.
123        #[arg(long, default_value_t = 500)]
124        interval_ms: u64,
125        /// Replay last N events from history before going live (0 = none).
126        #[arg(long, default_value_t = 0)]
127        replay: usize,
128    },
129    /// Verify a signed event from a JSON file or stdin (`-`).
130    Verify {
131        /// Path to event JSON, or `-` for stdin.
132        path: String,
133        /// Emit JSON.
134        #[arg(long)]
135        json: bool,
136    },
137    /// Run the MCP (Model Context Protocol) server over stdio.
138    /// This is how Claude Desktop / Claude Code / Cursor / etc. expose
139    /// `wire_send`, `wire_tail`, etc. as native tools.
140    Mcp,
141    /// Run a relay server on this host.
142    RelayServer {
143        /// Bind address (e.g. `127.0.0.1:8770`).
144        #[arg(long, default_value = "127.0.0.1:8770")]
145        bind: String,
146    },
147    /// Allocate a slot on a relay; bind it to this agent's identity.
148    BindRelay {
149        /// Relay base URL, e.g. `http://127.0.0.1:8770`.
150        url: String,
151        #[arg(long)]
152        json: bool,
153    },
154    /// Manually pin a peer's relay slot. (Replaces SAS pairing for v0.1 bootstrap;
155    /// real `wire join` lands in the SPAKE2 iter.)
156    AddPeerSlot {
157        /// Peer handle (becomes did:wire:<handle>).
158        handle: String,
159        /// Peer's relay base URL.
160        url: String,
161        /// Peer's slot id.
162        slot_id: String,
163        /// Slot bearer token (shared between paired peers in v0.1).
164        slot_token: String,
165        #[arg(long)]
166        json: bool,
167    },
168    /// Drain outbox JSONL files to peers' relay slots.
169    Push {
170        /// Optional peer filter; default = all peers with outbox entries.
171        peer: Option<String>,
172        #[arg(long)]
173        json: bool,
174    },
175    /// Pull events from our relay slot, verify, write to inbox.
176    Pull {
177        #[arg(long)]
178        json: bool,
179    },
180    /// Print a summary of identity, relay binding, peers, inbox/outbox queue depth.
181    /// Useful as a single "where am I" check.
182    Status {
183        /// Inspect a paired peer's transport / attention / responder health.
184        #[arg(long)]
185        peer: Option<String>,
186        #[arg(long)]
187        json: bool,
188    },
189    /// Publish or inspect auto-responder health for this slot.
190    Responder {
191        #[command(subcommand)]
192        command: ResponderCommand,
193    },
194    /// Pin a peer's signed agent-card from a file. (Manual out-of-band pairing
195    /// — fallback path; the magic-wormhole flow is `pair-host` / `pair-join`.)
196    Pin {
197        /// Path to peer's signed agent-card JSON.
198        card_file: String,
199        #[arg(long)]
200        json: bool,
201    },
202    /// Allocate a NEW slot on the same relay and abandon the old one.
203    /// Sends a kind=1201 wire_close event to every paired peer over the OLD
204    /// slot announcing the new mailbox before swapping. After rotation,
205    /// peers must re-pair (or operator runs `add-peer-slot` with the new
206    /// coords) — auto-update via wire_close is a v0.2 daemon feature.
207    ///
208    /// Use case: a paired peer turned hostile (T11 in THREAT_MODEL.md —
209    /// abusive bearer-holder spamming your slot). Rotate → old slot is
210    /// orphaned → attacker's leverage gone. Operator pairs again with
211    /// peers they still want.
212    RotateSlot {
213        /// Skip the wire_close announcement to peers (faster but they won't know
214        /// where you went).
215        #[arg(long)]
216        no_announce: bool,
217        #[arg(long)]
218        json: bool,
219    },
220    /// Remove a peer from trust + relay state. Inbox/outbox files for that
221    /// peer are NOT deleted (operator can grep history); pass --purge to
222    /// also wipe the JSONL files.
223    ForgetPeer {
224        /// Peer handle to forget.
225        handle: String,
226        /// Also delete inbox/<handle>.jsonl and outbox/<handle>.jsonl.
227        #[arg(long)]
228        purge: bool,
229        #[arg(long)]
230        json: bool,
231    },
232    /// Run a long-lived sync loop: every <interval> seconds, push outbox to
233    /// peers' relay slots and pull inbox from our own slot. Foreground process;
234    /// background it with systemd / `&` / tmux as you prefer.
235    Daemon {
236        /// Sync interval in seconds. Default 5.
237        #[arg(long, default_value_t = 5)]
238        interval: u64,
239        /// Run a single sync cycle and exit (useful for cron-driven setups).
240        #[arg(long)]
241        once: bool,
242        #[arg(long)]
243        json: bool,
244    },
245    /// Host a SAS-confirmed pairing. Generates a code phrase, prints it, waits
246    /// for a peer to `pair-join`, exchanges signed agent-cards via SPAKE2 +
247    /// ChaCha20-Poly1305. Auto-pins on success. (HUMAN-ONLY — operator must
248    /// read the SAS digits aloud and confirm.)
249    PairHost {
250        /// Relay base URL.
251        #[arg(long)]
252        relay: String,
253        /// Skip the SAS confirmation prompt. ONLY use when piping under
254        /// automated tests or when the SAS has already been verified by
255        /// another channel. Documented as test-only.
256        #[arg(long)]
257        yes: bool,
258        /// How long (seconds) to wait for the peer to join before timing out.
259        #[arg(long, default_value_t = 300)]
260        timeout: u64,
261        /// Detach: write a pending-pair file, print the code phrase, and exit
262        /// immediately. The running `wire daemon` does the handshake in the
263        /// background; confirm SAS later via `wire pair-confirm <code> <digits>`.
264        /// `wire pair-list` shows pending sessions. Default is foreground
265        /// blocking behavior for backward compat.
266        #[arg(long)]
267        detach: bool,
268        /// Emit JSON instead of text. Currently only meaningful with --detach.
269        #[arg(long)]
270        json: bool,
271    },
272    /// Join a pair-slot using a code phrase from the host. (HUMAN-ONLY.)
273    ///
274    /// Aliased as `wire join <code>` for magic-wormhole muscle-memory.
275    #[command(alias = "join")]
276    PairJoin {
277        /// Code phrase from the host's `pair-host` output (e.g. `73-2QXC4P`).
278        code_phrase: String,
279        /// Relay base URL (must match the host's relay).
280        #[arg(long)]
281        relay: String,
282        #[arg(long)]
283        yes: bool,
284        #[arg(long, default_value_t = 300)]
285        timeout: u64,
286        /// Detach: see `pair-host --detach`.
287        #[arg(long)]
288        detach: bool,
289        /// Emit JSON instead of text. Currently only meaningful with --detach.
290        #[arg(long)]
291        json: bool,
292    },
293    /// Confirm SAS digits for a detached pending pair. The daemon must be
294    /// running for this to do anything — it picks up the confirmation on its
295    /// next tick. Mismatch aborts the pair.
296    PairConfirm {
297        /// The code phrase the original `wire pair-host --detach` printed.
298        code_phrase: String,
299        /// 6 digits as displayed by `wire pair-list` (dashes/spaces stripped).
300        digits: String,
301        /// Emit JSON instead of human-readable text.
302        #[arg(long)]
303        json: bool,
304    },
305    /// List all pending detached pair sessions and their state.
306    PairList {
307        /// Emit JSON instead of the table.
308        #[arg(long)]
309        json: bool,
310        /// Stream mode: never exit; print one JSON line per status transition
311        /// (creation, status change, deletion) across all pending pairs.
312        /// Compose with bash `while read` to react in shell. Implies --json.
313        #[arg(long)]
314        watch: bool,
315        /// Poll interval in seconds for --watch.
316        #[arg(long, default_value_t = 1)]
317        watch_interval: u64,
318    },
319    /// Cancel a pending pair. Releases the relay slot and removes the pending file.
320    PairCancel {
321        code_phrase: String,
322        #[arg(long)]
323        json: bool,
324    },
325    /// Block until a pending pair reaches a target status (default sas_ready),
326    /// or terminates (finalized = file removed, aborted, aborted_restart), or
327    /// the timeout expires. Useful for shell scripts that want to drive the
328    /// detached flow without polling pair-list themselves.
329    ///
330    /// Exit codes:
331    ///   0 — reached target status (or finalized, if target was sas_ready)
332    ///   1 — terminated abnormally (aborted, aborted_restart, no such code)
333    ///   2 — timeout
334    PairWatch {
335        code_phrase: String,
336        /// Target status to wait for. Default: sas_ready.
337        #[arg(long, default_value = "sas_ready")]
338        status: String,
339        /// Max seconds to wait.
340        #[arg(long, default_value_t = 300)]
341        timeout: u64,
342        /// Emit JSON on each status change (one per line) instead of just on exit.
343        #[arg(long)]
344        json: bool,
345    },
346    /// One-shot bootstrap. Inits identity (idempotent), opens pair-host or
347    /// pair-join, then registers wire as an MCP server. Single command from
348    /// nothing to paired and ready — no separate init/pair-host/setup steps.
349    /// Operator still must confirm SAS digits.
350    ///
351    /// Examples:
352    ///   wire pair paul                          # host a new pair on default relay
353    ///   wire pair willard --code 58-NMTY7A      # join paul's pair
354    Pair {
355        /// Short handle for this agent (becomes did:wire:<handle>). Used by init
356        /// step if no identity exists; ignored if already initialized.
357        handle: String,
358        /// Code phrase from peer's pair-host output. Omit to be the host
359        /// (this command will print one for you to share).
360        #[arg(long)]
361        code: Option<String>,
362        /// Relay base URL. Defaults to the laulpogan public-good relay.
363        #[arg(long, default_value = "https://wireup.net")]
364        relay: String,
365        /// Skip SAS prompt. Test-only.
366        #[arg(long)]
367        yes: bool,
368        /// Pair-step timeout in seconds.
369        #[arg(long, default_value_t = 300)]
370        timeout: u64,
371        /// Skip the post-pair `setup --apply` step (don't register wire as
372        /// an MCP server in detected client configs).
373        #[arg(long)]
374        no_setup: bool,
375        /// Run via the daemon-orchestrated detached path (auto-starts daemon,
376        /// exits immediately, daemon does the handshake). Confirm via
377        /// `wire pair-confirm <code> <digits>` from any terminal. See
378        /// `pair-host --detach` for details.
379        #[arg(long)]
380        detach: bool,
381    },
382    /// Forget a half-finished pair-slot on the relay. Use this if `pair-host`
383    /// or `pair-join` crashed (process killed, network blip, OOM) before SAS
384    /// confirmation, leaving the relay-side slot stuck with "guest already
385    /// registered" or "host already registered" until the 5-minute TTL expires.
386    /// Either side can call. Idempotent.
387    PairAbandon {
388        /// The code phrase from the original pair-host (e.g. `58-NMTY7A`).
389        code_phrase: String,
390        /// Relay base URL.
391        #[arg(long, default_value = "https://wireup.net")]
392        relay: String,
393    },
394    /// Detect known MCP host config locations (Claude Desktop, Claude Code,
395    /// Cursor, project-local) and either print or auto-merge the wire MCP
396    /// server entry. Default prints; pass `--apply` to actually modify config
397    /// files. Idempotent — re-running is safe.
398    Setup {
399        /// Actually write the changes (default = print only).
400        #[arg(long)]
401        apply: bool,
402    },
403    /// Show an agent's profile. With no arg, prints local self. With a
404    /// `nick@domain` arg, resolves via that domain's `.well-known/wire/agent`
405    /// endpoint and verifies the returned signed card before display.
406    Whois {
407        /// Optional handle (`nick@domain`). Omit to show self.
408        handle: Option<String>,
409        #[arg(long)]
410        json: bool,
411        /// Override the relay base URL used for resolution (default:
412        /// `https://<domain>` from the handle).
413        #[arg(long)]
414        relay: Option<String>,
415    },
416    /// Zero-paste pair with a known handle. Resolves `nick@domain` via that
417    /// domain's `.well-known/wire/agent`, then delivers a signed pair-intro
418    /// to the peer's slot via `/v1/handle/intro`. Peer's daemon completes
419    /// the bilateral pin on its next pull (sends back pair_drop_ack carrying
420    /// their slot_token so we can `wire send` to them).
421    Add {
422        /// Peer handle (`nick@domain`).
423        handle: String,
424        /// Override the relay base URL used for resolution.
425        #[arg(long)]
426        relay: Option<String>,
427        #[arg(long)]
428        json: bool,
429    },
430    /// One-shot full bootstrap — `wire up <nick@relay-host>` does in one
431    /// command what 0.5.10 took five (init + bind-relay + claim + daemon-
432    /// background + remember-to-restart-on-login). Idempotent: re-run on
433    /// an already-set-up box prints state without churn.
434    ///
435    /// Examples:
436    ///   wire up paul@wireup.net           # full bootstrap
437    ///   wire up paul-mac@wireup.net       # ditto, nick = paul-mac
438    ///   wire up paul                      # bootstrap, default relay
439    Up {
440        /// Full handle in `nick@relay-host` form, or just `nick` (defaults
441        /// to the configured public relay wireup.net).
442        handle: String,
443        /// Optional display name (defaults to capitalized nick).
444        #[arg(long)]
445        name: Option<String>,
446        #[arg(long)]
447        json: bool,
448    },
449    /// Diagnose wire setup health. Single command that surfaces every
450    /// silent-fail class — daemon down or duplicated, relay unreachable,
451    /// cursor stuck, pair rejections piling up, trust ↔ directory drift.
452    /// Replaces today's 30-minute manual debug.
453    ///
454    /// Exit code non-zero if any FAIL findings.
455    Doctor {
456        /// Emit JSON.
457        #[arg(long)]
458        json: bool,
459        /// Show last N entries from pair-rejected.jsonl in the report.
460        #[arg(long, default_value_t = 5)]
461        recent_rejections: usize,
462    },
463    /// Atomic upgrade: kill every `wire daemon` process, spawn a fresh
464    /// one from the current binary, write a new pidfile. Eliminates the
465    /// "stale binary text in memory under a fresh symlink" bug class that
466    /// burned 30 minutes today.
467    Upgrade {
468        /// Report drift without taking action (lists processes that would
469        /// be killed + the version of each).
470        #[arg(long)]
471        check: bool,
472        #[arg(long)]
473        json: bool,
474    },
475    /// Install / inspect / remove a launchd plist (macOS) or systemd
476    /// user unit (linux) that runs `wire daemon` on login + restarts
477    /// on crash. Replaces today's "background it with tmux/&/systemd
478    /// as you prefer" footgun.
479    Service {
480        #[command(subcommand)]
481        action: ServiceAction,
482    },
483    /// Inspect or toggle the structured diagnostic trace
484    /// (`$WIRE_HOME/state/wire/diag.jsonl`). Off by default. Enable per
485    /// process via `WIRE_DIAG=1`, or per-machine via `wire diag enable`
486    /// (writes the file knob a running daemon picks up automatically).
487    Diag {
488        #[command(subcommand)]
489        action: DiagAction,
490    },
491    /// Claim a nick on a relay's handle directory. Anyone can then reach
492    /// this agent by `<nick>@<relay-domain>` via the relay's
493    /// `.well-known/wire/agent` endpoint. FCFS; same-DID re-claims allowed.
494    Claim {
495        nick: String,
496        /// Relay to claim the nick on. Default = relay our slot is on.
497        #[arg(long)]
498        relay: Option<String>,
499        /// Public URL the relay should advertise to resolvers (default = relay).
500        #[arg(long)]
501        public_url: Option<String>,
502        #[arg(long)]
503        json: bool,
504    },
505    /// Edit profile fields (display_name, emoji, motto, vibe, pronouns,
506    /// avatar_url, handle, now). Re-signs the agent-card atomically.
507    ///
508    /// Examples:
509    ///   wire profile set motto "compiles or dies trying"
510    ///   wire profile set emoji "🦀"
511    ///   wire profile set vibe '["rust","late-night","no-async-please"]'
512    ///   wire profile set handle "coffee-ghost@anthropic.dev"
513    ///   wire profile get
514    Profile {
515        #[command(subcommand)]
516        action: ProfileAction,
517    },
518    /// Mint a one-paste invite URL. Anyone with this URL can pair to us in a
519    /// single step (no SAS digits, no code typing). Auto-inits + auto-allocates
520    /// a relay slot on first use. Default TTL 24h, single-use.
521    Invite {
522        /// Override the relay URL for first-time auto-allocation.
523        #[arg(long, default_value = "https://wireup.net")]
524        relay: String,
525        /// Invite lifetime in seconds (default 86400 = 24h).
526        #[arg(long, default_value_t = 86_400)]
527        ttl: u64,
528        /// Number of distinct peers that can accept this invite before it's
529        /// consumed (default 1).
530        #[arg(long, default_value_t = 1)]
531        uses: u32,
532        /// Register the invite at the relay's short-URL endpoint and print
533        /// a `curl ... | sh` one-liner the peer can run on a fresh machine.
534        /// Installs wire if missing, then accepts the invite, then pairs.
535        #[arg(long)]
536        share: bool,
537        /// Emit JSON.
538        #[arg(long)]
539        json: bool,
540    },
541    /// Accept a wire invite URL. Single-step pair — pins issuer, sends our
542    /// signed card to issuer's slot. Auto-inits + auto-allocates if needed.
543    Accept {
544        /// The full invite URL (starts with `wire://pair?v=1&inv=...`).
545        url: String,
546        /// Emit JSON.
547        #[arg(long)]
548        json: bool,
549    },
550    /// Long-running event dispatcher. Watches inbox for new verified events
551    /// and spawns the given shell command per event, passing the event JSON
552    /// on stdin. Use to wire up autonomous reply loops:
553    ///   wire reactor --on-event 'claude -p "respond via wire send"'
554    /// Cursor persisted to `$WIRE_HOME/state/wire/reactor.cursor`.
555    Reactor {
556        /// Shell command to spawn per event. Event JSON written to its stdin.
557        #[arg(long)]
558        on_event: String,
559        /// Only fire for events from this peer.
560        #[arg(long)]
561        peer: Option<String>,
562        /// Only fire for events of this kind (numeric or name, e.g. 1 / decision).
563        #[arg(long)]
564        kind: Option<String>,
565        /// Skip events whose verified flag is false (default true).
566        #[arg(long, default_value_t = true)]
567        verified_only: bool,
568        /// Poll interval in seconds.
569        #[arg(long, default_value_t = 2)]
570        interval: u64,
571        /// Process one sweep and exit.
572        #[arg(long)]
573        once: bool,
574        /// Don't actually spawn — print one JSONL line per event for smoke-testing.
575        #[arg(long)]
576        dry_run: bool,
577        /// Hard rate-limit: max events handler is fired for per peer per minute.
578        /// 0 = unlimited. Default 6 — covers normal conversational tempo, kills
579        /// LLM-vs-LLM feedback loops (which fire 10+/sec).
580        #[arg(long, default_value_t = 6)]
581        max_per_minute: u32,
582        /// Anti-loop chain depth. Track event_ids this reactor emitted; if an
583        /// incoming event body contains `(re:X)` where X is in our emitted log,
584        /// skip — that's a reply-to-our-reply, depth ≥ 2. Disable with 0.
585        #[arg(long, default_value_t = 1)]
586        max_chain_depth: u32,
587    },
588    /// Watch the inbox for new verified events and fire an OS notification per
589    /// event. Long-running; background under systemd / `&` / tmux. Cursor is
590    /// persisted to `$WIRE_HOME/state/wire/notify.cursor` so restarts don't
591    /// re-emit history.
592    Notify {
593        /// Poll interval in seconds.
594        #[arg(long, default_value_t = 2)]
595        interval: u64,
596        /// Only notify for events from this peer (handle, no did: prefix).
597        #[arg(long)]
598        peer: Option<String>,
599        /// Run a single sweep and exit (useful for cron / tests).
600        #[arg(long)]
601        once: bool,
602        /// Suppress the OS notification call; print one JSON line per event to
603        /// stdout instead (for piping into other tooling or smoke-testing
604        /// without a desktop session).
605        #[arg(long)]
606        json: bool,
607    },
608}
609
610#[derive(Subcommand, Debug)]
611pub enum DiagAction {
612    /// Tail the last N entries from diag.jsonl.
613    Tail {
614        #[arg(long, default_value_t = 20)]
615        limit: usize,
616        #[arg(long)]
617        json: bool,
618    },
619    /// Flip the file-based knob ON. Running daemons pick this up on
620    /// the next emit call without restart.
621    Enable,
622    /// Flip the file-based knob OFF.
623    Disable,
624    /// Report whether diag is currently enabled + the file's size.
625    Status {
626        #[arg(long)]
627        json: bool,
628    },
629}
630
631#[derive(Subcommand, Debug)]
632pub enum ServiceAction {
633    /// Write the launchd plist (macOS) or systemd user unit (linux) and
634    /// load it. Idempotent — re-running re-bootstraps an existing service.
635    Install {
636        #[arg(long)]
637        json: bool,
638    },
639    /// Unload + delete the service unit. Daemon keeps running until the
640    /// next reboot or `wire upgrade`; this only changes the boot-time
641    /// behaviour.
642    Uninstall {
643        #[arg(long)]
644        json: bool,
645    },
646    /// Report whether the unit is installed + active.
647    Status {
648        #[arg(long)]
649        json: bool,
650    },
651}
652
653#[derive(Subcommand, Debug)]
654pub enum ResponderCommand {
655    /// Publish this agent's auto-responder health.
656    Set {
657        /// One of: online, offline, oauth_locked, rate_limited, degraded.
658        status: String,
659        /// Optional operator-facing reason.
660        #[arg(long)]
661        reason: Option<String>,
662        /// Emit JSON.
663        #[arg(long)]
664        json: bool,
665    },
666    /// Read responder health for self, or for a paired peer.
667    Get {
668        /// Optional peer handle; omitted means this agent's own slot.
669        peer: Option<String>,
670        /// Emit JSON.
671        #[arg(long)]
672        json: bool,
673    },
674}
675
676#[derive(Subcommand, Debug)]
677pub enum ProfileAction {
678    /// Set a profile field. Field names: display_name, emoji, motto, vibe,
679    /// pronouns, avatar_url, handle, now. Values are strings except `vibe`
680    /// (JSON array) and `now` (JSON object).
681    Set {
682        field: String,
683        value: String,
684        #[arg(long)]
685        json: bool,
686    },
687    /// Show all profile fields. Equivalent to `wire whois`.
688    Get {
689        #[arg(long)]
690        json: bool,
691    },
692    /// Clear a profile field.
693    Clear {
694        field: String,
695        #[arg(long)]
696        json: bool,
697    },
698}
699
700/// Entry point — parse and dispatch.
701pub fn run() -> Result<()> {
702    let cli = Cli::parse();
703    match cli.command {
704        Command::Init {
705            handle,
706            name,
707            relay,
708            json,
709        } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
710        Command::Status { peer, json } => {
711            if let Some(peer) = peer {
712                cmd_status_peer(&peer, json)
713            } else {
714                cmd_status(json)
715            }
716        }
717        Command::Whoami { json } => cmd_whoami(json),
718        Command::Peers { json } => cmd_peers(json),
719        Command::Send {
720            peer,
721            kind_or_body,
722            body,
723            deadline,
724            json,
725        } => {
726            // P0.S: smart-positional API. `wire send peer body` =
727            // kind=claim. `wire send peer kind body` = explicit kind.
728            let (kind, body) = match body {
729                Some(real_body) => (kind_or_body, real_body),
730                None => ("claim".to_string(), kind_or_body),
731            };
732            cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
733        }
734        Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
735        Command::Monitor {
736            peer,
737            json,
738            include_handshake,
739            interval_ms,
740            replay,
741        } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
742        Command::Verify { path, json } => cmd_verify(&path, json),
743        Command::Responder { command } => match command {
744            ResponderCommand::Set {
745                status,
746                reason,
747                json,
748            } => cmd_responder_set(&status, reason.as_deref(), json),
749            ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
750        },
751        Command::Mcp => cmd_mcp(),
752        Command::RelayServer { bind } => cmd_relay_server(&bind),
753        Command::BindRelay { url, json } => cmd_bind_relay(&url, json),
754        Command::AddPeerSlot {
755            handle,
756            url,
757            slot_id,
758            slot_token,
759            json,
760        } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
761        Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
762        Command::Pull { json } => cmd_pull(json),
763        Command::Pin { card_file, json } => cmd_pin(&card_file, json),
764        Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
765        Command::ForgetPeer {
766            handle,
767            purge,
768            json,
769        } => cmd_forget_peer(&handle, purge, json),
770        Command::Daemon {
771            interval,
772            once,
773            json,
774        } => cmd_daemon(interval, once, json),
775        Command::PairHost {
776            relay,
777            yes,
778            timeout,
779            detach,
780            json,
781        } => {
782            if detach {
783                cmd_pair_host_detach(&relay, json)
784            } else {
785                cmd_pair_host(&relay, yes, timeout)
786            }
787        }
788        Command::PairJoin {
789            code_phrase,
790            relay,
791            yes,
792            timeout,
793            detach,
794            json,
795        } => {
796            if detach {
797                cmd_pair_join_detach(&code_phrase, &relay, json)
798            } else {
799                cmd_pair_join(&code_phrase, &relay, yes, timeout)
800            }
801        }
802        Command::PairConfirm {
803            code_phrase,
804            digits,
805            json,
806        } => cmd_pair_confirm(&code_phrase, &digits, json),
807        Command::PairList {
808            json,
809            watch,
810            watch_interval,
811        } => cmd_pair_list(json, watch, watch_interval),
812        Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
813        Command::PairWatch {
814            code_phrase,
815            status,
816            timeout,
817            json,
818        } => cmd_pair_watch(&code_phrase, &status, timeout, json),
819        Command::Pair {
820            handle,
821            code,
822            relay,
823            yes,
824            timeout,
825            no_setup,
826            detach,
827        } => {
828            // P0.P (0.5.11): if the handle is in `nick@domain` form, route to
829            // the zero-paste megacommand path — `wire pair slancha-spark@
830            // wireup.net` does add + poll-for-ack + verify in one shot. The
831            // SAS / code-based pair flow stays available for handles without
832            // `@` (bootstrap pairing between two boxes that don't yet share a
833            // relay directory).
834            if handle.contains('@') && code.is_none() {
835                cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
836            } else if detach {
837                cmd_pair_detach(&handle, code.as_deref(), &relay)
838            } else {
839                cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
840            }
841        }
842        Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
843        Command::Invite {
844            relay,
845            ttl,
846            uses,
847            share,
848            json,
849        } => cmd_invite(&relay, ttl, uses, share, json),
850        Command::Accept { url, json } => cmd_accept(&url, json),
851        Command::Whois {
852            handle,
853            json,
854            relay,
855        } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
856        Command::Add {
857            handle,
858            relay,
859            json,
860        } => cmd_add(&handle, relay.as_deref(), json),
861        Command::Up {
862            handle,
863            name,
864            json,
865        } => cmd_up(&handle, name.as_deref(), json),
866        Command::Doctor {
867            json,
868            recent_rejections,
869        } => cmd_doctor(json, recent_rejections),
870        Command::Upgrade { check, json } => cmd_upgrade(check, json),
871        Command::Service { action } => cmd_service(action),
872        Command::Diag { action } => cmd_diag(action),
873        Command::Claim {
874            nick,
875            relay,
876            public_url,
877            json,
878        } => cmd_claim(&nick, relay.as_deref(), public_url.as_deref(), json),
879        Command::Profile { action } => cmd_profile(action),
880        Command::Setup { apply } => cmd_setup(apply),
881        Command::Reactor {
882            on_event,
883            peer,
884            kind,
885            verified_only,
886            interval,
887            once,
888            dry_run,
889            max_per_minute,
890            max_chain_depth,
891        } => cmd_reactor(
892            &on_event,
893            peer.as_deref(),
894            kind.as_deref(),
895            verified_only,
896            interval,
897            once,
898            dry_run,
899            max_per_minute,
900            max_chain_depth,
901        ),
902        Command::Notify {
903            interval,
904            peer,
905            once,
906            json,
907        } => cmd_notify(interval, peer.as_deref(), once, json),
908    }
909}
910
911// ---------- init ----------
912
913fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
914    if !handle
915        .chars()
916        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
917    {
918        bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
919    }
920    if config::is_initialized()? {
921        bail!(
922            "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
923            config::config_dir()?
924        );
925    }
926
927    config::ensure_dirs()?;
928    let (sk_seed, pk_bytes) = generate_keypair();
929    config::write_private_key(&sk_seed)?;
930
931    let card = build_agent_card(handle, &pk_bytes, name, None, None);
932    let signed = sign_agent_card(&card, &sk_seed);
933    config::write_agent_card(&signed)?;
934
935    let mut trust = empty_trust();
936    add_self_to_trust(&mut trust, handle, &pk_bytes);
937    config::write_trust(&trust)?;
938
939    let fp = fingerprint(&pk_bytes);
940    let key_id = make_key_id(handle, &pk_bytes);
941
942    // If --relay was passed, also bind a slot inline so init+bind happen in one step.
943    let mut relay_info: Option<(String, String)> = None;
944    if let Some(url) = relay {
945        let normalized = url.trim_end_matches('/');
946        let client = crate::relay_client::RelayClient::new(normalized);
947        client.check_healthz()?;
948        let alloc = client.allocate_slot(Some(handle))?;
949        let mut state = config::read_relay_state()?;
950        state["self"] = json!({
951            "relay_url": normalized,
952            "slot_id": alloc.slot_id.clone(),
953            "slot_token": alloc.slot_token,
954        });
955        config::write_relay_state(&state)?;
956        relay_info = Some((normalized.to_string(), alloc.slot_id));
957    }
958
959    let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
960    if as_json {
961        let mut out = json!({
962            "did": did_str.clone(),
963            "fingerprint": fp,
964            "key_id": key_id,
965            "config_dir": config::config_dir()?.to_string_lossy(),
966        });
967        if let Some((url, slot_id)) = &relay_info {
968            out["relay_url"] = json!(url);
969            out["slot_id"] = json!(slot_id);
970        }
971        println!("{}", serde_json::to_string(&out)?);
972    } else {
973        println!("generated {did_str} (ed25519:{key_id})");
974        println!(
975            "config written to {}",
976            config::config_dir()?.to_string_lossy()
977        );
978        if let Some((url, slot_id)) = &relay_info {
979            println!("bound to relay {url} (slot {slot_id})");
980            println!();
981            println!(
982                "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
983            );
984        } else {
985            println!();
986            println!(
987                "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
988            );
989        }
990    }
991    Ok(())
992}
993
994// ---------- status ----------
995
996fn cmd_status(as_json: bool) -> Result<()> {
997    let initialized = config::is_initialized()?;
998
999    let mut summary = json!({
1000        "initialized": initialized,
1001    });
1002
1003    if initialized {
1004        let card = config::read_agent_card()?;
1005        let did = card
1006            .get("did")
1007            .and_then(Value::as_str)
1008            .unwrap_or("")
1009            .to_string();
1010        // Prefer the explicit `handle` field added in v0.5.7. Fall back to
1011        // stripping the DID prefix (and the v0.5.7+ pubkey suffix) for
1012        // legacy cards.
1013        let handle = card
1014            .get("handle")
1015            .and_then(Value::as_str)
1016            .map(str::to_string)
1017            .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1018        let pk_b64 = card
1019            .get("verify_keys")
1020            .and_then(Value::as_object)
1021            .and_then(|m| m.values().next())
1022            .and_then(|v| v.get("key"))
1023            .and_then(Value::as_str)
1024            .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1025        let pk_bytes = crate::signing::b64decode(pk_b64)?;
1026        summary["did"] = json!(did);
1027        summary["handle"] = json!(handle);
1028        summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1029        summary["capabilities"] = card
1030            .get("capabilities")
1031            .cloned()
1032            .unwrap_or_else(|| json!([]));
1033
1034        let trust = config::read_trust()?;
1035        let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1036        let mut peers = Vec::new();
1037        if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1038            for (peer_handle, _agent) in agents {
1039                if peer_handle == &handle {
1040                    continue; // self
1041                }
1042                // P0.Y (0.5.11): use effective tier — surfaces PENDING_ACK
1043                // for peers we've pinned but never received a pair_drop_ack
1044                // from, so the operator sees the "we can't send to them yet"
1045                // state instead of seeing a misleading VERIFIED.
1046                peers.push(json!({
1047                    "handle": peer_handle,
1048                    "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1049                }));
1050            }
1051        }
1052        summary["peers"] = json!(peers);
1053
1054        let relay_state = config::read_relay_state()?;
1055        summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1056        if !summary["self_relay"].is_null() {
1057            // Hide slot_token from default view.
1058            if let Some(obj) = summary["self_relay"].as_object_mut() {
1059                obj.remove("slot_token");
1060            }
1061        }
1062        summary["peer_slots_count"] = json!(
1063            relay_state
1064                .get("peers")
1065                .and_then(Value::as_object)
1066                .map(|m| m.len())
1067                .unwrap_or(0)
1068        );
1069
1070        // Outbox / inbox queue depth (file count + total events)
1071        let outbox = config::outbox_dir()?;
1072        let inbox = config::inbox_dir()?;
1073        summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1074        summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1075
1076        // P1.7 (0.5.11): daemon liveness now consults the structured
1077        // pidfile (P0.4) AND `pgrep -f "wire daemon"` to detect orphans
1078        // that the pidfile didn't record. Today's debug had a 4-day-old
1079        // 0.2.4 daemon (PID 54017) running while the pidfile pointed at
1080        // an unrelated dead PID — wire status said `daemon: DOWN` while
1081        // the box was actually full of stale-daemon-eating-events
1082        // behaviour. Catch THAT class here.
1083        let record = crate::ensure_up::read_pid_record("daemon");
1084        let pidfile_pid = record.pid();
1085        let pidfile_alive = pidfile_pid
1086            .map(|pid| {
1087                #[cfg(target_os = "linux")]
1088                {
1089                    std::path::Path::new(&format!("/proc/{pid}")).exists()
1090                }
1091                #[cfg(not(target_os = "linux"))]
1092                {
1093                    std::process::Command::new("kill")
1094                        .args(["-0", &pid.to_string()])
1095                        .output()
1096                        .map(|o| o.status.success())
1097                        .unwrap_or(false)
1098                }
1099            })
1100            .unwrap_or(false);
1101
1102        // Cross-check with pgrep — surfaces orphan daemons not in pidfile.
1103        let pgrep_pids: Vec<u32> = std::process::Command::new("pgrep")
1104            .args(["-f", "wire daemon"])
1105            .output()
1106            .ok()
1107            .filter(|o| o.status.success())
1108            .map(|o| {
1109                String::from_utf8_lossy(&o.stdout)
1110                    .split_whitespace()
1111                    .filter_map(|s| s.parse::<u32>().ok())
1112                    .collect()
1113            })
1114            .unwrap_or_default();
1115        let orphan_pids: Vec<u32> = pgrep_pids
1116            .iter()
1117            .filter(|p| Some(**p) != pidfile_pid)
1118            .copied()
1119            .collect();
1120
1121        let mut daemon = json!({
1122            "running": pidfile_alive,
1123            "pid": pidfile_pid,
1124            "all_running_pids": pgrep_pids,
1125            "orphans": orphan_pids,
1126        });
1127        if let crate::ensure_up::PidRecord::Json(d) = &record {
1128            daemon["version"] = json!(d.version);
1129            daemon["bin_path"] = json!(d.bin_path);
1130            daemon["did"] = json!(d.did);
1131            daemon["relay_url"] = json!(d.relay_url);
1132            daemon["started_at"] = json!(d.started_at);
1133            daemon["schema"] = json!(d.schema);
1134            if d.version != env!("CARGO_PKG_VERSION") {
1135                daemon["version_mismatch"] = json!({
1136                    "daemon": d.version.clone(),
1137                    "cli": env!("CARGO_PKG_VERSION"),
1138                });
1139            }
1140        } else if matches!(record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1141            daemon["pidfile_form"] = json!("legacy-int");
1142            daemon["version_mismatch"] = json!({
1143                "daemon": "<pre-0.5.11>",
1144                "cli": env!("CARGO_PKG_VERSION"),
1145            });
1146        }
1147        summary["daemon"] = daemon;
1148
1149        // Pending pair sessions — counts by status.
1150        let pending = crate::pending_pair::list_pending().unwrap_or_default();
1151        let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1152        for p in &pending {
1153            *counts.entry(p.status.clone()).or_default() += 1;
1154        }
1155        summary["pending_pairs"] = json!({
1156            "total": pending.len(),
1157            "by_status": counts,
1158        });
1159    }
1160
1161    if as_json {
1162        println!("{}", serde_json::to_string(&summary)?);
1163    } else if !initialized {
1164        println!("not initialized — run `wire init <handle>` first");
1165    } else {
1166        println!("did:           {}", summary["did"].as_str().unwrap_or("?"));
1167        println!(
1168            "fingerprint:   {}",
1169            summary["fingerprint"].as_str().unwrap_or("?")
1170        );
1171        println!("capabilities:  {}", summary["capabilities"]);
1172        if !summary["self_relay"].is_null() {
1173            println!(
1174                "self relay:    {} (slot {})",
1175                summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1176                summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1177            );
1178        } else {
1179            println!("self relay:    (not bound — run `wire pair-host --relay <url>` to bind)");
1180        }
1181        println!(
1182            "peers:         {}",
1183            summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1184        );
1185        for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1186            println!(
1187                "  - {:<20} tier={}",
1188                p["handle"].as_str().unwrap_or(""),
1189                p["tier"].as_str().unwrap_or("?")
1190            );
1191        }
1192        println!(
1193            "outbox:        {} file(s), {} event(s) queued",
1194            summary["outbox"]["files"].as_u64().unwrap_or(0),
1195            summary["outbox"]["events"].as_u64().unwrap_or(0)
1196        );
1197        println!(
1198            "inbox:         {} file(s), {} event(s) received",
1199            summary["inbox"]["files"].as_u64().unwrap_or(0),
1200            summary["inbox"]["events"].as_u64().unwrap_or(0)
1201        );
1202        let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1203        let daemon_pid = summary["daemon"]["pid"]
1204            .as_u64()
1205            .map(|p| p.to_string())
1206            .unwrap_or_else(|| "—".to_string());
1207        let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1208        let version_suffix = if !daemon_version.is_empty() {
1209            format!(" v{daemon_version}")
1210        } else {
1211            String::new()
1212        };
1213        println!(
1214            "daemon:        {} (pid {}{})",
1215            if daemon_running { "running" } else { "DOWN" },
1216            daemon_pid,
1217            version_suffix,
1218        );
1219        // P1.7: surface version mismatch + orphan procs loudly.
1220        if let Some(mm) = summary["daemon"].get("version_mismatch") {
1221            println!(
1222                "               !! version mismatch: daemon={} CLI={}. \
1223                 run `wire upgrade` to swap atomically.",
1224                mm["daemon"].as_str().unwrap_or("?"),
1225                mm["cli"].as_str().unwrap_or("?"),
1226            );
1227        }
1228        if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1229            && !orphans.is_empty()
1230        {
1231            let pids: Vec<String> = orphans
1232                .iter()
1233                .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1234                .collect();
1235            println!(
1236                "               !! orphan daemon process(es): pids {}. \
1237                 pgrep saw them but pidfile didn't — likely stale process from \
1238                 prior install. Multiple daemons race the relay cursor.",
1239                pids.join(", ")
1240            );
1241        }
1242        let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1243        if pending_total > 0 {
1244            print!("pending pairs: {pending_total}");
1245            if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1246                let parts: Vec<String> = obj
1247                    .iter()
1248                    .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1249                    .collect();
1250                if !parts.is_empty() {
1251                    print!(" ({})", parts.join(", "));
1252                }
1253            }
1254            println!();
1255        } else {
1256            println!("pending pairs: none");
1257        }
1258    }
1259    Ok(())
1260}
1261
1262fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1263    if !dir.exists() {
1264        return Ok(json!({"files": 0, "events": 0}));
1265    }
1266    let mut files = 0usize;
1267    let mut events = 0usize;
1268    for entry in std::fs::read_dir(dir)? {
1269        let path = entry?.path();
1270        if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1271            files += 1;
1272            if let Ok(body) = std::fs::read_to_string(&path) {
1273                events += body.lines().filter(|l| !l.trim().is_empty()).count();
1274            }
1275        }
1276    }
1277    Ok(json!({"files": files, "events": events}))
1278}
1279
1280// ---------- responder health ----------
1281
1282fn responder_status_allowed(status: &str) -> bool {
1283    matches!(
1284        status,
1285        "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1286    )
1287}
1288
1289fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1290    let state = config::read_relay_state()?;
1291    let (label, slot_info) = match peer {
1292        Some(peer) => (
1293            peer.to_string(),
1294            state
1295                .get("peers")
1296                .and_then(|p| p.get(peer))
1297                .ok_or_else(|| {
1298                    anyhow!(
1299                        "unknown peer {peer:?} in relay state — pair with them first:\n  \
1300                         wire add {peer}@wireup.net   (or {peer}@<their-relay>)\n\
1301                         (`wire peers` lists who you've already paired with.)"
1302                    )
1303                })?,
1304        ),
1305        None => (
1306            "self".to_string(),
1307            state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1308                anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1309            })?,
1310        ),
1311    };
1312    let relay_url = slot_info["relay_url"]
1313        .as_str()
1314        .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1315        .to_string();
1316    let slot_id = slot_info["slot_id"]
1317        .as_str()
1318        .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1319        .to_string();
1320    let slot_token = slot_info["slot_token"]
1321        .as_str()
1322        .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1323        .to_string();
1324    Ok((label, relay_url, slot_id, slot_token))
1325}
1326
1327fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1328    if !responder_status_allowed(status) {
1329        bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1330    }
1331    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1332    let now = time::OffsetDateTime::now_utc()
1333        .format(&time::format_description::well_known::Rfc3339)
1334        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1335    let mut record = json!({
1336        "status": status,
1337        "set_at": now,
1338    });
1339    if let Some(reason) = reason {
1340        record["reason"] = json!(reason);
1341    }
1342    if status == "online" {
1343        record["last_success_at"] = json!(now);
1344    }
1345    let client = crate::relay_client::RelayClient::new(&relay_url);
1346    let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1347    if as_json {
1348        println!("{}", serde_json::to_string(&saved)?);
1349    } else {
1350        let reason = saved
1351            .get("reason")
1352            .and_then(Value::as_str)
1353            .map(|r| format!(" — {r}"))
1354            .unwrap_or_default();
1355        println!(
1356            "responder {}{}",
1357            saved
1358                .get("status")
1359                .and_then(Value::as_str)
1360                .unwrap_or(status),
1361            reason
1362        );
1363    }
1364    Ok(())
1365}
1366
1367fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1368    let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1369    let client = crate::relay_client::RelayClient::new(&relay_url);
1370    let health = client.responder_health_get(&slot_id, &slot_token)?;
1371    if as_json {
1372        println!(
1373            "{}",
1374            serde_json::to_string(&json!({
1375                "target": label,
1376                "responder_health": health,
1377            }))?
1378        );
1379    } else if health.is_null() {
1380        println!("{label}: responder health not reported");
1381    } else {
1382        let status = health
1383            .get("status")
1384            .and_then(Value::as_str)
1385            .unwrap_or("unknown");
1386        let reason = health
1387            .get("reason")
1388            .and_then(Value::as_str)
1389            .map(|r| format!(" — {r}"))
1390            .unwrap_or_default();
1391        let last_success = health
1392            .get("last_success_at")
1393            .and_then(Value::as_str)
1394            .map(|t| format!(" (last_success: {t})"))
1395            .unwrap_or_default();
1396        println!("{label}: {status}{reason}{last_success}");
1397    }
1398    Ok(())
1399}
1400
1401fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1402    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1403    let client = crate::relay_client::RelayClient::new(&relay_url);
1404
1405    let started = std::time::Instant::now();
1406    let transport_ok = client.healthz().unwrap_or(false);
1407    let latency_ms = started.elapsed().as_millis() as u64;
1408
1409    let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1410    let now = std::time::SystemTime::now()
1411        .duration_since(std::time::UNIX_EPOCH)
1412        .map(|d| d.as_secs())
1413        .unwrap_or(0);
1414    let attention = match last_pull_at_unix {
1415        Some(last) if now.saturating_sub(last) <= 300 => json!({
1416            "status": "ok",
1417            "last_pull_at_unix": last,
1418            "age_seconds": now.saturating_sub(last),
1419            "event_count": event_count,
1420        }),
1421        Some(last) => json!({
1422            "status": "stale",
1423            "last_pull_at_unix": last,
1424            "age_seconds": now.saturating_sub(last),
1425            "event_count": event_count,
1426        }),
1427        None => json!({
1428            "status": "never_pulled",
1429            "last_pull_at_unix": Value::Null,
1430            "event_count": event_count,
1431        }),
1432    };
1433
1434    let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1435    let responder = if responder_health.is_null() {
1436        json!({"status": "not_reported", "record": Value::Null})
1437    } else {
1438        json!({
1439            "status": responder_health
1440                .get("status")
1441                .and_then(Value::as_str)
1442                .unwrap_or("unknown"),
1443            "record": responder_health,
1444        })
1445    };
1446
1447    let report = json!({
1448        "peer": peer,
1449        "transport": {
1450            "status": if transport_ok { "ok" } else { "error" },
1451            "relay_url": relay_url,
1452            "latency_ms": latency_ms,
1453        },
1454        "attention": attention,
1455        "responder": responder,
1456    });
1457
1458    if as_json {
1459        println!("{}", serde_json::to_string(&report)?);
1460    } else {
1461        let transport_line = if transport_ok {
1462            format!("ok relay reachable ({latency_ms}ms)")
1463        } else {
1464            "error relay unreachable".to_string()
1465        };
1466        println!("transport      {transport_line}");
1467        match report["attention"]["status"].as_str().unwrap_or("unknown") {
1468            "ok" => println!(
1469                "attention      ok last pull {}s ago",
1470                report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1471            ),
1472            "stale" => println!(
1473                "attention      stale last pull {}m ago",
1474                report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1475            ),
1476            "never_pulled" => println!("attention      never pulled since relay reset"),
1477            other => println!("attention      {other}"),
1478        }
1479        if report["responder"]["status"] == "not_reported" {
1480            println!("auto-responder not reported");
1481        } else {
1482            let record = &report["responder"]["record"];
1483            let status = record
1484                .get("status")
1485                .and_then(Value::as_str)
1486                .unwrap_or("unknown");
1487            let reason = record
1488                .get("reason")
1489                .and_then(Value::as_str)
1490                .map(|r| format!(" — {r}"))
1491                .unwrap_or_default();
1492            println!("auto-responder {status}{reason}");
1493        }
1494    }
1495    Ok(())
1496}
1497
1498// (Old cmd_join stub removed — superseded by cmd_pair_join below.)
1499
1500// ---------- whoami ----------
1501
1502fn cmd_whoami(as_json: bool) -> Result<()> {
1503    if !config::is_initialized()? {
1504        bail!("not initialized — run `wire init <handle>` first");
1505    }
1506    let card = config::read_agent_card()?;
1507    let did = card
1508        .get("did")
1509        .and_then(Value::as_str)
1510        .unwrap_or("")
1511        .to_string();
1512    let handle = card
1513        .get("handle")
1514        .and_then(Value::as_str)
1515        .map(str::to_string)
1516        .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1517    let pk_b64 = card
1518        .get("verify_keys")
1519        .and_then(Value::as_object)
1520        .and_then(|m| m.values().next())
1521        .and_then(|v| v.get("key"))
1522        .and_then(Value::as_str)
1523        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1524    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1525    let fp = fingerprint(&pk_bytes);
1526    let key_id = make_key_id(&handle, &pk_bytes);
1527    let capabilities = card
1528        .get("capabilities")
1529        .cloned()
1530        .unwrap_or_else(|| json!(["wire/v3.1"]));
1531
1532    if as_json {
1533        println!(
1534            "{}",
1535            serde_json::to_string(&json!({
1536                "did": did,
1537                "handle": handle,
1538                "fingerprint": fp,
1539                "key_id": key_id,
1540                "public_key_b64": pk_b64,
1541                "capabilities": capabilities,
1542                "config_dir": config::config_dir()?.to_string_lossy(),
1543            }))?
1544        );
1545    } else {
1546        println!("{did} (ed25519:{key_id})");
1547        println!("fingerprint: {fp}");
1548        println!("capabilities: {capabilities}");
1549    }
1550    Ok(())
1551}
1552
1553// ---------- peers ----------
1554
1555/// P0.Y (0.5.11): effective tier shown to operators. `wire add` pins a
1556/// peer's card into trust at VERIFIED immediately, but the bilateral pin
1557/// isn't complete until that peer's `pair_drop_ack` arrives carrying their
1558/// slot_token. Until then we CAN'T send to them. Displaying VERIFIED is
1559/// misleading — spark observed this in real usage.
1560///
1561/// Effective rules:
1562///   trust.tier == VERIFIED + relay_state.peers[h].slot_token empty -> "PENDING_ACK"
1563///   otherwise -> raw trust tier (UNTRUSTED / VERIFIED / etc.)
1564///
1565/// Strictly a display concern — trust state machine itself is untouched
1566/// so existing promote/demote logic still works.
1567fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1568    let raw = crate::trust::get_tier(trust, handle);
1569    if raw != "VERIFIED" {
1570        return raw.to_string();
1571    }
1572    let token = relay_state
1573        .get("peers")
1574        .and_then(|p| p.get(handle))
1575        .and_then(|p| p.get("slot_token"))
1576        .and_then(Value::as_str)
1577        .unwrap_or("");
1578    if token.is_empty() {
1579        "PENDING_ACK".to_string()
1580    } else {
1581        raw.to_string()
1582    }
1583}
1584
1585fn cmd_peers(as_json: bool) -> Result<()> {
1586    let trust = config::read_trust()?;
1587    let agents = trust
1588        .get("agents")
1589        .and_then(Value::as_object)
1590        .cloned()
1591        .unwrap_or_default();
1592    let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1593
1594    let mut self_did: Option<String> = None;
1595    if let Ok(card) = config::read_agent_card() {
1596        self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1597    }
1598
1599    let mut peers = Vec::new();
1600    for (handle, agent) in agents.iter() {
1601        let did = agent
1602            .get("did")
1603            .and_then(Value::as_str)
1604            .unwrap_or("")
1605            .to_string();
1606        if Some(did.as_str()) == self_did.as_deref() {
1607            continue; // skip self-attestation
1608        }
1609        let tier = effective_peer_tier(&trust, &relay_state, handle);
1610        let capabilities = agent
1611            .get("card")
1612            .and_then(|c| c.get("capabilities"))
1613            .cloned()
1614            .unwrap_or_else(|| json!([]));
1615        peers.push(json!({
1616            "handle": handle,
1617            "did": did,
1618            "tier": tier,
1619            "capabilities": capabilities,
1620        }));
1621    }
1622
1623    if as_json {
1624        println!("{}", serde_json::to_string(&peers)?);
1625    } else if peers.is_empty() {
1626        println!("no peers pinned (run `wire join <code>` to pair)");
1627    } else {
1628        for p in &peers {
1629            println!(
1630                "{:<20} {:<10} {}",
1631                p["handle"].as_str().unwrap_or(""),
1632                p["tier"].as_str().unwrap_or(""),
1633                p["did"].as_str().unwrap_or(""),
1634            );
1635        }
1636    }
1637    Ok(())
1638}
1639
1640// ---------- send ----------
1641
1642/// R4 attentiveness pre-flight. Best-effort: any failure is silent.
1643///
1644/// Looks up `peer` in relay-state for slot_id + slot_token + relay_url, asks
1645/// the relay for the slot's `last_pull_at_unix`, and prints a warning to
1646/// stderr if the peer hasn't polled in > 5min (or never has). Threshold of
1647/// 300s is the same wire daemon polling cadence rule-of-thumb — a peer
1648/// hasn't crossed two heartbeats means probably degraded.
1649fn maybe_warn_peer_attentiveness(peer: &str) {
1650    let state = match config::read_relay_state() {
1651        Ok(s) => s,
1652        Err(_) => return,
1653    };
1654    let p = state.get("peers").and_then(|p| p.get(peer));
1655    let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1656        Some(s) if !s.is_empty() => s,
1657        _ => return,
1658    };
1659    let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1660        Some(s) if !s.is_empty() => s,
1661        _ => return,
1662    };
1663    let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1664        Some(s) if !s.is_empty() => s.to_string(),
1665        _ => match state
1666            .get("self")
1667            .and_then(|s| s.get("relay_url"))
1668            .and_then(Value::as_str)
1669        {
1670            Some(s) if !s.is_empty() => s.to_string(),
1671            _ => return,
1672        },
1673    };
1674    let client = crate::relay_client::RelayClient::new(&relay_url);
1675    let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1676        Ok(t) => t,
1677        Err(_) => return,
1678    };
1679    let now = std::time::SystemTime::now()
1680        .duration_since(std::time::UNIX_EPOCH)
1681        .map(|d| d.as_secs())
1682        .unwrap_or(0);
1683    match last_pull {
1684        None => {
1685            eprintln!(
1686                "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1687            );
1688        }
1689        Some(t) if now.saturating_sub(t) > 300 => {
1690            let mins = now.saturating_sub(t) / 60;
1691            eprintln!(
1692                "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1693            );
1694        }
1695        _ => {}
1696    }
1697}
1698
1699pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1700    let trimmed = input.trim();
1701    if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1702    {
1703        return Ok(trimmed.to_string());
1704    }
1705    let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1706    let n: i64 = amount
1707        .parse()
1708        .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1709    if n <= 0 {
1710        bail!("deadline duration must be positive: {input:?}");
1711    }
1712    let duration = match unit {
1713        "m" => time::Duration::minutes(n),
1714        "h" => time::Duration::hours(n),
1715        "d" => time::Duration::days(n),
1716        _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1717    };
1718    Ok((time::OffsetDateTime::now_utc() + duration)
1719        .format(&time::format_description::well_known::Rfc3339)
1720        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1721}
1722
1723fn cmd_send(
1724    peer: &str,
1725    kind: &str,
1726    body_arg: &str,
1727    deadline: Option<&str>,
1728    as_json: bool,
1729) -> Result<()> {
1730    if !config::is_initialized()? {
1731        bail!("not initialized — run `wire init <handle>` first");
1732    }
1733    let sk_seed = config::read_private_key()?;
1734    let card = config::read_agent_card()?;
1735    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1736    let handle = crate::agent_card::display_handle_from_did(did).to_string();
1737    let pk_b64 = card
1738        .get("verify_keys")
1739        .and_then(Value::as_object)
1740        .and_then(|m| m.values().next())
1741        .and_then(|v| v.get("key"))
1742        .and_then(Value::as_str)
1743        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1744    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1745
1746    // Body: literal string, `@/path/to/body.json`, or `-` for stdin.
1747    // P0.S (0.5.11): stdin support lets shells pipe in long content
1748    // without quoting/escaping ceremony, and supports heredocs naturally:
1749    //   wire send peer - <<EOF ... EOF
1750    let body_value: Value = if body_arg == "-" {
1751        use std::io::Read;
1752        let mut raw = String::new();
1753        std::io::stdin()
1754            .read_to_string(&mut raw)
1755            .with_context(|| "reading body from stdin")?;
1756        // Try parsing as JSON first; fall back to string literal for
1757        // plain-text bodies.
1758        serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1759    } else if let Some(path) = body_arg.strip_prefix('@') {
1760        let raw =
1761            std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1762        serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1763    } else {
1764        Value::String(body_arg.to_string())
1765    };
1766
1767    let kind_id = parse_kind(kind)?;
1768
1769    let now = time::OffsetDateTime::now_utc()
1770        .format(&time::format_description::well_known::Rfc3339)
1771        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1772
1773    let mut event = json!({
1774        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1775        "timestamp": now,
1776        "from": did,
1777        "to": format!("did:wire:{peer}"),
1778        "type": kind,
1779        "kind": kind_id,
1780        "body": body_value,
1781    });
1782    if let Some(deadline) = deadline {
1783        event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1784    }
1785    let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1786    let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1787
1788    // R4: best-effort attentiveness pre-flight. Look up the peer's slot
1789    // coords in relay-state and ask the relay how recently the peer pulled.
1790    // Warn on stderr if the peer hasn't pulled in >5min OR has never pulled.
1791    // Never blocks the send — the event still queues to outbox.
1792    maybe_warn_peer_attentiveness(peer);
1793
1794    // For now we append to outbox JSONL and rely on a future daemon to push
1795    // to the relay. That's the file-system contract from AGENT_INTEGRATION.md.
1796    // Append goes through `config::append_outbox_record` which holds a per-
1797    // path mutex so concurrent senders cannot interleave bytes mid-line.
1798    let line = serde_json::to_vec(&signed)?;
1799    let outbox = config::append_outbox_record(peer, &line)?;
1800
1801    if as_json {
1802        println!(
1803            "{}",
1804            serde_json::to_string(&json!({
1805                "event_id": event_id,
1806                "status": "queued",
1807                "peer": peer,
1808                "outbox": outbox.to_string_lossy(),
1809            }))?
1810        );
1811    } else {
1812        println!(
1813            "queued event {event_id} → {peer} (outbox: {})",
1814            outbox.display()
1815        );
1816    }
1817    Ok(())
1818}
1819
1820fn parse_kind(s: &str) -> Result<u32> {
1821    if let Ok(n) = s.parse::<u32>() {
1822        return Ok(n);
1823    }
1824    for (id, name) in crate::signing::kinds() {
1825        if *name == s {
1826            return Ok(*id);
1827        }
1828    }
1829    // Unknown name — default to kind 1 (decision) for v0.1.
1830    Ok(1)
1831}
1832
1833// ---------- tail ----------
1834
1835fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
1836    let inbox = config::inbox_dir()?;
1837    if !inbox.exists() {
1838        if !as_json {
1839            eprintln!("no inbox yet — daemon hasn't run, or no events received");
1840        }
1841        return Ok(());
1842    }
1843    let trust = config::read_trust()?;
1844    let mut count = 0usize;
1845
1846    let entries: Vec<_> = std::fs::read_dir(&inbox)?
1847        .filter_map(|e| e.ok())
1848        .map(|e| e.path())
1849        .filter(|p| {
1850            p.extension().map(|x| x == "jsonl").unwrap_or(false)
1851                && match peer {
1852                    Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
1853                    None => true,
1854                }
1855        })
1856        .collect();
1857
1858    for path in entries {
1859        let body = std::fs::read_to_string(&path)?;
1860        for line in body.lines() {
1861            let event: Value = match serde_json::from_str(line) {
1862                Ok(v) => v,
1863                Err(_) => continue,
1864            };
1865            let verified = verify_message_v31(&event, &trust).is_ok();
1866            if as_json {
1867                let mut event_with_meta = event.clone();
1868                if let Some(obj) = event_with_meta.as_object_mut() {
1869                    obj.insert("verified".into(), json!(verified));
1870                }
1871                println!("{}", serde_json::to_string(&event_with_meta)?);
1872            } else {
1873                let ts = event
1874                    .get("timestamp")
1875                    .and_then(Value::as_str)
1876                    .unwrap_or("?");
1877                let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
1878                let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1879                let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
1880                let summary = event
1881                    .get("body")
1882                    .map(|b| match b {
1883                        Value::String(s) => s.clone(),
1884                        _ => b.to_string(),
1885                    })
1886                    .unwrap_or_default();
1887                let mark = if verified { "✓" } else { "✗" };
1888                let deadline = event
1889                    .get("time_sensitive_until")
1890                    .and_then(Value::as_str)
1891                    .map(|d| format!(" deadline: {d}"))
1892                    .unwrap_or_default();
1893                println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
1894            }
1895            count += 1;
1896            if limit > 0 && count >= limit {
1897                return Ok(());
1898            }
1899        }
1900    }
1901    Ok(())
1902}
1903
1904// ---------- monitor (live-tail across all peers, harness-friendly) ----------
1905
1906/// Events filtered out of `wire monitor` by default — pair handshake +
1907/// liveness pings. Operators almost never want these surfaced; an explicit
1908/// `--include-handshake` brings them back.
1909fn monitor_is_noise_kind(kind: &str) -> bool {
1910    matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
1911}
1912
1913/// Render a single InboxEvent for `wire monitor` output. JSON form emits the
1914/// full structured event for tooling consumption; the plain form is a tight
1915/// one-line summary suitable as a harness stream-watcher notification.
1916fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
1917    if as_json {
1918        Ok(serde_json::to_string(e)?)
1919    } else {
1920        let eid_short: String = e.event_id.chars().take(12).collect();
1921        let body = e.body_preview.replace('\n', " ");
1922        let ts: String = e.timestamp.chars().take(19).collect();
1923        Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
1924    }
1925}
1926
1927/// `wire monitor` — long-running line-per-event stream of new inbox events.
1928///
1929/// Built for agent harnesses that have an "every stdout line is a chat
1930/// notification" stream watcher (Claude Code Monitor tool, etc.). One
1931/// command, persistent, filtered. Replaces the manual `tail -F inbox/*.jsonl
1932/// | python parse | grep -v pair_drop` pipeline operators improvise on day
1933/// one of every wire session.
1934///
1935/// Default filter strips `pair_drop`, `pair_drop_ack`, and `heartbeat` —
1936/// pure handshake / liveness noise that operators almost never want
1937/// surfaced. Pass `--include-handshake` if you do.
1938///
1939/// Cursor: in-memory only. Starts from EOF (so a fresh `wire monitor`
1940/// doesn't drown the operator in replay), with optional `--replay N` to
1941/// emit the last N events first.
1942fn cmd_monitor(
1943    peer_filter: Option<&str>,
1944    as_json: bool,
1945    include_handshake: bool,
1946    interval_ms: u64,
1947    replay: usize,
1948) -> Result<()> {
1949    let inbox_dir = config::inbox_dir()?;
1950    if !inbox_dir.exists() {
1951        if !as_json {
1952            eprintln!(
1953                "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
1954            );
1955        }
1956        // Still proceed — InboxWatcher::from_dir_head handles missing dir.
1957    }
1958
1959    // Optional replay — read existing files and emit the last `replay` events
1960    // (post-filter) before going live. Useful when the harness restarts and
1961    // wants recent context.
1962    if replay > 0 && inbox_dir.exists() {
1963        let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
1964        for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
1965            let path = entry.path();
1966            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
1967                continue;
1968            }
1969            let peer = match path.file_stem().and_then(|s| s.to_str()) {
1970                Some(s) => s.to_string(),
1971                None => continue,
1972            };
1973            if let Some(filter) = peer_filter {
1974                if peer != filter {
1975                    continue;
1976                }
1977            }
1978            let body = std::fs::read_to_string(&path).unwrap_or_default();
1979            for line in body.lines() {
1980                let line = line.trim();
1981                if line.is_empty() {
1982                    continue;
1983                }
1984                let signed: Value = match serde_json::from_str(line) {
1985                    Ok(v) => v,
1986                    Err(_) => continue,
1987                };
1988                let ev = crate::inbox_watch::InboxEvent::from_signed(
1989                    &peer,
1990                    signed,
1991                    /* verified */ true,
1992                );
1993                if !include_handshake && monitor_is_noise_kind(&ev.kind) {
1994                    continue;
1995                }
1996                all.push(ev);
1997            }
1998        }
1999        // Sort by timestamp string (RFC3339-ish — lexicographic order matches
2000        // chronological for same-zoned timestamps).
2001        all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2002        let start = all.len().saturating_sub(replay);
2003        for ev in &all[start..] {
2004            println!("{}", monitor_render(ev, as_json)?);
2005        }
2006        use std::io::Write;
2007        std::io::stdout().flush().ok();
2008    }
2009
2010    // Live loop. InboxWatcher::from_head() seeds cursors at current EOF, so
2011    // the first poll only returns events that arrived AFTER startup.
2012    let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2013    let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2014
2015    loop {
2016        let events = w.poll()?;
2017        let mut wrote = false;
2018        for ev in events {
2019            if let Some(filter) = peer_filter {
2020                if ev.peer != filter {
2021                    continue;
2022                }
2023            }
2024            if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2025                continue;
2026            }
2027            println!("{}", monitor_render(&ev, as_json)?);
2028            wrote = true;
2029        }
2030        if wrote {
2031            use std::io::Write;
2032            std::io::stdout().flush().ok();
2033        }
2034        std::thread::sleep(sleep_dur);
2035    }
2036}
2037
2038#[cfg(test)]
2039mod tier_tests {
2040    use super::*;
2041    use serde_json::json;
2042
2043    fn trust_with(handle: &str, tier: &str) -> Value {
2044        json!({
2045            "version": 1,
2046            "agents": {
2047                handle: {
2048                    "tier": tier,
2049                    "did": format!("did:wire:{handle}"),
2050                    "card": {"capabilities": ["wire/v3.1"]}
2051                }
2052            }
2053        })
2054    }
2055
2056    #[test]
2057    fn pending_ack_when_verified_but_no_slot_token() {
2058        // P0.Y rule: after `wire add`, trust says VERIFIED but the peer's
2059        // slot_token hasn't arrived yet. Display PENDING_ACK so the
2060        // operator knows wire send won't work yet.
2061        let trust = trust_with("willard", "VERIFIED");
2062        let relay_state = json!({
2063            "peers": {
2064                "willard": {
2065                    "relay_url": "https://relay",
2066                    "slot_id": "abc",
2067                    "slot_token": "",
2068                }
2069            }
2070        });
2071        assert_eq!(
2072            effective_peer_tier(&trust, &relay_state, "willard"),
2073            "PENDING_ACK"
2074        );
2075    }
2076
2077    #[test]
2078    fn verified_when_slot_token_present() {
2079        let trust = trust_with("willard", "VERIFIED");
2080        let relay_state = json!({
2081            "peers": {
2082                "willard": {
2083                    "relay_url": "https://relay",
2084                    "slot_id": "abc",
2085                    "slot_token": "tok123",
2086                }
2087            }
2088        });
2089        assert_eq!(
2090            effective_peer_tier(&trust, &relay_state, "willard"),
2091            "VERIFIED"
2092        );
2093    }
2094
2095    #[test]
2096    fn raw_tier_passes_through_for_non_verified() {
2097        // PENDING_ACK should ONLY decorate VERIFIED. UNTRUSTED stays
2098        // UNTRUSTED regardless of slot_token state.
2099        let trust = trust_with("willard", "UNTRUSTED");
2100        let relay_state = json!({
2101            "peers": {"willard": {"slot_token": ""}}
2102        });
2103        assert_eq!(
2104            effective_peer_tier(&trust, &relay_state, "willard"),
2105            "UNTRUSTED"
2106        );
2107    }
2108
2109    #[test]
2110    fn pending_ack_when_relay_state_missing_peer() {
2111        // After wire add, trust gets updated BEFORE relay_state.peers does.
2112        // If relay_state has no entry for the peer at all, the operator
2113        // still hasn't completed the bilateral pin — show PENDING_ACK.
2114        let trust = trust_with("willard", "VERIFIED");
2115        let relay_state = json!({"peers": {}});
2116        assert_eq!(
2117            effective_peer_tier(&trust, &relay_state, "willard"),
2118            "PENDING_ACK"
2119        );
2120    }
2121}
2122
2123#[cfg(test)]
2124mod monitor_tests {
2125    use super::*;
2126    use crate::inbox_watch::InboxEvent;
2127    use serde_json::Value;
2128
2129    fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2130        InboxEvent {
2131            peer: peer.to_string(),
2132            event_id: "abcd1234567890ef".to_string(),
2133            kind: kind.to_string(),
2134            body_preview: body.to_string(),
2135            verified: true,
2136            timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2137            raw: Value::Null,
2138        }
2139    }
2140
2141    #[test]
2142    fn monitor_filter_drops_handshake_kinds_by_default() {
2143        // The whole point: pair_drop / pair_drop_ack / heartbeat are
2144        // protocol noise. If they leak into the operator's chat stream by
2145        // default, the recipe is useless ("wire monitor talks too much,
2146        // disabled it"). Burn this rule in.
2147        assert!(monitor_is_noise_kind("pair_drop"));
2148        assert!(monitor_is_noise_kind("pair_drop_ack"));
2149        assert!(monitor_is_noise_kind("heartbeat"));
2150
2151        // Real-payload kinds — operator wants every one.
2152        assert!(!monitor_is_noise_kind("claim"));
2153        assert!(!monitor_is_noise_kind("decision"));
2154        assert!(!monitor_is_noise_kind("ack"));
2155        assert!(!monitor_is_noise_kind("request"));
2156        assert!(!monitor_is_noise_kind("note"));
2157        // Unknown future kinds shouldn't be filtered as noise either —
2158        // operator probably wants to see something they don't recognise,
2159        // not have it silently dropped (the P0.1 lesson at the UX layer).
2160        assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2161    }
2162
2163    #[test]
2164    fn monitor_render_plain_is_one_short_line() {
2165        let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2166        let line = monitor_render(&e, false).unwrap();
2167        // Must be single-line.
2168        assert!(!line.contains('\n'), "render must be one line: {line}");
2169        // Must include peer, kind, body fragment, short event_id.
2170        assert!(line.contains("willard"));
2171        assert!(line.contains("claim"));
2172        assert!(line.contains("real v8 train"));
2173        // Short event id (first 12 chars).
2174        assert!(line.contains("abcd12345678"));
2175        assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2176        // RFC3339-ish second precision.
2177        assert!(line.contains("2026-05-15T23:14:07"));
2178    }
2179
2180    #[test]
2181    fn monitor_render_strips_newlines_from_body() {
2182        // Multi-line bodies (markdown lists, code, etc.) must collapse to
2183        // one line — otherwise a single message produces multiple
2184        // notifications in the harness, ruining the "one event = one line"
2185        // contract the Monitor tool relies on.
2186        let e = ev("spark", "claim", "line one\nline two\nline three");
2187        let line = monitor_render(&e, false).unwrap();
2188        assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2189        assert!(line.contains("line one line two line three"));
2190    }
2191
2192    #[test]
2193    fn monitor_render_json_is_valid_jsonl() {
2194        let e = ev("spark", "claim", "hi");
2195        let line = monitor_render(&e, true).unwrap();
2196        assert!(!line.contains('\n'));
2197        let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2198        assert_eq!(parsed["peer"], "spark");
2199        assert_eq!(parsed["kind"], "claim");
2200        assert_eq!(parsed["body_preview"], "hi");
2201    }
2202
2203    #[test]
2204    fn monitor_does_not_drop_on_verified_null() {
2205        // Spark's bug confession on 2026-05-15: their monitor pipeline ran
2206        // `select(.verified == true)` against inbox JSONL. Daemon writes
2207        // events with verified=null (verification happens at tail-time, not
2208        // write-time), so the filter silently rejected everything — same
2209        // anti-pattern as P0.1 at the JSON-jq level. Cost: 4 of my events
2210        // never surfaced for ~30min.
2211        //
2212        // wire monitor's render path must NOT consult `.verified` for any
2213        // filter decision. Lock that in here so a future "be conservative,
2214        // only emit verified" patch can't quietly land.
2215        let mut e = ev("spark", "claim", "from disk with verified=null");
2216        e.verified = false; // worst case — even if disk says unverified, emit
2217        let line = monitor_render(&e, false).unwrap();
2218        assert!(line.contains("from disk with verified=null"));
2219        // Noise filter operates purely on kind, never on verified.
2220        assert!(!monitor_is_noise_kind("claim"));
2221    }
2222}
2223
2224// ---------- verify ----------
2225
2226fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2227    let body = if path == "-" {
2228        let mut buf = String::new();
2229        use std::io::Read;
2230        std::io::stdin().read_to_string(&mut buf)?;
2231        buf
2232    } else {
2233        std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2234    };
2235    let event: Value = serde_json::from_str(&body)?;
2236    let trust = config::read_trust()?;
2237    match verify_message_v31(&event, &trust) {
2238        Ok(()) => {
2239            if as_json {
2240                println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2241            } else {
2242                println!("verified ✓");
2243            }
2244            Ok(())
2245        }
2246        Err(e) => {
2247            let reason = e.to_string();
2248            if as_json {
2249                println!(
2250                    "{}",
2251                    serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2252                );
2253            } else {
2254                eprintln!("FAILED: {reason}");
2255            }
2256            std::process::exit(1);
2257        }
2258    }
2259}
2260
2261// ---------- mcp / relay-server stubs ----------
2262
2263fn cmd_mcp() -> Result<()> {
2264    crate::mcp::run()
2265}
2266
2267fn cmd_relay_server(bind: &str) -> Result<()> {
2268    // Default state dir for the relay process: $WIRE_HOME/state/wire-relay
2269    // (or `dirs::state_dir()/wire-relay`). Distinct from the CLI's state dir
2270    // so a single user can run both client and server on one machine.
2271    let state_dir = if let Ok(home) = std::env::var("WIRE_HOME") {
2272        std::path::PathBuf::from(home)
2273            .join("state")
2274            .join("wire-relay")
2275    } else {
2276        dirs::state_dir()
2277            .or_else(dirs::data_local_dir)
2278            .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2279            .join("wire-relay")
2280    };
2281    let runtime = tokio::runtime::Builder::new_multi_thread()
2282        .enable_all()
2283        .build()?;
2284    runtime.block_on(crate::relay_server::serve(bind, state_dir))
2285}
2286
2287// ---------- bind-relay ----------
2288
2289fn cmd_bind_relay(url: &str, as_json: bool) -> Result<()> {
2290    if !config::is_initialized()? {
2291        bail!("not initialized — run `wire init <handle>` first");
2292    }
2293    let card = config::read_agent_card()?;
2294    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2295    let handle = crate::agent_card::display_handle_from_did(did).to_string();
2296
2297    let normalized = url.trim_end_matches('/');
2298    let client = crate::relay_client::RelayClient::new(normalized);
2299    client.check_healthz()?;
2300    let alloc = client.allocate_slot(Some(&handle))?;
2301    let mut state = config::read_relay_state()?;
2302    state["self"] = json!({
2303        "relay_url": url,
2304        "slot_id": alloc.slot_id,
2305        "slot_token": alloc.slot_token,
2306    });
2307    config::write_relay_state(&state)?;
2308
2309    if as_json {
2310        println!(
2311            "{}",
2312            serde_json::to_string(&json!({
2313                "relay_url": url,
2314                "slot_id": alloc.slot_id,
2315                "slot_token_present": true,
2316            }))?
2317        );
2318    } else {
2319        println!("bound to relay {url}");
2320        println!("slot_id: {}", alloc.slot_id);
2321        println!(
2322            "(slot_token written to {} mode 0600)",
2323            config::relay_state_path()?.display()
2324        );
2325    }
2326    Ok(())
2327}
2328
2329// ---------- add-peer-slot ----------
2330
2331fn cmd_add_peer_slot(
2332    handle: &str,
2333    url: &str,
2334    slot_id: &str,
2335    slot_token: &str,
2336    as_json: bool,
2337) -> Result<()> {
2338    let mut state = config::read_relay_state()?;
2339    let peers = state["peers"]
2340        .as_object_mut()
2341        .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2342    peers.insert(
2343        handle.to_string(),
2344        json!({
2345            "relay_url": url,
2346            "slot_id": slot_id,
2347            "slot_token": slot_token,
2348        }),
2349    );
2350    config::write_relay_state(&state)?;
2351    if as_json {
2352        println!(
2353            "{}",
2354            serde_json::to_string(&json!({
2355                "handle": handle,
2356                "relay_url": url,
2357                "slot_id": slot_id,
2358                "added": true,
2359            }))?
2360        );
2361    } else {
2362        println!("pinned peer slot for {handle} at {url} ({slot_id})");
2363    }
2364    Ok(())
2365}
2366
2367// ---------- push ----------
2368
2369fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2370    let state = config::read_relay_state()?;
2371    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2372    if peers.is_empty() {
2373        bail!(
2374            "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2375        );
2376    }
2377    let outbox_dir = config::outbox_dir()?;
2378    if !outbox_dir.exists() {
2379        if as_json {
2380            println!(
2381                "{}",
2382                serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2383            );
2384        } else {
2385            println!("phyllis: nothing to dial out — write a message first with `wire send`");
2386        }
2387        return Ok(());
2388    }
2389
2390    let mut pushed = Vec::new();
2391    let mut skipped = Vec::new();
2392
2393    for (peer_handle, slot_info) in peers.iter() {
2394        if let Some(want) = peer_filter
2395            && peer_handle != want
2396        {
2397            continue;
2398        }
2399        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2400        if !outbox.exists() {
2401            continue;
2402        }
2403        let url = slot_info["relay_url"]
2404            .as_str()
2405            .ok_or_else(|| anyhow!("peer {peer_handle} missing relay_url"))?;
2406        let slot_id = slot_info["slot_id"]
2407            .as_str()
2408            .ok_or_else(|| anyhow!("peer {peer_handle} missing slot_id"))?;
2409        let slot_token = slot_info["slot_token"]
2410            .as_str()
2411            .ok_or_else(|| anyhow!("peer {peer_handle} missing slot_token"))?;
2412        let client = crate::relay_client::RelayClient::new(url);
2413        let body = std::fs::read_to_string(&outbox)?;
2414        for line in body.lines() {
2415            let event: Value = match serde_json::from_str(line) {
2416                Ok(v) => v,
2417                Err(_) => continue,
2418            };
2419            let event_id = event
2420                .get("event_id")
2421                .and_then(Value::as_str)
2422                .unwrap_or("")
2423                .to_string();
2424            match client.post_event(slot_id, slot_token, &event) {
2425                Ok(resp) => {
2426                    if resp.status == "duplicate" {
2427                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
2428                    } else {
2429                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
2430                    }
2431                }
2432                Err(e) => {
2433                    skipped.push(
2434                        json!({"peer": peer_handle, "event_id": event_id, "reason": e.to_string()}),
2435                    );
2436                }
2437            }
2438        }
2439    }
2440
2441    if as_json {
2442        println!(
2443            "{}",
2444            serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2445        );
2446    } else {
2447        println!(
2448            "pushed {} event(s); skipped {} ({})",
2449            pushed.len(),
2450            skipped.len(),
2451            if skipped.is_empty() {
2452                "none"
2453            } else {
2454                "see --json for detail"
2455            }
2456        );
2457    }
2458    Ok(())
2459}
2460
2461// ---------- pull ----------
2462
2463fn cmd_pull(as_json: bool) -> Result<()> {
2464    let state = config::read_relay_state()?;
2465    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2466    if self_state.is_null() {
2467        bail!("self slot not bound — run `wire bind-relay <url>` first");
2468    }
2469    let url = self_state["relay_url"]
2470        .as_str()
2471        .ok_or_else(|| anyhow!("self.relay_url missing"))?;
2472    let slot_id = self_state["slot_id"]
2473        .as_str()
2474        .ok_or_else(|| anyhow!("self.slot_id missing"))?;
2475    let slot_token = self_state["slot_token"]
2476        .as_str()
2477        .ok_or_else(|| anyhow!("self.slot_token missing"))?;
2478    let last_event_id = self_state
2479        .get("last_pulled_event_id")
2480        .and_then(Value::as_str)
2481        .map(str::to_string);
2482
2483    let client = crate::relay_client::RelayClient::new(url);
2484    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
2485
2486    let inbox_dir = config::inbox_dir()?;
2487    config::ensure_dirs()?;
2488
2489    // P0.1 (0.5.11): cursor advancement now blocks on unknown kinds and
2490    // transient verify errors. See `pull::process_events`.
2491    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
2492
2493    // P0.3 (0.5.11): cursor write goes through update_relay_state, which
2494    // takes an exclusive flock on relay.lock for the whole RMW. Concurrent
2495    // wire processes (multi-daemon, CLI vs daemon, CLI vs MCP) serialise
2496    // through the lock — no more racing the cursor like today's debug.
2497    if let Some(eid) = &result.advance_cursor_to {
2498        let eid = eid.clone();
2499        config::update_relay_state(|state| {
2500            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2501                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
2502            }
2503            Ok(())
2504        })?;
2505    }
2506
2507    if as_json {
2508        println!(
2509            "{}",
2510            serde_json::to_string(&json!({
2511                "written": result.written,
2512                "rejected": result.rejected,
2513                "total_seen": events.len(),
2514                "cursor_blocked": result.blocked,
2515                "cursor_advanced_to": result.advance_cursor_to,
2516            }))?
2517        );
2518    } else {
2519        let blocking = result
2520            .rejected
2521            .iter()
2522            .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2523            .count();
2524        if blocking > 0 {
2525            println!(
2526                "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2527                events.len(),
2528                result.written.len(),
2529                result.rejected.len(),
2530                blocking,
2531            );
2532        } else {
2533            println!(
2534                "pulled {} event(s); wrote {}; rejected {}",
2535                events.len(),
2536                result.written.len(),
2537                result.rejected.len(),
2538            );
2539        }
2540    }
2541    Ok(())
2542}
2543
2544// ---------- rotate-slot ----------
2545
2546fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2547    if !config::is_initialized()? {
2548        bail!("not initialized — run `wire init <handle>` first");
2549    }
2550    let mut state = config::read_relay_state()?;
2551    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2552    if self_state.is_null() {
2553        bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
2554    }
2555    let url = self_state["relay_url"]
2556        .as_str()
2557        .ok_or_else(|| anyhow!("self.relay_url missing"))?
2558        .to_string();
2559    let old_slot_id = self_state["slot_id"]
2560        .as_str()
2561        .ok_or_else(|| anyhow!("self.slot_id missing"))?
2562        .to_string();
2563    let old_slot_token = self_state["slot_token"]
2564        .as_str()
2565        .ok_or_else(|| anyhow!("self.slot_token missing"))?
2566        .to_string();
2567
2568    // Read identity to sign the announcement.
2569    let card = config::read_agent_card()?;
2570    let did = card
2571        .get("did")
2572        .and_then(Value::as_str)
2573        .unwrap_or("")
2574        .to_string();
2575    let handle = crate::agent_card::display_handle_from_did(&did).to_string();
2576    let pk_b64 = card
2577        .get("verify_keys")
2578        .and_then(Value::as_object)
2579        .and_then(|m| m.values().next())
2580        .and_then(|v| v.get("key"))
2581        .and_then(Value::as_str)
2582        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
2583        .to_string();
2584    let pk_bytes = crate::signing::b64decode(&pk_b64)?;
2585    let sk_seed = config::read_private_key()?;
2586
2587    // Allocate new slot on the same relay.
2588    let normalized = url.trim_end_matches('/').to_string();
2589    let client = crate::relay_client::RelayClient::new(&normalized);
2590    client
2591        .check_healthz()
2592        .context("aborting rotation; old slot still valid")?;
2593    let alloc = client.allocate_slot(Some(&handle))?;
2594    let new_slot_id = alloc.slot_id.clone();
2595    let new_slot_token = alloc.slot_token.clone();
2596
2597    // Optionally announce the rotation to every paired peer via the OLD slot.
2598    // Each peer's recipient-side `wire pull` will pick up this event before
2599    // their daemon next polls the new slot — but auto-update of peer's
2600    // relay.json from a wire_close event is a v0.2 daemon feature; for now
2601    // peers see the event and an operator must manually `add-peer-slot` the
2602    // new coords, OR re-pair via SAS.
2603    let mut announced: Vec<String> = Vec::new();
2604    if !no_announce {
2605        let now = time::OffsetDateTime::now_utc()
2606            .format(&time::format_description::well_known::Rfc3339)
2607            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
2608        let body = json!({
2609            "reason": "operator-initiated slot rotation",
2610            "new_relay_url": url,
2611            "new_slot_id": new_slot_id,
2612            // NOTE: new_slot_token deliberately NOT shared in the broadcast.
2613            // In v0.1 slot tokens are bilateral-shared, so peer can post via
2614            // existing add-peer-slot flow if operator chooses to re-issue.
2615        });
2616        let peers = state["peers"].as_object().cloned().unwrap_or_default();
2617        for (peer_handle, _peer_info) in peers.iter() {
2618            let event = json!({
2619                "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
2620                "timestamp": now.clone(),
2621                "from": did,
2622                "to": format!("did:wire:{peer_handle}"),
2623                "type": "wire_close",
2624                "kind": 1201,
2625                "body": body.clone(),
2626            });
2627            let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
2628                Ok(s) => s,
2629                Err(e) => {
2630                    eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
2631                    continue;
2632                }
2633            };
2634            // Post to OUR old slot (we're announcing on our own slot, NOT
2635            // peer's slot — peer reads from us). Wait, this is wrong: peers
2636            // read from THEIR OWN slot via wire pull. To reach peer A, we
2637            // post to peer A's slot. Use the existing per-peer slot mapping.
2638            let peer_info = match state["peers"].get(peer_handle) {
2639                Some(p) => p.clone(),
2640                None => continue,
2641            };
2642            let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
2643            let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
2644            let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
2645            if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
2646                continue;
2647            }
2648            let peer_client = if peer_url == url {
2649                client.clone()
2650            } else {
2651                crate::relay_client::RelayClient::new(peer_url)
2652            };
2653            match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
2654                Ok(_) => announced.push(peer_handle.clone()),
2655                Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
2656            }
2657        }
2658    }
2659
2660    // Swap the self-slot to the new one.
2661    state["self"] = json!({
2662        "relay_url": url,
2663        "slot_id": new_slot_id,
2664        "slot_token": new_slot_token,
2665    });
2666    config::write_relay_state(&state)?;
2667
2668    if as_json {
2669        println!(
2670            "{}",
2671            serde_json::to_string(&json!({
2672                "rotated": true,
2673                "old_slot_id": old_slot_id,
2674                "new_slot_id": new_slot_id,
2675                "relay_url": url,
2676                "announced_to": announced,
2677            }))?
2678        );
2679    } else {
2680        println!("rotated slot on {url}");
2681        println!(
2682            "  old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
2683        );
2684        println!("  new slot_id: {new_slot_id}");
2685        if !announced.is_empty() {
2686            println!(
2687                "  announced wire_close (kind=1201) to: {}",
2688                announced.join(", ")
2689            );
2690        }
2691        println!();
2692        println!("next steps:");
2693        println!("  - peers see the wire_close event in their next `wire pull`");
2694        println!(
2695            "  - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
2696        );
2697        println!("    (or full re-pair via `wire pair-host`/`wire join`)");
2698        println!("  - until they do, you'll receive but they won't be able to reach you");
2699        // Suppress unused warning
2700        let _ = old_slot_token;
2701    }
2702    Ok(())
2703}
2704
2705// ---------- forget-peer ----------
2706
2707fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
2708    let mut trust = config::read_trust()?;
2709    let mut removed_from_trust = false;
2710    if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
2711        && agents.remove(handle).is_some()
2712    {
2713        removed_from_trust = true;
2714    }
2715    config::write_trust(&trust)?;
2716
2717    let mut state = config::read_relay_state()?;
2718    let mut removed_from_relay = false;
2719    if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
2720        && peers.remove(handle).is_some()
2721    {
2722        removed_from_relay = true;
2723    }
2724    config::write_relay_state(&state)?;
2725
2726    let mut purged: Vec<String> = Vec::new();
2727    if purge {
2728        for dir in [config::inbox_dir()?, config::outbox_dir()?] {
2729            let path = dir.join(format!("{handle}.jsonl"));
2730            if path.exists() {
2731                std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
2732                purged.push(path.to_string_lossy().into());
2733            }
2734        }
2735    }
2736
2737    if !removed_from_trust && !removed_from_relay {
2738        if as_json {
2739            println!(
2740                "{}",
2741                serde_json::to_string(&json!({
2742                    "removed": false,
2743                    "reason": format!("peer {handle:?} not pinned"),
2744                }))?
2745            );
2746        } else {
2747            eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
2748        }
2749        return Ok(());
2750    }
2751
2752    if as_json {
2753        println!(
2754            "{}",
2755            serde_json::to_string(&json!({
2756                "handle": handle,
2757                "removed_from_trust": removed_from_trust,
2758                "removed_from_relay_state": removed_from_relay,
2759                "purged_files": purged,
2760            }))?
2761        );
2762    } else {
2763        println!("forgot peer {handle:?}");
2764        if removed_from_trust {
2765            println!("  - removed from trust.json");
2766        }
2767        if removed_from_relay {
2768            println!("  - removed from relay.json");
2769        }
2770        if !purged.is_empty() {
2771            for p in &purged {
2772                println!("  - deleted {p}");
2773            }
2774        } else if !purge {
2775            println!("  (inbox/outbox files preserved; pass --purge to delete them)");
2776        }
2777    }
2778    Ok(())
2779}
2780
2781// ---------- daemon (long-lived push+pull sync) ----------
2782
2783fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
2784    if !config::is_initialized()? {
2785        bail!("not initialized — run `wire init <handle>` first");
2786    }
2787    let interval = std::time::Duration::from_secs(interval_secs.max(1));
2788
2789    if !as_json {
2790        if once {
2791            eprintln!("wire daemon: single sync cycle, then exit");
2792        } else {
2793            eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
2794        }
2795    }
2796
2797    // Recover from prior crash: any pending pair in transient state had its
2798    // in-memory SPAKE2 secret lost when the previous daemon exited. Release
2799    // the relay slots and mark the files so the operator can re-issue.
2800    if let Err(e) = crate::pending_pair::cleanup_on_startup() {
2801        eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
2802    }
2803
2804    // R1 phase 2: spawn the SSE stream subscriber. On every event pushed
2805    // to our slot, the subscriber signals `wake_rx`; we use it as the
2806    // sleep-or-wake gate of the polling loop. Polling stays as the
2807    // safety net — stream errors fall back transparently to the existing
2808    // interval-based cadence.
2809    let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
2810    if !once {
2811        crate::daemon_stream::spawn_stream_subscriber(wake_tx);
2812    }
2813
2814    loop {
2815        let pushed = run_sync_push().unwrap_or_else(|e| {
2816            eprintln!("daemon: push error: {e:#}");
2817            json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
2818        });
2819        let pulled = run_sync_pull().unwrap_or_else(|e| {
2820            eprintln!("daemon: pull error: {e:#}");
2821            json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
2822        });
2823        let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
2824            eprintln!("daemon: pending-pair tick error: {e:#}");
2825            json!({"transitions": []})
2826        });
2827
2828        if as_json {
2829            println!(
2830                "{}",
2831                serde_json::to_string(&json!({
2832                    "ts": time::OffsetDateTime::now_utc()
2833                        .format(&time::format_description::well_known::Rfc3339)
2834                        .unwrap_or_default(),
2835                    "push": pushed,
2836                    "pull": pulled,
2837                    "pairs": pairs,
2838                }))?
2839            );
2840        } else {
2841            let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
2842            let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
2843            let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
2844            let pair_transitions = pairs["transitions"]
2845                .as_array()
2846                .map(|a| a.len())
2847                .unwrap_or(0);
2848            if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
2849                eprintln!(
2850                    "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
2851                );
2852            }
2853            // Loud per-transition logging so operator sees pair progress live.
2854            if let Some(arr) = pairs["transitions"].as_array() {
2855                for t in arr {
2856                    eprintln!(
2857                        "  pair {} : {} → {}",
2858                        t.get("code").and_then(Value::as_str).unwrap_or("?"),
2859                        t.get("from").and_then(Value::as_str).unwrap_or("?"),
2860                        t.get("to").and_then(Value::as_str).unwrap_or("?")
2861                    );
2862                    if let Some(sas) = t.get("sas").and_then(Value::as_str)
2863                        && t.get("to").and_then(Value::as_str) == Some("sas_ready")
2864                    {
2865                        eprintln!("    SAS digits: {}-{}", &sas[..3], &sas[3..]);
2866                        eprintln!(
2867                            "    Run: wire pair-confirm {} {}",
2868                            t.get("code").and_then(Value::as_str).unwrap_or("?"),
2869                            sas
2870                        );
2871                    }
2872                }
2873            }
2874        }
2875
2876        if once {
2877            return Ok(());
2878        }
2879        // Wait either for the next poll-interval tick OR for a stream
2880        // wake signal — whichever comes first. Drain any additional
2881        // wake-ups that accumulated during the previous cycle since one
2882        // pull catches up everything.
2883        let _ = wake_rx.recv_timeout(interval);
2884        while wake_rx.try_recv().is_ok() {}
2885    }
2886}
2887
2888/// Programmatic push (no stdout, no exit on errors). Returns the same JSON
2889/// shape `wire push --json` emits.
2890fn run_sync_push() -> Result<Value> {
2891    let state = config::read_relay_state()?;
2892    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2893    if peers.is_empty() {
2894        return Ok(json!({"pushed": [], "skipped": []}));
2895    }
2896    let outbox_dir = config::outbox_dir()?;
2897    if !outbox_dir.exists() {
2898        return Ok(json!({"pushed": [], "skipped": []}));
2899    }
2900    let mut pushed = Vec::new();
2901    let mut skipped = Vec::new();
2902    for (peer_handle, slot_info) in peers.iter() {
2903        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2904        if !outbox.exists() {
2905            continue;
2906        }
2907        let url = slot_info["relay_url"].as_str().unwrap_or("");
2908        let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
2909        let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
2910        if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
2911            continue;
2912        }
2913        let client = crate::relay_client::RelayClient::new(url);
2914        let body = std::fs::read_to_string(&outbox)?;
2915        for line in body.lines() {
2916            let event: Value = match serde_json::from_str(line) {
2917                Ok(v) => v,
2918                Err(_) => continue,
2919            };
2920            let event_id = event
2921                .get("event_id")
2922                .and_then(Value::as_str)
2923                .unwrap_or("")
2924                .to_string();
2925            match client.post_event(slot_id, slot_token, &event) {
2926                Ok(resp) => {
2927                    if resp.status == "duplicate" {
2928                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
2929                    } else {
2930                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
2931                    }
2932                }
2933                Err(e) => {
2934                    skipped.push(
2935                        json!({"peer": peer_handle, "event_id": event_id, "reason": e.to_string()}),
2936                    );
2937                }
2938            }
2939        }
2940    }
2941    Ok(json!({"pushed": pushed, "skipped": skipped}))
2942}
2943
2944/// Programmatic pull. Same shape as `wire pull --json`.
2945fn run_sync_pull() -> Result<Value> {
2946    let state = config::read_relay_state()?;
2947    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2948    if self_state.is_null() {
2949        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
2950    }
2951    let url = self_state["relay_url"].as_str().unwrap_or("");
2952    let slot_id = self_state["slot_id"].as_str().unwrap_or("");
2953    let slot_token = self_state["slot_token"].as_str().unwrap_or("");
2954    let last_event_id = self_state
2955        .get("last_pulled_event_id")
2956        .and_then(Value::as_str)
2957        .map(str::to_string);
2958    if url.is_empty() {
2959        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
2960    }
2961    let client = crate::relay_client::RelayClient::new(url);
2962    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
2963    let inbox_dir = config::inbox_dir()?;
2964    config::ensure_dirs()?;
2965
2966    // P0.1 (0.5.11): shared cursor-blocking logic. Daemon's --once path
2967    // must match the CLI's `wire pull` semantics or version-skew bugs
2968    // re-emerge by another route.
2969    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
2970
2971    // P0.3 (0.5.11): same flock-protected RMW as cmd_pull.
2972    if let Some(eid) = &result.advance_cursor_to {
2973        let eid = eid.clone();
2974        config::update_relay_state(|state| {
2975            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2976                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
2977            }
2978            Ok(())
2979        })?;
2980    }
2981
2982    Ok(json!({
2983        "written": result.written,
2984        "rejected": result.rejected,
2985        "total_seen": events.len(),
2986        "cursor_blocked": result.blocked,
2987        "cursor_advanced_to": result.advance_cursor_to,
2988    }))
2989}
2990
2991// ---------- pin (manual out-of-band peer pairing) ----------
2992
2993fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
2994    let body =
2995        std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
2996    let card: Value =
2997        serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
2998    crate::agent_card::verify_agent_card(&card)
2999        .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3000
3001    let mut trust = config::read_trust()?;
3002    crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3003
3004    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3005    let handle = crate::agent_card::display_handle_from_did(did).to_string();
3006    config::write_trust(&trust)?;
3007
3008    if as_json {
3009        println!(
3010            "{}",
3011            serde_json::to_string(&json!({
3012                "handle": handle,
3013                "did": did,
3014                "tier": "VERIFIED",
3015                "pinned": true,
3016            }))?
3017        );
3018    } else {
3019        println!("pinned {handle} ({did}) at tier VERIFIED");
3020    }
3021    Ok(())
3022}
3023
3024// ---------- pair-host / pair-join (the magic-wormhole flow) ----------
3025
3026fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3027    pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3028}
3029
3030fn cmd_pair_join(
3031    code_phrase: &str,
3032    relay_url: &str,
3033    auto_yes: bool,
3034    timeout_secs: u64,
3035) -> Result<()> {
3036    pair_orchestrate(
3037        relay_url,
3038        Some(code_phrase),
3039        "guest",
3040        auto_yes,
3041        timeout_secs,
3042    )
3043}
3044
3045/// Shared orchestration for both sides of the SAS pairing.
3046///
3047/// Now thin: delegates to `pair_session::pair_session_open` / `_try_sas` /
3048/// `_finalize`. CLI keeps its interactive y/N prompt; MCP uses
3049/// `pair_session_confirm_sas` instead.
3050fn pair_orchestrate(
3051    relay_url: &str,
3052    code_in: Option<&str>,
3053    role: &str,
3054    auto_yes: bool,
3055    timeout_secs: u64,
3056) -> Result<()> {
3057    use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3058
3059    let mut s = pair_session_open(role, relay_url, code_in)?;
3060
3061    if role == "host" {
3062        eprintln!();
3063        eprintln!("share this code phrase with your peer:");
3064        eprintln!();
3065        eprintln!("    {}", s.code);
3066        eprintln!();
3067        eprintln!(
3068            "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3069            s.code
3070        );
3071    } else {
3072        eprintln!();
3073        eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3074    }
3075
3076    // Stage 2 — poll for SAS-ready with periodic progress heartbeat. The bare
3077    // pair_session_wait_for_sas helper is silent; the CLI wraps it in a loop
3078    // that emits a "waiting (Ns / Ts)" line every HEARTBEAT_SECS so operators
3079    // see the process is alive while the other side connects.
3080    const HEARTBEAT_SECS: u64 = 10;
3081    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3082    let started = std::time::Instant::now();
3083    let mut last_heartbeat = started;
3084    let formatted = loop {
3085        if let Some(sas) = pair_session_try_sas(&mut s)? {
3086            break sas;
3087        }
3088        let now = std::time::Instant::now();
3089        if now >= deadline {
3090            return Err(anyhow!(
3091                "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3092            ));
3093        }
3094        if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3095            let elapsed = now.duration_since(started).as_secs();
3096            eprintln!("  ... still waiting ({elapsed}s / {timeout_secs}s)");
3097            last_heartbeat = now;
3098        }
3099        std::thread::sleep(std::time::Duration::from_millis(250));
3100    };
3101
3102    eprintln!();
3103    eprintln!("SAS digits (must match peer's terminal):");
3104    eprintln!();
3105    eprintln!("    {formatted}");
3106    eprintln!();
3107
3108    // Stage 3 — operator confirmation. CLI uses interactive y/N for backward
3109    // compatibility; MCP uses pair_session_confirm_sas with the typed digits.
3110    if !auto_yes {
3111        eprint!("does this match your peer's terminal? [y/N]: ");
3112        use std::io::Write;
3113        std::io::stderr().flush().ok();
3114        let mut input = String::new();
3115        std::io::stdin().read_line(&mut input)?;
3116        let trimmed = input.trim().to_lowercase();
3117        if trimmed != "y" && trimmed != "yes" {
3118            bail!("SAS confirmation declined — aborting pairing");
3119        }
3120    }
3121    s.sas_confirmed = true;
3122
3123    // Stage 4 — seal+exchange bootstrap, pin peer.
3124    let result = pair_session_finalize(&mut s, timeout_secs)?;
3125
3126    let peer_did = result["paired_with"].as_str().unwrap_or("");
3127    let peer_role = if role == "host" { "guest" } else { "host" };
3128    eprintln!("paired with {peer_did} (peer role: {peer_role})");
3129    eprintln!("peer card pinned at tier VERIFIED");
3130    eprintln!(
3131        "peer relay slot saved to {}",
3132        config::relay_state_path()?.display()
3133    );
3134
3135    println!("{}", serde_json::to_string(&result)?);
3136    Ok(())
3137}
3138
3139// (poll_until helper removed — pair flow now uses pair_session::pair_session_wait_for_sas
3140// and pair_session_finalize, both of which inline their own deadline loops.)
3141
3142// ---------- pair — single-shot init + pair-* + setup ----------
3143
3144fn cmd_pair(
3145    handle: &str,
3146    code: Option<&str>,
3147    relay: &str,
3148    auto_yes: bool,
3149    timeout_secs: u64,
3150    no_setup: bool,
3151) -> Result<()> {
3152    // Step 1 — idempotent identity. Safe if already initialized with the SAME handle;
3153    // bails loudly if a different handle is already set (operator must explicitly delete).
3154    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3155    let did = init_result
3156        .get("did")
3157        .and_then(|v| v.as_str())
3158        .unwrap_or("(unknown)")
3159        .to_string();
3160    let already = init_result
3161        .get("already_initialized")
3162        .and_then(|v| v.as_bool())
3163        .unwrap_or(false);
3164    if already {
3165        println!("(identity {did} already initialized — reusing)");
3166    } else {
3167        println!("initialized {did}");
3168    }
3169    println!();
3170
3171    // Step 2 — pair-host or pair-join based on code presence.
3172    match code {
3173        None => {
3174            println!("hosting pair on {relay} (no code = host) ...");
3175            cmd_pair_host(relay, auto_yes, timeout_secs)?;
3176        }
3177        Some(c) => {
3178            println!("joining pair with code {c} on {relay} ...");
3179            cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3180        }
3181    }
3182
3183    // Step 3 — register wire as MCP server in detected client configs (idempotent).
3184    if !no_setup {
3185        println!();
3186        println!("registering wire as MCP server in detected client configs ...");
3187        if let Err(e) = cmd_setup(true) {
3188            // Non-fatal — pair succeeded, just print the warning.
3189            eprintln!("warn: setup --apply failed: {e}");
3190            eprintln!("      pair succeeded; you can re-run `wire setup --apply` manually.");
3191        }
3192    }
3193
3194    println!();
3195    println!("pair complete. Next steps:");
3196    println!("  wire daemon start              # background sync of inbox/outbox vs relay");
3197    println!("  wire send <peer> claim <msg>   # send your peer something");
3198    println!("  wire tail                      # watch incoming events");
3199    Ok(())
3200}
3201
3202// ---------- detached pair (daemon-orchestrated) ----------
3203
3204/// `wire pair <handle> [--code <phrase>] --detach` — wraps init + detach
3205/// pair-host/-join into a single command. The non-detached variant lives in
3206/// `cmd_pair`; this one short-circuits to the daemon-orchestrated path.
3207fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3208    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3209    let did = init_result
3210        .get("did")
3211        .and_then(|v| v.as_str())
3212        .unwrap_or("(unknown)")
3213        .to_string();
3214    let already = init_result
3215        .get("already_initialized")
3216        .and_then(|v| v.as_bool())
3217        .unwrap_or(false);
3218    if already {
3219        println!("(identity {did} already initialized — reusing)");
3220    } else {
3221        println!("initialized {did}");
3222    }
3223    println!();
3224    match code {
3225        None => cmd_pair_host_detach(relay, false),
3226        Some(c) => cmd_pair_join_detach(c, relay, false),
3227    }
3228}
3229
3230fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3231    if !config::is_initialized()? {
3232        bail!("not initialized — run `wire init <handle>` first");
3233    }
3234    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3235        Ok(b) => b,
3236        Err(e) => {
3237            if !as_json {
3238                eprintln!(
3239                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3240                );
3241            }
3242            false
3243        }
3244    };
3245    let code = crate::sas::generate_code_phrase();
3246    let code_hash = crate::pair_session::derive_code_hash(&code);
3247    let now = time::OffsetDateTime::now_utc()
3248        .format(&time::format_description::well_known::Rfc3339)
3249        .unwrap_or_default();
3250    let p = crate::pending_pair::PendingPair {
3251        code: code.clone(),
3252        code_hash,
3253        role: "host".to_string(),
3254        relay_url: relay_url.to_string(),
3255        status: "request_host".to_string(),
3256        sas: None,
3257        peer_did: None,
3258        created_at: now,
3259        last_error: None,
3260        pair_id: None,
3261        our_slot_id: None,
3262        our_slot_token: None,
3263        spake2_seed_b64: None,
3264    };
3265    crate::pending_pair::write_pending(&p)?;
3266    if as_json {
3267        println!(
3268            "{}",
3269            serde_json::to_string(&json!({
3270                "state": "queued",
3271                "code_phrase": code,
3272                "relay_url": relay_url,
3273                "role": "host",
3274                "daemon_spawned": daemon_spawned,
3275            }))?
3276        );
3277    } else {
3278        if daemon_spawned {
3279            println!("(started wire daemon in background)");
3280        }
3281        println!("detached pair-host queued. Share this code with your peer:\n");
3282        println!("    {code}\n");
3283        println!("Next steps:");
3284        println!("  wire pair-list                                # check status");
3285        println!("  wire pair-confirm {code} <digits>   # when SAS shows up");
3286        println!("  wire pair-cancel  {code}            # to abort");
3287    }
3288    Ok(())
3289}
3290
3291fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3292    if !config::is_initialized()? {
3293        bail!("not initialized — run `wire init <handle>` first");
3294    }
3295    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3296        Ok(b) => b,
3297        Err(e) => {
3298            if !as_json {
3299                eprintln!(
3300                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3301                );
3302            }
3303            false
3304        }
3305    };
3306    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3307    let code_hash = crate::pair_session::derive_code_hash(&code);
3308    let now = time::OffsetDateTime::now_utc()
3309        .format(&time::format_description::well_known::Rfc3339)
3310        .unwrap_or_default();
3311    let p = crate::pending_pair::PendingPair {
3312        code: code.clone(),
3313        code_hash,
3314        role: "guest".to_string(),
3315        relay_url: relay_url.to_string(),
3316        status: "request_guest".to_string(),
3317        sas: None,
3318        peer_did: None,
3319        created_at: now,
3320        last_error: None,
3321        pair_id: None,
3322        our_slot_id: None,
3323        our_slot_token: None,
3324        spake2_seed_b64: None,
3325    };
3326    crate::pending_pair::write_pending(&p)?;
3327    if as_json {
3328        println!(
3329            "{}",
3330            serde_json::to_string(&json!({
3331                "state": "queued",
3332                "code_phrase": code,
3333                "relay_url": relay_url,
3334                "role": "guest",
3335                "daemon_spawned": daemon_spawned,
3336            }))?
3337        );
3338    } else {
3339        if daemon_spawned {
3340            println!("(started wire daemon in background)");
3341        }
3342        println!("detached pair-join queued for code {code}.");
3343        println!(
3344            "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3345        );
3346    }
3347    Ok(())
3348}
3349
3350fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3351    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3352    let typed: String = typed_digits
3353        .chars()
3354        .filter(|c| c.is_ascii_digit())
3355        .collect();
3356    if typed.len() != 6 {
3357        bail!(
3358            "expected 6 digits (got {} after stripping non-digits)",
3359            typed.len()
3360        );
3361    }
3362    let mut p = crate::pending_pair::read_pending(&code)?
3363        .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3364    if p.status != "sas_ready" {
3365        bail!(
3366            "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3367            p.status
3368        );
3369    }
3370    let stored = p
3371        .sas
3372        .as_ref()
3373        .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3374        .clone();
3375    if stored == typed {
3376        p.status = "confirmed".to_string();
3377        crate::pending_pair::write_pending(&p)?;
3378        if as_json {
3379            println!(
3380                "{}",
3381                serde_json::to_string(&json!({
3382                    "state": "confirmed",
3383                    "code_phrase": code,
3384                }))?
3385            );
3386        } else {
3387            println!("digits match. Daemon will finalize the handshake on its next tick.");
3388            println!("Run `wire peers` after a few seconds to confirm.");
3389        }
3390    } else {
3391        p.status = "aborted".to_string();
3392        p.last_error = Some(format!(
3393            "SAS digit mismatch (typed {typed}, expected {stored})"
3394        ));
3395        let client = crate::relay_client::RelayClient::new(&p.relay_url);
3396        let _ = client.pair_abandon(&p.code_hash);
3397        crate::pending_pair::write_pending(&p)?;
3398        crate::os_notify::toast(
3399            &format!("wire — pair aborted ({})", p.code),
3400            p.last_error.as_deref().unwrap_or("digits mismatch"),
3401        );
3402        if as_json {
3403            println!(
3404                "{}",
3405                serde_json::to_string(&json!({
3406                    "state": "aborted",
3407                    "code_phrase": code,
3408                    "error": "digits mismatch",
3409                }))?
3410            );
3411        }
3412        bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3413    }
3414    Ok(())
3415}
3416
3417fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3418    if watch {
3419        return cmd_pair_list_watch(watch_interval_secs);
3420    }
3421    let items = crate::pending_pair::list_pending()?;
3422    if as_json {
3423        println!("{}", serde_json::to_string(&items)?);
3424        return Ok(());
3425    }
3426    if items.is_empty() {
3427        println!("no pending pair sessions.");
3428        return Ok(());
3429    }
3430    println!(
3431        "{:<15} {:<8} {:<18} {:<10} NOTE",
3432        "CODE", "ROLE", "STATUS", "SAS"
3433    );
3434    for p in items {
3435        let sas = p
3436            .sas
3437            .as_ref()
3438            .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3439            .unwrap_or_else(|| "—".to_string());
3440        let note = p
3441            .last_error
3442            .as_deref()
3443            .or(p.peer_did.as_deref())
3444            .unwrap_or("");
3445        println!(
3446            "{:<15} {:<8} {:<18} {:<10} {}",
3447            p.code, p.role, p.status, sas, note
3448        );
3449    }
3450    Ok(())
3451}
3452
3453/// Stream-mode pair-list: never exits. Diffs per-code state every
3454/// `interval_secs` and prints one JSON line per transition (creation,
3455/// status flip, deletion). Useful for shell pipelines:
3456///
3457/// ```text
3458/// wire pair-list --watch | while read line; do
3459///     CODE=$(echo "$line" | jq -r .code)
3460///     STATUS=$(echo "$line" | jq -r .status)
3461///     ...
3462/// done
3463/// ```
3464fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3465    use std::collections::HashMap;
3466    use std::io::Write;
3467    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3468    // Emit a snapshot synthetic event for every currently-pending pair on
3469    // startup so a consumer that arrives mid-flight sees the current state.
3470    let mut prev: HashMap<String, String> = HashMap::new();
3471    {
3472        let items = crate::pending_pair::list_pending()?;
3473        for p in &items {
3474            println!("{}", serde_json::to_string(&p)?);
3475            prev.insert(p.code.clone(), p.status.clone());
3476        }
3477        // Flush so the consumer's `while read` gets the snapshot promptly.
3478        let _ = std::io::stdout().flush();
3479    }
3480    loop {
3481        std::thread::sleep(interval);
3482        let items = match crate::pending_pair::list_pending() {
3483            Ok(v) => v,
3484            Err(_) => continue,
3485        };
3486        let mut cur: HashMap<String, String> = HashMap::new();
3487        for p in &items {
3488            cur.insert(p.code.clone(), p.status.clone());
3489            match prev.get(&p.code) {
3490                None => {
3491                    // New code appeared.
3492                    println!("{}", serde_json::to_string(&p)?);
3493                }
3494                Some(prev_status) if prev_status != &p.status => {
3495                    // Status flipped.
3496                    println!("{}", serde_json::to_string(&p)?);
3497                }
3498                _ => {}
3499            }
3500        }
3501        for code in prev.keys() {
3502            if !cur.contains_key(code) {
3503                // File disappeared → finalized or cancelled. Emit a synthetic
3504                // "removed" marker so the consumer sees the terminal event.
3505                println!(
3506                    "{}",
3507                    serde_json::to_string(&json!({
3508                        "code": code,
3509                        "status": "removed",
3510                        "_synthetic": true,
3511                    }))?
3512                );
3513            }
3514        }
3515        let _ = std::io::stdout().flush();
3516        prev = cur;
3517    }
3518}
3519
3520/// Block until a pending pair reaches `target_status` or terminates. Process
3521/// exit code carries the outcome (0 success, 1 terminated abnormally, 2
3522/// timeout) so shell scripts can branch directly.
3523fn cmd_pair_watch(
3524    code_phrase: &str,
3525    target_status: &str,
3526    timeout_secs: u64,
3527    as_json: bool,
3528) -> Result<()> {
3529    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3530    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3531    let mut last_seen_status: Option<String> = None;
3532    loop {
3533        let p_opt = crate::pending_pair::read_pending(&code)?;
3534        let now = std::time::Instant::now();
3535        match p_opt {
3536            None => {
3537                // File gone — either finalized (success if target=sas_ready
3538                // since finalization implies it passed sas_ready) or never
3539                // existed. Distinguish by whether we ever saw it.
3540                if last_seen_status.is_some() {
3541                    if as_json {
3542                        println!(
3543                            "{}",
3544                            serde_json::to_string(&json!({"state": "finalized", "code": code}))?
3545                        );
3546                    } else {
3547                        println!("pair {code} finalized (file removed)");
3548                    }
3549                    return Ok(());
3550                } else {
3551                    if as_json {
3552                        println!(
3553                            "{}",
3554                            serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
3555                        );
3556                    }
3557                    std::process::exit(1);
3558                }
3559            }
3560            Some(p) => {
3561                let cur = p.status.clone();
3562                if Some(cur.clone()) != last_seen_status {
3563                    if as_json {
3564                        // Emit per-transition line so scripts can stream.
3565                        println!("{}", serde_json::to_string(&p)?);
3566                    }
3567                    last_seen_status = Some(cur.clone());
3568                }
3569                if cur == target_status {
3570                    if !as_json {
3571                        let sas_str = p
3572                            .sas
3573                            .as_ref()
3574                            .map(|s| format!("{}-{}", &s[..3], &s[3..]))
3575                            .unwrap_or_else(|| "—".to_string());
3576                        println!("pair {code} reached {target_status} (SAS: {sas_str})");
3577                    }
3578                    return Ok(());
3579                }
3580                if cur == "aborted" || cur == "aborted_restart" {
3581                    if !as_json {
3582                        let err = p.last_error.as_deref().unwrap_or("(no detail)");
3583                        eprintln!("pair {code} {cur}: {err}");
3584                    }
3585                    std::process::exit(1);
3586                }
3587            }
3588        }
3589        if now >= deadline {
3590            if !as_json {
3591                eprintln!(
3592                    "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
3593                );
3594            }
3595            std::process::exit(2);
3596        }
3597        std::thread::sleep(std::time::Duration::from_millis(250));
3598    }
3599}
3600
3601fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
3602    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3603    let p = crate::pending_pair::read_pending(&code)?
3604        .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
3605    let client = crate::relay_client::RelayClient::new(&p.relay_url);
3606    let _ = client.pair_abandon(&p.code_hash);
3607    crate::pending_pair::delete_pending(&code)?;
3608    if as_json {
3609        println!(
3610            "{}",
3611            serde_json::to_string(&json!({
3612                "state": "cancelled",
3613                "code_phrase": code,
3614            }))?
3615        );
3616    } else {
3617        println!("cancelled pending pair {code} (relay slot released, file removed).");
3618    }
3619    Ok(())
3620}
3621
3622// ---------- pair-abandon — release stuck pair-slot ----------
3623
3624fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
3625    // Accept either the raw phrase (e.g. "53-CKWIA5") or whatever the user
3626    // typed — normalize via the existing parser.
3627    let code = crate::sas::parse_code_phrase(code_phrase)?;
3628    let code_hash = crate::pair_session::derive_code_hash(code);
3629    let client = crate::relay_client::RelayClient::new(relay_url);
3630    client.pair_abandon(&code_hash)?;
3631    println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
3632    println!("host can now issue a fresh code; guest can re-join.");
3633    Ok(())
3634}
3635
3636// ---------- invite / accept — one-paste pair (v0.4.0) ----------
3637
3638fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
3639    let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
3640
3641    // If --share, register the invite at the relay's short-URL endpoint and
3642    // build the one-curl onboarding line for the peer to paste.
3643    let share_payload: Option<Value> = if share {
3644        let client = reqwest::blocking::Client::new();
3645        let single_use = if uses == 1 { Some(1u32) } else { None };
3646        let body = json!({
3647            "invite_url": url,
3648            "ttl_seconds": ttl,
3649            "uses": single_use,
3650        });
3651        let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
3652        let resp = client.post(&endpoint).json(&body).send()?;
3653        if !resp.status().is_success() {
3654            let code = resp.status();
3655            let txt = resp.text().unwrap_or_default();
3656            bail!("relay {code} on /v1/invite/register: {txt}");
3657        }
3658        let parsed: Value = resp.json()?;
3659        let token = parsed
3660            .get("token")
3661            .and_then(Value::as_str)
3662            .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
3663            .to_string();
3664        let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
3665        let curl_line = format!("curl -fsSL {share_url} | sh");
3666        Some(json!({
3667            "token": token,
3668            "share_url": share_url,
3669            "curl": curl_line,
3670            "expires_unix": parsed.get("expires_unix"),
3671        }))
3672    } else {
3673        None
3674    };
3675
3676    if as_json {
3677        let mut out = json!({
3678            "invite_url": url,
3679            "ttl_secs": ttl,
3680            "uses": uses,
3681            "relay": relay,
3682        });
3683        if let Some(s) = &share_payload {
3684            out["share"] = s.clone();
3685        }
3686        println!("{}", serde_json::to_string(&out)?);
3687    } else if let Some(s) = share_payload {
3688        let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
3689        eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
3690        eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
3691        println!("{curl}");
3692    } else {
3693        eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
3694        eprintln!("# TTL: {ttl}s. Uses: {uses}.");
3695        println!("{url}");
3696    }
3697    Ok(())
3698}
3699
3700fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
3701    // If the user pasted an HTTP(S) short URL (e.g. https://wireup.net/i/AB12),
3702    // resolve it to the underlying wire://pair?... URL via ?format=url before
3703    // accepting. Saves them from having to know which URL shape goes where.
3704    let resolved = if url.starts_with("http://") || url.starts_with("https://") {
3705        let sep = if url.contains('?') { '&' } else { '?' };
3706        let resolve_url = format!("{url}{sep}format=url");
3707        let client = reqwest::blocking::Client::new();
3708        let resp = client
3709            .get(&resolve_url)
3710            .send()
3711            .with_context(|| format!("GET {resolve_url}"))?;
3712        if !resp.status().is_success() {
3713            bail!("could not resolve short URL {url} (HTTP {})", resp.status());
3714        }
3715        let body = resp.text().unwrap_or_default().trim().to_string();
3716        if !body.starts_with("wire://pair?") {
3717            bail!(
3718                "short URL {url} did not resolve to a wire:// invite. \
3719                 (got: {}{})",
3720                body.chars().take(80).collect::<String>(),
3721                if body.chars().count() > 80 { "…" } else { "" }
3722            );
3723        }
3724        body
3725    } else {
3726        url.to_string()
3727    };
3728
3729    let result = crate::pair_invite::accept_invite(&resolved)?;
3730    if as_json {
3731        println!("{}", serde_json::to_string(&result)?);
3732    } else {
3733        let did = result
3734            .get("paired_with")
3735            .and_then(Value::as_str)
3736            .unwrap_or("?");
3737        println!("paired with {did}");
3738        println!(
3739            "you can now: wire send {} <kind> <body>",
3740            crate::agent_card::display_handle_from_did(did)
3741        );
3742    }
3743    Ok(())
3744}
3745
3746// ---------- whois / profile (v0.5) ----------
3747
3748fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
3749    if let Some(h) = handle {
3750        let parsed = crate::pair_profile::parse_handle(h)?;
3751        // Special-case: if the supplied handle matches our own, skip the
3752        // network round-trip and print local.
3753        if config::is_initialized()? {
3754            let card = config::read_agent_card()?;
3755            let local_handle = card
3756                .get("profile")
3757                .and_then(|p| p.get("handle"))
3758                .and_then(Value::as_str)
3759                .map(str::to_string);
3760            if local_handle.as_deref() == Some(h) {
3761                return cmd_whois(None, as_json, None);
3762            }
3763        }
3764        // Remote resolution via .well-known/wire/agent on the handle's domain.
3765        let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
3766        if as_json {
3767            println!("{}", serde_json::to_string(&resolved)?);
3768        } else {
3769            print_resolved_profile(&resolved);
3770        }
3771        return Ok(());
3772    }
3773    let card = config::read_agent_card()?;
3774    if as_json {
3775        let profile = card.get("profile").cloned().unwrap_or(Value::Null);
3776        println!(
3777            "{}",
3778            serde_json::to_string(&json!({
3779                "did": card.get("did").cloned().unwrap_or(Value::Null),
3780                "profile": profile,
3781            }))?
3782        );
3783    } else {
3784        print!("{}", crate::pair_profile::render_self_summary()?);
3785    }
3786    Ok(())
3787}
3788
3789fn print_resolved_profile(resolved: &Value) {
3790    let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
3791    let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
3792    let relay = resolved
3793        .get("relay_url")
3794        .and_then(Value::as_str)
3795        .unwrap_or("");
3796    let slot = resolved
3797        .get("slot_id")
3798        .and_then(Value::as_str)
3799        .unwrap_or("");
3800    let profile = resolved
3801        .get("card")
3802        .and_then(|c| c.get("profile"))
3803        .cloned()
3804        .unwrap_or(Value::Null);
3805    println!("{did}");
3806    println!("  nick:         {nick}");
3807    if !relay.is_empty() {
3808        println!("  relay_url:    {relay}");
3809    }
3810    if !slot.is_empty() {
3811        println!("  slot_id:      {slot}");
3812    }
3813    let pick =
3814        |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
3815    if let Some(s) = pick("display_name") {
3816        println!("  display_name: {s}");
3817    }
3818    if let Some(s) = pick("emoji") {
3819        println!("  emoji:        {s}");
3820    }
3821    if let Some(s) = pick("motto") {
3822        println!("  motto:        {s}");
3823    }
3824    if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
3825        let joined: Vec<String> = arr
3826            .iter()
3827            .filter_map(|v| v.as_str().map(str::to_string))
3828            .collect();
3829        println!("  vibe:         {}", joined.join(", "));
3830    }
3831    if let Some(s) = pick("pronouns") {
3832        println!("  pronouns:     {s}");
3833    }
3834}
3835
3836/// `wire add <nick@domain>` — zero-paste pair. Resolve handle, build a
3837/// signed pair_drop event with our card + slot coords, deliver via the
3838/// peer relay's `/v1/handle/intro/<nick>` endpoint (no slot_token needed).
3839/// Peer's daemon completes the bilateral pin on its next pull and emits a
3840/// pair_drop_ack carrying their slot_token so we can send back.
3841fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
3842    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
3843
3844    // 1. Auto-init self if needed + ensure a relay slot.
3845    let (our_did, our_relay, our_slot_id, our_slot_token) =
3846        crate::pair_invite::ensure_self_with_relay(relay_override)?;
3847    if our_did == format!("did:wire:{}", parsed.nick) {
3848        // Lazy guard — actual self-add would also be caught by FCFS later.
3849        bail!("refusing to add self (handle matches own DID)");
3850    }
3851
3852    // 2. Resolve peer via .well-known on their relay.
3853    let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
3854    let peer_card = resolved
3855        .get("card")
3856        .cloned()
3857        .ok_or_else(|| anyhow!("resolved missing card"))?;
3858    let peer_did = resolved
3859        .get("did")
3860        .and_then(Value::as_str)
3861        .ok_or_else(|| anyhow!("resolved missing did"))?
3862        .to_string();
3863    let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
3864    let peer_slot_id = resolved
3865        .get("slot_id")
3866        .and_then(Value::as_str)
3867        .ok_or_else(|| anyhow!("resolved missing slot_id"))?
3868        .to_string();
3869    let peer_relay = resolved
3870        .get("relay_url")
3871        .and_then(Value::as_str)
3872        .map(str::to_string)
3873        .or_else(|| relay_override.map(str::to_string))
3874        .unwrap_or_else(|| format!("https://{}", parsed.domain));
3875
3876    // 3. Pin peer in trust + relay-state. slot_token will arrive via ack.
3877    let mut trust = config::read_trust()?;
3878    crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
3879    config::write_trust(&trust)?;
3880    let mut relay_state = config::read_relay_state()?;
3881    let existing_token = relay_state
3882        .get("peers")
3883        .and_then(|p| p.get(&peer_handle))
3884        .and_then(|p| p.get("slot_token"))
3885        .and_then(Value::as_str)
3886        .map(str::to_string)
3887        .unwrap_or_default();
3888    relay_state["peers"][&peer_handle] = json!({
3889        "relay_url": peer_relay,
3890        "slot_id": peer_slot_id,
3891        "slot_token": existing_token, // empty until pair_drop_ack lands
3892    });
3893    config::write_relay_state(&relay_state)?;
3894
3895    // 4. Build signed pair_drop with our card + coords (no pair_nonce — this
3896    // is the v0.5 zero-paste open-mode path).
3897    let our_card = config::read_agent_card()?;
3898    let sk_seed = config::read_private_key()?;
3899    let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
3900    let pk_b64 = our_card
3901        .get("verify_keys")
3902        .and_then(Value::as_object)
3903        .and_then(|m| m.values().next())
3904        .and_then(|v| v.get("key"))
3905        .and_then(Value::as_str)
3906        .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
3907    let pk_bytes = crate::signing::b64decode(pk_b64)?;
3908    let now = time::OffsetDateTime::now_utc()
3909        .format(&time::format_description::well_known::Rfc3339)
3910        .unwrap_or_default();
3911    let event = json!({
3912        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
3913        "timestamp": now,
3914        "from": our_did,
3915        "to": peer_did,
3916        "type": "pair_drop",
3917        "kind": 1100u32,
3918        "body": {
3919            "card": our_card,
3920            "relay_url": our_relay,
3921            "slot_id": our_slot_id,
3922            "slot_token": our_slot_token,
3923        },
3924    });
3925    let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
3926
3927    // 5. Deliver via /v1/handle/intro/<nick> (auth-free; relay validates kind).
3928    let client = crate::relay_client::RelayClient::new(&peer_relay);
3929    let resp = client.handle_intro(&parsed.nick, &signed)?;
3930    let event_id = signed
3931        .get("event_id")
3932        .and_then(Value::as_str)
3933        .unwrap_or("")
3934        .to_string();
3935
3936    if as_json {
3937        println!(
3938            "{}",
3939            serde_json::to_string(&json!({
3940                "handle": handle_arg,
3941                "paired_with": peer_did,
3942                "peer_handle": peer_handle,
3943                "event_id": event_id,
3944                "drop_response": resp,
3945                "status": "drop_sent",
3946            }))?
3947        );
3948    } else {
3949        println!(
3950            "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
3951        );
3952    }
3953    Ok(())
3954}
3955
3956// ---------- diag (structured trace) ----------
3957
3958fn cmd_diag(action: DiagAction) -> Result<()> {
3959    let state = config::state_dir()?;
3960    let knob = state.join("diag.enabled");
3961    let log_path = state.join("diag.jsonl");
3962    match action {
3963        DiagAction::Tail { limit, json } => {
3964            let entries = crate::diag::tail(limit);
3965            if json {
3966                for e in entries {
3967                    println!("{}", serde_json::to_string(&e)?);
3968                }
3969            } else if entries.is_empty() {
3970                println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
3971            } else {
3972                for e in entries {
3973                    let ts = e["ts"].as_u64().unwrap_or(0);
3974                    let ty = e["type"].as_str().unwrap_or("?");
3975                    let pid = e["pid"].as_u64().unwrap_or(0);
3976                    let payload = e["payload"].to_string();
3977                    println!("[{ts}] pid={pid} {ty} {payload}");
3978                }
3979            }
3980        }
3981        DiagAction::Enable => {
3982            config::ensure_dirs()?;
3983            std::fs::write(&knob, "1")?;
3984            println!("wire diag: enabled at {knob:?}");
3985        }
3986        DiagAction::Disable => {
3987            if knob.exists() {
3988                std::fs::remove_file(&knob)?;
3989            }
3990            println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
3991        }
3992        DiagAction::Status { json } => {
3993            let enabled = crate::diag::is_enabled();
3994            let size = std::fs::metadata(&log_path)
3995                .map(|m| m.len())
3996                .unwrap_or(0);
3997            if json {
3998                println!(
3999                    "{}",
4000                    serde_json::to_string(&serde_json::json!({
4001                        "enabled": enabled,
4002                        "log_path": log_path,
4003                        "log_size_bytes": size,
4004                    }))?
4005                );
4006            } else {
4007                println!("wire diag status");
4008                println!("  enabled:    {enabled}");
4009                println!("  log:        {log_path:?}");
4010                println!("  log size:   {size} bytes");
4011            }
4012        }
4013    }
4014    Ok(())
4015}
4016
4017// ---------- service (install / uninstall / status) ----------
4018
4019fn cmd_service(action: ServiceAction) -> Result<()> {
4020    let (report, as_json) = match action {
4021        ServiceAction::Install { json } => (crate::service::install()?, json),
4022        ServiceAction::Uninstall { json } => (crate::service::uninstall()?, json),
4023        ServiceAction::Status { json } => (crate::service::status()?, json),
4024    };
4025    if as_json {
4026        println!("{}", serde_json::to_string(&report)?);
4027    } else {
4028        println!("wire service {}", report.action);
4029        println!("  platform:  {}", report.platform);
4030        println!("  unit:      {}", report.unit_path);
4031        println!("  status:    {}", report.status);
4032        println!("  detail:    {}", report.detail);
4033    }
4034    Ok(())
4035}
4036
4037// ---------- upgrade (atomic daemon swap) ----------
4038
4039/// `wire upgrade` — kill all running `wire daemon` processes, spawn a
4040/// fresh one from the currently-installed binary, write a new versioned
4041/// pidfile. The fix for today's exact failure mode: a daemon process that
4042/// kept running OLD binary text in memory under a symlink that had since
4043/// been repointed at a NEW binary on disk.
4044///
4045/// Idempotent. If no stale daemon is running, just starts a fresh one
4046/// (same as `wire daemon &` but with the wait-until-alive guard from
4047/// ensure_up::ensure_daemon_running).
4048///
4049/// `--check` mode reports drift without acting — lists the processes
4050/// that WOULD be killed and the binary version of each.
4051fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
4052    // 1. Identify all `wire daemon` processes.
4053    let pgrep_out = std::process::Command::new("pgrep")
4054        .args(["-f", "wire daemon"])
4055        .output();
4056    let running_pids: Vec<u32> = match pgrep_out {
4057        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
4058            .split_whitespace()
4059            .filter_map(|s| s.parse::<u32>().ok())
4060            .collect(),
4061        _ => Vec::new(),
4062    };
4063
4064    // 2. Read pidfile to surface what the daemon THINKS it is.
4065    let record = crate::ensure_up::read_pid_record("daemon");
4066    let recorded_version: Option<String> = match &record {
4067        crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
4068        crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
4069        _ => None,
4070    };
4071    let cli_version = env!("CARGO_PKG_VERSION").to_string();
4072
4073    if check_only {
4074        let report = json!({
4075            "running_pids": running_pids,
4076            "pidfile_version": recorded_version,
4077            "cli_version": cli_version,
4078            "would_kill": running_pids,
4079        });
4080        if as_json {
4081            println!("{}", serde_json::to_string(&report)?);
4082        } else {
4083            println!("wire upgrade --check");
4084            println!("  cli version:      {cli_version}");
4085            println!("  pidfile version:  {}", recorded_version.as_deref().unwrap_or("(missing)"));
4086            if running_pids.is_empty() {
4087                println!("  running daemons:  none");
4088            } else {
4089                let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
4090                println!("  running daemons:  pids {}", pids.join(", "));
4091                println!("  would kill all + spawn fresh");
4092            }
4093        }
4094        return Ok(());
4095    }
4096
4097    // 3. Kill every running wire daemon. Use SIGTERM first, then SIGKILL
4098    // after a brief grace period.
4099    let mut killed: Vec<u32> = Vec::new();
4100    for pid in &running_pids {
4101        // SIGTERM (15).
4102        let _ = std::process::Command::new("kill")
4103            .args(["-15", &pid.to_string()])
4104            .status();
4105        killed.push(*pid);
4106    }
4107    // Wait up to ~2s for graceful exit.
4108    if !killed.is_empty() {
4109        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
4110        loop {
4111            let still_alive: Vec<u32> = killed
4112                .iter()
4113                .copied()
4114                .filter(|p| process_alive_pid(*p))
4115                .collect();
4116            if still_alive.is_empty() {
4117                break;
4118            }
4119            if std::time::Instant::now() >= deadline {
4120                // SIGKILL hold-outs.
4121                for pid in still_alive {
4122                    let _ = std::process::Command::new("kill")
4123                        .args(["-9", &pid.to_string()])
4124                        .status();
4125                }
4126                break;
4127            }
4128            std::thread::sleep(std::time::Duration::from_millis(50));
4129        }
4130    }
4131
4132    // 4. Remove stale pidfile so ensure_daemon_running doesn't think the
4133    //    old daemon is still owning it.
4134    let pidfile = config::state_dir()?.join("daemon.pid");
4135    if pidfile.exists() {
4136        let _ = std::fs::remove_file(&pidfile);
4137    }
4138
4139    // 5. Spawn fresh daemon via ensure_up — atomically waits for
4140    //    process_alive + writes the versioned pidfile.
4141    let spawned = crate::ensure_up::ensure_daemon_running()?;
4142
4143    let new_record = crate::ensure_up::read_pid_record("daemon");
4144    let new_pid = new_record.pid();
4145    let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
4146        Some(d.version.clone())
4147    } else {
4148        None
4149    };
4150
4151    if as_json {
4152        println!(
4153            "{}",
4154            serde_json::to_string(&json!({
4155                "killed": killed,
4156                "spawned_fresh_daemon": spawned,
4157                "new_pid": new_pid,
4158                "new_version": new_version,
4159                "cli_version": cli_version,
4160            }))?
4161        );
4162    } else {
4163        if killed.is_empty() {
4164            println!("wire upgrade: no stale daemons running");
4165        } else {
4166            println!("wire upgrade: killed {} daemon(s) (pids {})",
4167                killed.len(),
4168                killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
4169        }
4170        if spawned {
4171            println!(
4172                "wire upgrade: spawned fresh daemon (pid {} v{})",
4173                new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
4174                new_version.as_deref().unwrap_or(&cli_version),
4175            );
4176        } else {
4177            println!("wire upgrade: daemon was already running on current binary");
4178        }
4179    }
4180    Ok(())
4181}
4182
4183fn process_alive_pid(pid: u32) -> bool {
4184    #[cfg(target_os = "linux")]
4185    {
4186        std::path::Path::new(&format!("/proc/{pid}")).exists()
4187    }
4188    #[cfg(not(target_os = "linux"))]
4189    {
4190        std::process::Command::new("kill")
4191            .args(["-0", &pid.to_string()])
4192            .stdin(std::process::Stdio::null())
4193            .stdout(std::process::Stdio::null())
4194            .stderr(std::process::Stdio::null())
4195            .status()
4196            .map(|s| s.success())
4197            .unwrap_or(false)
4198    }
4199}
4200
4201// ---------- doctor (single-command diagnostic) ----------
4202
4203/// One DoctorCheck = one verdict on one health dimension.
4204#[derive(Clone, Debug, serde::Serialize)]
4205pub struct DoctorCheck {
4206    /// Short stable identifier (`daemon`, `relay`, `pair_rejections`, ...).
4207    /// Stable across versions for tooling consumption.
4208    pub id: String,
4209    /// PASS / WARN / FAIL.
4210    pub status: String,
4211    /// One-line human summary.
4212    pub detail: String,
4213    /// Optional remediation hint shown after the failing line.
4214    #[serde(skip_serializing_if = "Option::is_none")]
4215    pub fix: Option<String>,
4216}
4217
4218impl DoctorCheck {
4219    fn pass(id: &str, detail: impl Into<String>) -> Self {
4220        Self {
4221            id: id.into(),
4222            status: "PASS".into(),
4223            detail: detail.into(),
4224            fix: None,
4225        }
4226    }
4227    fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
4228        Self {
4229            id: id.into(),
4230            status: "WARN".into(),
4231            detail: detail.into(),
4232            fix: Some(fix.into()),
4233        }
4234    }
4235    fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
4236        Self {
4237            id: id.into(),
4238            status: "FAIL".into(),
4239            detail: detail.into(),
4240            fix: Some(fix.into()),
4241        }
4242    }
4243}
4244
4245/// `wire doctor` — single-command diagnostic for the silent-fail classes
4246/// 0.5.11 ships fixes for. Surfaces what each fix produces (P0.1 cursor
4247/// blocks, P0.2 pair-rejection logs, P0.4 daemon version mismatch, etc.)
4248/// so operators don't have to know where each lives.
4249fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
4250    let mut checks: Vec<DoctorCheck> = Vec::new();
4251
4252    checks.push(check_daemon_health());
4253    checks.push(check_daemon_pid_consistency());
4254    checks.push(check_relay_reachable());
4255    checks.push(check_pair_rejections(recent_rejections));
4256    checks.push(check_cursor_progress());
4257
4258    let fails = checks.iter().filter(|c| c.status == "FAIL").count();
4259    let warns = checks.iter().filter(|c| c.status == "WARN").count();
4260
4261    if as_json {
4262        println!(
4263            "{}",
4264            serde_json::to_string(&json!({
4265                "checks": checks,
4266                "fail_count": fails,
4267                "warn_count": warns,
4268                "ok": fails == 0,
4269            }))?
4270        );
4271    } else {
4272        println!("wire doctor — {} checks", checks.len());
4273        for c in &checks {
4274            let bullet = match c.status.as_str() {
4275                "PASS" => "✓",
4276                "WARN" => "!",
4277                "FAIL" => "✗",
4278                _ => "?",
4279            };
4280            println!("  {bullet} [{}] {}: {}", c.status, c.id, c.detail);
4281            if let Some(fix) = &c.fix {
4282                println!("      fix: {fix}");
4283            }
4284        }
4285        println!();
4286        if fails == 0 && warns == 0 {
4287            println!("ALL GREEN");
4288        } else {
4289            println!("{fails} FAIL, {warns} WARN");
4290        }
4291    }
4292
4293    if fails > 0 {
4294        std::process::exit(1);
4295    }
4296    Ok(())
4297}
4298
4299/// Check: daemon running, exactly one instance, no orphans.
4300///
4301/// Today's debug surfaced PID 54017 (old-binary wire daemon running for 4
4302/// days, advancing cursor without pinning). `wire status` lied about it.
4303/// `wire doctor` must catch THIS class: multiple daemons running, OR
4304/// pid-file claims daemon down while a process is actually up.
4305fn check_daemon_health() -> DoctorCheck {
4306    let output = std::process::Command::new("pgrep")
4307        .args(["-f", "wire daemon"])
4308        .output();
4309    let pids: Vec<String> = match output {
4310        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
4311            .split_whitespace()
4312            .map(str::to_string)
4313            .collect(),
4314        _ => Vec::new(),
4315    };
4316
4317    match pids.len() {
4318        0 => DoctorCheck::fail(
4319            "daemon",
4320            "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
4321            "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
4322        ),
4323        1 => DoctorCheck::pass("daemon", format!("one daemon running (pid {})", pids[0])),
4324        n => DoctorCheck::warn(
4325            "daemon",
4326            format!(
4327                "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor — one writes, another overwrites. Today's exact bug class.",
4328                pids.join(", ")
4329            ),
4330            format!(
4331                "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`. P0.4 versioned-pidfile (0.5.11 spark side) will detect this automatically."
4332            ),
4333        ),
4334    }
4335}
4336
4337/// Check: structured pidfile matches running daemon. Spark's P0.4 5th
4338/// check. Surfaces version mismatch (daemon running old binary text in
4339/// memory under a current symlink — today's exact bug class), schema
4340/// drift (future format bumps), and identity contamination (daemon's
4341/// recorded DID doesn't match this box's configured DID).
4342fn check_daemon_pid_consistency() -> DoctorCheck {
4343    let record = crate::ensure_up::read_pid_record("daemon");
4344    match record {
4345        crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
4346            "daemon_pid_consistency",
4347            "no daemon.pid yet — fresh box or daemon never started",
4348        ),
4349        crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
4350            "daemon_pid_consistency",
4351            format!("daemon.pid is corrupt: {reason}"),
4352            "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
4353        ),
4354        crate::ensure_up::PidRecord::LegacyInt(pid) => DoctorCheck::warn(
4355            "daemon_pid_consistency",
4356            format!(
4357                "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
4358                 Daemon was started by a pre-0.5.11 binary."
4359            ),
4360            "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
4361        ),
4362        crate::ensure_up::PidRecord::Json(d) => {
4363            let mut issues: Vec<String> = Vec::new();
4364            if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
4365                issues.push(format!(
4366                    "schema={} (expected {})",
4367                    d.schema,
4368                    crate::ensure_up::DAEMON_PID_SCHEMA
4369                ));
4370            }
4371            let cli_version = env!("CARGO_PKG_VERSION");
4372            if d.version != cli_version {
4373                issues.push(format!(
4374                    "version daemon={} cli={cli_version}",
4375                    d.version
4376                ));
4377            }
4378            if !std::path::Path::new(&d.bin_path).exists() {
4379                issues.push(format!("bin_path {} missing on disk", d.bin_path));
4380            }
4381            // Cross-check DID + relay against current config (best-effort).
4382            if let Ok(card) = config::read_agent_card()
4383                && let Some(current_did) = card.get("did").and_then(Value::as_str)
4384                && let Some(recorded_did) = &d.did
4385                && recorded_did != current_did
4386            {
4387                issues.push(format!(
4388                    "did daemon={recorded_did} config={current_did} — identity drift"
4389                ));
4390            }
4391            if let Ok(state) = config::read_relay_state()
4392                && let Some(current_relay) = state
4393                    .get("self")
4394                    .and_then(|s| s.get("relay_url"))
4395                    .and_then(Value::as_str)
4396                && let Some(recorded_relay) = &d.relay_url
4397                && recorded_relay != current_relay
4398            {
4399                issues.push(format!(
4400                    "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
4401                ));
4402            }
4403            if issues.is_empty() {
4404                DoctorCheck::pass(
4405                    "daemon_pid_consistency",
4406                    format!(
4407                        "daemon v{} bound to {} as {}",
4408                        d.version,
4409                        d.relay_url.as_deref().unwrap_or("?"),
4410                        d.did.as_deref().unwrap_or("?")
4411                    ),
4412                )
4413            } else {
4414                DoctorCheck::warn(
4415                    "daemon_pid_consistency",
4416                    format!("daemon pidfile drift: {}", issues.join("; ")),
4417                    "`wire upgrade` to atomically restart daemon with current config".to_string(),
4418                )
4419            }
4420        }
4421    }
4422}
4423
4424/// Check: bound relay's /healthz returns 200.
4425fn check_relay_reachable() -> DoctorCheck {
4426    let state = match config::read_relay_state() {
4427        Ok(s) => s,
4428        Err(e) => return DoctorCheck::fail(
4429            "relay",
4430            format!("could not read relay state: {e}"),
4431            "run `wire up <handle>@<relay>` to bootstrap",
4432        ),
4433    };
4434    let url = state
4435        .get("self")
4436        .and_then(|s| s.get("relay_url"))
4437        .and_then(Value::as_str)
4438        .unwrap_or("");
4439    if url.is_empty() {
4440        return DoctorCheck::warn(
4441            "relay",
4442            "no relay bound — wire send/pull will not work",
4443            "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
4444        );
4445    }
4446    let client = crate::relay_client::RelayClient::new(url);
4447    match client.check_healthz() {
4448        Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
4449        Err(e) => DoctorCheck::fail(
4450            "relay",
4451            format!("{url} unreachable: {e}"),
4452            format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
4453        ),
4454    }
4455}
4456
4457/// Check: count recent entries in pair-rejected.jsonl (P0.2 output). Every
4458/// entry there is a silent failure that, pre-0.5.11, would have left the
4459/// operator wondering why pairing didn't complete.
4460fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
4461    let path = match config::state_dir() {
4462        Ok(d) => d.join("pair-rejected.jsonl"),
4463        Err(e) => return DoctorCheck::warn(
4464            "pair_rejections",
4465            format!("could not resolve state dir: {e}"),
4466            "set WIRE_HOME or fix XDG_STATE_HOME",
4467        ),
4468    };
4469    if !path.exists() {
4470        return DoctorCheck::pass(
4471            "pair_rejections",
4472            "no pair-rejected.jsonl — no recorded pair failures",
4473        );
4474    }
4475    let body = match std::fs::read_to_string(&path) {
4476        Ok(b) => b,
4477        Err(e) => return DoctorCheck::warn(
4478            "pair_rejections",
4479            format!("could not read {path:?}: {e}"),
4480            "check file permissions",
4481        ),
4482    };
4483    let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
4484    if lines.is_empty() {
4485        return DoctorCheck::pass(
4486            "pair_rejections",
4487            "pair-rejected.jsonl present but empty",
4488        );
4489    }
4490    let total = lines.len();
4491    let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
4492    let mut summary: Vec<String> = Vec::new();
4493    for line in &recent {
4494        if let Ok(rec) = serde_json::from_str::<Value>(line) {
4495            let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
4496            let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
4497            summary.push(format!("{peer}/{code}"));
4498        }
4499    }
4500    DoctorCheck::warn(
4501        "pair_rejections",
4502        format!(
4503            "{total} pair failures recorded. recent: [{}]",
4504            summary.join(", ")
4505        ),
4506        format!(
4507            "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
4508        ),
4509    )
4510}
4511
4512/// Check: cursor isn't stuck. We can't tell without polling — but we can
4513/// report the current cursor position so operators see if it changes.
4514/// Real "stuck" detection needs two pulls separated in time; defer that
4515/// behaviour to a `wire doctor --watch` mode.
4516fn check_cursor_progress() -> DoctorCheck {
4517    let state = match config::read_relay_state() {
4518        Ok(s) => s,
4519        Err(e) => return DoctorCheck::warn(
4520            "cursor",
4521            format!("could not read relay state: {e}"),
4522            "check ~/Library/Application Support/wire/relay.json",
4523        ),
4524    };
4525    let cursor = state
4526        .get("self")
4527        .and_then(|s| s.get("last_pulled_event_id"))
4528        .and_then(Value::as_str)
4529        .map(|s| s.chars().take(16).collect::<String>())
4530        .unwrap_or_else(|| "<none>".to_string());
4531    DoctorCheck::pass(
4532        "cursor",
4533        format!(
4534            "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
4535        ),
4536    )
4537}
4538
4539#[cfg(test)]
4540mod doctor_tests {
4541    use super::*;
4542
4543    #[test]
4544    fn doctor_check_constructors_set_status_correctly() {
4545        // Silent-fail-prevention rule: pass/warn/fail must be visibly
4546        // distinguishable to operators. If any constructor lets the wrong
4547        // status through, `wire doctor` lies and we're back to today's
4548        // 30-minute debug.
4549        let p = DoctorCheck::pass("x", "ok");
4550        assert_eq!(p.status, "PASS");
4551        assert_eq!(p.fix, None);
4552
4553        let w = DoctorCheck::warn("x", "watch out", "do this");
4554        assert_eq!(w.status, "WARN");
4555        assert_eq!(w.fix, Some("do this".to_string()));
4556
4557        let f = DoctorCheck::fail("x", "broken", "fix it");
4558        assert_eq!(f.status, "FAIL");
4559        assert_eq!(f.fix, Some("fix it".to_string()));
4560    }
4561
4562    #[test]
4563    fn check_pair_rejections_no_file_is_pass() {
4564        // Fresh-box case: no pair-rejected.jsonl yet. Must NOT report this
4565        // as a problem.
4566        config::test_support::with_temp_home(|| {
4567            config::ensure_dirs().unwrap();
4568            let c = check_pair_rejections(5);
4569            assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
4570        });
4571    }
4572
4573    #[test]
4574    fn check_pair_rejections_with_entries_warns() {
4575        // Existence of rejections is itself a signal — even if each entry
4576        // is a "known good failure," the operator wants to know they
4577        // happened.
4578        config::test_support::with_temp_home(|| {
4579            config::ensure_dirs().unwrap();
4580            crate::pair_invite::record_pair_rejection(
4581                "willard",
4582                "pair_drop_ack_send_failed",
4583                "POST 502",
4584            );
4585            let c = check_pair_rejections(5);
4586            assert_eq!(c.status, "WARN");
4587            assert!(c.detail.contains("1 pair failures"));
4588            assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
4589        });
4590    }
4591}
4592
4593// ---------- up megacommand (full bootstrap) ----------
4594
4595/// `wire up <nick@relay-host>` — single command from fresh box to ready-to-
4596/// pair. Composes the steps that today's onboarding walks operators through
4597/// one by one (init / bind-relay / claim / background daemon / arm monitor
4598/// recipe). Idempotent: every step checks current state and skips if done.
4599///
4600/// Argument parsing accepts:
4601///   - `<nick>@<relay-host>` — explicit relay
4602///   - `<nick>`              — defaults to wireup.net (the configured public
4603///                             relay)
4604fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
4605    let (nick, relay_url) = match handle_arg.split_once('@') {
4606        Some((n, host)) => {
4607            let url = if host.starts_with("http://") || host.starts_with("https://") {
4608                host.to_string()
4609            } else {
4610                format!("https://{host}")
4611            };
4612            (n.to_string(), url)
4613        }
4614        None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
4615    };
4616
4617    let mut report: Vec<(String, String)> = Vec::new();
4618    let mut step = |stage: &str, detail: String| {
4619        report.push((stage.to_string(), detail.clone()));
4620        if !as_json {
4621            eprintln!("wire up: {stage} — {detail}");
4622        }
4623    };
4624
4625    // 1. init (or verify existing identity matches the requested nick).
4626    if config::is_initialized()? {
4627        let card = config::read_agent_card()?;
4628        let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
4629        let existing_handle =
4630            crate::agent_card::display_handle_from_did(existing_did).to_string();
4631        if existing_handle != nick {
4632            bail!(
4633                "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
4634                 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
4635                 delete `{:?}` to start fresh.",
4636                config::config_dir()?
4637            );
4638        }
4639        step("init", format!("already initialized as {existing_handle}"));
4640    } else {
4641        cmd_init(&nick, name, Some(&relay_url), /* as_json */ false)?;
4642        step("init", format!("created identity {nick} bound to {relay_url}"));
4643    }
4644
4645    // 2. Ensure relay binding matches. cmd_init with --relay binds it; if
4646    // already initialized we may need to bind to the requested relay
4647    // separately (operator switched relays).
4648    let relay_state = config::read_relay_state()?;
4649    let bound_relay = relay_state
4650        .get("self")
4651        .and_then(|s| s.get("relay_url"))
4652        .and_then(Value::as_str)
4653        .unwrap_or("")
4654        .to_string();
4655    if bound_relay.is_empty() {
4656        // Identity exists but never bound to a relay — bind now.
4657        cmd_bind_relay(&relay_url, /* as_json */ false)?;
4658        step("bind-relay", format!("bound to {relay_url}"));
4659    } else if bound_relay != relay_url {
4660        step(
4661            "bind-relay",
4662            format!(
4663                "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
4664                 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
4665            ),
4666        );
4667    } else {
4668        step("bind-relay", format!("already bound to {bound_relay}"));
4669    }
4670
4671    // 3. Claim nick on the relay's handle directory. Idempotent — same-DID
4672    // re-claims are accepted by the relay.
4673    match cmd_claim(&nick, Some(&relay_url), None, /* as_json */ false) {
4674        Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
4675        Err(e) => step(
4676            "claim",
4677            format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
4678        ),
4679    }
4680
4681    // 4. Background daemon — must be running for pull/push/ack to flow.
4682    match crate::ensure_up::ensure_daemon_running() {
4683        Ok(true) => step("daemon", "started fresh background daemon".to_string()),
4684        Ok(false) => step("daemon", "already running".to_string()),
4685        Err(e) => step(
4686            "daemon",
4687            format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
4688        ),
4689    }
4690
4691    // 5. Final summary — point operator at the next commands.
4692    let summary = format!(
4693        "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
4694         `wire monitor` to watch incoming events."
4695    );
4696    step("ready", summary.clone());
4697
4698    if as_json {
4699        let steps_json: Vec<_> = report
4700            .iter()
4701            .map(|(k, v)| json!({"stage": k, "detail": v}))
4702            .collect();
4703        println!(
4704            "{}",
4705            serde_json::to_string(&json!({
4706                "nick": nick,
4707                "relay": relay_url,
4708                "steps": steps_json,
4709            }))?
4710        );
4711    }
4712    Ok(())
4713}
4714
4715/// Strip http:// or https:// prefix for display in `wire up` step output.
4716fn strip_proto(url: &str) -> String {
4717    url.trim_start_matches("https://")
4718        .trim_start_matches("http://")
4719        .to_string()
4720}
4721
4722// ---------- pair megacommand (zero-paste handle-based) ----------
4723
4724/// `wire pair <nick@domain>` zero-shot. Dispatched from Command::Pair when
4725/// the handle is in `nick@domain` form. Wraps:
4726///
4727///   1. cmd_add — resolve, pin, drop intro
4728///   2. Wait up to `timeout_secs` for the peer's `pair_drop_ack` to arrive
4729///      (signalled by `peers.<handle>.slot_token` populating in relay state)
4730///   3. Verify bilateral pin: trust contains peer + relay state has token
4731///   4. Print final state — both sides VERIFIED + can `wire send`
4732///
4733/// On timeout: hard-errors with the specific stuck step so the operator
4734/// knows which side to chase. No silent partial success.
4735fn cmd_pair_megacommand(
4736    handle_arg: &str,
4737    relay_override: Option<&str>,
4738    timeout_secs: u64,
4739    _as_json: bool,
4740) -> Result<()> {
4741    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4742    let peer_handle = parsed.nick.clone();
4743
4744    eprintln!("wire pair: resolving {handle_arg}...");
4745    cmd_add(handle_arg, relay_override, /* as_json */ false)?;
4746
4747    eprintln!(
4748        "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
4749         to ack (their daemon must be running + pulling)..."
4750    );
4751
4752    // Trigger an immediate daemon-style pull so we don't wait the full daemon
4753    // interval. Best-effort — if it fails, we still fall through to the
4754    // polling loop.
4755    let _ = run_sync_pull();
4756
4757    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
4758    let poll_interval = std::time::Duration::from_millis(500);
4759
4760    loop {
4761        // Drain anything new from the relay (e.g. our pair_drop_ack landing).
4762        let _ = run_sync_pull();
4763        let relay_state = config::read_relay_state()?;
4764        let peer_entry = relay_state
4765            .get("peers")
4766            .and_then(|p| p.get(&peer_handle))
4767            .cloned();
4768        let token = peer_entry
4769            .as_ref()
4770            .and_then(|e| e.get("slot_token"))
4771            .and_then(Value::as_str)
4772            .unwrap_or("");
4773
4774        if !token.is_empty() {
4775            // Bilateral pin complete — we have their slot_token, we can send.
4776            let trust = config::read_trust()?;
4777            let pinned_in_trust = trust
4778                .get("agents")
4779                .and_then(|a| a.get(&peer_handle))
4780                .is_some();
4781            println!(
4782                "wire pair: paired with {peer_handle}.\n  trust: {}  bilateral: yes (slot_token recorded)\n  next: `wire send {peer_handle} \"<msg>\"`",
4783                if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
4784            );
4785            return Ok(());
4786        }
4787
4788        if std::time::Instant::now() >= deadline {
4789            // Timeout — surface the EXACT stuck step. Likely culprits:
4790            //   - peer daemon not running on their box
4791            //   - peer's relay slot is offline
4792            //   - their daemon is on an older binary that doesn't know
4793            //     pair_drop kind=1100 (the P0.1 class — now visible via
4794            //     wire pull --json on their side as a blocking rejection)
4795            bail!(
4796                "wire pair: timed out after {timeout_secs}s. \
4797                 peer {peer_handle} never sent pair_drop_ack. \
4798                 likely causes: (a) their daemon is down — ask them to run \
4799                 `wire status` and `wire daemon &`; (b) their binary is older \
4800                 than 0.5.x and doesn't understand pair_drop events — ask \
4801                 them to `wire upgrade`; (c) network / relay blip — re-run \
4802                 `wire pair {handle_arg}` to retry."
4803            );
4804        }
4805
4806        std::thread::sleep(poll_interval);
4807    }
4808}
4809
4810fn cmd_claim(
4811    nick: &str,
4812    relay_override: Option<&str>,
4813    public_url: Option<&str>,
4814    as_json: bool,
4815) -> Result<()> {
4816    if !crate::pair_profile::is_valid_nick(nick) {
4817        bail!(
4818            "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
4819        );
4820    }
4821    // `wire claim` is the one-step bootstrap: auto-init + auto-allocate slot
4822    // + claim handle. Operator should never have to run init/bind-relay first.
4823    let (_did, relay_url, slot_id, slot_token) =
4824        crate::pair_invite::ensure_self_with_relay(relay_override)?;
4825    let card = config::read_agent_card()?;
4826
4827    let client = crate::relay_client::RelayClient::new(&relay_url);
4828    let resp = client.handle_claim(nick, &slot_id, &slot_token, public_url, &card)?;
4829
4830    if as_json {
4831        println!(
4832            "{}",
4833            serde_json::to_string(&json!({
4834                "nick": nick,
4835                "relay": relay_url,
4836                "response": resp,
4837            }))?
4838        );
4839    } else {
4840        // Best-effort: derive the public domain from the relay URL. If
4841        // operator passed --public-url that's the canonical address; else
4842        // the relay URL itself. Falls back to a placeholder if both miss.
4843        let domain = public_url
4844            .unwrap_or(&relay_url)
4845            .trim_start_matches("https://")
4846            .trim_start_matches("http://")
4847            .trim_end_matches('/')
4848            .split('/')
4849            .next()
4850            .unwrap_or("<this-relay-domain>")
4851            .to_string();
4852        println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
4853        println!("verify with: wire whois {nick}@{domain}");
4854    }
4855    Ok(())
4856}
4857
4858fn cmd_profile(action: ProfileAction) -> Result<()> {
4859    match action {
4860        ProfileAction::Set { field, value, json } => {
4861            // Try parsing the value as JSON; if that fails, treat it as a
4862            // bare string. Lets operators pass either `42` or `"hello"` or
4863            // `["rust","late-night"]` without quoting hell.
4864            let parsed: Value =
4865                serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
4866            let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
4867            if json {
4868                println!(
4869                    "{}",
4870                    serde_json::to_string(&json!({
4871                        "field": field,
4872                        "profile": new_profile,
4873                    }))?
4874                );
4875            } else {
4876                println!("profile.{field} set");
4877            }
4878        }
4879        ProfileAction::Get { json } => return cmd_whois(None, json, None),
4880        ProfileAction::Clear { field, json } => {
4881            let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
4882            if json {
4883                println!(
4884                    "{}",
4885                    serde_json::to_string(&json!({
4886                        "field": field,
4887                        "cleared": true,
4888                        "profile": new_profile,
4889                    }))?
4890                );
4891            } else {
4892                println!("profile.{field} cleared");
4893            }
4894        }
4895    }
4896    Ok(())
4897}
4898
4899// ---------- setup — one-shot MCP host registration ----------
4900
4901fn cmd_setup(apply: bool) -> Result<()> {
4902    use std::path::PathBuf;
4903
4904    let entry = json!({"command": "wire", "args": ["mcp"]});
4905    let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
4906
4907    // Detect probable MCP host config locations. Cross-platform — we only
4908    // touch the file if it already exists OR --apply was passed.
4909    let mut targets: Vec<(&str, PathBuf)> = Vec::new();
4910    if let Some(home) = dirs::home_dir() {
4911        // Claude Code (CLI) — real config path is ~/.claude.json on all platforms (Linux/macOS/Windows).
4912        // The mcpServers map lives at the top level of that file.
4913        targets.push(("Claude Code", home.join(".claude.json")));
4914        // Legacy / alternate Claude Code XDG path — still try, harmless if absent.
4915        targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
4916        // Claude Desktop macOS
4917        #[cfg(target_os = "macos")]
4918        targets.push((
4919            "Claude Desktop (macOS)",
4920            home.join("Library/Application Support/Claude/claude_desktop_config.json"),
4921        ));
4922        // Claude Desktop Windows
4923        #[cfg(target_os = "windows")]
4924        if let Ok(appdata) = std::env::var("APPDATA") {
4925            targets.push((
4926                "Claude Desktop (Windows)",
4927                PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
4928            ));
4929        }
4930        // Cursor
4931        targets.push(("Cursor", home.join(".cursor/mcp.json")));
4932    }
4933    // Project-local — works for several MCP-aware tools
4934    targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
4935
4936    println!("wire setup\n");
4937    println!("MCP server snippet (add this to your client's mcpServers):");
4938    println!();
4939    println!("{entry_pretty}");
4940    println!();
4941
4942    if !apply {
4943        println!("Probable MCP host config locations on this machine:");
4944        for (name, path) in &targets {
4945            let marker = if path.exists() {
4946                "✓ found"
4947            } else {
4948                "  (would create)"
4949            };
4950            println!("  {marker:14}  {name}: {}", path.display());
4951        }
4952        println!();
4953        println!("Run `wire setup --apply` to merge wire into each config above.");
4954        println!(
4955            "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
4956        );
4957        return Ok(());
4958    }
4959
4960    let mut modified: Vec<String> = Vec::new();
4961    let mut skipped: Vec<String> = Vec::new();
4962    for (name, path) in &targets {
4963        match upsert_mcp_entry(path, "wire", &entry) {
4964            Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
4965            Ok(false) => skipped.push(format!("  {name} ({}): already configured", path.display())),
4966            Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
4967        }
4968    }
4969    if !modified.is_empty() {
4970        println!("Modified:");
4971        for line in &modified {
4972            println!("  {line}");
4973        }
4974        println!();
4975        println!("Restart the app(s) above to load wire MCP.");
4976    }
4977    if !skipped.is_empty() {
4978        println!();
4979        println!("Skipped:");
4980        for line in &skipped {
4981            println!("  {line}");
4982        }
4983    }
4984    Ok(())
4985}
4986
4987/// Idempotent merge of an `mcpServers.<name>` entry into a JSON config file.
4988/// Returns Ok(true) if file was changed, Ok(false) if entry already matched.
4989fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
4990    let mut cfg: Value = if path.exists() {
4991        let body = std::fs::read_to_string(path).context("reading config")?;
4992        serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
4993    } else {
4994        json!({})
4995    };
4996    if !cfg.is_object() {
4997        cfg = json!({});
4998    }
4999    let root = cfg.as_object_mut().unwrap();
5000    let servers = root
5001        .entry("mcpServers".to_string())
5002        .or_insert_with(|| json!({}));
5003    if !servers.is_object() {
5004        *servers = json!({});
5005    }
5006    let map = servers.as_object_mut().unwrap();
5007    if map.get(server_name) == Some(entry) {
5008        return Ok(false);
5009    }
5010    map.insert(server_name.to_string(), entry.clone());
5011    if let Some(parent) = path.parent()
5012        && !parent.as_os_str().is_empty()
5013    {
5014        std::fs::create_dir_all(parent).context("creating parent dir")?;
5015    }
5016    let out = serde_json::to_string_pretty(&cfg)? + "\n";
5017    std::fs::write(path, out).context("writing config")?;
5018    Ok(true)
5019}
5020
5021// ---------- reactor — event-handler dispatch loop ----------
5022
5023#[allow(clippy::too_many_arguments)]
5024fn cmd_reactor(
5025    on_event: &str,
5026    peer_filter: Option<&str>,
5027    kind_filter: Option<&str>,
5028    verified_only: bool,
5029    interval_secs: u64,
5030    once: bool,
5031    dry_run: bool,
5032    max_per_minute: u32,
5033    max_chain_depth: u32,
5034) -> Result<()> {
5035    use crate::inbox_watch::{InboxEvent, InboxWatcher};
5036    use std::collections::{HashMap, HashSet, VecDeque};
5037    use std::io::Write;
5038    use std::process::{Command, Stdio};
5039    use std::time::{Duration, Instant};
5040
5041    let cursor_path = config::state_dir()?.join("reactor.cursor");
5042    // event_ids THIS reactor's handler has caused to be sent (via wire send).
5043    // Used by chain-depth check — an incoming `(re:X)` where X is in this set
5044    // means peer is replying to something we just said → don't reply back.
5045    //
5046    // Persisted across restarts so a reactor that crashes mid-conversation
5047    // doesn't re-enter the loop. Reads on startup, writes after each
5048    // outbox-grow detection. Capped at 500 entries (LRU-ish — old entries
5049    // dropped from front of file).
5050    let emitted_path = config::state_dir()?.join("reactor-emitted.log");
5051    let mut emitted_ids: HashSet<String> = HashSet::new();
5052    if emitted_path.exists()
5053        && let Ok(body) = std::fs::read_to_string(&emitted_path)
5054    {
5055        for line in body.lines() {
5056            let t = line.trim();
5057            if !t.is_empty() {
5058                emitted_ids.insert(t.to_string());
5059            }
5060        }
5061    }
5062    // Outbox file paths the reactor watches for new sent-event_ids.
5063    let outbox_dir = config::outbox_dir()?;
5064    // (peer → file size we've already scanned). Lets us notice new outbox
5065    // appends without re-reading the whole file each sweep.
5066    let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
5067
5068    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
5069
5070    let kind_num: Option<u32> = match kind_filter {
5071        Some(k) => Some(parse_kind(k)?),
5072        None => None,
5073    };
5074
5075    // Per-peer sliding window of dispatch instants for rate-limit check.
5076    let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
5077
5078    let dispatch = |ev: &InboxEvent,
5079                    peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
5080                    emitted_ids: &HashSet<String>|
5081     -> Result<bool> {
5082        if let Some(p) = peer_filter
5083            && ev.peer != p
5084        {
5085            return Ok(false);
5086        }
5087        if verified_only && !ev.verified {
5088            return Ok(false);
5089        }
5090        if let Some(want) = kind_num {
5091            let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
5092            if ev_kind != Some(want) {
5093                return Ok(false);
5094            }
5095        }
5096
5097        // Chain-depth check: if the body contains `(re:<event_id>)` and that
5098        // event_id is in our emitted set, this is a reply to one of our
5099        // replies → loop suspected, skip.
5100        if max_chain_depth > 0 {
5101            let body_str = match &ev.raw["body"] {
5102                Value::String(s) => s.clone(),
5103                other => serde_json::to_string(other).unwrap_or_default(),
5104            };
5105            if let Some(referenced) = parse_re_marker(&body_str) {
5106                // Handler scripts usually truncate event_id (e.g. ${ID:0:12}).
5107                // Match emitted set by prefix to catch both full + truncated.
5108                let matched = emitted_ids.contains(&referenced)
5109                    || emitted_ids.iter().any(|full| full.starts_with(&referenced));
5110                if matched {
5111                    eprintln!(
5112                        "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
5113                        ev.event_id, ev.peer, referenced
5114                    );
5115                    return Ok(false);
5116                }
5117            }
5118        }
5119
5120        // Per-peer rate-limit check (sliding 60s window).
5121        if max_per_minute > 0 {
5122            let now = Instant::now();
5123            let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
5124            while let Some(&front) = win.front() {
5125                if now.duration_since(front) > Duration::from_secs(60) {
5126                    win.pop_front();
5127                } else {
5128                    break;
5129                }
5130            }
5131            if win.len() as u32 >= max_per_minute {
5132                eprintln!(
5133                    "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
5134                    ev.event_id, ev.peer, max_per_minute
5135                );
5136                return Ok(false);
5137            }
5138            win.push_back(now);
5139        }
5140
5141        if dry_run {
5142            println!("{}", serde_json::to_string(&ev.raw)?);
5143            return Ok(true);
5144        }
5145
5146        let mut child = Command::new("sh")
5147            .arg("-c")
5148            .arg(on_event)
5149            .stdin(Stdio::piped())
5150            .stdout(Stdio::inherit())
5151            .stderr(Stdio::inherit())
5152            .env("WIRE_EVENT_PEER", &ev.peer)
5153            .env("WIRE_EVENT_ID", &ev.event_id)
5154            .env("WIRE_EVENT_KIND", &ev.kind)
5155            .spawn()
5156            .with_context(|| format!("spawning reactor handler: {on_event}"))?;
5157        if let Some(mut stdin) = child.stdin.take() {
5158            let body = serde_json::to_vec(&ev.raw)?;
5159            let _ = stdin.write_all(&body);
5160            let _ = stdin.write_all(b"\n");
5161        }
5162        std::mem::drop(child);
5163        Ok(true)
5164    };
5165
5166    // Scan outbox files for newly-appended event_ids and add to emitted set.
5167    let scan_outbox = |emitted_ids: &mut HashSet<String>,
5168                       outbox_cursors: &mut HashMap<String, u64>|
5169     -> Result<usize> {
5170        if !outbox_dir.exists() {
5171            return Ok(0);
5172        }
5173        let mut added = 0;
5174        let mut new_ids: Vec<String> = Vec::new();
5175        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
5176            let path = entry.path();
5177            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
5178                continue;
5179            }
5180            let peer = match path.file_stem().and_then(|s| s.to_str()) {
5181                Some(s) => s.to_string(),
5182                None => continue,
5183            };
5184            let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
5185            let start = *outbox_cursors.get(&peer).unwrap_or(&0);
5186            if cur_len <= start {
5187                outbox_cursors.insert(peer, start);
5188                continue;
5189            }
5190            let body = std::fs::read_to_string(&path).unwrap_or_default();
5191            let tail = &body[start as usize..];
5192            for line in tail.lines() {
5193                if let Ok(v) = serde_json::from_str::<Value>(line)
5194                    && let Some(eid) = v.get("event_id").and_then(Value::as_str)
5195                    && emitted_ids.insert(eid.to_string())
5196                {
5197                    new_ids.push(eid.to_string());
5198                    added += 1;
5199                }
5200            }
5201            outbox_cursors.insert(peer, cur_len);
5202        }
5203        if !new_ids.is_empty() {
5204            // Append new ids to disk, cap on-disk file at 500 entries.
5205            let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
5206            if all.len() > 500 {
5207                all.sort();
5208                let drop_n = all.len() - 500;
5209                let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
5210                emitted_ids.retain(|x| !dropped.contains(x));
5211                all = emitted_ids.iter().cloned().collect();
5212            }
5213            let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
5214        }
5215        Ok(added)
5216    };
5217
5218    let sweep = |watcher: &mut InboxWatcher,
5219                 emitted_ids: &mut HashSet<String>,
5220                 outbox_cursors: &mut HashMap<String, u64>,
5221                 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
5222     -> Result<usize> {
5223        // Pick up any event_ids we sent since last sweep.
5224        let _ = scan_outbox(emitted_ids, outbox_cursors);
5225
5226        let events = watcher.poll()?;
5227        let mut fired = 0usize;
5228        for ev in &events {
5229            match dispatch(ev, peer_dispatch_log, emitted_ids) {
5230                Ok(true) => fired += 1,
5231                Ok(false) => {}
5232                Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
5233            }
5234        }
5235        watcher.save_cursors(&cursor_path)?;
5236        Ok(fired)
5237    };
5238
5239    if once {
5240        sweep(
5241            &mut watcher,
5242            &mut emitted_ids,
5243            &mut outbox_cursors,
5244            &mut peer_dispatch_log,
5245        )?;
5246        return Ok(());
5247    }
5248    let interval = std::time::Duration::from_secs(interval_secs.max(1));
5249    loop {
5250        if let Err(e) = sweep(
5251            &mut watcher,
5252            &mut emitted_ids,
5253            &mut outbox_cursors,
5254            &mut peer_dispatch_log,
5255        ) {
5256            eprintln!("wire reactor: sweep error: {e}");
5257        }
5258        std::thread::sleep(interval);
5259    }
5260}
5261
5262/// Parse `(re:<event_id>)` marker out of an event body. Returns the
5263/// referenced event_id (full or prefix) if present. Tolerates spaces.
5264fn parse_re_marker(body: &str) -> Option<String> {
5265    let needle = "(re:";
5266    let i = body.find(needle)?;
5267    let rest = &body[i + needle.len()..];
5268    let end = rest.find(')')?;
5269    let id = rest[..end].trim().to_string();
5270    if id.is_empty() {
5271        return None;
5272    }
5273    Some(id)
5274}
5275
5276// ---------- notify (Goal 2) ----------
5277
5278fn cmd_notify(
5279    interval_secs: u64,
5280    peer_filter: Option<&str>,
5281    once: bool,
5282    as_json: bool,
5283) -> Result<()> {
5284    use crate::inbox_watch::InboxWatcher;
5285    let cursor_path = config::state_dir()?.join("notify.cursor");
5286    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
5287
5288    let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
5289        let events = watcher.poll()?;
5290        for ev in events {
5291            if let Some(p) = peer_filter
5292                && ev.peer != p
5293            {
5294                continue;
5295            }
5296            if as_json {
5297                println!("{}", serde_json::to_string(&ev)?);
5298            } else {
5299                os_notify_inbox_event(&ev);
5300            }
5301        }
5302        watcher.save_cursors(&cursor_path)?;
5303        Ok(())
5304    };
5305
5306    if once {
5307        return sweep(&mut watcher);
5308    }
5309
5310    let interval = std::time::Duration::from_secs(interval_secs.max(1));
5311    loop {
5312        if let Err(e) = sweep(&mut watcher) {
5313            eprintln!("wire notify: sweep error: {e}");
5314        }
5315        std::thread::sleep(interval);
5316    }
5317}
5318
5319fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
5320    let title = if ev.verified {
5321        format!("wire ← {}", ev.peer)
5322    } else {
5323        format!("wire ← {} (UNVERIFIED)", ev.peer)
5324    };
5325    let body = format!("{}: {}", ev.kind, ev.body_preview);
5326    crate::os_notify::toast(&title, &body);
5327}
5328
5329#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
5330fn os_toast(title: &str, body: &str) {
5331    eprintln!("[wire notify] {title}\n  {body}");
5332}
5333
5334// Integration tests for the CLI live in `tests/cli.rs` (cargo's tests/ dir).