Skip to main content

wire/
cli.rs

1//! `wire` CLI surface.
2//!
3//! Every subcommand emits human-readable text by default and structured JSON
4//! when `--json` is passed. Stable JSON shape is part of the API contract —
5//! see `docs/AGENT_INTEGRATION.md`.
6//!
7//! Subcommand split:
8//!   - **agent-safe**: `whoami`, `peers`, `verify`, `send`, `tail` — pure
9//!     message-layer ops, no trust establishment.
10//!   - **trust-establishing**: `init`, `pair-host`, `pair-join`. The CLI
11//!     uses interactive `y/N` prompts here. The MCP equivalents
12//!     (`wire_init`, `wire_pair_initiate`, `wire_pair_join`, `wire_pair_check`,
13//!     `wire_pair_confirm`) preserve the human gate by requiring the user to
14//!     type the 6 SAS digits back into chat — see `docs/THREAT_MODEL.md` T10/T14.
15
16use anyhow::{Context, Result, anyhow, bail};
17use clap::{Parser, Subcommand};
18use serde_json::{Value, json};
19
20use crate::{
21    agent_card::{build_agent_card, sign_agent_card},
22    config,
23    signing::{fingerprint, generate_keypair, make_key_id, sign_message_v31, verify_message_v31},
24    trust::{add_self_to_trust, empty_trust},
25};
26
27/// Top-level CLI.
28#[derive(Parser, Debug)]
29#[command(name = "wire", version, about = "Magic-wormhole for AI agents — bilateral signed-message bus", long_about = None)]
30pub struct Cli {
31    #[command(subcommand)]
32    pub command: Command,
33}
34
35#[derive(Subcommand, Debug)]
36pub enum Command {
37    /// Generate a keypair, write self-card, and prepare to pair. (HUMAN-ONLY — DO NOT exec from agents.)
38    Init {
39        /// Short handle for this agent (becomes did:wire:<handle>).
40        handle: String,
41        /// Optional display name (defaults to capitalized handle).
42        #[arg(long)]
43        name: Option<String>,
44        /// Optional relay URL — if set, also allocates a relay slot in one step
45        /// (equivalent to running `wire init` then `wire bind-relay <url>`).
46        #[arg(long)]
47        relay: Option<String>,
48        /// Emit JSON.
49        #[arg(long)]
50        json: bool,
51    },
52    // (Old `Join` stub removed in iter 11 — superseded by `pair-join` with
53    // `join` alias. See PairJoin below.)
54    /// Print this agent's identity (DID, fingerprint, mailbox slot).
55    Whoami {
56        #[arg(long)]
57        json: bool,
58    },
59    /// List pinned peers with their tiers and capabilities.
60    Peers {
61        #[arg(long)]
62        json: bool,
63    },
64    /// Sign and queue an event to a peer.
65    ///
66    /// Forms (P0.S 0.5.11):
67    ///   wire send <peer> <body>              # kind defaults to "claim"
68    ///   wire send <peer> <kind> <body>       # explicit kind (back-compat)
69    ///   wire send <peer> -                   # body from stdin (kind=claim)
70    ///   wire send <peer> @/path/to/body.json # body from file
71    Send {
72        /// Peer handle (without `did:wire:` prefix).
73        peer: String,
74        /// When `<body>` is omitted, this is the event body (kind defaults
75        /// to `claim`). When both this and `<body>` are given, this is the
76        /// event kind (`decision`, `claim`, etc., or numeric kind id) and
77        /// the next positional is the body.
78        kind_or_body: String,
79        /// Event body — free-form text, `@/path/to/body.json` to load from
80        /// a file, or `-` to read from stdin. Optional; omit to use
81        /// `<kind_or_body>` as the body with kind=`claim`.
82        body: Option<String>,
83        /// Advisory deadline: duration (`30m`, `2h`, `1d`) or RFC3339 timestamp.
84        #[arg(long)]
85        deadline: Option<String>,
86        /// Emit JSON.
87        #[arg(long)]
88        json: bool,
89    },
90    /// Stream signed events from peers.
91    Tail {
92        /// Optional peer filter; if omitted, tails all peers.
93        peer: Option<String>,
94        /// Emit JSONL (one event per line).
95        #[arg(long)]
96        json: bool,
97        /// Maximum events to read before exiting (0 = stream until SIGINT).
98        #[arg(long, default_value_t = 0)]
99        limit: usize,
100    },
101    /// Live tail of new inbox events across all pinned peers — one line per
102    /// new event, handshake (pair_drop / pair_drop_ack / heartbeat) filtered
103    /// by default.
104    ///
105    /// Designed to be left running in an agent harness's stream-watcher
106    /// (Claude Code Monitor tool, etc.) so peer messages surface in the
107    /// session as they arrive, not on next manual `wire pull`.
108    ///
109    /// See docs/AGENT_INTEGRATION.md for the recommended Monitor invocation
110    /// template.
111    Monitor {
112        /// Only show events from this peer.
113        #[arg(long)]
114        peer: Option<String>,
115        /// Emit JSONL (one InboxEvent per line) for tooling consumption.
116        #[arg(long)]
117        json: bool,
118        /// Include handshake events (pair_drop, pair_drop_ack, heartbeat).
119        /// Default filters them out as noise.
120        #[arg(long)]
121        include_handshake: bool,
122        /// Poll interval in milliseconds. Lower = lower latency, higher CPU.
123        #[arg(long, default_value_t = 500)]
124        interval_ms: u64,
125        /// Replay last N events from history before going live (0 = none).
126        #[arg(long, default_value_t = 0)]
127        replay: usize,
128    },
129    /// Verify a signed event from a JSON file or stdin (`-`).
130    Verify {
131        /// Path to event JSON, or `-` for stdin.
132        path: String,
133        /// Emit JSON.
134        #[arg(long)]
135        json: bool,
136    },
137    /// Run the MCP (Model Context Protocol) server over stdio.
138    /// This is how Claude Desktop / Claude Code / Cursor / etc. expose
139    /// `wire_send`, `wire_tail`, etc. as native tools.
140    Mcp,
141    /// Run a relay server on this host.
142    RelayServer {
143        /// Bind address (e.g. `127.0.0.1:8770`).
144        #[arg(long, default_value = "127.0.0.1:8770")]
145        bind: String,
146        /// v0.5.17: refuse non-loopback binds, skip phonebook listing,
147        /// skip `.well-known/wire/agent` serving. The relay becomes
148        /// invisible from outside the box — only same-machine processes
149        /// can pair through it. Right call for within-machine agent
150        /// coordination where you don't want metadata leaking to a
151        /// public relay. Pair this with `wire session new` which probes
152        /// `127.0.0.1:8771` and allocates a local slot automatically.
153        #[arg(long)]
154        local_only: bool,
155    },
156    /// Allocate a slot on a relay; bind it to this agent's identity.
157    BindRelay {
158        /// Relay base URL, e.g. `http://127.0.0.1:8770`.
159        url: String,
160        #[arg(long)]
161        json: bool,
162    },
163    /// Manually pin a peer's relay slot. (Replaces SAS pairing for v0.1 bootstrap;
164    /// real `wire join` lands in the SPAKE2 iter.)
165    AddPeerSlot {
166        /// Peer handle (becomes did:wire:<handle>).
167        handle: String,
168        /// Peer's relay base URL.
169        url: String,
170        /// Peer's slot id.
171        slot_id: String,
172        /// Slot bearer token (shared between paired peers in v0.1).
173        slot_token: String,
174        #[arg(long)]
175        json: bool,
176    },
177    /// Drain outbox JSONL files to peers' relay slots.
178    Push {
179        /// Optional peer filter; default = all peers with outbox entries.
180        peer: Option<String>,
181        #[arg(long)]
182        json: bool,
183    },
184    /// Pull events from our relay slot, verify, write to inbox.
185    Pull {
186        #[arg(long)]
187        json: bool,
188    },
189    /// Print a summary of identity, relay binding, peers, inbox/outbox queue depth.
190    /// Useful as a single "where am I" check.
191    Status {
192        /// Inspect a paired peer's transport / attention / responder health.
193        #[arg(long)]
194        peer: Option<String>,
195        #[arg(long)]
196        json: bool,
197    },
198    /// Publish or inspect auto-responder health for this slot.
199    Responder {
200        #[command(subcommand)]
201        command: ResponderCommand,
202    },
203    /// Pin a peer's signed agent-card from a file. (Manual out-of-band pairing
204    /// — fallback path; the magic-wormhole flow is `pair-host` / `pair-join`.)
205    Pin {
206        /// Path to peer's signed agent-card JSON.
207        card_file: String,
208        #[arg(long)]
209        json: bool,
210    },
211    /// Allocate a NEW slot on the same relay and abandon the old one.
212    /// Sends a kind=1201 wire_close event to every paired peer over the OLD
213    /// slot announcing the new mailbox before swapping. After rotation,
214    /// peers must re-pair (or operator runs `add-peer-slot` with the new
215    /// coords) — auto-update via wire_close is a v0.2 daemon feature.
216    ///
217    /// Use case: a paired peer turned hostile (T11 in THREAT_MODEL.md —
218    /// abusive bearer-holder spamming your slot). Rotate → old slot is
219    /// orphaned → attacker's leverage gone. Operator pairs again with
220    /// peers they still want.
221    RotateSlot {
222        /// Skip the wire_close announcement to peers (faster but they won't know
223        /// where you went).
224        #[arg(long)]
225        no_announce: bool,
226        #[arg(long)]
227        json: bool,
228    },
229    /// Remove a peer from trust + relay state. Inbox/outbox files for that
230    /// peer are NOT deleted (operator can grep history); pass --purge to
231    /// also wipe the JSONL files.
232    ForgetPeer {
233        /// Peer handle to forget.
234        handle: String,
235        /// Also delete inbox/<handle>.jsonl and outbox/<handle>.jsonl.
236        #[arg(long)]
237        purge: bool,
238        #[arg(long)]
239        json: bool,
240    },
241    /// Run a long-lived sync loop: every <interval> seconds, push outbox to
242    /// peers' relay slots and pull inbox from our own slot. Foreground process;
243    /// background it with systemd / `&` / tmux as you prefer.
244    Daemon {
245        /// Sync interval in seconds. Default 5.
246        #[arg(long, default_value_t = 5)]
247        interval: u64,
248        /// Run a single sync cycle and exit (useful for cron-driven setups).
249        #[arg(long)]
250        once: bool,
251        #[arg(long)]
252        json: bool,
253    },
254    /// Host a SAS-confirmed pairing. Generates a code phrase, prints it, waits
255    /// for a peer to `pair-join`, exchanges signed agent-cards via SPAKE2 +
256    /// ChaCha20-Poly1305. Auto-pins on success. (HUMAN-ONLY — operator must
257    /// read the SAS digits aloud and confirm.)
258    PairHost {
259        /// Relay base URL.
260        #[arg(long)]
261        relay: String,
262        /// Skip the SAS confirmation prompt. ONLY use when piping under
263        /// automated tests or when the SAS has already been verified by
264        /// another channel. Documented as test-only.
265        #[arg(long)]
266        yes: bool,
267        /// How long (seconds) to wait for the peer to join before timing out.
268        #[arg(long, default_value_t = 300)]
269        timeout: u64,
270        /// Detach: write a pending-pair file, print the code phrase, and exit
271        /// immediately. The running `wire daemon` does the handshake in the
272        /// background; confirm SAS later via `wire pair-confirm <code> <digits>`.
273        /// `wire pair-list` shows pending sessions. Default is foreground
274        /// blocking behavior for backward compat.
275        #[arg(long)]
276        detach: bool,
277        /// Emit JSON instead of text. Currently only meaningful with --detach.
278        #[arg(long)]
279        json: bool,
280    },
281    /// Join a pair-slot using a code phrase from the host. (HUMAN-ONLY.)
282    ///
283    /// Aliased as `wire join <code>` for magic-wormhole muscle-memory.
284    #[command(alias = "join")]
285    PairJoin {
286        /// Code phrase from the host's `pair-host` output (e.g. `73-2QXC4P`).
287        code_phrase: String,
288        /// Relay base URL (must match the host's relay).
289        #[arg(long)]
290        relay: String,
291        #[arg(long)]
292        yes: bool,
293        #[arg(long, default_value_t = 300)]
294        timeout: u64,
295        /// Detach: see `pair-host --detach`.
296        #[arg(long)]
297        detach: bool,
298        /// Emit JSON instead of text. Currently only meaningful with --detach.
299        #[arg(long)]
300        json: bool,
301    },
302    /// Confirm SAS digits for a detached pending pair. The daemon must be
303    /// running for this to do anything — it picks up the confirmation on its
304    /// next tick. Mismatch aborts the pair.
305    PairConfirm {
306        /// The code phrase the original `wire pair-host --detach` printed.
307        code_phrase: String,
308        /// 6 digits as displayed by `wire pair-list` (dashes/spaces stripped).
309        digits: String,
310        /// Emit JSON instead of human-readable text.
311        #[arg(long)]
312        json: bool,
313    },
314    /// List all pending detached pair sessions and their state.
315    PairList {
316        /// Emit JSON instead of the table.
317        #[arg(long)]
318        json: bool,
319        /// Stream mode: never exit; print one JSON line per status transition
320        /// (creation, status change, deletion) across all pending pairs.
321        /// Compose with bash `while read` to react in shell. Implies --json.
322        #[arg(long)]
323        watch: bool,
324        /// Poll interval in seconds for --watch.
325        #[arg(long, default_value_t = 1)]
326        watch_interval: u64,
327    },
328    /// Cancel a pending pair. Releases the relay slot and removes the pending file.
329    PairCancel {
330        code_phrase: String,
331        #[arg(long)]
332        json: bool,
333    },
334    /// Block until a pending pair reaches a target status (default sas_ready),
335    /// or terminates (finalized = file removed, aborted, aborted_restart), or
336    /// the timeout expires. Useful for shell scripts that want to drive the
337    /// detached flow without polling pair-list themselves.
338    ///
339    /// Exit codes:
340    ///   0 — reached target status (or finalized, if target was sas_ready)
341    ///   1 — terminated abnormally (aborted, aborted_restart, no such code)
342    ///   2 — timeout
343    PairWatch {
344        code_phrase: String,
345        /// Target status to wait for. Default: sas_ready.
346        #[arg(long, default_value = "sas_ready")]
347        status: String,
348        /// Max seconds to wait.
349        #[arg(long, default_value_t = 300)]
350        timeout: u64,
351        /// Emit JSON on each status change (one per line) instead of just on exit.
352        #[arg(long)]
353        json: bool,
354    },
355    /// One-shot bootstrap. Inits identity (idempotent), opens pair-host or
356    /// pair-join, then registers wire as an MCP server. Single command from
357    /// nothing to paired and ready — no separate init/pair-host/setup steps.
358    /// Operator still must confirm SAS digits.
359    ///
360    /// Examples:
361    ///   wire pair paul                          # host a new pair on default relay
362    ///   wire pair willard --code 58-NMTY7A      # join paul's pair
363    Pair {
364        /// Short handle for this agent (becomes did:wire:<handle>). Used by init
365        /// step if no identity exists; ignored if already initialized.
366        handle: String,
367        /// Code phrase from peer's pair-host output. Omit to be the host
368        /// (this command will print one for you to share).
369        #[arg(long)]
370        code: Option<String>,
371        /// Relay base URL. Defaults to the laulpogan public-good relay.
372        #[arg(long, default_value = "https://wireup.net")]
373        relay: String,
374        /// Skip SAS prompt. Test-only.
375        #[arg(long)]
376        yes: bool,
377        /// Pair-step timeout in seconds.
378        #[arg(long, default_value_t = 300)]
379        timeout: u64,
380        /// Skip the post-pair `setup --apply` step (don't register wire as
381        /// an MCP server in detected client configs).
382        #[arg(long)]
383        no_setup: bool,
384        /// Run via the daemon-orchestrated detached path (auto-starts daemon,
385        /// exits immediately, daemon does the handshake). Confirm via
386        /// `wire pair-confirm <code> <digits>` from any terminal. See
387        /// `pair-host --detach` for details.
388        #[arg(long)]
389        detach: bool,
390    },
391    /// Forget a half-finished pair-slot on the relay. Use this if `pair-host`
392    /// or `pair-join` crashed (process killed, network blip, OOM) before SAS
393    /// confirmation, leaving the relay-side slot stuck with "guest already
394    /// registered" or "host already registered" until the 5-minute TTL expires.
395    /// Either side can call. Idempotent.
396    PairAbandon {
397        /// The code phrase from the original pair-host (e.g. `58-NMTY7A`).
398        code_phrase: String,
399        /// Relay base URL.
400        #[arg(long, default_value = "https://wireup.net")]
401        relay: String,
402    },
403    /// Accept a pending-inbound pair request (v0.5.14). Explicit alias for
404    /// the bilateral-completion path that `wire add <peer>@<relay>` also
405    /// drives — but doesn't require remembering the peer's relay domain
406    /// (the relay coords come from the stored pair_drop). Errors if no
407    /// pending-inbound record exists for that peer.
408    PairAccept {
409        /// Bare peer handle (without `@<relay>`).
410        peer: String,
411        /// Emit JSON.
412        #[arg(long)]
413        json: bool,
414    },
415    /// Reject a pending pair request (v0.5.14). When someone runs `wire add
416    /// you@<your-relay>` against your handle, their signed pair_drop lands
417    /// in pending-inbound — visible via `wire pair-list`. Run `wire pair-reject
418    /// <peer>` to delete the record without pairing. The peer never receives
419    /// our slot_token; from their side the pair stays pending until they
420    /// time out.
421    PairReject {
422        /// Bare peer handle (without `@<relay>`).
423        peer: String,
424        /// Emit JSON.
425        #[arg(long)]
426        json: bool,
427    },
428    /// Programmatic-shape list of pending-inbound pair requests (v0.5.14).
429    /// `--json` returns a flat array (matching the v0.5.13-and-earlier
430    /// `pair-list --json` shape but for inbound). Use this in scripts that
431    /// need to enumerate inbound pair requests without parsing the SPAKE2
432    /// table format from `wire pair-list`.
433    PairListInbound {
434        /// Emit JSON.
435        #[arg(long)]
436        json: bool,
437    },
438    /// Manage isolated wire sessions on this machine (v0.5.16).
439    ///
440    /// Each session = its own DID + handle + relay slot + daemon + inbox/
441    /// outbox tree. Use when multiple agents (e.g. Claude Code sessions
442    /// in different projects) run on the same machine — without sessions
443    /// they all share one identity and race the inbox cursor.
444    ///
445    /// Names are derived from `basename(cwd)` and cached in a registry,
446    /// so re-entering the same project reuses the same identity.
447    #[command(subcommand)]
448    Session(SessionCommand),
449    /// Detect known MCP host config locations (Claude Desktop, Claude Code,
450    /// Cursor, project-local) and either print or auto-merge the wire MCP
451    /// server entry. Default prints; pass `--apply` to actually modify config
452    /// files. Idempotent — re-running is safe.
453    Setup {
454        /// Actually write the changes (default = print only).
455        #[arg(long)]
456        apply: bool,
457    },
458    /// Show an agent's profile. With no arg, prints local self. With a
459    /// `nick@domain` arg, resolves via that domain's `.well-known/wire/agent`
460    /// endpoint and verifies the returned signed card before display.
461    Whois {
462        /// Optional handle (`nick@domain`). Omit to show self.
463        handle: Option<String>,
464        #[arg(long)]
465        json: bool,
466        /// Override the relay base URL used for resolution (default:
467        /// `https://<domain>` from the handle).
468        #[arg(long)]
469        relay: Option<String>,
470    },
471    /// Zero-paste pair with a known handle. Resolves `nick@domain` via that
472    /// domain's `.well-known/wire/agent`, then delivers a signed pair-intro
473    /// to the peer's slot via `/v1/handle/intro`. Peer's daemon completes
474    /// the bilateral pin on its next pull (sends back pair_drop_ack carrying
475    /// their slot_token so we can `wire send` to them).
476    Add {
477        /// Peer handle (`nick@domain`).
478        handle: String,
479        /// Override the relay base URL used for resolution.
480        #[arg(long)]
481        relay: Option<String>,
482        #[arg(long)]
483        json: bool,
484    },
485    /// One-shot full bootstrap — `wire up <nick@relay-host>` does in one
486    /// command what 0.5.10 took five (init + bind-relay + claim + daemon-
487    /// background + remember-to-restart-on-login). Idempotent: re-run on
488    /// an already-set-up box prints state without churn.
489    ///
490    /// Examples:
491    ///   wire up paul@wireup.net           # full bootstrap
492    ///   wire up paul-mac@wireup.net       # ditto, nick = paul-mac
493    ///   wire up paul                      # bootstrap, default relay
494    Up {
495        /// Full handle in `nick@relay-host` form, or just `nick` (defaults
496        /// to the configured public relay wireup.net).
497        handle: String,
498        /// Optional display name (defaults to capitalized nick).
499        #[arg(long)]
500        name: Option<String>,
501        #[arg(long)]
502        json: bool,
503    },
504    /// Diagnose wire setup health. Single command that surfaces every
505    /// silent-fail class — daemon down or duplicated, relay unreachable,
506    /// cursor stuck, pair rejections piling up, trust ↔ directory drift.
507    /// Replaces today's 30-minute manual debug.
508    ///
509    /// Exit code non-zero if any FAIL findings.
510    Doctor {
511        /// Emit JSON.
512        #[arg(long)]
513        json: bool,
514        /// Show last N entries from pair-rejected.jsonl in the report.
515        #[arg(long, default_value_t = 5)]
516        recent_rejections: usize,
517    },
518    /// Atomic upgrade: kill every `wire daemon` process, spawn a fresh
519    /// one from the current binary, write a new pidfile. Eliminates the
520    /// "stale binary text in memory under a fresh symlink" bug class that
521    /// burned 30 minutes today.
522    Upgrade {
523        /// Report drift without taking action (lists processes that would
524        /// be killed + the version of each).
525        #[arg(long)]
526        check: bool,
527        #[arg(long)]
528        json: bool,
529    },
530    /// Install / inspect / remove a launchd plist (macOS) or systemd
531    /// user unit (linux) that runs `wire daemon` on login + restarts
532    /// on crash. Replaces today's "background it with tmux/&/systemd
533    /// as you prefer" footgun.
534    Service {
535        #[command(subcommand)]
536        action: ServiceAction,
537    },
538    /// Inspect or toggle the structured diagnostic trace
539    /// (`$WIRE_HOME/state/wire/diag.jsonl`). Off by default. Enable per
540    /// process via `WIRE_DIAG=1`, or per-machine via `wire diag enable`
541    /// (writes the file knob a running daemon picks up automatically).
542    Diag {
543        #[command(subcommand)]
544        action: DiagAction,
545    },
546    /// Claim a nick on a relay's handle directory. Anyone can then reach
547    /// this agent by `<nick>@<relay-domain>` via the relay's
548    /// `.well-known/wire/agent` endpoint. FCFS; same-DID re-claims allowed.
549    Claim {
550        nick: String,
551        /// Relay to claim the nick on. Default = relay our slot is on.
552        #[arg(long)]
553        relay: Option<String>,
554        /// Public URL the relay should advertise to resolvers (default = relay).
555        #[arg(long)]
556        public_url: Option<String>,
557        #[arg(long)]
558        json: bool,
559    },
560    /// Edit profile fields (display_name, emoji, motto, vibe, pronouns,
561    /// avatar_url, handle, now). Re-signs the agent-card atomically.
562    ///
563    /// Examples:
564    ///   wire profile set motto "compiles or dies trying"
565    ///   wire profile set emoji "🦀"
566    ///   wire profile set vibe '["rust","late-night","no-async-please"]'
567    ///   wire profile set handle "coffee-ghost@anthropic.dev"
568    ///   wire profile get
569    Profile {
570        #[command(subcommand)]
571        action: ProfileAction,
572    },
573    /// Mint a one-paste invite URL. Anyone with this URL can pair to us in a
574    /// single step (no SAS digits, no code typing). Auto-inits + auto-allocates
575    /// a relay slot on first use. Default TTL 24h, single-use.
576    Invite {
577        /// Override the relay URL for first-time auto-allocation.
578        #[arg(long, default_value = "https://wireup.net")]
579        relay: String,
580        /// Invite lifetime in seconds (default 86400 = 24h).
581        #[arg(long, default_value_t = 86_400)]
582        ttl: u64,
583        /// Number of distinct peers that can accept this invite before it's
584        /// consumed (default 1).
585        #[arg(long, default_value_t = 1)]
586        uses: u32,
587        /// Register the invite at the relay's short-URL endpoint and print
588        /// a `curl ... | sh` one-liner the peer can run on a fresh machine.
589        /// Installs wire if missing, then accepts the invite, then pairs.
590        #[arg(long)]
591        share: bool,
592        /// Emit JSON.
593        #[arg(long)]
594        json: bool,
595    },
596    /// Accept a wire invite URL. Single-step pair — pins issuer, sends our
597    /// signed card to issuer's slot. Auto-inits + auto-allocates if needed.
598    Accept {
599        /// The full invite URL (starts with `wire://pair?v=1&inv=...`).
600        url: String,
601        /// Emit JSON.
602        #[arg(long)]
603        json: bool,
604    },
605    /// Long-running event dispatcher. Watches inbox for new verified events
606    /// and spawns the given shell command per event, passing the event JSON
607    /// on stdin. Use to wire up autonomous reply loops:
608    ///   wire reactor --on-event 'claude -p "respond via wire send"'
609    /// Cursor persisted to `$WIRE_HOME/state/wire/reactor.cursor`.
610    Reactor {
611        /// Shell command to spawn per event. Event JSON written to its stdin.
612        #[arg(long)]
613        on_event: String,
614        /// Only fire for events from this peer.
615        #[arg(long)]
616        peer: Option<String>,
617        /// Only fire for events of this kind (numeric or name, e.g. 1 / decision).
618        #[arg(long)]
619        kind: Option<String>,
620        /// Skip events whose verified flag is false (default true).
621        #[arg(long, default_value_t = true)]
622        verified_only: bool,
623        /// Poll interval in seconds.
624        #[arg(long, default_value_t = 2)]
625        interval: u64,
626        /// Process one sweep and exit.
627        #[arg(long)]
628        once: bool,
629        /// Don't actually spawn — print one JSONL line per event for smoke-testing.
630        #[arg(long)]
631        dry_run: bool,
632        /// Hard rate-limit: max events handler is fired for per peer per minute.
633        /// 0 = unlimited. Default 6 — covers normal conversational tempo, kills
634        /// LLM-vs-LLM feedback loops (which fire 10+/sec).
635        #[arg(long, default_value_t = 6)]
636        max_per_minute: u32,
637        /// Anti-loop chain depth. Track event_ids this reactor emitted; if an
638        /// incoming event body contains `(re:X)` where X is in our emitted log,
639        /// skip — that's a reply-to-our-reply, depth ≥ 2. Disable with 0.
640        #[arg(long, default_value_t = 1)]
641        max_chain_depth: u32,
642    },
643    /// Watch the inbox for new verified events and fire an OS notification per
644    /// event. Long-running; background under systemd / `&` / tmux. Cursor is
645    /// persisted to `$WIRE_HOME/state/wire/notify.cursor` so restarts don't
646    /// re-emit history.
647    Notify {
648        /// Poll interval in seconds.
649        #[arg(long, default_value_t = 2)]
650        interval: u64,
651        /// Only notify for events from this peer (handle, no did: prefix).
652        #[arg(long)]
653        peer: Option<String>,
654        /// Run a single sweep and exit (useful for cron / tests).
655        #[arg(long)]
656        once: bool,
657        /// Suppress the OS notification call; print one JSON line per event to
658        /// stdout instead (for piping into other tooling or smoke-testing
659        /// without a desktop session).
660        #[arg(long)]
661        json: bool,
662    },
663}
664
665#[derive(Subcommand, Debug)]
666pub enum DiagAction {
667    /// Tail the last N entries from diag.jsonl.
668    Tail {
669        #[arg(long, default_value_t = 20)]
670        limit: usize,
671        #[arg(long)]
672        json: bool,
673    },
674    /// Flip the file-based knob ON. Running daemons pick this up on
675    /// the next emit call without restart.
676    Enable,
677    /// Flip the file-based knob OFF.
678    Disable,
679    /// Report whether diag is currently enabled + the file's size.
680    Status {
681        #[arg(long)]
682        json: bool,
683    },
684}
685
686#[derive(Subcommand, Debug)]
687pub enum SessionCommand {
688    /// Bootstrap a new isolated session in this machine's sessions root.
689    /// With no name, derives one from `basename(cwd)` and caches it in
690    /// the registry so re-running from the same project reuses it.
691    /// Runs `init` + `claim` + spawns a session-local daemon, all inside
692    /// the new session's WIRE_HOME. Output includes the `export
693    /// WIRE_HOME=...` line operators paste into their shell to activate
694    /// it.
695    New {
696        /// Optional session name. Default = derived from `basename(cwd)`.
697        name: Option<String>,
698        /// Relay URL for the session's slot allocation + handle claim.
699        #[arg(long, default_value = "https://wireup.net")]
700        relay: String,
701        /// v0.5.17: also allocate a second slot on a same-machine local
702        /// relay (defaults to `http://127.0.0.1:8771`). Within-machine
703        /// sister-session traffic prefers this path: zero round-trip
704        /// latency, zero metadata exposure to the public relay. Probes
705        /// `<local-relay>/healthz` first; silently skips if the local
706        /// relay isn't running.
707        #[arg(long)]
708        with_local: bool,
709        /// v0.5.17: override the local relay URL probed by `--with-local`.
710        /// Default is `http://127.0.0.1:8771` to match
711        /// `wire relay-server --bind 127.0.0.1:8771 --local-only`.
712        #[arg(long, default_value = "http://127.0.0.1:8771")]
713        local_relay: String,
714        /// Skip spawning the session-local daemon. Use when you want
715        /// to drive sync explicitly from the agent or test rig.
716        #[arg(long)]
717        no_daemon: bool,
718        /// Emit JSON.
719        #[arg(long)]
720        json: bool,
721    },
722    /// List all sessions on this machine with their handle, DID,
723    /// daemon liveness, and the cwd they're associated with.
724    List {
725        #[arg(long)]
726        json: bool,
727    },
728    /// Print the `export WIRE_HOME=...` line for a session, so a shell
729    /// can `eval $(wire session env <name>)` to activate it. With no
730    /// name, resolves the cwd through the registry.
731    Env {
732        /// Session name. Default = derived from cwd via the registry.
733        name: Option<String>,
734        #[arg(long)]
735        json: bool,
736    },
737    /// Identify which session the current cwd maps to in the registry.
738    /// Prints `(none)` if cwd isn't registered — `wire session new`
739    /// would create one.
740    Current {
741        #[arg(long)]
742        json: bool,
743    },
744    /// Tear down a session: kills its daemon (if running), deletes its
745    /// state directory, and removes it from the registry. Requires
746    /// `--force` because state loss is unrecoverable (keypair gone).
747    Destroy {
748        name: String,
749        /// Confirm state-deleting operation.
750        #[arg(long)]
751        force: bool,
752        #[arg(long)]
753        json: bool,
754    },
755}
756
757#[derive(Subcommand, Debug)]
758pub enum ServiceAction {
759    /// Write the launchd plist (macOS) or systemd user unit (linux) and
760    /// load it. Idempotent — re-running re-bootstraps an existing service.
761    Install {
762        #[arg(long)]
763        json: bool,
764    },
765    /// Unload + delete the service unit. Daemon keeps running until the
766    /// next reboot or `wire upgrade`; this only changes the boot-time
767    /// behaviour.
768    Uninstall {
769        #[arg(long)]
770        json: bool,
771    },
772    /// Report whether the unit is installed + active.
773    Status {
774        #[arg(long)]
775        json: bool,
776    },
777}
778
779#[derive(Subcommand, Debug)]
780pub enum ResponderCommand {
781    /// Publish this agent's auto-responder health.
782    Set {
783        /// One of: online, offline, oauth_locked, rate_limited, degraded.
784        status: String,
785        /// Optional operator-facing reason.
786        #[arg(long)]
787        reason: Option<String>,
788        /// Emit JSON.
789        #[arg(long)]
790        json: bool,
791    },
792    /// Read responder health for self, or for a paired peer.
793    Get {
794        /// Optional peer handle; omitted means this agent's own slot.
795        peer: Option<String>,
796        /// Emit JSON.
797        #[arg(long)]
798        json: bool,
799    },
800}
801
802#[derive(Subcommand, Debug)]
803pub enum ProfileAction {
804    /// Set a profile field. Field names: display_name, emoji, motto, vibe,
805    /// pronouns, avatar_url, handle, now. Values are strings except `vibe`
806    /// (JSON array) and `now` (JSON object).
807    Set {
808        field: String,
809        value: String,
810        #[arg(long)]
811        json: bool,
812    },
813    /// Show all profile fields. Equivalent to `wire whois`.
814    Get {
815        #[arg(long)]
816        json: bool,
817    },
818    /// Clear a profile field.
819    Clear {
820        field: String,
821        #[arg(long)]
822        json: bool,
823    },
824}
825
826/// Entry point — parse and dispatch.
827pub fn run() -> Result<()> {
828    let cli = Cli::parse();
829    match cli.command {
830        Command::Init {
831            handle,
832            name,
833            relay,
834            json,
835        } => cmd_init(&handle, name.as_deref(), relay.as_deref(), json),
836        Command::Status { peer, json } => {
837            if let Some(peer) = peer {
838                cmd_status_peer(&peer, json)
839            } else {
840                cmd_status(json)
841            }
842        }
843        Command::Whoami { json } => cmd_whoami(json),
844        Command::Peers { json } => cmd_peers(json),
845        Command::Send {
846            peer,
847            kind_or_body,
848            body,
849            deadline,
850            json,
851        } => {
852            // P0.S: smart-positional API. `wire send peer body` =
853            // kind=claim. `wire send peer kind body` = explicit kind.
854            let (kind, body) = match body {
855                Some(real_body) => (kind_or_body, real_body),
856                None => ("claim".to_string(), kind_or_body),
857            };
858            cmd_send(&peer, &kind, &body, deadline.as_deref(), json)
859        }
860        Command::Tail { peer, json, limit } => cmd_tail(peer.as_deref(), json, limit),
861        Command::Monitor {
862            peer,
863            json,
864            include_handshake,
865            interval_ms,
866            replay,
867        } => cmd_monitor(peer.as_deref(), json, include_handshake, interval_ms, replay),
868        Command::Verify { path, json } => cmd_verify(&path, json),
869        Command::Responder { command } => match command {
870            ResponderCommand::Set {
871                status,
872                reason,
873                json,
874            } => cmd_responder_set(&status, reason.as_deref(), json),
875            ResponderCommand::Get { peer, json } => cmd_responder_get(peer.as_deref(), json),
876        },
877        Command::Mcp => cmd_mcp(),
878        Command::RelayServer { bind, local_only } => cmd_relay_server(&bind, local_only),
879        Command::BindRelay { url, json } => cmd_bind_relay(&url, json),
880        Command::AddPeerSlot {
881            handle,
882            url,
883            slot_id,
884            slot_token,
885            json,
886        } => cmd_add_peer_slot(&handle, &url, &slot_id, &slot_token, json),
887        Command::Push { peer, json } => cmd_push(peer.as_deref(), json),
888        Command::Pull { json } => cmd_pull(json),
889        Command::Pin { card_file, json } => cmd_pin(&card_file, json),
890        Command::RotateSlot { no_announce, json } => cmd_rotate_slot(no_announce, json),
891        Command::ForgetPeer {
892            handle,
893            purge,
894            json,
895        } => cmd_forget_peer(&handle, purge, json),
896        Command::Daemon {
897            interval,
898            once,
899            json,
900        } => cmd_daemon(interval, once, json),
901        Command::PairHost {
902            relay,
903            yes,
904            timeout,
905            detach,
906            json,
907        } => {
908            if detach {
909                cmd_pair_host_detach(&relay, json)
910            } else {
911                cmd_pair_host(&relay, yes, timeout)
912            }
913        }
914        Command::PairJoin {
915            code_phrase,
916            relay,
917            yes,
918            timeout,
919            detach,
920            json,
921        } => {
922            if detach {
923                cmd_pair_join_detach(&code_phrase, &relay, json)
924            } else {
925                cmd_pair_join(&code_phrase, &relay, yes, timeout)
926            }
927        }
928        Command::PairConfirm {
929            code_phrase,
930            digits,
931            json,
932        } => cmd_pair_confirm(&code_phrase, &digits, json),
933        Command::PairList {
934            json,
935            watch,
936            watch_interval,
937        } => cmd_pair_list(json, watch, watch_interval),
938        Command::PairCancel { code_phrase, json } => cmd_pair_cancel(&code_phrase, json),
939        Command::PairWatch {
940            code_phrase,
941            status,
942            timeout,
943            json,
944        } => cmd_pair_watch(&code_phrase, &status, timeout, json),
945        Command::Pair {
946            handle,
947            code,
948            relay,
949            yes,
950            timeout,
951            no_setup,
952            detach,
953        } => {
954            // P0.P (0.5.11): if the handle is in `nick@domain` form, route to
955            // the zero-paste megacommand path — `wire pair slancha-spark@
956            // wireup.net` does add + poll-for-ack + verify in one shot. The
957            // SAS / code-based pair flow stays available for handles without
958            // `@` (bootstrap pairing between two boxes that don't yet share a
959            // relay directory).
960            if handle.contains('@') && code.is_none() {
961                cmd_pair_megacommand(&handle, Some(&relay), timeout, false)
962            } else if detach {
963                cmd_pair_detach(&handle, code.as_deref(), &relay)
964            } else {
965                cmd_pair(&handle, code.as_deref(), &relay, yes, timeout, no_setup)
966            }
967        }
968        Command::PairAbandon { code_phrase, relay } => cmd_pair_abandon(&code_phrase, &relay),
969        Command::PairAccept { peer, json } => cmd_pair_accept(&peer, json),
970        Command::PairReject { peer, json } => cmd_pair_reject(&peer, json),
971        Command::PairListInbound { json } => cmd_pair_list_inbound(json),
972        Command::Session(cmd) => cmd_session(cmd),
973        Command::Invite {
974            relay,
975            ttl,
976            uses,
977            share,
978            json,
979        } => cmd_invite(&relay, ttl, uses, share, json),
980        Command::Accept { url, json } => cmd_accept(&url, json),
981        Command::Whois {
982            handle,
983            json,
984            relay,
985        } => cmd_whois(handle.as_deref(), json, relay.as_deref()),
986        Command::Add {
987            handle,
988            relay,
989            json,
990        } => cmd_add(&handle, relay.as_deref(), json),
991        Command::Up {
992            handle,
993            name,
994            json,
995        } => cmd_up(&handle, name.as_deref(), json),
996        Command::Doctor {
997            json,
998            recent_rejections,
999        } => cmd_doctor(json, recent_rejections),
1000        Command::Upgrade { check, json } => cmd_upgrade(check, json),
1001        Command::Service { action } => cmd_service(action),
1002        Command::Diag { action } => cmd_diag(action),
1003        Command::Claim {
1004            nick,
1005            relay,
1006            public_url,
1007            json,
1008        } => cmd_claim(&nick, relay.as_deref(), public_url.as_deref(), json),
1009        Command::Profile { action } => cmd_profile(action),
1010        Command::Setup { apply } => cmd_setup(apply),
1011        Command::Reactor {
1012            on_event,
1013            peer,
1014            kind,
1015            verified_only,
1016            interval,
1017            once,
1018            dry_run,
1019            max_per_minute,
1020            max_chain_depth,
1021        } => cmd_reactor(
1022            &on_event,
1023            peer.as_deref(),
1024            kind.as_deref(),
1025            verified_only,
1026            interval,
1027            once,
1028            dry_run,
1029            max_per_minute,
1030            max_chain_depth,
1031        ),
1032        Command::Notify {
1033            interval,
1034            peer,
1035            once,
1036            json,
1037        } => cmd_notify(interval, peer.as_deref(), once, json),
1038    }
1039}
1040
1041// ---------- init ----------
1042
1043fn cmd_init(handle: &str, name: Option<&str>, relay: Option<&str>, as_json: bool) -> Result<()> {
1044    if !handle
1045        .chars()
1046        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
1047    {
1048        bail!("handle must be ASCII alphanumeric / '-' / '_' (got {handle:?})");
1049    }
1050    if config::is_initialized()? {
1051        bail!(
1052            "already initialized — config exists at {:?}. Delete it first if you want a fresh identity.",
1053            config::config_dir()?
1054        );
1055    }
1056
1057    config::ensure_dirs()?;
1058    let (sk_seed, pk_bytes) = generate_keypair();
1059    config::write_private_key(&sk_seed)?;
1060
1061    let card = build_agent_card(handle, &pk_bytes, name, None, None);
1062    let signed = sign_agent_card(&card, &sk_seed);
1063    config::write_agent_card(&signed)?;
1064
1065    let mut trust = empty_trust();
1066    add_self_to_trust(&mut trust, handle, &pk_bytes);
1067    config::write_trust(&trust)?;
1068
1069    let fp = fingerprint(&pk_bytes);
1070    let key_id = make_key_id(handle, &pk_bytes);
1071
1072    // If --relay was passed, also bind a slot inline so init+bind happen in one step.
1073    let mut relay_info: Option<(String, String)> = None;
1074    if let Some(url) = relay {
1075        let normalized = url.trim_end_matches('/');
1076        let client = crate::relay_client::RelayClient::new(normalized);
1077        client.check_healthz()?;
1078        let alloc = client.allocate_slot(Some(handle))?;
1079        let mut state = config::read_relay_state()?;
1080        state["self"] = json!({
1081            "relay_url": normalized,
1082            "slot_id": alloc.slot_id.clone(),
1083            "slot_token": alloc.slot_token,
1084        });
1085        config::write_relay_state(&state)?;
1086        relay_info = Some((normalized.to_string(), alloc.slot_id));
1087    }
1088
1089    let did_str = crate::agent_card::did_for_with_key(handle, &pk_bytes);
1090    if as_json {
1091        let mut out = json!({
1092            "did": did_str.clone(),
1093            "fingerprint": fp,
1094            "key_id": key_id,
1095            "config_dir": config::config_dir()?.to_string_lossy(),
1096        });
1097        if let Some((url, slot_id)) = &relay_info {
1098            out["relay_url"] = json!(url);
1099            out["slot_id"] = json!(slot_id);
1100        }
1101        println!("{}", serde_json::to_string(&out)?);
1102    } else {
1103        println!("generated {did_str} (ed25519:{key_id})");
1104        println!(
1105            "config written to {}",
1106            config::config_dir()?.to_string_lossy()
1107        );
1108        if let Some((url, slot_id)) = &relay_info {
1109            println!("bound to relay {url} (slot {slot_id})");
1110            println!();
1111            println!(
1112                "next step: `wire pair-host --relay {url}` to print a code phrase for a peer."
1113            );
1114        } else {
1115            println!();
1116            println!(
1117                "next step: `wire pair-host --relay <url>` to bind a relay + open a pair-slot."
1118            );
1119        }
1120    }
1121    Ok(())
1122}
1123
1124// ---------- status ----------
1125
1126fn cmd_status(as_json: bool) -> Result<()> {
1127    let initialized = config::is_initialized()?;
1128
1129    let mut summary = json!({
1130        "initialized": initialized,
1131    });
1132
1133    if initialized {
1134        let card = config::read_agent_card()?;
1135        let did = card
1136            .get("did")
1137            .and_then(Value::as_str)
1138            .unwrap_or("")
1139            .to_string();
1140        // Prefer the explicit `handle` field added in v0.5.7. Fall back to
1141        // stripping the DID prefix (and the v0.5.7+ pubkey suffix) for
1142        // legacy cards.
1143        let handle = card
1144            .get("handle")
1145            .and_then(Value::as_str)
1146            .map(str::to_string)
1147            .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1148        let pk_b64 = card
1149            .get("verify_keys")
1150            .and_then(Value::as_object)
1151            .and_then(|m| m.values().next())
1152            .and_then(|v| v.get("key"))
1153            .and_then(Value::as_str)
1154            .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1155        let pk_bytes = crate::signing::b64decode(pk_b64)?;
1156        summary["did"] = json!(did);
1157        summary["handle"] = json!(handle);
1158        summary["fingerprint"] = json!(fingerprint(&pk_bytes));
1159        summary["capabilities"] = card
1160            .get("capabilities")
1161            .cloned()
1162            .unwrap_or_else(|| json!([]));
1163
1164        let trust = config::read_trust()?;
1165        let relay_state_for_tier = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1166        let mut peers = Vec::new();
1167        if let Some(agents) = trust.get("agents").and_then(Value::as_object) {
1168            for (peer_handle, _agent) in agents {
1169                if peer_handle == &handle {
1170                    continue; // self
1171                }
1172                // P0.Y (0.5.11): use effective tier — surfaces PENDING_ACK
1173                // for peers we've pinned but never received a pair_drop_ack
1174                // from, so the operator sees the "we can't send to them yet"
1175                // state instead of seeing a misleading VERIFIED.
1176                peers.push(json!({
1177                    "handle": peer_handle,
1178                    "tier": effective_peer_tier(&trust, &relay_state_for_tier, peer_handle),
1179                }));
1180            }
1181        }
1182        summary["peers"] = json!(peers);
1183
1184        let relay_state = config::read_relay_state()?;
1185        summary["self_relay"] = relay_state.get("self").cloned().unwrap_or(Value::Null);
1186        if !summary["self_relay"].is_null() {
1187            // Hide slot_token from default view.
1188            if let Some(obj) = summary["self_relay"].as_object_mut() {
1189                obj.remove("slot_token");
1190            }
1191        }
1192        summary["peer_slots_count"] = json!(
1193            relay_state
1194                .get("peers")
1195                .and_then(Value::as_object)
1196                .map(|m| m.len())
1197                .unwrap_or(0)
1198        );
1199
1200        // Outbox / inbox queue depth (file count + total events)
1201        let outbox = config::outbox_dir()?;
1202        let inbox = config::inbox_dir()?;
1203        summary["outbox"] = json!(scan_jsonl_dir(&outbox)?);
1204        summary["inbox"] = json!(scan_jsonl_dir(&inbox)?);
1205
1206        // P1.7 (0.5.11): daemon liveness now consults the structured
1207        // pidfile (P0.4) AND `pgrep -f "wire daemon"` to detect orphans
1208        // that the pidfile didn't record. Today's debug had a 4-day-old
1209        // 0.2.4 daemon (PID 54017) running while the pidfile pointed at
1210        // an unrelated dead PID — wire status said `daemon: DOWN` while
1211        // the box was actually full of stale-daemon-eating-events
1212        // behaviour. Catch THAT class here.
1213        let record = crate::ensure_up::read_pid_record("daemon");
1214        let pidfile_pid = record.pid();
1215        let pidfile_alive = pidfile_pid
1216            .map(|pid| {
1217                #[cfg(target_os = "linux")]
1218                {
1219                    std::path::Path::new(&format!("/proc/{pid}")).exists()
1220                }
1221                #[cfg(not(target_os = "linux"))]
1222                {
1223                    std::process::Command::new("kill")
1224                        .args(["-0", &pid.to_string()])
1225                        .output()
1226                        .map(|o| o.status.success())
1227                        .unwrap_or(false)
1228                }
1229            })
1230            .unwrap_or(false);
1231
1232        // Cross-check with pgrep — surfaces orphan daemons not in pidfile.
1233        let pgrep_pids: Vec<u32> = std::process::Command::new("pgrep")
1234            .args(["-f", "wire daemon"])
1235            .output()
1236            .ok()
1237            .filter(|o| o.status.success())
1238            .map(|o| {
1239                String::from_utf8_lossy(&o.stdout)
1240                    .split_whitespace()
1241                    .filter_map(|s| s.parse::<u32>().ok())
1242                    .collect()
1243            })
1244            .unwrap_or_default();
1245        let orphan_pids: Vec<u32> = pgrep_pids
1246            .iter()
1247            .filter(|p| Some(**p) != pidfile_pid)
1248            .copied()
1249            .collect();
1250
1251        let mut daemon = json!({
1252            "running": pidfile_alive,
1253            "pid": pidfile_pid,
1254            "all_running_pids": pgrep_pids,
1255            "orphans": orphan_pids,
1256        });
1257        if let crate::ensure_up::PidRecord::Json(d) = &record {
1258            daemon["version"] = json!(d.version);
1259            daemon["bin_path"] = json!(d.bin_path);
1260            daemon["did"] = json!(d.did);
1261            daemon["relay_url"] = json!(d.relay_url);
1262            daemon["started_at"] = json!(d.started_at);
1263            daemon["schema"] = json!(d.schema);
1264            if d.version != env!("CARGO_PKG_VERSION") {
1265                daemon["version_mismatch"] = json!({
1266                    "daemon": d.version.clone(),
1267                    "cli": env!("CARGO_PKG_VERSION"),
1268                });
1269            }
1270        } else if matches!(record, crate::ensure_up::PidRecord::LegacyInt(_)) {
1271            daemon["pidfile_form"] = json!("legacy-int");
1272            daemon["version_mismatch"] = json!({
1273                "daemon": "<pre-0.5.11>",
1274                "cli": env!("CARGO_PKG_VERSION"),
1275            });
1276        }
1277        summary["daemon"] = daemon;
1278
1279        // Pending pair sessions — counts by status.
1280        let pending = crate::pending_pair::list_pending().unwrap_or_default();
1281        let mut counts: std::collections::BTreeMap<String, u32> = Default::default();
1282        for p in &pending {
1283            *counts.entry(p.status.clone()).or_default() += 1;
1284        }
1285        // v0.5.14: pending-inbound zero-paste pair_drops awaiting accept.
1286        let pending_inbound =
1287            crate::pending_inbound_pair::list_pending_inbound().unwrap_or_default();
1288        let inbound_handles: Vec<&str> = pending_inbound
1289            .iter()
1290            .map(|p| p.peer_handle.as_str())
1291            .collect();
1292        summary["pending_pairs"] = json!({
1293            "total": pending.len(),
1294            "by_status": counts,
1295            "inbound_count": pending_inbound.len(),
1296            "inbound_handles": inbound_handles,
1297        });
1298    }
1299
1300    if as_json {
1301        println!("{}", serde_json::to_string(&summary)?);
1302    } else if !initialized {
1303        println!("not initialized — run `wire init <handle>` first");
1304    } else {
1305        println!("did:           {}", summary["did"].as_str().unwrap_or("?"));
1306        println!(
1307            "fingerprint:   {}",
1308            summary["fingerprint"].as_str().unwrap_or("?")
1309        );
1310        println!("capabilities:  {}", summary["capabilities"]);
1311        if !summary["self_relay"].is_null() {
1312            println!(
1313                "self relay:    {} (slot {})",
1314                summary["self_relay"]["relay_url"].as_str().unwrap_or("?"),
1315                summary["self_relay"]["slot_id"].as_str().unwrap_or("?")
1316            );
1317        } else {
1318            println!("self relay:    (not bound — run `wire pair-host --relay <url>` to bind)");
1319        }
1320        println!(
1321            "peers:         {}",
1322            summary["peers"].as_array().map(|a| a.len()).unwrap_or(0)
1323        );
1324        for p in summary["peers"].as_array().unwrap_or(&Vec::new()) {
1325            println!(
1326                "  - {:<20} tier={}",
1327                p["handle"].as_str().unwrap_or(""),
1328                p["tier"].as_str().unwrap_or("?")
1329            );
1330        }
1331        println!(
1332            "outbox:        {} file(s), {} event(s) queued",
1333            summary["outbox"]["files"].as_u64().unwrap_or(0),
1334            summary["outbox"]["events"].as_u64().unwrap_or(0)
1335        );
1336        println!(
1337            "inbox:         {} file(s), {} event(s) received",
1338            summary["inbox"]["files"].as_u64().unwrap_or(0),
1339            summary["inbox"]["events"].as_u64().unwrap_or(0)
1340        );
1341        let daemon_running = summary["daemon"]["running"].as_bool().unwrap_or(false);
1342        let daemon_pid = summary["daemon"]["pid"]
1343            .as_u64()
1344            .map(|p| p.to_string())
1345            .unwrap_or_else(|| "—".to_string());
1346        let daemon_version = summary["daemon"]["version"].as_str().unwrap_or("");
1347        let version_suffix = if !daemon_version.is_empty() {
1348            format!(" v{daemon_version}")
1349        } else {
1350            String::new()
1351        };
1352        println!(
1353            "daemon:        {} (pid {}{})",
1354            if daemon_running { "running" } else { "DOWN" },
1355            daemon_pid,
1356            version_suffix,
1357        );
1358        // P1.7: surface version mismatch + orphan procs loudly.
1359        if let Some(mm) = summary["daemon"].get("version_mismatch") {
1360            println!(
1361                "               !! version mismatch: daemon={} CLI={}. \
1362                 run `wire upgrade` to swap atomically.",
1363                mm["daemon"].as_str().unwrap_or("?"),
1364                mm["cli"].as_str().unwrap_or("?"),
1365            );
1366        }
1367        if let Some(orphans) = summary["daemon"]["orphans"].as_array()
1368            && !orphans.is_empty()
1369        {
1370            let pids: Vec<String> = orphans
1371                .iter()
1372                .filter_map(|v| v.as_u64().map(|p| p.to_string()))
1373                .collect();
1374            println!(
1375                "               !! orphan daemon process(es): pids {}. \
1376                 pgrep saw them but pidfile didn't — likely stale process from \
1377                 prior install. Multiple daemons race the relay cursor.",
1378                pids.join(", ")
1379            );
1380        }
1381        let pending_total = summary["pending_pairs"]["total"].as_u64().unwrap_or(0);
1382        let inbound_count = summary["pending_pairs"]["inbound_count"]
1383            .as_u64()
1384            .unwrap_or(0);
1385        if pending_total > 0 {
1386            print!("pending pairs: {pending_total}");
1387            if let Some(obj) = summary["pending_pairs"]["by_status"].as_object() {
1388                let parts: Vec<String> = obj
1389                    .iter()
1390                    .map(|(k, v)| format!("{}={}", k, v.as_u64().unwrap_or(0)))
1391                    .collect();
1392                if !parts.is_empty() {
1393                    print!(" ({})", parts.join(", "));
1394                }
1395            }
1396            println!();
1397        } else if inbound_count == 0 {
1398            println!("pending pairs: none");
1399        }
1400        // v0.5.14: separate line for pending-inbound zero-paste requests.
1401        // Loud because each one is awaiting an operator gesture and the
1402        // capability hasn't flowed yet.
1403        if inbound_count > 0 {
1404            let handles: Vec<String> = summary["pending_pairs"]["inbound_handles"]
1405                .as_array()
1406                .map(|a| {
1407                    a.iter()
1408                        .filter_map(|v| v.as_str().map(str::to_string))
1409                        .collect()
1410                })
1411                .unwrap_or_default();
1412            println!(
1413                "inbound pair requests ({inbound_count}): {} — `wire pair-list` to inspect, `wire pair-accept <peer>` to accept, `wire pair-reject <peer>` to refuse",
1414                handles.join(", "),
1415            );
1416        }
1417    }
1418    Ok(())
1419}
1420
1421fn scan_jsonl_dir(dir: &std::path::Path) -> Result<Value> {
1422    if !dir.exists() {
1423        return Ok(json!({"files": 0, "events": 0}));
1424    }
1425    let mut files = 0usize;
1426    let mut events = 0usize;
1427    for entry in std::fs::read_dir(dir)? {
1428        let path = entry?.path();
1429        if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
1430            files += 1;
1431            if let Ok(body) = std::fs::read_to_string(&path) {
1432                events += body.lines().filter(|l| !l.trim().is_empty()).count();
1433            }
1434        }
1435    }
1436    Ok(json!({"files": files, "events": events}))
1437}
1438
1439// ---------- responder health ----------
1440
1441fn responder_status_allowed(status: &str) -> bool {
1442    matches!(
1443        status,
1444        "online" | "offline" | "oauth_locked" | "rate_limited" | "degraded"
1445    )
1446}
1447
1448fn relay_slot_for(peer: Option<&str>) -> Result<(String, String, String, String)> {
1449    let state = config::read_relay_state()?;
1450    let (label, slot_info) = match peer {
1451        Some(peer) => (
1452            peer.to_string(),
1453            state
1454                .get("peers")
1455                .and_then(|p| p.get(peer))
1456                .ok_or_else(|| {
1457                    anyhow!(
1458                        "unknown peer {peer:?} in relay state — pair with them first:\n  \
1459                         wire add {peer}@wireup.net   (or {peer}@<their-relay>)\n\
1460                         (`wire peers` lists who you've already paired with.)"
1461                    )
1462                })?,
1463        ),
1464        None => (
1465            "self".to_string(),
1466            state.get("self").filter(|v| !v.is_null()).ok_or_else(|| {
1467                anyhow!("self slot not bound — run `wire bind-relay <url>` first")
1468            })?,
1469        ),
1470    };
1471    let relay_url = slot_info["relay_url"]
1472        .as_str()
1473        .ok_or_else(|| anyhow!("{label} relay_url missing"))?
1474        .to_string();
1475    let slot_id = slot_info["slot_id"]
1476        .as_str()
1477        .ok_or_else(|| anyhow!("{label} slot_id missing"))?
1478        .to_string();
1479    let slot_token = slot_info["slot_token"]
1480        .as_str()
1481        .ok_or_else(|| anyhow!("{label} slot_token missing"))?
1482        .to_string();
1483    Ok((label, relay_url, slot_id, slot_token))
1484}
1485
1486fn cmd_responder_set(status: &str, reason: Option<&str>, as_json: bool) -> Result<()> {
1487    if !responder_status_allowed(status) {
1488        bail!("status must be one of: online, offline, oauth_locked, rate_limited, degraded");
1489    }
1490    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(None)?;
1491    let now = time::OffsetDateTime::now_utc()
1492        .format(&time::format_description::well_known::Rfc3339)
1493        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1494    let mut record = json!({
1495        "status": status,
1496        "set_at": now,
1497    });
1498    if let Some(reason) = reason {
1499        record["reason"] = json!(reason);
1500    }
1501    if status == "online" {
1502        record["last_success_at"] = json!(now);
1503    }
1504    let client = crate::relay_client::RelayClient::new(&relay_url);
1505    let saved = client.responder_health_set(&slot_id, &slot_token, &record)?;
1506    if as_json {
1507        println!("{}", serde_json::to_string(&saved)?);
1508    } else {
1509        let reason = saved
1510            .get("reason")
1511            .and_then(Value::as_str)
1512            .map(|r| format!(" — {r}"))
1513            .unwrap_or_default();
1514        println!(
1515            "responder {}{}",
1516            saved
1517                .get("status")
1518                .and_then(Value::as_str)
1519                .unwrap_or(status),
1520            reason
1521        );
1522    }
1523    Ok(())
1524}
1525
1526fn cmd_responder_get(peer: Option<&str>, as_json: bool) -> Result<()> {
1527    let (label, relay_url, slot_id, slot_token) = relay_slot_for(peer)?;
1528    let client = crate::relay_client::RelayClient::new(&relay_url);
1529    let health = client.responder_health_get(&slot_id, &slot_token)?;
1530    if as_json {
1531        println!(
1532            "{}",
1533            serde_json::to_string(&json!({
1534                "target": label,
1535                "responder_health": health,
1536            }))?
1537        );
1538    } else if health.is_null() {
1539        println!("{label}: responder health not reported");
1540    } else {
1541        let status = health
1542            .get("status")
1543            .and_then(Value::as_str)
1544            .unwrap_or("unknown");
1545        let reason = health
1546            .get("reason")
1547            .and_then(Value::as_str)
1548            .map(|r| format!(" — {r}"))
1549            .unwrap_or_default();
1550        let last_success = health
1551            .get("last_success_at")
1552            .and_then(Value::as_str)
1553            .map(|t| format!(" (last_success: {t})"))
1554            .unwrap_or_default();
1555        println!("{label}: {status}{reason}{last_success}");
1556    }
1557    Ok(())
1558}
1559
1560fn cmd_status_peer(peer: &str, as_json: bool) -> Result<()> {
1561    let (_label, relay_url, slot_id, slot_token) = relay_slot_for(Some(peer))?;
1562    let client = crate::relay_client::RelayClient::new(&relay_url);
1563
1564    let started = std::time::Instant::now();
1565    let transport_ok = client.healthz().unwrap_or(false);
1566    let latency_ms = started.elapsed().as_millis() as u64;
1567
1568    let (event_count, last_pull_at_unix) = client.slot_state(&slot_id, &slot_token)?;
1569    let now = std::time::SystemTime::now()
1570        .duration_since(std::time::UNIX_EPOCH)
1571        .map(|d| d.as_secs())
1572        .unwrap_or(0);
1573    let attention = match last_pull_at_unix {
1574        Some(last) if now.saturating_sub(last) <= 300 => json!({
1575            "status": "ok",
1576            "last_pull_at_unix": last,
1577            "age_seconds": now.saturating_sub(last),
1578            "event_count": event_count,
1579        }),
1580        Some(last) => json!({
1581            "status": "stale",
1582            "last_pull_at_unix": last,
1583            "age_seconds": now.saturating_sub(last),
1584            "event_count": event_count,
1585        }),
1586        None => json!({
1587            "status": "never_pulled",
1588            "last_pull_at_unix": Value::Null,
1589            "event_count": event_count,
1590        }),
1591    };
1592
1593    let responder_health = client.responder_health_get(&slot_id, &slot_token)?;
1594    let responder = if responder_health.is_null() {
1595        json!({"status": "not_reported", "record": Value::Null})
1596    } else {
1597        json!({
1598            "status": responder_health
1599                .get("status")
1600                .and_then(Value::as_str)
1601                .unwrap_or("unknown"),
1602            "record": responder_health,
1603        })
1604    };
1605
1606    let report = json!({
1607        "peer": peer,
1608        "transport": {
1609            "status": if transport_ok { "ok" } else { "error" },
1610            "relay_url": relay_url,
1611            "latency_ms": latency_ms,
1612        },
1613        "attention": attention,
1614        "responder": responder,
1615    });
1616
1617    if as_json {
1618        println!("{}", serde_json::to_string(&report)?);
1619    } else {
1620        let transport_line = if transport_ok {
1621            format!("ok relay reachable ({latency_ms}ms)")
1622        } else {
1623            "error relay unreachable".to_string()
1624        };
1625        println!("transport      {transport_line}");
1626        match report["attention"]["status"].as_str().unwrap_or("unknown") {
1627            "ok" => println!(
1628                "attention      ok last pull {}s ago",
1629                report["attention"]["age_seconds"].as_u64().unwrap_or(0)
1630            ),
1631            "stale" => println!(
1632                "attention      stale last pull {}m ago",
1633                report["attention"]["age_seconds"].as_u64().unwrap_or(0) / 60
1634            ),
1635            "never_pulled" => println!("attention      never pulled since relay reset"),
1636            other => println!("attention      {other}"),
1637        }
1638        if report["responder"]["status"] == "not_reported" {
1639            println!("auto-responder not reported");
1640        } else {
1641            let record = &report["responder"]["record"];
1642            let status = record
1643                .get("status")
1644                .and_then(Value::as_str)
1645                .unwrap_or("unknown");
1646            let reason = record
1647                .get("reason")
1648                .and_then(Value::as_str)
1649                .map(|r| format!(" — {r}"))
1650                .unwrap_or_default();
1651            println!("auto-responder {status}{reason}");
1652        }
1653    }
1654    Ok(())
1655}
1656
1657// (Old cmd_join stub removed — superseded by cmd_pair_join below.)
1658
1659// ---------- whoami ----------
1660
1661fn cmd_whoami(as_json: bool) -> Result<()> {
1662    if !config::is_initialized()? {
1663        bail!("not initialized — run `wire init <handle>` first");
1664    }
1665    let card = config::read_agent_card()?;
1666    let did = card
1667        .get("did")
1668        .and_then(Value::as_str)
1669        .unwrap_or("")
1670        .to_string();
1671    let handle = card
1672        .get("handle")
1673        .and_then(Value::as_str)
1674        .map(str::to_string)
1675        .unwrap_or_else(|| crate::agent_card::display_handle_from_did(&did).to_string());
1676    let pk_b64 = card
1677        .get("verify_keys")
1678        .and_then(Value::as_object)
1679        .and_then(|m| m.values().next())
1680        .and_then(|v| v.get("key"))
1681        .and_then(Value::as_str)
1682        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1683    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1684    let fp = fingerprint(&pk_bytes);
1685    let key_id = make_key_id(&handle, &pk_bytes);
1686    let capabilities = card
1687        .get("capabilities")
1688        .cloned()
1689        .unwrap_or_else(|| json!(["wire/v3.1"]));
1690
1691    if as_json {
1692        println!(
1693            "{}",
1694            serde_json::to_string(&json!({
1695                "did": did,
1696                "handle": handle,
1697                "fingerprint": fp,
1698                "key_id": key_id,
1699                "public_key_b64": pk_b64,
1700                "capabilities": capabilities,
1701                "config_dir": config::config_dir()?.to_string_lossy(),
1702            }))?
1703        );
1704    } else {
1705        println!("{did} (ed25519:{key_id})");
1706        println!("fingerprint: {fp}");
1707        println!("capabilities: {capabilities}");
1708    }
1709    Ok(())
1710}
1711
1712// ---------- peers ----------
1713
1714/// P0.Y (0.5.11): effective tier shown to operators. `wire add` pins a
1715/// peer's card into trust at VERIFIED immediately, but the bilateral pin
1716/// isn't complete until that peer's `pair_drop_ack` arrives carrying their
1717/// slot_token. Until then we CAN'T send to them. Displaying VERIFIED is
1718/// misleading — spark observed this in real usage.
1719///
1720/// Effective rules:
1721///   trust.tier == VERIFIED + relay_state.peers[h].slot_token empty -> "PENDING_ACK"
1722///   otherwise -> raw trust tier (UNTRUSTED / VERIFIED / etc.)
1723///
1724/// Strictly a display concern — trust state machine itself is untouched
1725/// so existing promote/demote logic still works.
1726fn effective_peer_tier(trust: &Value, relay_state: &Value, handle: &str) -> String {
1727    let raw = crate::trust::get_tier(trust, handle);
1728    if raw != "VERIFIED" {
1729        return raw.to_string();
1730    }
1731    let token = relay_state
1732        .get("peers")
1733        .and_then(|p| p.get(handle))
1734        .and_then(|p| p.get("slot_token"))
1735        .and_then(Value::as_str)
1736        .unwrap_or("");
1737    if token.is_empty() {
1738        "PENDING_ACK".to_string()
1739    } else {
1740        raw.to_string()
1741    }
1742}
1743
1744fn cmd_peers(as_json: bool) -> Result<()> {
1745    let trust = config::read_trust()?;
1746    let agents = trust
1747        .get("agents")
1748        .and_then(Value::as_object)
1749        .cloned()
1750        .unwrap_or_default();
1751    let relay_state = config::read_relay_state().unwrap_or_else(|_| json!({"peers": {}}));
1752
1753    let mut self_did: Option<String> = None;
1754    if let Ok(card) = config::read_agent_card() {
1755        self_did = card.get("did").and_then(Value::as_str).map(str::to_string);
1756    }
1757
1758    let mut peers = Vec::new();
1759    for (handle, agent) in agents.iter() {
1760        let did = agent
1761            .get("did")
1762            .and_then(Value::as_str)
1763            .unwrap_or("")
1764            .to_string();
1765        if Some(did.as_str()) == self_did.as_deref() {
1766            continue; // skip self-attestation
1767        }
1768        let tier = effective_peer_tier(&trust, &relay_state, handle);
1769        let capabilities = agent
1770            .get("card")
1771            .and_then(|c| c.get("capabilities"))
1772            .cloned()
1773            .unwrap_or_else(|| json!([]));
1774        peers.push(json!({
1775            "handle": handle,
1776            "did": did,
1777            "tier": tier,
1778            "capabilities": capabilities,
1779        }));
1780    }
1781
1782    if as_json {
1783        println!("{}", serde_json::to_string(&peers)?);
1784    } else if peers.is_empty() {
1785        println!("no peers pinned (run `wire join <code>` to pair)");
1786    } else {
1787        for p in &peers {
1788            println!(
1789                "{:<20} {:<10} {}",
1790                p["handle"].as_str().unwrap_or(""),
1791                p["tier"].as_str().unwrap_or(""),
1792                p["did"].as_str().unwrap_or(""),
1793            );
1794        }
1795    }
1796    Ok(())
1797}
1798
1799// ---------- send ----------
1800
1801/// R4 attentiveness pre-flight. Best-effort: any failure is silent.
1802///
1803/// Looks up `peer` in relay-state for slot_id + slot_token + relay_url, asks
1804/// the relay for the slot's `last_pull_at_unix`, and prints a warning to
1805/// stderr if the peer hasn't polled in > 5min (or never has). Threshold of
1806/// 300s is the same wire daemon polling cadence rule-of-thumb — a peer
1807/// hasn't crossed two heartbeats means probably degraded.
1808fn maybe_warn_peer_attentiveness(peer: &str) {
1809    let state = match config::read_relay_state() {
1810        Ok(s) => s,
1811        Err(_) => return,
1812    };
1813    let p = state.get("peers").and_then(|p| p.get(peer));
1814    let slot_id = match p.and_then(|p| p.get("slot_id")).and_then(Value::as_str) {
1815        Some(s) if !s.is_empty() => s,
1816        _ => return,
1817    };
1818    let slot_token = match p.and_then(|p| p.get("slot_token")).and_then(Value::as_str) {
1819        Some(s) if !s.is_empty() => s,
1820        _ => return,
1821    };
1822    let relay_url = match p.and_then(|p| p.get("relay_url")).and_then(Value::as_str) {
1823        Some(s) if !s.is_empty() => s.to_string(),
1824        _ => match state
1825            .get("self")
1826            .and_then(|s| s.get("relay_url"))
1827            .and_then(Value::as_str)
1828        {
1829            Some(s) if !s.is_empty() => s.to_string(),
1830            _ => return,
1831        },
1832    };
1833    let client = crate::relay_client::RelayClient::new(&relay_url);
1834    let (_count, last_pull) = match client.slot_state(slot_id, slot_token) {
1835        Ok(t) => t,
1836        Err(_) => return,
1837    };
1838    let now = std::time::SystemTime::now()
1839        .duration_since(std::time::UNIX_EPOCH)
1840        .map(|d| d.as_secs())
1841        .unwrap_or(0);
1842    match last_pull {
1843        None => {
1844            eprintln!(
1845                "phyllis: {peer}'s line is silent — relay sees no pulls yet. message will queue, but they may not be listening."
1846            );
1847        }
1848        Some(t) if now.saturating_sub(t) > 300 => {
1849            let mins = now.saturating_sub(t) / 60;
1850            eprintln!(
1851                "phyllis: {peer} hasn't picked up in {mins}m — message will queue, but they may be away."
1852            );
1853        }
1854        _ => {}
1855    }
1856}
1857
1858pub(crate) fn parse_deadline_until(input: &str) -> Result<String> {
1859    let trimmed = input.trim();
1860    if time::OffsetDateTime::parse(trimmed, &time::format_description::well_known::Rfc3339).is_ok()
1861    {
1862        return Ok(trimmed.to_string());
1863    }
1864    let (amount, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
1865    let n: i64 = amount
1866        .parse()
1867        .with_context(|| format!("deadline must be `30m`, `2h`, `1d`, or RFC3339: {input:?}"))?;
1868    if n <= 0 {
1869        bail!("deadline duration must be positive: {input:?}");
1870    }
1871    let duration = match unit {
1872        "m" => time::Duration::minutes(n),
1873        "h" => time::Duration::hours(n),
1874        "d" => time::Duration::days(n),
1875        _ => bail!("deadline must end in m, h, d, or be RFC3339: {input:?}"),
1876    };
1877    Ok((time::OffsetDateTime::now_utc() + duration)
1878        .format(&time::format_description::well_known::Rfc3339)
1879        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()))
1880}
1881
1882fn cmd_send(
1883    peer: &str,
1884    kind: &str,
1885    body_arg: &str,
1886    deadline: Option<&str>,
1887    as_json: bool,
1888) -> Result<()> {
1889    if !config::is_initialized()? {
1890        bail!("not initialized — run `wire init <handle>` first");
1891    }
1892    let peer = crate::agent_card::bare_handle(peer);
1893    let sk_seed = config::read_private_key()?;
1894    let card = config::read_agent_card()?;
1895    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
1896    let handle = crate::agent_card::display_handle_from_did(did).to_string();
1897    let pk_b64 = card
1898        .get("verify_keys")
1899        .and_then(Value::as_object)
1900        .and_then(|m| m.values().next())
1901        .and_then(|v| v.get("key"))
1902        .and_then(Value::as_str)
1903        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?;
1904    let pk_bytes = crate::signing::b64decode(pk_b64)?;
1905
1906    // Body: literal string, `@/path/to/body.json`, or `-` for stdin.
1907    // P0.S (0.5.11): stdin support lets shells pipe in long content
1908    // without quoting/escaping ceremony, and supports heredocs naturally:
1909    //   wire send peer - <<EOF ... EOF
1910    let body_value: Value = if body_arg == "-" {
1911        use std::io::Read;
1912        let mut raw = String::new();
1913        std::io::stdin()
1914            .read_to_string(&mut raw)
1915            .with_context(|| "reading body from stdin")?;
1916        // Try parsing as JSON first; fall back to string literal for
1917        // plain-text bodies.
1918        serde_json::from_str(raw.trim_end()).unwrap_or(Value::String(raw))
1919    } else if let Some(path) = body_arg.strip_prefix('@') {
1920        let raw =
1921            std::fs::read_to_string(path).with_context(|| format!("reading body file {path:?}"))?;
1922        serde_json::from_str(&raw).unwrap_or(Value::String(raw))
1923    } else {
1924        Value::String(body_arg.to_string())
1925    };
1926
1927    let kind_id = parse_kind(kind)?;
1928
1929    let now = time::OffsetDateTime::now_utc()
1930        .format(&time::format_description::well_known::Rfc3339)
1931        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
1932
1933    let mut event = json!({
1934        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
1935        "timestamp": now,
1936        "from": did,
1937        "to": format!("did:wire:{peer}"),
1938        "type": kind,
1939        "kind": kind_id,
1940        "body": body_value,
1941    });
1942    if let Some(deadline) = deadline {
1943        event["time_sensitive_until"] = json!(parse_deadline_until(deadline)?);
1944    }
1945    let signed = sign_message_v31(&event, &sk_seed, &pk_bytes, &handle)?;
1946    let event_id = signed["event_id"].as_str().unwrap_or("").to_string();
1947
1948    // R4: best-effort attentiveness pre-flight. Look up the peer's slot
1949    // coords in relay-state and ask the relay how recently the peer pulled.
1950    // Warn on stderr if the peer hasn't pulled in >5min OR has never pulled.
1951    // Never blocks the send — the event still queues to outbox.
1952    maybe_warn_peer_attentiveness(peer);
1953
1954    // For now we append to outbox JSONL and rely on a future daemon to push
1955    // to the relay. That's the file-system contract from AGENT_INTEGRATION.md.
1956    // Append goes through `config::append_outbox_record` which holds a per-
1957    // path mutex so concurrent senders cannot interleave bytes mid-line.
1958    let line = serde_json::to_vec(&signed)?;
1959    let outbox = config::append_outbox_record(peer, &line)?;
1960
1961    if as_json {
1962        println!(
1963            "{}",
1964            serde_json::to_string(&json!({
1965                "event_id": event_id,
1966                "status": "queued",
1967                "peer": peer,
1968                "outbox": outbox.to_string_lossy(),
1969            }))?
1970        );
1971    } else {
1972        println!(
1973            "queued event {event_id} → {peer} (outbox: {})",
1974            outbox.display()
1975        );
1976    }
1977    Ok(())
1978}
1979
1980fn parse_kind(s: &str) -> Result<u32> {
1981    if let Ok(n) = s.parse::<u32>() {
1982        return Ok(n);
1983    }
1984    for (id, name) in crate::signing::kinds() {
1985        if *name == s {
1986            return Ok(*id);
1987        }
1988    }
1989    // Unknown name — default to kind 1 (decision) for v0.1.
1990    Ok(1)
1991}
1992
1993// ---------- tail ----------
1994
1995fn cmd_tail(peer: Option<&str>, as_json: bool, limit: usize) -> Result<()> {
1996    let inbox = config::inbox_dir()?;
1997    if !inbox.exists() {
1998        if !as_json {
1999            eprintln!("no inbox yet — daemon hasn't run, or no events received");
2000        }
2001        return Ok(());
2002    }
2003    let trust = config::read_trust()?;
2004    let mut count = 0usize;
2005
2006    let entries: Vec<_> = std::fs::read_dir(&inbox)?
2007        .filter_map(|e| e.ok())
2008        .map(|e| e.path())
2009        .filter(|p| {
2010            p.extension().map(|x| x == "jsonl").unwrap_or(false)
2011                && match peer {
2012                    Some(want) => p.file_stem().and_then(|s| s.to_str()) == Some(want),
2013                    None => true,
2014                }
2015        })
2016        .collect();
2017
2018    for path in entries {
2019        let body = std::fs::read_to_string(&path)?;
2020        for line in body.lines() {
2021            let event: Value = match serde_json::from_str(line) {
2022                Ok(v) => v,
2023                Err(_) => continue,
2024            };
2025            let verified = verify_message_v31(&event, &trust).is_ok();
2026            if as_json {
2027                let mut event_with_meta = event.clone();
2028                if let Some(obj) = event_with_meta.as_object_mut() {
2029                    obj.insert("verified".into(), json!(verified));
2030                }
2031                println!("{}", serde_json::to_string(&event_with_meta)?);
2032            } else {
2033                let ts = event
2034                    .get("timestamp")
2035                    .and_then(Value::as_str)
2036                    .unwrap_or("?");
2037                let from = event.get("from").and_then(Value::as_str).unwrap_or("?");
2038                let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0);
2039                let kind_name = event.get("type").and_then(Value::as_str).unwrap_or("?");
2040                let summary = event
2041                    .get("body")
2042                    .map(|b| match b {
2043                        Value::String(s) => s.clone(),
2044                        _ => b.to_string(),
2045                    })
2046                    .unwrap_or_default();
2047                let mark = if verified { "✓" } else { "✗" };
2048                let deadline = event
2049                    .get("time_sensitive_until")
2050                    .and_then(Value::as_str)
2051                    .map(|d| format!(" deadline: {d}"))
2052                    .unwrap_or_default();
2053                println!("[{ts} {from} kind={kind} {kind_name}{deadline}] {summary} | sig {mark}");
2054            }
2055            count += 1;
2056            if limit > 0 && count >= limit {
2057                return Ok(());
2058            }
2059        }
2060    }
2061    Ok(())
2062}
2063
2064// ---------- monitor (live-tail across all peers, harness-friendly) ----------
2065
2066/// Events filtered out of `wire monitor` by default — pair handshake +
2067/// liveness pings. Operators almost never want these surfaced; an explicit
2068/// `--include-handshake` brings them back.
2069fn monitor_is_noise_kind(kind: &str) -> bool {
2070    matches!(kind, "pair_drop" | "pair_drop_ack" | "heartbeat")
2071}
2072
2073/// Render a single InboxEvent for `wire monitor` output. JSON form emits the
2074/// full structured event for tooling consumption; the plain form is a tight
2075/// one-line summary suitable as a harness stream-watcher notification.
2076fn monitor_render(e: &crate::inbox_watch::InboxEvent, as_json: bool) -> Result<String> {
2077    if as_json {
2078        Ok(serde_json::to_string(e)?)
2079    } else {
2080        let eid_short: String = e.event_id.chars().take(12).collect();
2081        let body = e.body_preview.replace('\n', " ");
2082        let ts: String = e.timestamp.chars().take(19).collect();
2083        Ok(format!("[{ts}] {}/{} ({eid_short}) {body}", e.peer, e.kind))
2084    }
2085}
2086
2087/// `wire monitor` — long-running line-per-event stream of new inbox events.
2088///
2089/// Built for agent harnesses that have an "every stdout line is a chat
2090/// notification" stream watcher (Claude Code Monitor tool, etc.). One
2091/// command, persistent, filtered. Replaces the manual `tail -F inbox/*.jsonl
2092/// | python parse | grep -v pair_drop` pipeline operators improvise on day
2093/// one of every wire session.
2094///
2095/// Default filter strips `pair_drop`, `pair_drop_ack`, and `heartbeat` —
2096/// pure handshake / liveness noise that operators almost never want
2097/// surfaced. Pass `--include-handshake` if you do.
2098///
2099/// Cursor: in-memory only. Starts from EOF (so a fresh `wire monitor`
2100/// doesn't drown the operator in replay), with optional `--replay N` to
2101/// emit the last N events first.
2102fn cmd_monitor(
2103    peer_filter: Option<&str>,
2104    as_json: bool,
2105    include_handshake: bool,
2106    interval_ms: u64,
2107    replay: usize,
2108) -> Result<()> {
2109    let inbox_dir = config::inbox_dir()?;
2110    if !inbox_dir.exists() {
2111        if !as_json {
2112            eprintln!(
2113                "wire monitor: inbox dir {inbox_dir:?} missing — has the daemon ever run?"
2114            );
2115        }
2116        // Still proceed — InboxWatcher::from_dir_head handles missing dir.
2117    }
2118
2119    // Optional replay — read existing files and emit the last `replay` events
2120    // (post-filter) before going live. Useful when the harness restarts and
2121    // wants recent context.
2122    if replay > 0 && inbox_dir.exists() {
2123        let mut all: Vec<crate::inbox_watch::InboxEvent> = Vec::new();
2124        for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
2125            let path = entry.path();
2126            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2127                continue;
2128            }
2129            let peer = match path.file_stem().and_then(|s| s.to_str()) {
2130                Some(s) => s.to_string(),
2131                None => continue,
2132            };
2133            if let Some(filter) = peer_filter {
2134                if peer != filter {
2135                    continue;
2136                }
2137            }
2138            let body = std::fs::read_to_string(&path).unwrap_or_default();
2139            for line in body.lines() {
2140                let line = line.trim();
2141                if line.is_empty() {
2142                    continue;
2143                }
2144                let signed: Value = match serde_json::from_str(line) {
2145                    Ok(v) => v,
2146                    Err(_) => continue,
2147                };
2148                let ev = crate::inbox_watch::InboxEvent::from_signed(
2149                    &peer,
2150                    signed,
2151                    /* verified */ true,
2152                );
2153                if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2154                    continue;
2155                }
2156                all.push(ev);
2157            }
2158        }
2159        // Sort by timestamp string (RFC3339-ish — lexicographic order matches
2160        // chronological for same-zoned timestamps).
2161        all.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
2162        let start = all.len().saturating_sub(replay);
2163        for ev in &all[start..] {
2164            println!("{}", monitor_render(ev, as_json)?);
2165        }
2166        use std::io::Write;
2167        std::io::stdout().flush().ok();
2168    }
2169
2170    // Live loop. InboxWatcher::from_head() seeds cursors at current EOF, so
2171    // the first poll only returns events that arrived AFTER startup.
2172    let mut w = crate::inbox_watch::InboxWatcher::from_head()?;
2173    let sleep_dur = std::time::Duration::from_millis(interval_ms.max(50));
2174
2175    loop {
2176        let events = w.poll()?;
2177        let mut wrote = false;
2178        for ev in events {
2179            if let Some(filter) = peer_filter {
2180                if ev.peer != filter {
2181                    continue;
2182                }
2183            }
2184            if !include_handshake && monitor_is_noise_kind(&ev.kind) {
2185                continue;
2186            }
2187            println!("{}", monitor_render(&ev, as_json)?);
2188            wrote = true;
2189        }
2190        if wrote {
2191            use std::io::Write;
2192            std::io::stdout().flush().ok();
2193        }
2194        std::thread::sleep(sleep_dur);
2195    }
2196}
2197
2198#[cfg(test)]
2199mod tier_tests {
2200    use super::*;
2201    use serde_json::json;
2202
2203    fn trust_with(handle: &str, tier: &str) -> Value {
2204        json!({
2205            "version": 1,
2206            "agents": {
2207                handle: {
2208                    "tier": tier,
2209                    "did": format!("did:wire:{handle}"),
2210                    "card": {"capabilities": ["wire/v3.1"]}
2211                }
2212            }
2213        })
2214    }
2215
2216    #[test]
2217    fn pending_ack_when_verified_but_no_slot_token() {
2218        // P0.Y rule: after `wire add`, trust says VERIFIED but the peer's
2219        // slot_token hasn't arrived yet. Display PENDING_ACK so the
2220        // operator knows wire send won't work yet.
2221        let trust = trust_with("willard", "VERIFIED");
2222        let relay_state = json!({
2223            "peers": {
2224                "willard": {
2225                    "relay_url": "https://relay",
2226                    "slot_id": "abc",
2227                    "slot_token": "",
2228                }
2229            }
2230        });
2231        assert_eq!(
2232            effective_peer_tier(&trust, &relay_state, "willard"),
2233            "PENDING_ACK"
2234        );
2235    }
2236
2237    #[test]
2238    fn verified_when_slot_token_present() {
2239        let trust = trust_with("willard", "VERIFIED");
2240        let relay_state = json!({
2241            "peers": {
2242                "willard": {
2243                    "relay_url": "https://relay",
2244                    "slot_id": "abc",
2245                    "slot_token": "tok123",
2246                }
2247            }
2248        });
2249        assert_eq!(
2250            effective_peer_tier(&trust, &relay_state, "willard"),
2251            "VERIFIED"
2252        );
2253    }
2254
2255    #[test]
2256    fn raw_tier_passes_through_for_non_verified() {
2257        // PENDING_ACK should ONLY decorate VERIFIED. UNTRUSTED stays
2258        // UNTRUSTED regardless of slot_token state.
2259        let trust = trust_with("willard", "UNTRUSTED");
2260        let relay_state = json!({
2261            "peers": {"willard": {"slot_token": ""}}
2262        });
2263        assert_eq!(
2264            effective_peer_tier(&trust, &relay_state, "willard"),
2265            "UNTRUSTED"
2266        );
2267    }
2268
2269    #[test]
2270    fn pending_ack_when_relay_state_missing_peer() {
2271        // After wire add, trust gets updated BEFORE relay_state.peers does.
2272        // If relay_state has no entry for the peer at all, the operator
2273        // still hasn't completed the bilateral pin — show PENDING_ACK.
2274        let trust = trust_with("willard", "VERIFIED");
2275        let relay_state = json!({"peers": {}});
2276        assert_eq!(
2277            effective_peer_tier(&trust, &relay_state, "willard"),
2278            "PENDING_ACK"
2279        );
2280    }
2281}
2282
2283#[cfg(test)]
2284mod monitor_tests {
2285    use super::*;
2286    use crate::inbox_watch::InboxEvent;
2287    use serde_json::Value;
2288
2289    fn ev(peer: &str, kind: &str, body: &str) -> InboxEvent {
2290        InboxEvent {
2291            peer: peer.to_string(),
2292            event_id: "abcd1234567890ef".to_string(),
2293            kind: kind.to_string(),
2294            body_preview: body.to_string(),
2295            verified: true,
2296            timestamp: "2026-05-15T23:14:07.123456Z".to_string(),
2297            raw: Value::Null,
2298        }
2299    }
2300
2301    #[test]
2302    fn monitor_filter_drops_handshake_kinds_by_default() {
2303        // The whole point: pair_drop / pair_drop_ack / heartbeat are
2304        // protocol noise. If they leak into the operator's chat stream by
2305        // default, the recipe is useless ("wire monitor talks too much,
2306        // disabled it"). Burn this rule in.
2307        assert!(monitor_is_noise_kind("pair_drop"));
2308        assert!(monitor_is_noise_kind("pair_drop_ack"));
2309        assert!(monitor_is_noise_kind("heartbeat"));
2310
2311        // Real-payload kinds — operator wants every one.
2312        assert!(!monitor_is_noise_kind("claim"));
2313        assert!(!monitor_is_noise_kind("decision"));
2314        assert!(!monitor_is_noise_kind("ack"));
2315        assert!(!monitor_is_noise_kind("request"));
2316        assert!(!monitor_is_noise_kind("note"));
2317        // Unknown future kinds shouldn't be filtered as noise either —
2318        // operator probably wants to see something they don't recognise,
2319        // not have it silently dropped (the P0.1 lesson at the UX layer).
2320        assert!(!monitor_is_noise_kind("future_kind_we_dont_know"));
2321    }
2322
2323    #[test]
2324    fn monitor_render_plain_is_one_short_line() {
2325        let e = ev("willard", "claim", "real v8 train shipped 1350 steps");
2326        let line = monitor_render(&e, false).unwrap();
2327        // Must be single-line.
2328        assert!(!line.contains('\n'), "render must be one line: {line}");
2329        // Must include peer, kind, body fragment, short event_id.
2330        assert!(line.contains("willard"));
2331        assert!(line.contains("claim"));
2332        assert!(line.contains("real v8 train"));
2333        // Short event id (first 12 chars).
2334        assert!(line.contains("abcd12345678"));
2335        assert!(!line.contains("abcd1234567890ef"), "should truncate full id");
2336        // RFC3339-ish second precision.
2337        assert!(line.contains("2026-05-15T23:14:07"));
2338    }
2339
2340    #[test]
2341    fn monitor_render_strips_newlines_from_body() {
2342        // Multi-line bodies (markdown lists, code, etc.) must collapse to
2343        // one line — otherwise a single message produces multiple
2344        // notifications in the harness, ruining the "one event = one line"
2345        // contract the Monitor tool relies on.
2346        let e = ev("spark", "claim", "line one\nline two\nline three");
2347        let line = monitor_render(&e, false).unwrap();
2348        assert!(!line.contains('\n'), "newlines must be stripped: {line}");
2349        assert!(line.contains("line one line two line three"));
2350    }
2351
2352    #[test]
2353    fn monitor_render_json_is_valid_jsonl() {
2354        let e = ev("spark", "claim", "hi");
2355        let line = monitor_render(&e, true).unwrap();
2356        assert!(!line.contains('\n'));
2357        let parsed: Value = serde_json::from_str(&line).expect("valid JSONL");
2358        assert_eq!(parsed["peer"], "spark");
2359        assert_eq!(parsed["kind"], "claim");
2360        assert_eq!(parsed["body_preview"], "hi");
2361    }
2362
2363    #[test]
2364    fn monitor_does_not_drop_on_verified_null() {
2365        // Spark's bug confession on 2026-05-15: their monitor pipeline ran
2366        // `select(.verified == true)` against inbox JSONL. Daemon writes
2367        // events with verified=null (verification happens at tail-time, not
2368        // write-time), so the filter silently rejected everything — same
2369        // anti-pattern as P0.1 at the JSON-jq level. Cost: 4 of my events
2370        // never surfaced for ~30min.
2371        //
2372        // wire monitor's render path must NOT consult `.verified` for any
2373        // filter decision. Lock that in here so a future "be conservative,
2374        // only emit verified" patch can't quietly land.
2375        let mut e = ev("spark", "claim", "from disk with verified=null");
2376        e.verified = false; // worst case — even if disk says unverified, emit
2377        let line = monitor_render(&e, false).unwrap();
2378        assert!(line.contains("from disk with verified=null"));
2379        // Noise filter operates purely on kind, never on verified.
2380        assert!(!monitor_is_noise_kind("claim"));
2381    }
2382}
2383
2384// ---------- verify ----------
2385
2386fn cmd_verify(path: &str, as_json: bool) -> Result<()> {
2387    let body = if path == "-" {
2388        let mut buf = String::new();
2389        use std::io::Read;
2390        std::io::stdin().read_to_string(&mut buf)?;
2391        buf
2392    } else {
2393        std::fs::read_to_string(path).with_context(|| format!("reading {path}"))?
2394    };
2395    let event: Value = serde_json::from_str(&body)?;
2396    let trust = config::read_trust()?;
2397    match verify_message_v31(&event, &trust) {
2398        Ok(()) => {
2399            if as_json {
2400                println!("{}", serde_json::to_string(&json!({"verified": true}))?);
2401            } else {
2402                println!("verified ✓");
2403            }
2404            Ok(())
2405        }
2406        Err(e) => {
2407            let reason = e.to_string();
2408            if as_json {
2409                println!(
2410                    "{}",
2411                    serde_json::to_string(&json!({"verified": false, "reason": reason}))?
2412                );
2413            } else {
2414                eprintln!("FAILED: {reason}");
2415            }
2416            std::process::exit(1);
2417        }
2418    }
2419}
2420
2421// ---------- mcp / relay-server stubs ----------
2422
2423fn cmd_mcp() -> Result<()> {
2424    crate::mcp::run()
2425}
2426
2427fn cmd_relay_server(bind: &str, local_only: bool) -> Result<()> {
2428    // v0.5.17: --local-only refuses non-loopback binds. Catches the
2429    // "wait did I just bind a publicly-reachable local-only relay" mistake
2430    // at startup rather than discovering it via an empty phonebook later.
2431    if local_only {
2432        validate_loopback_bind(bind)?;
2433    }
2434    // Default state dir for the relay process: $WIRE_HOME/state/wire-relay
2435    // (or `dirs::state_dir()/wire-relay`). Distinct from the CLI's state dir
2436    // so a single user can run both client and server on one machine.
2437    // For --local-only, suffix with /local so a single operator can run
2438    // both a federation relay and a local-only relay without state collision.
2439    let base = if let Ok(home) = std::env::var("WIRE_HOME") {
2440        std::path::PathBuf::from(home)
2441            .join("state")
2442            .join("wire-relay")
2443    } else {
2444        dirs::state_dir()
2445            .or_else(dirs::data_local_dir)
2446            .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
2447            .join("wire-relay")
2448    };
2449    let state_dir = if local_only { base.join("local") } else { base };
2450    let runtime = tokio::runtime::Builder::new_multi_thread()
2451        .enable_all()
2452        .build()?;
2453    runtime.block_on(crate::relay_server::serve_with_mode(
2454        bind,
2455        state_dir,
2456        crate::relay_server::ServerMode { local_only },
2457    ))
2458}
2459
2460/// v0.5.17 loopback-bind guard. Refuses any address whose host portion
2461/// resolves to something outside `127.0.0.0/8` or `::1`. Specifically
2462/// rejects `0.0.0.0`, `::`, `0:0:0:0:0:0:0:0`, and any non-loopback
2463/// IPv4/IPv6 literal. Hostname-form addresses (e.g. `localhost`) are
2464/// accepted only if they resolve to a loopback address.
2465fn validate_loopback_bind(bind: &str) -> Result<()> {
2466    // Split host:port. IPv6 literals use `[::]:port` form.
2467    let host = if let Some(stripped) = bind.strip_prefix('[') {
2468        let close = stripped
2469            .find(']')
2470            .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
2471        stripped[..close].to_string()
2472    } else {
2473        bind.rsplit_once(':')
2474            .map(|(h, _)| h.to_string())
2475            .unwrap_or_else(|| bind.to_string())
2476    };
2477    use std::net::ToSocketAddrs;
2478    let probe = format!("{host}:0");
2479    let resolved: Vec<_> = probe
2480        .to_socket_addrs()
2481        .with_context(|| format!("resolving bind host {host:?}"))?
2482        .collect();
2483    if resolved.is_empty() {
2484        bail!("--local-only: bind host {host:?} resolved to no addresses");
2485    }
2486    for addr in &resolved {
2487        if !addr.ip().is_loopback() {
2488            bail!(
2489                "--local-only refuses non-loopback bind: {host:?} resolves to {} \
2490                 which is not in 127.0.0.0/8 or [::1]. Remove --local-only to bind \
2491                 publicly, or use 127.0.0.1 / [::1] / localhost.",
2492                addr.ip()
2493            );
2494        }
2495    }
2496    Ok(())
2497}
2498
2499// ---------- bind-relay ----------
2500
2501fn cmd_bind_relay(url: &str, as_json: bool) -> Result<()> {
2502    if !config::is_initialized()? {
2503        bail!("not initialized — run `wire init <handle>` first");
2504    }
2505    let card = config::read_agent_card()?;
2506    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
2507    let handle = crate::agent_card::display_handle_from_did(did).to_string();
2508
2509    let normalized = url.trim_end_matches('/');
2510    let client = crate::relay_client::RelayClient::new(normalized);
2511    client.check_healthz()?;
2512    let alloc = client.allocate_slot(Some(&handle))?;
2513    let mut state = config::read_relay_state()?;
2514    state["self"] = json!({
2515        "relay_url": url,
2516        "slot_id": alloc.slot_id,
2517        "slot_token": alloc.slot_token,
2518    });
2519    config::write_relay_state(&state)?;
2520
2521    if as_json {
2522        println!(
2523            "{}",
2524            serde_json::to_string(&json!({
2525                "relay_url": url,
2526                "slot_id": alloc.slot_id,
2527                "slot_token_present": true,
2528            }))?
2529        );
2530    } else {
2531        println!("bound to relay {url}");
2532        println!("slot_id: {}", alloc.slot_id);
2533        println!(
2534            "(slot_token written to {} mode 0600)",
2535            config::relay_state_path()?.display()
2536        );
2537    }
2538    Ok(())
2539}
2540
2541// ---------- add-peer-slot ----------
2542
2543fn cmd_add_peer_slot(
2544    handle: &str,
2545    url: &str,
2546    slot_id: &str,
2547    slot_token: &str,
2548    as_json: bool,
2549) -> Result<()> {
2550    let mut state = config::read_relay_state()?;
2551    let peers = state["peers"]
2552        .as_object_mut()
2553        .ok_or_else(|| anyhow!("relay state missing 'peers' object"))?;
2554    peers.insert(
2555        handle.to_string(),
2556        json!({
2557            "relay_url": url,
2558            "slot_id": slot_id,
2559            "slot_token": slot_token,
2560        }),
2561    );
2562    config::write_relay_state(&state)?;
2563    if as_json {
2564        println!(
2565            "{}",
2566            serde_json::to_string(&json!({
2567                "handle": handle,
2568                "relay_url": url,
2569                "slot_id": slot_id,
2570                "added": true,
2571            }))?
2572        );
2573    } else {
2574        println!("pinned peer slot for {handle} at {url} ({slot_id})");
2575    }
2576    Ok(())
2577}
2578
2579// ---------- push ----------
2580
2581fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
2582    let state = config::read_relay_state()?;
2583    let peers = state["peers"].as_object().cloned().unwrap_or_default();
2584    if peers.is_empty() {
2585        bail!(
2586            "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
2587        );
2588    }
2589    let outbox_dir = config::outbox_dir()?;
2590    // v0.5.13 loud-fail: warn on outbox files that don't match a pinned peer.
2591    // Pre-v0.5.13 `wire send peer@relay` wrote to `peer@relay.jsonl` while
2592    // push only enumerated bare-handle files. After upgrade, stale FQDN-named
2593    // files sit on disk forever; warn so operator can `cat fqdn.jsonl >> handle.jsonl`.
2594    if outbox_dir.exists() {
2595        let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
2596        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
2597            let path = entry.path();
2598            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
2599                continue;
2600            }
2601            let stem = match path.file_stem().and_then(|s| s.to_str()) {
2602                Some(s) => s.to_string(),
2603                None => continue,
2604            };
2605            if pinned.contains(&stem) {
2606                continue;
2607            }
2608            // Try the bare-handle of the orphaned stem — if THAT matches a
2609            // pinned peer, the stem is a stale FQDN-suffixed file.
2610            let bare = crate::agent_card::bare_handle(&stem);
2611            if pinned.contains(bare) {
2612                eprintln!(
2613                    "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
2614                     Merge with: `cat {} >> {}` then delete the FQDN file.",
2615                    stem,
2616                    path.display(),
2617                    outbox_dir.join(format!("{bare}.jsonl")).display(),
2618                );
2619            }
2620        }
2621    }
2622    if !outbox_dir.exists() {
2623        if as_json {
2624            println!(
2625                "{}",
2626                serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
2627            );
2628        } else {
2629            println!("phyllis: nothing to dial out — write a message first with `wire send`");
2630        }
2631        return Ok(());
2632    }
2633
2634    let mut pushed = Vec::new();
2635    let mut skipped = Vec::new();
2636
2637    // v0.5.17: walk each peer's pinned endpoints in priority order (local
2638    // first if we share a local relay, federation second). Try POST on the
2639    // first endpoint; on transport failure, fall through to the next.
2640    // Falls back to the v0.5.16 legacy single-endpoint code path when the
2641    // peer record carries no `endpoints[]` array (back-compat).
2642    for (peer_handle, _) in peers.iter() {
2643        if let Some(want) = peer_filter
2644            && peer_handle != want
2645        {
2646            continue;
2647        }
2648        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
2649        if !outbox.exists() {
2650            continue;
2651        }
2652        let ordered_endpoints =
2653            crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
2654        if ordered_endpoints.is_empty() {
2655            // Unreachable peer (no federation endpoint AND our local
2656            // relay doesn't match the peer's). Skip with a loud reason
2657            // rather than silently dropping events.
2658            for line in std::fs::read_to_string(&outbox)
2659                .unwrap_or_default()
2660                .lines()
2661            {
2662                let event: Value = match serde_json::from_str(line) {
2663                    Ok(v) => v,
2664                    Err(_) => continue,
2665                };
2666                let event_id = event
2667                    .get("event_id")
2668                    .and_then(Value::as_str)
2669                    .unwrap_or("")
2670                    .to_string();
2671                skipped.push(json!({
2672                    "peer": peer_handle,
2673                    "event_id": event_id,
2674                    "reason": "no reachable endpoint pinned for peer",
2675                }));
2676            }
2677            continue;
2678        }
2679        let body = std::fs::read_to_string(&outbox)?;
2680        for line in body.lines() {
2681            let event: Value = match serde_json::from_str(line) {
2682                Ok(v) => v,
2683                Err(_) => continue,
2684            };
2685            let event_id = event
2686                .get("event_id")
2687                .and_then(Value::as_str)
2688                .unwrap_or("")
2689                .to_string();
2690
2691            let mut delivered = false;
2692            let mut last_err_reason: Option<String> = None;
2693            for endpoint in &ordered_endpoints {
2694                let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2695                match client.post_event(&endpoint.slot_id, &endpoint.slot_token, &event) {
2696                    Ok(resp) => {
2697                        if resp.status == "duplicate" {
2698                            skipped.push(json!({
2699                                "peer": peer_handle,
2700                                "event_id": event_id,
2701                                "reason": "duplicate",
2702                                "endpoint": endpoint.relay_url,
2703                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2704                            }));
2705                        } else {
2706                            pushed.push(json!({
2707                                "peer": peer_handle,
2708                                "event_id": event_id,
2709                                "endpoint": endpoint.relay_url,
2710                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
2711                            }));
2712                        }
2713                        delivered = true;
2714                        break;
2715                    }
2716                    Err(e) => {
2717                        // Local-first endpoint failed; record reason and
2718                        // try the next endpoint silently (operator sees
2719                        // the federation success). If every endpoint
2720                        // fails, the last reason is what gets reported.
2721                        last_err_reason =
2722                            Some(crate::relay_client::format_transport_error(&e));
2723                    }
2724                }
2725            }
2726            if !delivered {
2727                skipped.push(json!({
2728                    "peer": peer_handle,
2729                    "event_id": event_id,
2730                    "reason": last_err_reason.unwrap_or_else(|| "all endpoints failed".to_string()),
2731                }));
2732            }
2733        }
2734    }
2735
2736    if as_json {
2737        println!(
2738            "{}",
2739            serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
2740        );
2741    } else {
2742        println!(
2743            "pushed {} event(s); skipped {} ({})",
2744            pushed.len(),
2745            skipped.len(),
2746            if skipped.is_empty() {
2747                "none"
2748            } else {
2749                "see --json for detail"
2750            }
2751        );
2752    }
2753    Ok(())
2754}
2755
2756// ---------- pull ----------
2757
2758fn cmd_pull(as_json: bool) -> Result<()> {
2759    let state = config::read_relay_state()?;
2760    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2761    if self_state.is_null() {
2762        bail!("self slot not bound — run `wire bind-relay <url>` first");
2763    }
2764
2765    // v0.5.17: pull from every endpoint in self.endpoints (federation +
2766    // optional local). Each endpoint has its own per-scope cursor so we
2767    // don't re-pull events we've already seen on that path. Events from
2768    // all endpoints feed into the same inbox JSONL via process_events;
2769    // dedup by event_id is the last line of defense.
2770    // Falls back to a single federation endpoint synthesized from the
2771    // top-level legacy fields when self.endpoints is absent (v0.5.16
2772    // back-compat).
2773    let endpoints = crate::endpoints::self_endpoints(&state);
2774    if endpoints.is_empty() {
2775        bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
2776    }
2777
2778    let inbox_dir = config::inbox_dir()?;
2779    config::ensure_dirs()?;
2780
2781    let mut total_seen = 0usize;
2782    let mut all_written: Vec<Value> = Vec::new();
2783    let mut all_rejected: Vec<Value> = Vec::new();
2784    let mut all_blocked = false;
2785    let mut all_advance_cursor_to: Option<String> = None;
2786
2787    for endpoint in &endpoints {
2788        let cursor_key = endpoint_cursor_key(endpoint.scope);
2789        let last_event_id = self_state
2790            .get(&cursor_key)
2791            .and_then(Value::as_str)
2792            .map(str::to_string);
2793        let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
2794        let events = match client.list_events(
2795            &endpoint.slot_id,
2796            &endpoint.slot_token,
2797            last_event_id.as_deref(),
2798            Some(1000),
2799        ) {
2800            Ok(ev) => ev,
2801            Err(e) => {
2802                // One endpoint's failure shouldn't kill the whole pull.
2803                // The local-relay-down case in particular needs to
2804                // gracefully continue against federation.
2805                eprintln!(
2806                    "wire pull: endpoint {} ({:?}) errored: {}; continuing",
2807                    endpoint.relay_url,
2808                    endpoint.scope,
2809                    crate::relay_client::format_transport_error(&e),
2810                );
2811                continue;
2812            }
2813        };
2814        total_seen += events.len();
2815        let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
2816        all_written.extend(result.written.iter().cloned());
2817        all_rejected.extend(result.rejected.iter().cloned());
2818        if result.blocked {
2819            all_blocked = true;
2820        }
2821        // Advance per-endpoint cursor. The cursor key is scope-specific
2822        // so federation and local don't trample each other.
2823        if let Some(eid) = result.advance_cursor_to.clone() {
2824            if endpoint.scope == crate::endpoints::EndpointScope::Federation {
2825                all_advance_cursor_to = Some(eid.clone());
2826            }
2827            let key = cursor_key.clone();
2828            config::update_relay_state(|state| {
2829                if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
2830                    self_obj.insert(key, Value::String(eid));
2831                }
2832                Ok(())
2833            })?;
2834        }
2835    }
2836
2837    // Compatibility shim for the legacy single-cursor code paths below:
2838    // `result` used to come from one process_events call; we now have
2839    // per-endpoint results aggregated into the all_* accumulators.
2840    // Reconstruct a synthetic result for the remaining display logic.
2841    let result = crate::pull::PullResult {
2842        written: all_written,
2843        rejected: all_rejected,
2844        blocked: all_blocked,
2845        advance_cursor_to: all_advance_cursor_to,
2846    };
2847    let events_len = total_seen;
2848
2849    // Cursor advance happened per-endpoint above; no aggregate cursor
2850    // write needed here.
2851
2852    if as_json {
2853        println!(
2854            "{}",
2855            serde_json::to_string(&json!({
2856                "written": result.written,
2857                "rejected": result.rejected,
2858                "total_seen": events_len,
2859                "cursor_blocked": result.blocked,
2860                "cursor_advanced_to": result.advance_cursor_to,
2861            }))?
2862        );
2863    } else {
2864        let blocking = result
2865            .rejected
2866            .iter()
2867            .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
2868            .count();
2869        if blocking > 0 {
2870            println!(
2871                "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
2872                events_len,
2873                result.written.len(),
2874                result.rejected.len(),
2875                blocking,
2876            );
2877        } else {
2878            println!(
2879                "pulled {} event(s); wrote {}; rejected {}",
2880                events_len,
2881                result.written.len(),
2882                result.rejected.len(),
2883            );
2884        }
2885    }
2886    Ok(())
2887}
2888
2889/// v0.5.17: cursor key for an endpoint's per-scope read position.
2890/// Federation keeps the v0.5.16 legacy key `last_pulled_event_id` for
2891/// back-compat with on-disk relay_state files; local uses a
2892/// `_local` suffix.
2893fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
2894    match scope {
2895        crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
2896        crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
2897    }
2898}
2899
2900// ---------- rotate-slot ----------
2901
2902fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
2903    if !config::is_initialized()? {
2904        bail!("not initialized — run `wire init <handle>` first");
2905    }
2906    let mut state = config::read_relay_state()?;
2907    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
2908    if self_state.is_null() {
2909        bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
2910    }
2911    let url = self_state["relay_url"]
2912        .as_str()
2913        .ok_or_else(|| anyhow!("self.relay_url missing"))?
2914        .to_string();
2915    let old_slot_id = self_state["slot_id"]
2916        .as_str()
2917        .ok_or_else(|| anyhow!("self.slot_id missing"))?
2918        .to_string();
2919    let old_slot_token = self_state["slot_token"]
2920        .as_str()
2921        .ok_or_else(|| anyhow!("self.slot_token missing"))?
2922        .to_string();
2923
2924    // Read identity to sign the announcement.
2925    let card = config::read_agent_card()?;
2926    let did = card
2927        .get("did")
2928        .and_then(Value::as_str)
2929        .unwrap_or("")
2930        .to_string();
2931    let handle = crate::agent_card::display_handle_from_did(&did).to_string();
2932    let pk_b64 = card
2933        .get("verify_keys")
2934        .and_then(Value::as_object)
2935        .and_then(|m| m.values().next())
2936        .and_then(|v| v.get("key"))
2937        .and_then(Value::as_str)
2938        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
2939        .to_string();
2940    let pk_bytes = crate::signing::b64decode(&pk_b64)?;
2941    let sk_seed = config::read_private_key()?;
2942
2943    // Allocate new slot on the same relay.
2944    let normalized = url.trim_end_matches('/').to_string();
2945    let client = crate::relay_client::RelayClient::new(&normalized);
2946    client
2947        .check_healthz()
2948        .context("aborting rotation; old slot still valid")?;
2949    let alloc = client.allocate_slot(Some(&handle))?;
2950    let new_slot_id = alloc.slot_id.clone();
2951    let new_slot_token = alloc.slot_token.clone();
2952
2953    // Optionally announce the rotation to every paired peer via the OLD slot.
2954    // Each peer's recipient-side `wire pull` will pick up this event before
2955    // their daemon next polls the new slot — but auto-update of peer's
2956    // relay.json from a wire_close event is a v0.2 daemon feature; for now
2957    // peers see the event and an operator must manually `add-peer-slot` the
2958    // new coords, OR re-pair via SAS.
2959    let mut announced: Vec<String> = Vec::new();
2960    if !no_announce {
2961        let now = time::OffsetDateTime::now_utc()
2962            .format(&time::format_description::well_known::Rfc3339)
2963            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
2964        let body = json!({
2965            "reason": "operator-initiated slot rotation",
2966            "new_relay_url": url,
2967            "new_slot_id": new_slot_id,
2968            // NOTE: new_slot_token deliberately NOT shared in the broadcast.
2969            // In v0.1 slot tokens are bilateral-shared, so peer can post via
2970            // existing add-peer-slot flow if operator chooses to re-issue.
2971        });
2972        let peers = state["peers"].as_object().cloned().unwrap_or_default();
2973        for (peer_handle, _peer_info) in peers.iter() {
2974            let event = json!({
2975                "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
2976                "timestamp": now.clone(),
2977                "from": did,
2978                "to": format!("did:wire:{peer_handle}"),
2979                "type": "wire_close",
2980                "kind": 1201,
2981                "body": body.clone(),
2982            });
2983            let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
2984                Ok(s) => s,
2985                Err(e) => {
2986                    eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
2987                    continue;
2988                }
2989            };
2990            // Post to OUR old slot (we're announcing on our own slot, NOT
2991            // peer's slot — peer reads from us). Wait, this is wrong: peers
2992            // read from THEIR OWN slot via wire pull. To reach peer A, we
2993            // post to peer A's slot. Use the existing per-peer slot mapping.
2994            let peer_info = match state["peers"].get(peer_handle) {
2995                Some(p) => p.clone(),
2996                None => continue,
2997            };
2998            let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
2999            let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
3000            let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
3001            if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
3002                continue;
3003            }
3004            let peer_client = if peer_url == url {
3005                client.clone()
3006            } else {
3007                crate::relay_client::RelayClient::new(peer_url)
3008            };
3009            match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
3010                Ok(_) => announced.push(peer_handle.clone()),
3011                Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
3012            }
3013        }
3014    }
3015
3016    // Swap the self-slot to the new one.
3017    state["self"] = json!({
3018        "relay_url": url,
3019        "slot_id": new_slot_id,
3020        "slot_token": new_slot_token,
3021    });
3022    config::write_relay_state(&state)?;
3023
3024    if as_json {
3025        println!(
3026            "{}",
3027            serde_json::to_string(&json!({
3028                "rotated": true,
3029                "old_slot_id": old_slot_id,
3030                "new_slot_id": new_slot_id,
3031                "relay_url": url,
3032                "announced_to": announced,
3033            }))?
3034        );
3035    } else {
3036        println!("rotated slot on {url}");
3037        println!(
3038            "  old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
3039        );
3040        println!("  new slot_id: {new_slot_id}");
3041        if !announced.is_empty() {
3042            println!(
3043                "  announced wire_close (kind=1201) to: {}",
3044                announced.join(", ")
3045            );
3046        }
3047        println!();
3048        println!("next steps:");
3049        println!("  - peers see the wire_close event in their next `wire pull`");
3050        println!(
3051            "  - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
3052        );
3053        println!("    (or full re-pair via `wire pair-host`/`wire join`)");
3054        println!("  - until they do, you'll receive but they won't be able to reach you");
3055        // Suppress unused warning
3056        let _ = old_slot_token;
3057    }
3058    Ok(())
3059}
3060
3061// ---------- forget-peer ----------
3062
3063fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
3064    let mut trust = config::read_trust()?;
3065    let mut removed_from_trust = false;
3066    if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
3067        && agents.remove(handle).is_some()
3068    {
3069        removed_from_trust = true;
3070    }
3071    config::write_trust(&trust)?;
3072
3073    let mut state = config::read_relay_state()?;
3074    let mut removed_from_relay = false;
3075    if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
3076        && peers.remove(handle).is_some()
3077    {
3078        removed_from_relay = true;
3079    }
3080    config::write_relay_state(&state)?;
3081
3082    let mut purged: Vec<String> = Vec::new();
3083    if purge {
3084        for dir in [config::inbox_dir()?, config::outbox_dir()?] {
3085            let path = dir.join(format!("{handle}.jsonl"));
3086            if path.exists() {
3087                std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
3088                purged.push(path.to_string_lossy().into());
3089            }
3090        }
3091    }
3092
3093    if !removed_from_trust && !removed_from_relay {
3094        if as_json {
3095            println!(
3096                "{}",
3097                serde_json::to_string(&json!({
3098                    "removed": false,
3099                    "reason": format!("peer {handle:?} not pinned"),
3100                }))?
3101            );
3102        } else {
3103            eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
3104        }
3105        return Ok(());
3106    }
3107
3108    if as_json {
3109        println!(
3110            "{}",
3111            serde_json::to_string(&json!({
3112                "handle": handle,
3113                "removed_from_trust": removed_from_trust,
3114                "removed_from_relay_state": removed_from_relay,
3115                "purged_files": purged,
3116            }))?
3117        );
3118    } else {
3119        println!("forgot peer {handle:?}");
3120        if removed_from_trust {
3121            println!("  - removed from trust.json");
3122        }
3123        if removed_from_relay {
3124            println!("  - removed from relay.json");
3125        }
3126        if !purged.is_empty() {
3127            for p in &purged {
3128                println!("  - deleted {p}");
3129            }
3130        } else if !purge {
3131            println!("  (inbox/outbox files preserved; pass --purge to delete them)");
3132        }
3133    }
3134    Ok(())
3135}
3136
3137// ---------- daemon (long-lived push+pull sync) ----------
3138
3139fn cmd_daemon(interval_secs: u64, once: bool, as_json: bool) -> Result<()> {
3140    if !config::is_initialized()? {
3141        bail!("not initialized — run `wire init <handle>` first");
3142    }
3143    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3144
3145    if !as_json {
3146        if once {
3147            eprintln!("wire daemon: single sync cycle, then exit");
3148        } else {
3149            eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
3150        }
3151    }
3152
3153    // Recover from prior crash: any pending pair in transient state had its
3154    // in-memory SPAKE2 secret lost when the previous daemon exited. Release
3155    // the relay slots and mark the files so the operator can re-issue.
3156    if let Err(e) = crate::pending_pair::cleanup_on_startup() {
3157        eprintln!("daemon: pending-pair cleanup_on_startup error: {e:#}");
3158    }
3159
3160    // R1 phase 2: spawn the SSE stream subscriber. On every event pushed
3161    // to our slot, the subscriber signals `wake_rx`; we use it as the
3162    // sleep-or-wake gate of the polling loop. Polling stays as the
3163    // safety net — stream errors fall back transparently to the existing
3164    // interval-based cadence.
3165    let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
3166    if !once {
3167        crate::daemon_stream::spawn_stream_subscriber(wake_tx);
3168    }
3169
3170    loop {
3171        let pushed = run_sync_push().unwrap_or_else(|e| {
3172            eprintln!("daemon: push error: {e:#}");
3173            json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
3174        });
3175        let pulled = run_sync_pull().unwrap_or_else(|e| {
3176            eprintln!("daemon: pull error: {e:#}");
3177            json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
3178        });
3179        let pairs = crate::pending_pair::tick().unwrap_or_else(|e| {
3180            eprintln!("daemon: pending-pair tick error: {e:#}");
3181            json!({"transitions": []})
3182        });
3183
3184        if as_json {
3185            println!(
3186                "{}",
3187                serde_json::to_string(&json!({
3188                    "ts": time::OffsetDateTime::now_utc()
3189                        .format(&time::format_description::well_known::Rfc3339)
3190                        .unwrap_or_default(),
3191                    "push": pushed,
3192                    "pull": pulled,
3193                    "pairs": pairs,
3194                }))?
3195            );
3196        } else {
3197            let pushed_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
3198            let written_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
3199            let rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
3200            let pair_transitions = pairs["transitions"]
3201                .as_array()
3202                .map(|a| a.len())
3203                .unwrap_or(0);
3204            if pushed_n > 0 || written_n > 0 || rejected_n > 0 || pair_transitions > 0 {
3205                eprintln!(
3206                    "daemon: pushed={pushed_n} pulled={written_n} rejected={rejected_n} pair-transitions={pair_transitions}"
3207                );
3208            }
3209            // Loud per-transition logging so operator sees pair progress live.
3210            if let Some(arr) = pairs["transitions"].as_array() {
3211                for t in arr {
3212                    eprintln!(
3213                        "  pair {} : {} → {}",
3214                        t.get("code").and_then(Value::as_str).unwrap_or("?"),
3215                        t.get("from").and_then(Value::as_str).unwrap_or("?"),
3216                        t.get("to").and_then(Value::as_str).unwrap_or("?")
3217                    );
3218                    if let Some(sas) = t.get("sas").and_then(Value::as_str)
3219                        && t.get("to").and_then(Value::as_str) == Some("sas_ready")
3220                    {
3221                        eprintln!("    SAS digits: {}-{}", &sas[..3], &sas[3..]);
3222                        eprintln!(
3223                            "    Run: wire pair-confirm {} {}",
3224                            t.get("code").and_then(Value::as_str).unwrap_or("?"),
3225                            sas
3226                        );
3227                    }
3228                }
3229            }
3230        }
3231
3232        if once {
3233            return Ok(());
3234        }
3235        // Wait either for the next poll-interval tick OR for a stream
3236        // wake signal — whichever comes first. Drain any additional
3237        // wake-ups that accumulated during the previous cycle since one
3238        // pull catches up everything.
3239        let _ = wake_rx.recv_timeout(interval);
3240        while wake_rx.try_recv().is_ok() {}
3241    }
3242}
3243
3244/// Programmatic push (no stdout, no exit on errors). Returns the same JSON
3245/// shape `wire push --json` emits.
3246fn run_sync_push() -> Result<Value> {
3247    let state = config::read_relay_state()?;
3248    let peers = state["peers"].as_object().cloned().unwrap_or_default();
3249    if peers.is_empty() {
3250        return Ok(json!({"pushed": [], "skipped": []}));
3251    }
3252    let outbox_dir = config::outbox_dir()?;
3253    if !outbox_dir.exists() {
3254        return Ok(json!({"pushed": [], "skipped": []}));
3255    }
3256    let mut pushed = Vec::new();
3257    let mut skipped = Vec::new();
3258    for (peer_handle, slot_info) in peers.iter() {
3259        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
3260        if !outbox.exists() {
3261            continue;
3262        }
3263        let url = slot_info["relay_url"].as_str().unwrap_or("");
3264        let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
3265        let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
3266        if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
3267            continue;
3268        }
3269        let client = crate::relay_client::RelayClient::new(url);
3270        let body = std::fs::read_to_string(&outbox)?;
3271        for line in body.lines() {
3272            let event: Value = match serde_json::from_str(line) {
3273                Ok(v) => v,
3274                Err(_) => continue,
3275            };
3276            let event_id = event
3277                .get("event_id")
3278                .and_then(Value::as_str)
3279                .unwrap_or("")
3280                .to_string();
3281            match client.post_event(slot_id, slot_token, &event) {
3282                Ok(resp) => {
3283                    if resp.status == "duplicate" {
3284                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
3285                    } else {
3286                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
3287                    }
3288                }
3289                Err(e) => {
3290                    // v0.5.13: flatten the anyhow chain so TLS / DNS / timeout
3291                    // errors aren't hidden behind the topmost-context URL string.
3292                    // Issue #6 highest-impact silent-fail fix.
3293                    let reason = crate::relay_client::format_transport_error(&e);
3294                    skipped.push(
3295                        json!({"peer": peer_handle, "event_id": event_id, "reason": reason}),
3296                    );
3297                }
3298            }
3299        }
3300    }
3301    Ok(json!({"pushed": pushed, "skipped": skipped}))
3302}
3303
3304/// Programmatic pull. Same shape as `wire pull --json`.
3305fn run_sync_pull() -> Result<Value> {
3306    let state = config::read_relay_state()?;
3307    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
3308    if self_state.is_null() {
3309        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3310    }
3311    let url = self_state["relay_url"].as_str().unwrap_or("");
3312    let slot_id = self_state["slot_id"].as_str().unwrap_or("");
3313    let slot_token = self_state["slot_token"].as_str().unwrap_or("");
3314    let last_event_id = self_state
3315        .get("last_pulled_event_id")
3316        .and_then(Value::as_str)
3317        .map(str::to_string);
3318    if url.is_empty() {
3319        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
3320    }
3321    let client = crate::relay_client::RelayClient::new(url);
3322    let events = client.list_events(slot_id, slot_token, last_event_id.as_deref(), Some(1000))?;
3323    let inbox_dir = config::inbox_dir()?;
3324    config::ensure_dirs()?;
3325
3326    // P0.1 (0.5.11): shared cursor-blocking logic. Daemon's --once path
3327    // must match the CLI's `wire pull` semantics or version-skew bugs
3328    // re-emerge by another route.
3329    let result = crate::pull::process_events(&events, last_event_id, &inbox_dir)?;
3330
3331    // P0.3 (0.5.11): same flock-protected RMW as cmd_pull.
3332    if let Some(eid) = &result.advance_cursor_to {
3333        let eid = eid.clone();
3334        config::update_relay_state(|state| {
3335            if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
3336                self_obj.insert("last_pulled_event_id".into(), Value::String(eid));
3337            }
3338            Ok(())
3339        })?;
3340    }
3341
3342    Ok(json!({
3343        "written": result.written,
3344        "rejected": result.rejected,
3345        "total_seen": events.len(),
3346        "cursor_blocked": result.blocked,
3347        "cursor_advanced_to": result.advance_cursor_to,
3348    }))
3349}
3350
3351// ---------- pin (manual out-of-band peer pairing) ----------
3352
3353fn cmd_pin(card_file: &str, as_json: bool) -> Result<()> {
3354    let body =
3355        std::fs::read_to_string(card_file).with_context(|| format!("reading {card_file}"))?;
3356    let card: Value =
3357        serde_json::from_str(&body).with_context(|| format!("parsing {card_file}"))?;
3358    crate::agent_card::verify_agent_card(&card)
3359        .map_err(|e| anyhow!("peer card signature invalid: {e}"))?;
3360
3361    let mut trust = config::read_trust()?;
3362    crate::trust::add_agent_card_pin(&mut trust, &card, Some("VERIFIED"));
3363
3364    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
3365    let handle = crate::agent_card::display_handle_from_did(did).to_string();
3366    config::write_trust(&trust)?;
3367
3368    if as_json {
3369        println!(
3370            "{}",
3371            serde_json::to_string(&json!({
3372                "handle": handle,
3373                "did": did,
3374                "tier": "VERIFIED",
3375                "pinned": true,
3376            }))?
3377        );
3378    } else {
3379        println!("pinned {handle} ({did}) at tier VERIFIED");
3380    }
3381    Ok(())
3382}
3383
3384// ---------- pair-host / pair-join (the magic-wormhole flow) ----------
3385
3386fn cmd_pair_host(relay_url: &str, auto_yes: bool, timeout_secs: u64) -> Result<()> {
3387    pair_orchestrate(relay_url, None, "host", auto_yes, timeout_secs)
3388}
3389
3390fn cmd_pair_join(
3391    code_phrase: &str,
3392    relay_url: &str,
3393    auto_yes: bool,
3394    timeout_secs: u64,
3395) -> Result<()> {
3396    pair_orchestrate(
3397        relay_url,
3398        Some(code_phrase),
3399        "guest",
3400        auto_yes,
3401        timeout_secs,
3402    )
3403}
3404
3405/// Shared orchestration for both sides of the SAS pairing.
3406///
3407/// Now thin: delegates to `pair_session::pair_session_open` / `_try_sas` /
3408/// `_finalize`. CLI keeps its interactive y/N prompt; MCP uses
3409/// `pair_session_confirm_sas` instead.
3410fn pair_orchestrate(
3411    relay_url: &str,
3412    code_in: Option<&str>,
3413    role: &str,
3414    auto_yes: bool,
3415    timeout_secs: u64,
3416) -> Result<()> {
3417    use crate::pair_session::{pair_session_finalize, pair_session_open, pair_session_try_sas};
3418
3419    let mut s = pair_session_open(role, relay_url, code_in)?;
3420
3421    if role == "host" {
3422        eprintln!();
3423        eprintln!("share this code phrase with your peer:");
3424        eprintln!();
3425        eprintln!("    {}", s.code);
3426        eprintln!();
3427        eprintln!(
3428            "waiting for peer to run `wire pair-join {} --relay {relay_url}` ...",
3429            s.code
3430        );
3431    } else {
3432        eprintln!();
3433        eprintln!("joined pair-slot on {relay_url} — waiting for host's SPAKE2 message ...");
3434    }
3435
3436    // Stage 2 — poll for SAS-ready with periodic progress heartbeat. The bare
3437    // pair_session_wait_for_sas helper is silent; the CLI wraps it in a loop
3438    // that emits a "waiting (Ns / Ts)" line every HEARTBEAT_SECS so operators
3439    // see the process is alive while the other side connects.
3440    const HEARTBEAT_SECS: u64 = 10;
3441    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3442    let started = std::time::Instant::now();
3443    let mut last_heartbeat = started;
3444    let formatted = loop {
3445        if let Some(sas) = pair_session_try_sas(&mut s)? {
3446            break sas;
3447        }
3448        let now = std::time::Instant::now();
3449        if now >= deadline {
3450            return Err(anyhow!(
3451                "timeout after {timeout_secs}s waiting for peer's SPAKE2 message"
3452            ));
3453        }
3454        if now.duration_since(last_heartbeat).as_secs() >= HEARTBEAT_SECS {
3455            let elapsed = now.duration_since(started).as_secs();
3456            eprintln!("  ... still waiting ({elapsed}s / {timeout_secs}s)");
3457            last_heartbeat = now;
3458        }
3459        std::thread::sleep(std::time::Duration::from_millis(250));
3460    };
3461
3462    eprintln!();
3463    eprintln!("SAS digits (must match peer's terminal):");
3464    eprintln!();
3465    eprintln!("    {formatted}");
3466    eprintln!();
3467
3468    // Stage 3 — operator confirmation. CLI uses interactive y/N for backward
3469    // compatibility; MCP uses pair_session_confirm_sas with the typed digits.
3470    if !auto_yes {
3471        eprint!("does this match your peer's terminal? [y/N]: ");
3472        use std::io::Write;
3473        std::io::stderr().flush().ok();
3474        let mut input = String::new();
3475        std::io::stdin().read_line(&mut input)?;
3476        let trimmed = input.trim().to_lowercase();
3477        if trimmed != "y" && trimmed != "yes" {
3478            bail!("SAS confirmation declined — aborting pairing");
3479        }
3480    }
3481    s.sas_confirmed = true;
3482
3483    // Stage 4 — seal+exchange bootstrap, pin peer.
3484    let result = pair_session_finalize(&mut s, timeout_secs)?;
3485
3486    let peer_did = result["paired_with"].as_str().unwrap_or("");
3487    let peer_role = if role == "host" { "guest" } else { "host" };
3488    eprintln!("paired with {peer_did} (peer role: {peer_role})");
3489    eprintln!("peer card pinned at tier VERIFIED");
3490    eprintln!(
3491        "peer relay slot saved to {}",
3492        config::relay_state_path()?.display()
3493    );
3494
3495    println!("{}", serde_json::to_string(&result)?);
3496    Ok(())
3497}
3498
3499// (poll_until helper removed — pair flow now uses pair_session::pair_session_wait_for_sas
3500// and pair_session_finalize, both of which inline their own deadline loops.)
3501
3502// ---------- pair — single-shot init + pair-* + setup ----------
3503
3504fn cmd_pair(
3505    handle: &str,
3506    code: Option<&str>,
3507    relay: &str,
3508    auto_yes: bool,
3509    timeout_secs: u64,
3510    no_setup: bool,
3511) -> Result<()> {
3512    // Step 1 — idempotent identity. Safe if already initialized with the SAME handle;
3513    // bails loudly if a different handle is already set (operator must explicitly delete).
3514    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3515    let did = init_result
3516        .get("did")
3517        .and_then(|v| v.as_str())
3518        .unwrap_or("(unknown)")
3519        .to_string();
3520    let already = init_result
3521        .get("already_initialized")
3522        .and_then(|v| v.as_bool())
3523        .unwrap_or(false);
3524    if already {
3525        println!("(identity {did} already initialized — reusing)");
3526    } else {
3527        println!("initialized {did}");
3528    }
3529    println!();
3530
3531    // Step 2 — pair-host or pair-join based on code presence.
3532    match code {
3533        None => {
3534            println!("hosting pair on {relay} (no code = host) ...");
3535            cmd_pair_host(relay, auto_yes, timeout_secs)?;
3536        }
3537        Some(c) => {
3538            println!("joining pair with code {c} on {relay} ...");
3539            cmd_pair_join(c, relay, auto_yes, timeout_secs)?;
3540        }
3541    }
3542
3543    // Step 3 — register wire as MCP server in detected client configs (idempotent).
3544    if !no_setup {
3545        println!();
3546        println!("registering wire as MCP server in detected client configs ...");
3547        if let Err(e) = cmd_setup(true) {
3548            // Non-fatal — pair succeeded, just print the warning.
3549            eprintln!("warn: setup --apply failed: {e}");
3550            eprintln!("      pair succeeded; you can re-run `wire setup --apply` manually.");
3551        }
3552    }
3553
3554    println!();
3555    println!("pair complete. Next steps:");
3556    println!("  wire daemon start              # background sync of inbox/outbox vs relay");
3557    println!("  wire send <peer> claim <msg>   # send your peer something");
3558    println!("  wire tail                      # watch incoming events");
3559    Ok(())
3560}
3561
3562// ---------- detached pair (daemon-orchestrated) ----------
3563
3564/// `wire pair <handle> [--code <phrase>] --detach` — wraps init + detach
3565/// pair-host/-join into a single command. The non-detached variant lives in
3566/// `cmd_pair`; this one short-circuits to the daemon-orchestrated path.
3567fn cmd_pair_detach(handle: &str, code: Option<&str>, relay: &str) -> Result<()> {
3568    let init_result = crate::pair_session::init_self_idempotent(handle, None, None)?;
3569    let did = init_result
3570        .get("did")
3571        .and_then(|v| v.as_str())
3572        .unwrap_or("(unknown)")
3573        .to_string();
3574    let already = init_result
3575        .get("already_initialized")
3576        .and_then(|v| v.as_bool())
3577        .unwrap_or(false);
3578    if already {
3579        println!("(identity {did} already initialized — reusing)");
3580    } else {
3581        println!("initialized {did}");
3582    }
3583    println!();
3584    match code {
3585        None => cmd_pair_host_detach(relay, false),
3586        Some(c) => cmd_pair_join_detach(c, relay, false),
3587    }
3588}
3589
3590fn cmd_pair_host_detach(relay_url: &str, as_json: bool) -> Result<()> {
3591    if !config::is_initialized()? {
3592        bail!("not initialized — run `wire init <handle>` first");
3593    }
3594    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3595        Ok(b) => b,
3596        Err(e) => {
3597            if !as_json {
3598                eprintln!(
3599                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3600                );
3601            }
3602            false
3603        }
3604    };
3605    let code = crate::sas::generate_code_phrase();
3606    let code_hash = crate::pair_session::derive_code_hash(&code);
3607    let now = time::OffsetDateTime::now_utc()
3608        .format(&time::format_description::well_known::Rfc3339)
3609        .unwrap_or_default();
3610    let p = crate::pending_pair::PendingPair {
3611        code: code.clone(),
3612        code_hash,
3613        role: "host".to_string(),
3614        relay_url: relay_url.to_string(),
3615        status: "request_host".to_string(),
3616        sas: None,
3617        peer_did: None,
3618        created_at: now,
3619        last_error: None,
3620        pair_id: None,
3621        our_slot_id: None,
3622        our_slot_token: None,
3623        spake2_seed_b64: None,
3624    };
3625    crate::pending_pair::write_pending(&p)?;
3626    if as_json {
3627        println!(
3628            "{}",
3629            serde_json::to_string(&json!({
3630                "state": "queued",
3631                "code_phrase": code,
3632                "relay_url": relay_url,
3633                "role": "host",
3634                "daemon_spawned": daemon_spawned,
3635            }))?
3636        );
3637    } else {
3638        if daemon_spawned {
3639            println!("(started wire daemon in background)");
3640        }
3641        println!("detached pair-host queued. Share this code with your peer:\n");
3642        println!("    {code}\n");
3643        println!("Next steps:");
3644        println!("  wire pair-list                                # check status");
3645        println!("  wire pair-confirm {code} <digits>   # when SAS shows up");
3646        println!("  wire pair-cancel  {code}            # to abort");
3647    }
3648    Ok(())
3649}
3650
3651fn cmd_pair_join_detach(code_phrase: &str, relay_url: &str, as_json: bool) -> Result<()> {
3652    if !config::is_initialized()? {
3653        bail!("not initialized — run `wire init <handle>` first");
3654    }
3655    let daemon_spawned = match crate::ensure_up::ensure_daemon_running() {
3656        Ok(b) => b,
3657        Err(e) => {
3658            if !as_json {
3659                eprintln!(
3660                    "warn: could not auto-start daemon: {e}; pair will queue but not advance"
3661                );
3662            }
3663            false
3664        }
3665    };
3666    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3667    let code_hash = crate::pair_session::derive_code_hash(&code);
3668    let now = time::OffsetDateTime::now_utc()
3669        .format(&time::format_description::well_known::Rfc3339)
3670        .unwrap_or_default();
3671    let p = crate::pending_pair::PendingPair {
3672        code: code.clone(),
3673        code_hash,
3674        role: "guest".to_string(),
3675        relay_url: relay_url.to_string(),
3676        status: "request_guest".to_string(),
3677        sas: None,
3678        peer_did: None,
3679        created_at: now,
3680        last_error: None,
3681        pair_id: None,
3682        our_slot_id: None,
3683        our_slot_token: None,
3684        spake2_seed_b64: None,
3685    };
3686    crate::pending_pair::write_pending(&p)?;
3687    if as_json {
3688        println!(
3689            "{}",
3690            serde_json::to_string(&json!({
3691                "state": "queued",
3692                "code_phrase": code,
3693                "relay_url": relay_url,
3694                "role": "guest",
3695                "daemon_spawned": daemon_spawned,
3696            }))?
3697        );
3698    } else {
3699        if daemon_spawned {
3700            println!("(started wire daemon in background)");
3701        }
3702        println!("detached pair-join queued for code {code}.");
3703        println!(
3704            "Run `wire pair-list` to watch for SAS, then `wire pair-confirm {code} <digits>`."
3705        );
3706    }
3707    Ok(())
3708}
3709
3710fn cmd_pair_confirm(code_phrase: &str, typed_digits: &str, as_json: bool) -> Result<()> {
3711    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3712    let typed: String = typed_digits
3713        .chars()
3714        .filter(|c| c.is_ascii_digit())
3715        .collect();
3716    if typed.len() != 6 {
3717        bail!(
3718            "expected 6 digits (got {} after stripping non-digits)",
3719            typed.len()
3720        );
3721    }
3722    let mut p = crate::pending_pair::read_pending(&code)?
3723        .ok_or_else(|| anyhow!("no pending pair found for code {code}"))?;
3724    if p.status != "sas_ready" {
3725        bail!(
3726            "pair {code} not in sas_ready state (current: {}). Run `wire pair-list` to see what's going on.",
3727            p.status
3728        );
3729    }
3730    let stored = p
3731        .sas
3732        .as_ref()
3733        .ok_or_else(|| anyhow!("pending file has status=sas_ready but no sas field"))?
3734        .clone();
3735    if stored == typed {
3736        p.status = "confirmed".to_string();
3737        crate::pending_pair::write_pending(&p)?;
3738        if as_json {
3739            println!(
3740                "{}",
3741                serde_json::to_string(&json!({
3742                    "state": "confirmed",
3743                    "code_phrase": code,
3744                }))?
3745            );
3746        } else {
3747            println!("digits match. Daemon will finalize the handshake on its next tick.");
3748            println!("Run `wire peers` after a few seconds to confirm.");
3749        }
3750    } else {
3751        p.status = "aborted".to_string();
3752        p.last_error = Some(format!(
3753            "SAS digit mismatch (typed {typed}, expected {stored})"
3754        ));
3755        let client = crate::relay_client::RelayClient::new(&p.relay_url);
3756        let _ = client.pair_abandon(&p.code_hash);
3757        crate::pending_pair::write_pending(&p)?;
3758        crate::os_notify::toast(
3759            &format!("wire — pair aborted ({})", p.code),
3760            p.last_error.as_deref().unwrap_or("digits mismatch"),
3761        );
3762        if as_json {
3763            println!(
3764                "{}",
3765                serde_json::to_string(&json!({
3766                    "state": "aborted",
3767                    "code_phrase": code,
3768                    "error": "digits mismatch",
3769                }))?
3770            );
3771        }
3772        bail!("digits mismatch — pair aborted. Re-issue with a fresh `wire pair-host --detach`.");
3773    }
3774    Ok(())
3775}
3776
3777fn cmd_pair_list(as_json: bool, watch: bool, watch_interval_secs: u64) -> Result<()> {
3778    if watch {
3779        return cmd_pair_list_watch(watch_interval_secs);
3780    }
3781    let spake2_items = crate::pending_pair::list_pending()?;
3782    let inbound_items = crate::pending_inbound_pair::list_pending_inbound()?;
3783    if as_json {
3784        // Backwards-compat: flat SPAKE2 array (the shape every existing
3785        // script + e2e test parses since v0.5.x). v0.5.14 inbound items
3786        // surface programmatically via `wire pair-list-inbound --json`
3787        // and via `wire status --json` `pending_pairs.inbound_*` fields.
3788        println!("{}", serde_json::to_string(&spake2_items)?);
3789        return Ok(());
3790    }
3791    if spake2_items.is_empty() && inbound_items.is_empty() {
3792        println!("no pending pair sessions.");
3793        return Ok(());
3794    }
3795    // v0.5.14: inbound section first — these need operator action right now.
3796    // SPAKE2 sessions are typically already mid-flow.
3797    if !inbound_items.is_empty() {
3798        println!("PENDING INBOUND (v0.5.14 zero-paste pair_drop awaiting your accept)");
3799        println!(
3800            "{:<20} {:<35} {:<25} NEXT STEP",
3801            "PEER", "RELAY", "RECEIVED"
3802        );
3803        for p in &inbound_items {
3804            println!(
3805                "{:<20} {:<35} {:<25} `wire pair-accept {peer}` to accept; `wire pair-reject {peer}` to refuse",
3806                p.peer_handle,
3807                p.peer_relay_url,
3808                p.received_at,
3809                peer = p.peer_handle,
3810            );
3811        }
3812        println!();
3813    }
3814    if !spake2_items.is_empty() {
3815        println!("SPAKE2 SESSIONS");
3816        println!(
3817            "{:<15} {:<8} {:<18} {:<10} NOTE",
3818            "CODE", "ROLE", "STATUS", "SAS"
3819        );
3820        for p in spake2_items {
3821            let sas = p
3822                .sas
3823                .as_ref()
3824                .map(|d| format!("{}-{}", &d[..3], &d[3..]))
3825                .unwrap_or_else(|| "—".to_string());
3826            let note = p
3827                .last_error
3828                .as_deref()
3829                .or(p.peer_did.as_deref())
3830                .unwrap_or("");
3831            println!(
3832                "{:<15} {:<8} {:<18} {:<10} {}",
3833                p.code, p.role, p.status, sas, note
3834            );
3835        }
3836    }
3837    Ok(())
3838}
3839
3840/// Stream-mode pair-list: never exits. Diffs per-code state every
3841/// `interval_secs` and prints one JSON line per transition (creation,
3842/// status flip, deletion). Useful for shell pipelines:
3843///
3844/// ```text
3845/// wire pair-list --watch | while read line; do
3846///     CODE=$(echo "$line" | jq -r .code)
3847///     STATUS=$(echo "$line" | jq -r .status)
3848///     ...
3849/// done
3850/// ```
3851fn cmd_pair_list_watch(interval_secs: u64) -> Result<()> {
3852    use std::collections::HashMap;
3853    use std::io::Write;
3854    let interval = std::time::Duration::from_secs(interval_secs.max(1));
3855    // Emit a snapshot synthetic event for every currently-pending pair on
3856    // startup so a consumer that arrives mid-flight sees the current state.
3857    let mut prev: HashMap<String, String> = HashMap::new();
3858    {
3859        let items = crate::pending_pair::list_pending()?;
3860        for p in &items {
3861            println!("{}", serde_json::to_string(&p)?);
3862            prev.insert(p.code.clone(), p.status.clone());
3863        }
3864        // Flush so the consumer's `while read` gets the snapshot promptly.
3865        let _ = std::io::stdout().flush();
3866    }
3867    loop {
3868        std::thread::sleep(interval);
3869        let items = match crate::pending_pair::list_pending() {
3870            Ok(v) => v,
3871            Err(_) => continue,
3872        };
3873        let mut cur: HashMap<String, String> = HashMap::new();
3874        for p in &items {
3875            cur.insert(p.code.clone(), p.status.clone());
3876            match prev.get(&p.code) {
3877                None => {
3878                    // New code appeared.
3879                    println!("{}", serde_json::to_string(&p)?);
3880                }
3881                Some(prev_status) if prev_status != &p.status => {
3882                    // Status flipped.
3883                    println!("{}", serde_json::to_string(&p)?);
3884                }
3885                _ => {}
3886            }
3887        }
3888        for code in prev.keys() {
3889            if !cur.contains_key(code) {
3890                // File disappeared → finalized or cancelled. Emit a synthetic
3891                // "removed" marker so the consumer sees the terminal event.
3892                println!(
3893                    "{}",
3894                    serde_json::to_string(&json!({
3895                        "code": code,
3896                        "status": "removed",
3897                        "_synthetic": true,
3898                    }))?
3899                );
3900            }
3901        }
3902        let _ = std::io::stdout().flush();
3903        prev = cur;
3904    }
3905}
3906
3907/// Block until a pending pair reaches `target_status` or terminates. Process
3908/// exit code carries the outcome (0 success, 1 terminated abnormally, 2
3909/// timeout) so shell scripts can branch directly.
3910fn cmd_pair_watch(
3911    code_phrase: &str,
3912    target_status: &str,
3913    timeout_secs: u64,
3914    as_json: bool,
3915) -> Result<()> {
3916    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3917    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
3918    let mut last_seen_status: Option<String> = None;
3919    loop {
3920        let p_opt = crate::pending_pair::read_pending(&code)?;
3921        let now = std::time::Instant::now();
3922        match p_opt {
3923            None => {
3924                // File gone — either finalized (success if target=sas_ready
3925                // since finalization implies it passed sas_ready) or never
3926                // existed. Distinguish by whether we ever saw it.
3927                if last_seen_status.is_some() {
3928                    if as_json {
3929                        println!(
3930                            "{}",
3931                            serde_json::to_string(&json!({"state": "finalized", "code": code}))?
3932                        );
3933                    } else {
3934                        println!("pair {code} finalized (file removed)");
3935                    }
3936                    return Ok(());
3937                } else {
3938                    if as_json {
3939                        println!(
3940                            "{}",
3941                            serde_json::to_string(&json!({"error": "no such pair", "code": code}))?
3942                        );
3943                    }
3944                    std::process::exit(1);
3945                }
3946            }
3947            Some(p) => {
3948                let cur = p.status.clone();
3949                if Some(cur.clone()) != last_seen_status {
3950                    if as_json {
3951                        // Emit per-transition line so scripts can stream.
3952                        println!("{}", serde_json::to_string(&p)?);
3953                    }
3954                    last_seen_status = Some(cur.clone());
3955                }
3956                if cur == target_status {
3957                    if !as_json {
3958                        let sas_str = p
3959                            .sas
3960                            .as_ref()
3961                            .map(|s| format!("{}-{}", &s[..3], &s[3..]))
3962                            .unwrap_or_else(|| "—".to_string());
3963                        println!("pair {code} reached {target_status} (SAS: {sas_str})");
3964                    }
3965                    return Ok(());
3966                }
3967                if cur == "aborted" || cur == "aborted_restart" {
3968                    if !as_json {
3969                        let err = p.last_error.as_deref().unwrap_or("(no detail)");
3970                        eprintln!("pair {code} {cur}: {err}");
3971                    }
3972                    std::process::exit(1);
3973                }
3974            }
3975        }
3976        if now >= deadline {
3977            if !as_json {
3978                eprintln!(
3979                    "timeout after {timeout_secs}s waiting for pair {code} to reach {target_status}"
3980                );
3981            }
3982            std::process::exit(2);
3983        }
3984        std::thread::sleep(std::time::Duration::from_millis(250));
3985    }
3986}
3987
3988fn cmd_pair_cancel(code_phrase: &str, as_json: bool) -> Result<()> {
3989    let code = crate::sas::parse_code_phrase(code_phrase)?.to_string();
3990    let p = crate::pending_pair::read_pending(&code)?
3991        .ok_or_else(|| anyhow!("no pending pair for code {code}"))?;
3992    let client = crate::relay_client::RelayClient::new(&p.relay_url);
3993    let _ = client.pair_abandon(&p.code_hash);
3994    crate::pending_pair::delete_pending(&code)?;
3995    if as_json {
3996        println!(
3997            "{}",
3998            serde_json::to_string(&json!({
3999                "state": "cancelled",
4000                "code_phrase": code,
4001            }))?
4002        );
4003    } else {
4004        println!("cancelled pending pair {code} (relay slot released, file removed).");
4005    }
4006    Ok(())
4007}
4008
4009// ---------- pair-abandon — release stuck pair-slot ----------
4010
4011fn cmd_pair_abandon(code_phrase: &str, relay_url: &str) -> Result<()> {
4012    // Accept either the raw phrase (e.g. "53-CKWIA5") or whatever the user
4013    // typed — normalize via the existing parser.
4014    let code = crate::sas::parse_code_phrase(code_phrase)?;
4015    let code_hash = crate::pair_session::derive_code_hash(code);
4016    let client = crate::relay_client::RelayClient::new(relay_url);
4017    client.pair_abandon(&code_hash)?;
4018    println!("abandoned pair-slot for code {code_phrase} on {relay_url}");
4019    println!("host can now issue a fresh code; guest can re-join.");
4020    Ok(())
4021}
4022
4023// ---------- invite / accept — one-paste pair (v0.4.0) ----------
4024
4025fn cmd_invite(relay: &str, ttl: u64, uses: u32, share: bool, as_json: bool) -> Result<()> {
4026    let url = crate::pair_invite::mint_invite(Some(ttl), uses, Some(relay))?;
4027
4028    // If --share, register the invite at the relay's short-URL endpoint and
4029    // build the one-curl onboarding line for the peer to paste.
4030    let share_payload: Option<Value> = if share {
4031        let client = reqwest::blocking::Client::new();
4032        let single_use = if uses == 1 { Some(1u32) } else { None };
4033        let body = json!({
4034            "invite_url": url,
4035            "ttl_seconds": ttl,
4036            "uses": single_use,
4037        });
4038        let endpoint = format!("{}/v1/invite/register", relay.trim_end_matches('/'));
4039        let resp = client.post(&endpoint).json(&body).send()?;
4040        if !resp.status().is_success() {
4041            let code = resp.status();
4042            let txt = resp.text().unwrap_or_default();
4043            bail!("relay {code} on /v1/invite/register: {txt}");
4044        }
4045        let parsed: Value = resp.json()?;
4046        let token = parsed
4047            .get("token")
4048            .and_then(Value::as_str)
4049            .ok_or_else(|| anyhow::anyhow!("relay reply missing token"))?
4050            .to_string();
4051        let share_url = format!("{}/i/{}", relay.trim_end_matches('/'), token);
4052        let curl_line = format!("curl -fsSL {share_url} | sh");
4053        Some(json!({
4054            "token": token,
4055            "share_url": share_url,
4056            "curl": curl_line,
4057            "expires_unix": parsed.get("expires_unix"),
4058        }))
4059    } else {
4060        None
4061    };
4062
4063    if as_json {
4064        let mut out = json!({
4065            "invite_url": url,
4066            "ttl_secs": ttl,
4067            "uses": uses,
4068            "relay": relay,
4069        });
4070        if let Some(s) = &share_payload {
4071            out["share"] = s.clone();
4072        }
4073        println!("{}", serde_json::to_string(&out)?);
4074    } else if let Some(s) = share_payload {
4075        let curl = s.get("curl").and_then(Value::as_str).unwrap_or("");
4076        eprintln!("# One-curl onboarding. Share this single line — installs wire if missing,");
4077        eprintln!("# accepts the invite, pairs both sides. TTL: {ttl}s. Uses: {uses}.");
4078        println!("{curl}");
4079    } else {
4080        eprintln!("# Share this URL with one peer. Pasting it = pair complete on their side.");
4081        eprintln!("# TTL: {ttl}s. Uses: {uses}.");
4082        println!("{url}");
4083    }
4084    Ok(())
4085}
4086
4087fn cmd_accept(url: &str, as_json: bool) -> Result<()> {
4088    // If the user pasted an HTTP(S) short URL (e.g. https://wireup.net/i/AB12),
4089    // resolve it to the underlying wire://pair?... URL via ?format=url before
4090    // accepting. Saves them from having to know which URL shape goes where.
4091    let resolved = if url.starts_with("http://") || url.starts_with("https://") {
4092        let sep = if url.contains('?') { '&' } else { '?' };
4093        let resolve_url = format!("{url}{sep}format=url");
4094        let client = reqwest::blocking::Client::new();
4095        let resp = client
4096            .get(&resolve_url)
4097            .send()
4098            .with_context(|| format!("GET {resolve_url}"))?;
4099        if !resp.status().is_success() {
4100            bail!("could not resolve short URL {url} (HTTP {})", resp.status());
4101        }
4102        let body = resp.text().unwrap_or_default().trim().to_string();
4103        if !body.starts_with("wire://pair?") {
4104            bail!(
4105                "short URL {url} did not resolve to a wire:// invite. \
4106                 (got: {}{})",
4107                body.chars().take(80).collect::<String>(),
4108                if body.chars().count() > 80 { "…" } else { "" }
4109            );
4110        }
4111        body
4112    } else {
4113        url.to_string()
4114    };
4115
4116    let result = crate::pair_invite::accept_invite(&resolved)?;
4117    if as_json {
4118        println!("{}", serde_json::to_string(&result)?);
4119    } else {
4120        let did = result
4121            .get("paired_with")
4122            .and_then(Value::as_str)
4123            .unwrap_or("?");
4124        println!("paired with {did}");
4125        println!(
4126            "you can now: wire send {} <kind> <body>",
4127            crate::agent_card::display_handle_from_did(did)
4128        );
4129    }
4130    Ok(())
4131}
4132
4133// ---------- whois / profile (v0.5) ----------
4134
4135fn cmd_whois(handle: Option<&str>, as_json: bool, relay_override: Option<&str>) -> Result<()> {
4136    if let Some(h) = handle {
4137        let parsed = crate::pair_profile::parse_handle(h)?;
4138        // Special-case: if the supplied handle matches our own, skip the
4139        // network round-trip and print local.
4140        if config::is_initialized()? {
4141            let card = config::read_agent_card()?;
4142            let local_handle = card
4143                .get("profile")
4144                .and_then(|p| p.get("handle"))
4145                .and_then(Value::as_str)
4146                .map(str::to_string);
4147            if local_handle.as_deref() == Some(h) {
4148                return cmd_whois(None, as_json, None);
4149            }
4150        }
4151        // Remote resolution via .well-known/wire/agent on the handle's domain.
4152        let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4153        if as_json {
4154            println!("{}", serde_json::to_string(&resolved)?);
4155        } else {
4156            print_resolved_profile(&resolved);
4157        }
4158        return Ok(());
4159    }
4160    let card = config::read_agent_card()?;
4161    if as_json {
4162        let profile = card.get("profile").cloned().unwrap_or(Value::Null);
4163        println!(
4164            "{}",
4165            serde_json::to_string(&json!({
4166                "did": card.get("did").cloned().unwrap_or(Value::Null),
4167                "profile": profile,
4168            }))?
4169        );
4170    } else {
4171        print!("{}", crate::pair_profile::render_self_summary()?);
4172    }
4173    Ok(())
4174}
4175
4176fn print_resolved_profile(resolved: &Value) {
4177    let did = resolved.get("did").and_then(Value::as_str).unwrap_or("?");
4178    let nick = resolved.get("nick").and_then(Value::as_str).unwrap_or("?");
4179    let relay = resolved
4180        .get("relay_url")
4181        .and_then(Value::as_str)
4182        .unwrap_or("");
4183    let slot = resolved
4184        .get("slot_id")
4185        .and_then(Value::as_str)
4186        .unwrap_or("");
4187    let profile = resolved
4188        .get("card")
4189        .and_then(|c| c.get("profile"))
4190        .cloned()
4191        .unwrap_or(Value::Null);
4192    println!("{did}");
4193    println!("  nick:         {nick}");
4194    if !relay.is_empty() {
4195        println!("  relay_url:    {relay}");
4196    }
4197    if !slot.is_empty() {
4198        println!("  slot_id:      {slot}");
4199    }
4200    let pick =
4201        |k: &str| -> Option<String> { profile.get(k).and_then(Value::as_str).map(str::to_string) };
4202    if let Some(s) = pick("display_name") {
4203        println!("  display_name: {s}");
4204    }
4205    if let Some(s) = pick("emoji") {
4206        println!("  emoji:        {s}");
4207    }
4208    if let Some(s) = pick("motto") {
4209        println!("  motto:        {s}");
4210    }
4211    if let Some(arr) = profile.get("vibe").and_then(Value::as_array) {
4212        let joined: Vec<String> = arr
4213            .iter()
4214            .filter_map(|v| v.as_str().map(str::to_string))
4215            .collect();
4216        println!("  vibe:         {}", joined.join(", "));
4217    }
4218    if let Some(s) = pick("pronouns") {
4219        println!("  pronouns:     {s}");
4220    }
4221}
4222
4223/// `wire add <nick@domain>` — zero-paste pair. Resolve handle, build a
4224/// signed pair_drop event with our card + slot coords, deliver via the
4225/// peer relay's `/v1/handle/intro/<nick>` endpoint (no slot_token needed).
4226/// Peer's daemon completes the bilateral pin on its next pull and emits a
4227/// pair_drop_ack carrying their slot_token so we can send back.
4228fn cmd_add(handle_arg: &str, relay_override: Option<&str>, as_json: bool) -> Result<()> {
4229    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
4230
4231    // 1. Auto-init self if needed + ensure a relay slot.
4232    let (our_did, our_relay, our_slot_id, our_slot_token) =
4233        crate::pair_invite::ensure_self_with_relay(relay_override)?;
4234    if our_did == format!("did:wire:{}", parsed.nick) {
4235        // Lazy guard — actual self-add would also be caught by FCFS later.
4236        bail!("refusing to add self (handle matches own DID)");
4237    }
4238
4239    // v0.5.14 bilateral-completion path: if a pair_drop from this peer is
4240    // already sitting in pending-inbound, the operator is now accepting it.
4241    // Pin trust, save relay coords + slot_token from the stored drop, ship
4242    // our own slot_token back via pair_drop_ack, delete the pending record.
4243    //
4244    // This branch is the OTHER half of the v0.5.14 fix to maybe_consume_pair_drop:
4245    // receiver-side auto-promote was removed there; operator consent flows
4246    // through here. After this branch returns, both sides are bilaterally
4247    // pinned and capability flows in both directions.
4248    if let Some(pending) = crate::pending_inbound_pair::read_pending_inbound(&parsed.nick)? {
4249        return cmd_add_accept_pending(
4250            handle_arg,
4251            &parsed.nick,
4252            &pending,
4253            &our_relay,
4254            &our_slot_id,
4255            &our_slot_token,
4256            as_json,
4257        );
4258    }
4259
4260    // 2. Resolve peer via .well-known on their relay.
4261    let resolved = crate::pair_profile::resolve_handle(&parsed, relay_override)?;
4262    let peer_card = resolved
4263        .get("card")
4264        .cloned()
4265        .ok_or_else(|| anyhow!("resolved missing card"))?;
4266    let peer_did = resolved
4267        .get("did")
4268        .and_then(Value::as_str)
4269        .ok_or_else(|| anyhow!("resolved missing did"))?
4270        .to_string();
4271    let peer_handle = crate::agent_card::display_handle_from_did(&peer_did).to_string();
4272    let peer_slot_id = resolved
4273        .get("slot_id")
4274        .and_then(Value::as_str)
4275        .ok_or_else(|| anyhow!("resolved missing slot_id"))?
4276        .to_string();
4277    let peer_relay = resolved
4278        .get("relay_url")
4279        .and_then(Value::as_str)
4280        .map(str::to_string)
4281        .or_else(|| relay_override.map(str::to_string))
4282        .unwrap_or_else(|| format!("https://{}", parsed.domain));
4283
4284    // 3. Pin peer in trust + relay-state. slot_token will arrive via ack.
4285    let mut trust = config::read_trust()?;
4286    crate::trust::add_agent_card_pin(&mut trust, &peer_card, Some("VERIFIED"));
4287    config::write_trust(&trust)?;
4288    let mut relay_state = config::read_relay_state()?;
4289    let existing_token = relay_state
4290        .get("peers")
4291        .and_then(|p| p.get(&peer_handle))
4292        .and_then(|p| p.get("slot_token"))
4293        .and_then(Value::as_str)
4294        .map(str::to_string)
4295        .unwrap_or_default();
4296    relay_state["peers"][&peer_handle] = json!({
4297        "relay_url": peer_relay,
4298        "slot_id": peer_slot_id,
4299        "slot_token": existing_token, // empty until pair_drop_ack lands
4300    });
4301    config::write_relay_state(&relay_state)?;
4302
4303    // 4. Build signed pair_drop with our card + coords (no pair_nonce — this
4304    // is the v0.5 zero-paste open-mode path).
4305    let our_card = config::read_agent_card()?;
4306    let sk_seed = config::read_private_key()?;
4307    let our_handle = crate::agent_card::display_handle_from_did(&our_did).to_string();
4308    let pk_b64 = our_card
4309        .get("verify_keys")
4310        .and_then(Value::as_object)
4311        .and_then(|m| m.values().next())
4312        .and_then(|v| v.get("key"))
4313        .and_then(Value::as_str)
4314        .ok_or_else(|| anyhow!("our card missing verify_keys[*].key"))?;
4315    let pk_bytes = crate::signing::b64decode(pk_b64)?;
4316    let now = time::OffsetDateTime::now_utc()
4317        .format(&time::format_description::well_known::Rfc3339)
4318        .unwrap_or_default();
4319    // v0.5.17: advertise all our endpoints (federation + optional local)
4320    // to the peer in the pair_drop body. Back-compat: top-level
4321    // relay_url/slot_id/slot_token still point at the federation
4322    // endpoint so v0.5.16-and-earlier peers ingest unchanged.
4323    let our_relay_state = config::read_relay_state().unwrap_or_else(|_| json!({}));
4324    let our_endpoints = crate::endpoints::self_endpoints(&our_relay_state);
4325    let mut body = json!({
4326        "card": our_card,
4327        "relay_url": our_relay,
4328        "slot_id": our_slot_id,
4329        "slot_token": our_slot_token,
4330    });
4331    if !our_endpoints.is_empty() {
4332        body["endpoints"] = serde_json::to_value(&our_endpoints).unwrap_or(json!([]));
4333    }
4334    let event = json!({
4335        "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
4336        "timestamp": now,
4337        "from": our_did,
4338        "to": peer_did,
4339        "type": "pair_drop",
4340        "kind": 1100u32,
4341        "body": body,
4342    });
4343    let signed = crate::signing::sign_message_v31(&event, &sk_seed, &pk_bytes, &our_handle)?;
4344
4345    // 5. Deliver via /v1/handle/intro/<nick> (auth-free; relay validates kind).
4346    let client = crate::relay_client::RelayClient::new(&peer_relay);
4347    let resp = client.handle_intro(&parsed.nick, &signed)?;
4348    let event_id = signed
4349        .get("event_id")
4350        .and_then(Value::as_str)
4351        .unwrap_or("")
4352        .to_string();
4353
4354    if as_json {
4355        println!(
4356            "{}",
4357            serde_json::to_string(&json!({
4358                "handle": handle_arg,
4359                "paired_with": peer_did,
4360                "peer_handle": peer_handle,
4361                "event_id": event_id,
4362                "drop_response": resp,
4363                "status": "drop_sent",
4364            }))?
4365        );
4366    } else {
4367        println!(
4368            "→ resolved {handle_arg} (did={peer_did})\n→ pinned peer locally\n→ intro dropped to {peer_relay}\nawaiting pair_drop_ack from {peer_handle} to complete bilateral pin."
4369        );
4370    }
4371    Ok(())
4372}
4373
4374/// v0.5.14 bilateral-completion path for `wire add`. Called when the peer's
4375/// pair_drop is already sitting in `pending-inbound`. Pin trust, write relay
4376/// coords + slot_token from the stored drop, ship our slot_token back via
4377/// `pair_drop_ack`, delete the pending record. Symmetric with the SPAKE2
4378/// invite-URL path (which is already bilateral by virtue of the pre-shared
4379/// nonce).
4380fn cmd_add_accept_pending(
4381    handle_arg: &str,
4382    peer_nick: &str,
4383    pending: &crate::pending_inbound_pair::PendingInboundPair,
4384    _our_relay: &str,
4385    _our_slot_id: &str,
4386    _our_slot_token: &str,
4387    as_json: bool,
4388) -> Result<()> {
4389    // 1. Pin peer in trust with VERIFIED — operator gestured consent by running
4390    //    `wire add` against this handle while a drop was waiting.
4391    let mut trust = config::read_trust()?;
4392    crate::trust::add_agent_card_pin(&mut trust, &pending.peer_card, Some("VERIFIED"));
4393    config::write_trust(&trust)?;
4394
4395    // 2. Record peer's relay coords + slot_token (already shipped to us in
4396    //    the original drop body; held back until now).
4397    // v0.5.17: pin all advertised endpoints (federation + optional local).
4398    // Falls back to a single federation entry when the record was written
4399    // by v0.5.16-era code that didn't carry endpoints[].
4400    let mut relay_state = config::read_relay_state()?;
4401    let endpoints_to_pin = if pending.peer_endpoints.is_empty() {
4402        vec![crate::endpoints::Endpoint::federation(
4403            pending.peer_relay_url.clone(),
4404            pending.peer_slot_id.clone(),
4405            pending.peer_slot_token.clone(),
4406        )]
4407    } else {
4408        pending.peer_endpoints.clone()
4409    };
4410    crate::endpoints::pin_peer_endpoints(
4411        &mut relay_state,
4412        &pending.peer_handle,
4413        &endpoints_to_pin,
4414    )?;
4415    config::write_relay_state(&relay_state)?;
4416
4417    // 3. Ship our slot_token to peer via pair_drop_ack so they can write back.
4418    crate::pair_invite::send_pair_drop_ack(
4419        &pending.peer_handle,
4420        &pending.peer_relay_url,
4421        &pending.peer_slot_id,
4422        &pending.peer_slot_token,
4423    )
4424    .with_context(|| {
4425        format!(
4426            "pair_drop_ack send to {} @ {} slot {} failed",
4427            pending.peer_handle, pending.peer_relay_url, pending.peer_slot_id
4428        )
4429    })?;
4430
4431    // 4. Delete the pending-inbound record now that bilateral is complete.
4432    crate::pending_inbound_pair::consume_pending_inbound(peer_nick)?;
4433
4434    if as_json {
4435        println!(
4436            "{}",
4437            serde_json::to_string(&json!({
4438                "handle": handle_arg,
4439                "paired_with": pending.peer_did,
4440                "peer_handle": pending.peer_handle,
4441                "status": "bilateral_accepted",
4442                "via": "pending_inbound",
4443            }))?
4444        );
4445    } else {
4446        println!(
4447            "→ accepted pending pair from {peer}\n→ pinned VERIFIED, slot_token recorded\n→ shipped our slot_token back via pair_drop_ack\nbilateral pair complete. Send with `wire send {peer} \"...\"`.",
4448            peer = pending.peer_handle,
4449        );
4450    }
4451    Ok(())
4452}
4453
4454/// v0.5.14: explicit `wire pair-accept <peer>` — bilateral-completion path
4455/// for a pending-inbound pair request. Pin trust, write relay_state from the
4456/// stored pair_drop, send `pair_drop_ack` with our slot_token, delete the
4457/// pending record. Equivalent to running `wire add <peer>@<their-relay>`
4458/// when a pending-inbound record exists, but without needing to remember
4459/// the peer's relay domain.
4460fn cmd_pair_accept(peer_nick: &str, as_json: bool) -> Result<()> {
4461    let nick = crate::agent_card::bare_handle(peer_nick);
4462    let pending = crate::pending_inbound_pair::read_pending_inbound(nick)?.ok_or_else(|| {
4463        anyhow!(
4464            "no pending pair request from {nick}. Run `wire pair-list-inbound` to see who is waiting, \
4465             or use `wire add <peer>@<relay>` to send a fresh outbound pair request."
4466        )
4467    })?;
4468    let (_our_did, our_relay, our_slot_id, our_slot_token) =
4469        crate::pair_invite::ensure_self_with_relay(None)?;
4470    let handle_arg = format!("{}@{}", pending.peer_handle, pending.peer_relay_url);
4471    cmd_add_accept_pending(
4472        &handle_arg,
4473        nick,
4474        &pending,
4475        &our_relay,
4476        &our_slot_id,
4477        &our_slot_token,
4478        as_json,
4479    )
4480}
4481
4482/// v0.5.14: programmatic access to pending-inbound for scripts.
4483/// `wire pair-list-inbound --json` returns a flat array of records.
4484fn cmd_pair_list_inbound(as_json: bool) -> Result<()> {
4485    let items = crate::pending_inbound_pair::list_pending_inbound()?;
4486    if as_json {
4487        println!("{}", serde_json::to_string(&items)?);
4488        return Ok(());
4489    }
4490    if items.is_empty() {
4491        println!("no pending inbound pair requests.");
4492        return Ok(());
4493    }
4494    println!("{:<20} {:<35} {:<25} DID", "PEER", "RELAY", "RECEIVED");
4495    for p in items {
4496        println!(
4497            "{:<20} {:<35} {:<25} {}",
4498            p.peer_handle, p.peer_relay_url, p.received_at, p.peer_did,
4499        );
4500    }
4501    println!(
4502        "→ accept with `wire pair-accept <peer>`; refuse with `wire pair-reject <peer>`."
4503    );
4504    Ok(())
4505}
4506
4507/// v0.5.14: `wire pair-reject <peer>` — drop a pending-inbound record
4508/// without pairing. No event is sent back to the peer; their side stays
4509/// pending until they time out or the operator-side data ages out.
4510fn cmd_pair_reject(peer_nick: &str, as_json: bool) -> Result<()> {
4511    let nick = crate::agent_card::bare_handle(peer_nick);
4512    let existed = crate::pending_inbound_pair::read_pending_inbound(nick)?;
4513    crate::pending_inbound_pair::consume_pending_inbound(nick)?;
4514
4515    if as_json {
4516        println!(
4517            "{}",
4518            serde_json::to_string(&json!({
4519                "peer": nick,
4520                "rejected": existed.is_some(),
4521                "had_pending": existed.is_some(),
4522            }))?
4523        );
4524    } else if existed.is_some() {
4525        println!("→ rejected pending pair from {nick}\n→ pending-inbound record deleted; no ack sent.");
4526    } else {
4527        println!("no pending pair from {nick} — nothing to reject");
4528    }
4529    Ok(())
4530}
4531
4532// ---------- session (v0.5.16) ----------
4533//
4534// Multi-session wire on one machine. See src/session.rs for the storage
4535// layout + naming rules. The CLI dispatcher here orchestrates child
4536// `wire` invocations with `WIRE_HOME` overridden to the session's dir;
4537// each session-local `init` / `claim` / `daemon` runs in its own world
4538// without cross-contamination via env vars in this process.
4539
4540fn cmd_session(cmd: SessionCommand) -> Result<()> {
4541    match cmd {
4542        SessionCommand::New {
4543            name,
4544            relay,
4545            with_local,
4546            local_relay,
4547            no_daemon,
4548            json,
4549        } => cmd_session_new(
4550            name.as_deref(),
4551            &relay,
4552            with_local,
4553            &local_relay,
4554            no_daemon,
4555            json,
4556        ),
4557        SessionCommand::List { json } => cmd_session_list(json),
4558        SessionCommand::Env { name, json } => cmd_session_env(name.as_deref(), json),
4559        SessionCommand::Current { json } => cmd_session_current(json),
4560        SessionCommand::Destroy { name, force, json } => cmd_session_destroy(&name, force, json),
4561    }
4562}
4563
4564fn resolve_session_name(name: Option<&str>) -> Result<String> {
4565    if let Some(n) = name {
4566        return Ok(crate::session::sanitize_name(n));
4567    }
4568    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4569    let registry = crate::session::read_registry().unwrap_or_default();
4570    Ok(crate::session::derive_name_from_cwd(&cwd, &registry))
4571}
4572
4573fn cmd_session_new(
4574    name_arg: Option<&str>,
4575    relay: &str,
4576    with_local: bool,
4577    local_relay: &str,
4578    no_daemon: bool,
4579    as_json: bool,
4580) -> Result<()> {
4581    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
4582    let mut registry = crate::session::read_registry().unwrap_or_default();
4583    let name = match name_arg {
4584        Some(n) => crate::session::sanitize_name(n),
4585        None => crate::session::derive_name_from_cwd(&cwd, &registry),
4586    };
4587    let session_home = crate::session::session_dir(&name)?;
4588
4589    let already_exists = session_home.exists()
4590        && session_home
4591            .join("config")
4592            .join("wire")
4593            .join("agent-card.json")
4594            .exists();
4595    if already_exists {
4596        // Idempotent: re-register the cwd (if not already), refresh the
4597        // daemon if requested, surface the env-var line. Do not re-init
4598        // identity — that would clobber the keypair.
4599        registry
4600            .by_cwd
4601            .insert(cwd.to_string_lossy().into_owned(), name.clone());
4602        crate::session::write_registry(&registry)?;
4603        let info = render_session_info(&name, &session_home, &cwd)?;
4604        emit_session_new_result(&info, "already_exists", as_json)?;
4605        if !no_daemon {
4606            ensure_session_daemon(&session_home)?;
4607        }
4608        return Ok(());
4609    }
4610
4611    std::fs::create_dir_all(&session_home)
4612        .with_context(|| format!("creating session dir {session_home:?}"))?;
4613
4614    // Phase 1: init identity in the new session's WIRE_HOME.
4615    let init_status = run_wire_with_home(
4616        &session_home,
4617        &["init", &name, "--relay", relay],
4618    )?;
4619    if !init_status.success() {
4620        bail!(
4621            "`wire init {name} --relay {relay}` failed inside session dir {session_home:?}"
4622        );
4623    }
4624
4625    // Phase 2: claim the handle on the relay. If FCFS rejects the name
4626    // (another machine has it), fall back to `<name>-<2hex>` until success
4627    // or 5 attempts exhausted. Failure here is fatal — the session is
4628    // unreachable without a claim.
4629    let mut claim_attempt = 0u32;
4630    let mut effective_handle = name.clone();
4631    loop {
4632        claim_attempt += 1;
4633        let status = run_wire_with_home(
4634            &session_home,
4635            &["claim", &effective_handle, "--relay", relay],
4636        )?;
4637        if status.success() {
4638            break;
4639        }
4640        if claim_attempt >= 5 {
4641            bail!(
4642                "5 failed attempts to claim a handle on {relay} for session {name}. \
4643                 Try `wire session destroy {name} --force` and re-run with a different name."
4644            );
4645        }
4646        // Use a fresh random-ish suffix on each retry. We piggyback on the
4647        // path-hash logic but mix in the attempt counter to avoid getting
4648        // stuck on the same colliding suffix.
4649        let attempt_path = cwd.join(format!("__attempt_{claim_attempt}"));
4650        let suffix = crate::session::derive_name_from_cwd(&attempt_path, &registry);
4651        // suffix here is the full derived name for attempt_path; we just
4652        // want a short token, so take the trailing hash if it has one,
4653        // else hash the attempt-path ourselves.
4654        let token = suffix
4655            .rsplit('-')
4656            .next()
4657            .filter(|t| t.len() == 4)
4658            .map(str::to_string)
4659            .unwrap_or_else(|| format!("{claim_attempt}"));
4660        effective_handle = format!("{name}-{token}");
4661    }
4662
4663    // Persist the cwd → name mapping NOW so subsequent invocations from
4664    // this directory short-circuit to the "already_exists" branch.
4665    registry
4666        .by_cwd
4667        .insert(cwd.to_string_lossy().into_owned(), name.clone());
4668    crate::session::write_registry(&registry)?;
4669
4670    // v0.5.17: --with-local probes the local relay and, if it's
4671    // reachable, allocates a second slot there. The session's
4672    // relay_state.json grows a `self.endpoints[]` array carrying both
4673    // endpoints; routing layer (cmd_push) prefers local for sister-
4674    // session peers that also have a local slot.
4675    if with_local {
4676        try_allocate_local_slot(&session_home, &effective_handle, relay, local_relay);
4677    }
4678
4679    if !no_daemon {
4680        ensure_session_daemon(&session_home)?;
4681    }
4682
4683    let info = render_session_info(&name, &session_home, &cwd)?;
4684    emit_session_new_result(&info, "created", as_json)
4685}
4686
4687/// v0.5.17: probe the named local relay; if `/healthz` returns ok within
4688/// a short timeout, allocate a slot there and update the session's
4689/// `relay_state.json` `self.endpoints[]` to advertise both endpoints.
4690///
4691/// Failure to reach the local relay is NOT fatal — the session stays
4692/// federation-only. Logs to stderr on failure so operators can tell
4693/// the local relay isn't running, but doesn't abort the bootstrap.
4694fn try_allocate_local_slot(
4695    session_home: &std::path::Path,
4696    handle: &str,
4697    federation_relay: &str,
4698    local_relay: &str,
4699) {
4700    // Probe healthz with a tight timeout. Use a fresh client (don't
4701    // share the daemon-wide one) so the timeout is local to this call.
4702    let probe = match crate::relay_client::build_blocking_client(Some(
4703        std::time::Duration::from_millis(500),
4704    )) {
4705        Ok(c) => c,
4706        Err(e) => {
4707            eprintln!("wire session new: cannot build probe client for {local_relay}: {e:#}");
4708            return;
4709        }
4710    };
4711    let healthz_url = format!("{}/healthz", local_relay.trim_end_matches('/'));
4712    match probe.get(&healthz_url).send() {
4713        Ok(resp) if resp.status().is_success() => {}
4714        Ok(resp) => {
4715            eprintln!(
4716                "wire session new: local relay probe at {healthz_url} returned {} — staying federation-only",
4717                resp.status()
4718            );
4719            return;
4720        }
4721        Err(e) => {
4722            eprintln!(
4723                "wire session new: local relay at {local_relay} unreachable ({}) — staying federation-only. \
4724                 Start one with `wire relay-server --bind 127.0.0.1:8771 --local-only`.",
4725                crate::relay_client::format_transport_error(&anyhow::Error::new(e))
4726            );
4727            return;
4728        }
4729    };
4730
4731    // Allocate a slot on the local relay.
4732    let local_client = crate::relay_client::RelayClient::new(local_relay);
4733    let alloc = match local_client.allocate_slot(Some(handle)) {
4734        Ok(a) => a,
4735        Err(e) => {
4736            eprintln!(
4737                "wire session new: local relay slot allocation failed: {e:#} — staying federation-only"
4738            );
4739            return;
4740        }
4741    };
4742
4743    // Merge into the session's relay_state.json. We invoke wire via
4744    // run_wire_with_home for federation calls (subprocess isolation),
4745    // but the relay_state.json is a simple file we can edit directly
4746    // — and need to, because there's no `wire bind-relay --add-local`
4747    // command yet (could add later; out of scope for v0.5.17 MVP).
4748    let state_path = session_home
4749        .join("config")
4750        .join("wire")
4751        .join("relay-state.json");
4752    let mut state: serde_json::Value = std::fs::read(&state_path)
4753        .ok()
4754        .and_then(|b| serde_json::from_slice(&b).ok())
4755        .unwrap_or_else(|| serde_json::json!({}));
4756    // Read the existing federation self info (already written by
4757    // `wire init` + `wire bind-relay` path during session bootstrap).
4758    let fed_endpoint = state
4759        .get("self")
4760        .and_then(|s| {
4761            let url = s.get("relay_url").and_then(serde_json::Value::as_str)?;
4762            let slot_id = s.get("slot_id").and_then(serde_json::Value::as_str)?;
4763            let slot_token = s.get("slot_token").and_then(serde_json::Value::as_str)?;
4764            Some(crate::endpoints::Endpoint::federation(
4765                url.to_string(),
4766                slot_id.to_string(),
4767                slot_token.to_string(),
4768            ))
4769        });
4770
4771    let local_endpoint = crate::endpoints::Endpoint::local(
4772        local_relay.trim_end_matches('/').to_string(),
4773        alloc.slot_id.clone(),
4774        alloc.slot_token.clone(),
4775    );
4776
4777    let mut endpoints: Vec<crate::endpoints::Endpoint> = Vec::new();
4778    if let Some(f) = fed_endpoint.clone() {
4779        endpoints.push(f);
4780    }
4781    endpoints.push(local_endpoint);
4782
4783    let self_obj = state
4784        .as_object_mut()
4785        .expect("relay_state root is an object")
4786        .entry("self")
4787        .or_insert_with(|| {
4788            serde_json::json!({
4789                "relay_url": federation_relay,
4790            })
4791        });
4792    if let Some(obj) = self_obj.as_object_mut() {
4793        obj.insert(
4794            "endpoints".into(),
4795            serde_json::to_value(&endpoints).unwrap_or(serde_json::Value::Null),
4796        );
4797    }
4798
4799    if let Err(e) = std::fs::write(
4800        &state_path,
4801        serde_json::to_vec_pretty(&state).unwrap_or_default(),
4802    ) {
4803        eprintln!(
4804            "wire session new: persisting dual-slot relay_state at {state_path:?} failed: {e}"
4805        );
4806        return;
4807    }
4808    eprintln!(
4809        "wire session new: local slot allocated on {local_relay} (slot_id={})",
4810        alloc.slot_id
4811    );
4812}
4813
4814fn render_session_info(
4815    name: &str,
4816    session_home: &std::path::Path,
4817    cwd: &std::path::Path,
4818) -> Result<serde_json::Value> {
4819    let card_path = session_home.join("config").join("wire").join("agent-card.json");
4820    let (did, handle) = if card_path.exists() {
4821        let card: Value = serde_json::from_slice(&std::fs::read(&card_path)?)?;
4822        let did = card
4823            .get("did")
4824            .and_then(Value::as_str)
4825            .unwrap_or("")
4826            .to_string();
4827        let handle = card
4828            .get("handle")
4829            .and_then(Value::as_str)
4830            .map(str::to_string)
4831            .unwrap_or_else(|| {
4832                crate::agent_card::display_handle_from_did(&did).to_string()
4833            });
4834        (did, handle)
4835    } else {
4836        (String::new(), String::new())
4837    };
4838    Ok(json!({
4839        "name": name,
4840        "home_dir": session_home.to_string_lossy(),
4841        "cwd": cwd.to_string_lossy(),
4842        "did": did,
4843        "handle": handle,
4844        "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
4845    }))
4846}
4847
4848fn emit_session_new_result(
4849    info: &serde_json::Value,
4850    status: &str,
4851    as_json: bool,
4852) -> Result<()> {
4853    if as_json {
4854        let mut obj = info.clone();
4855        obj["status"] = json!(status);
4856        println!("{}", serde_json::to_string(&obj)?);
4857    } else {
4858        let name = info["name"].as_str().unwrap_or("?");
4859        let handle = info["handle"].as_str().unwrap_or("?");
4860        let home = info["home_dir"].as_str().unwrap_or("?");
4861        let did = info["did"].as_str().unwrap_or("?");
4862        let export = info["export"].as_str().unwrap_or("?");
4863        let prefix = if status == "already_exists" {
4864            "session already exists (re-registered cwd)"
4865        } else {
4866            "session created"
4867        };
4868        println!(
4869            "{prefix}\n  name:   {name}\n  handle: {handle}\n  did:    {did}\n  home:   {home}\n\nactivate with:\n  {export}"
4870        );
4871    }
4872    Ok(())
4873}
4874
4875fn run_wire_with_home(
4876    session_home: &std::path::Path,
4877    args: &[&str],
4878) -> Result<std::process::ExitStatus> {
4879    let bin = std::env::current_exe().with_context(|| "locating self exe")?;
4880    let status = std::process::Command::new(&bin)
4881        .env("WIRE_HOME", session_home)
4882        .env_remove("RUST_LOG")
4883        .args(args)
4884        .status()
4885        .with_context(|| format!("spawning `wire {}`", args.join(" ")))?;
4886    Ok(status)
4887}
4888
4889fn ensure_session_daemon(session_home: &std::path::Path) -> Result<()> {
4890    // Check if a daemon is already alive in this session's WIRE_HOME.
4891    // If so, no-op (let the existing process keep running).
4892    let pidfile = session_home
4893        .join("state")
4894        .join("wire")
4895        .join("daemon.pid");
4896    if pidfile.exists() {
4897        let bytes = std::fs::read(&pidfile).unwrap_or_default();
4898        let pid: Option<u32> =
4899            if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
4900                v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
4901            } else {
4902                String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
4903            };
4904        if let Some(p) = pid {
4905            let alive = {
4906                #[cfg(target_os = "linux")]
4907                {
4908                    std::path::Path::new(&format!("/proc/{p}")).exists()
4909                }
4910                #[cfg(not(target_os = "linux"))]
4911                {
4912                    std::process::Command::new("kill")
4913                        .args(["-0", &p.to_string()])
4914                        .output()
4915                        .map(|o| o.status.success())
4916                        .unwrap_or(false)
4917                }
4918            };
4919            if alive {
4920                return Ok(());
4921            }
4922        }
4923    }
4924
4925    // Spawn `wire daemon` detached. The existing `cmd_daemon` writes the
4926    // versioned pidfile; we just kick it off and return.
4927    let bin = std::env::current_exe().with_context(|| "locating self exe")?;
4928    let log_path = session_home.join("state").join("wire").join("daemon.log");
4929    if let Some(parent) = log_path.parent() {
4930        std::fs::create_dir_all(parent).ok();
4931    }
4932    let log_file = std::fs::OpenOptions::new()
4933        .create(true)
4934        .append(true)
4935        .open(&log_path)
4936        .with_context(|| format!("opening daemon log {log_path:?}"))?;
4937    let log_err = log_file.try_clone()?;
4938    std::process::Command::new(&bin)
4939        .env("WIRE_HOME", session_home)
4940        .env_remove("RUST_LOG")
4941        .args(["daemon", "--interval", "5"])
4942        .stdout(log_file)
4943        .stderr(log_err)
4944        .stdin(std::process::Stdio::null())
4945        .spawn()
4946        .with_context(|| "spawning session-local `wire daemon`")?;
4947    Ok(())
4948}
4949
4950fn cmd_session_list(as_json: bool) -> Result<()> {
4951    let items = crate::session::list_sessions()?;
4952    if as_json {
4953        println!("{}", serde_json::to_string(&items)?);
4954        return Ok(());
4955    }
4956    if items.is_empty() {
4957        println!("no sessions on this machine. `wire session new` to create one.");
4958        return Ok(());
4959    }
4960    println!(
4961        "{:<24} {:<24} {:<10} CWD",
4962        "NAME", "HANDLE", "DAEMON"
4963    );
4964    for s in items {
4965        println!(
4966            "{:<24} {:<24} {:<10} {}",
4967            s.name,
4968            s.handle.as_deref().unwrap_or("?"),
4969            if s.daemon_running { "running" } else { "down" },
4970            s.cwd.as_deref().unwrap_or("(no cwd registered)"),
4971        );
4972    }
4973    Ok(())
4974}
4975
4976fn cmd_session_env(name_arg: Option<&str>, as_json: bool) -> Result<()> {
4977    let name = resolve_session_name(name_arg)?;
4978    let session_home = crate::session::session_dir(&name)?;
4979    if !session_home.exists() {
4980        bail!(
4981            "no session named {name:?} on this machine. `wire session list` to enumerate, \
4982             `wire session new {name}` to create."
4983        );
4984    }
4985    if as_json {
4986        println!(
4987            "{}",
4988            serde_json::to_string(&json!({
4989                "name": name,
4990                "home_dir": session_home.to_string_lossy(),
4991                "export": format!("export WIRE_HOME={}", session_home.to_string_lossy()),
4992            }))?
4993        );
4994    } else {
4995        println!("export WIRE_HOME={}", session_home.to_string_lossy());
4996    }
4997    Ok(())
4998}
4999
5000fn cmd_session_current(as_json: bool) -> Result<()> {
5001    let cwd = std::env::current_dir().with_context(|| "reading cwd")?;
5002    let registry = crate::session::read_registry().unwrap_or_default();
5003    let cwd_key = cwd.to_string_lossy().into_owned();
5004    let name = registry.by_cwd.get(&cwd_key).cloned();
5005    if as_json {
5006        println!(
5007            "{}",
5008            serde_json::to_string(&json!({
5009                "cwd": cwd_key,
5010                "session": name,
5011            }))?
5012        );
5013    } else if let Some(n) = name {
5014        println!("{n}");
5015    } else {
5016        println!("(no session registered for this cwd)");
5017    }
5018    Ok(())
5019}
5020
5021fn cmd_session_destroy(name_arg: &str, force: bool, as_json: bool) -> Result<()> {
5022    let name = crate::session::sanitize_name(name_arg);
5023    let session_home = crate::session::session_dir(&name)?;
5024    if !session_home.exists() {
5025        if as_json {
5026            println!(
5027                "{}",
5028                serde_json::to_string(&json!({
5029                    "name": name,
5030                    "destroyed": false,
5031                    "reason": "no such session",
5032                }))?
5033            );
5034        } else {
5035            println!("no session named {name:?} — nothing to destroy.");
5036        }
5037        return Ok(());
5038    }
5039    if !force {
5040        bail!(
5041            "destroying session {name:?} would delete its keypair + state irrecoverably. \
5042             Pass --force to confirm."
5043        );
5044    }
5045
5046    // Kill the session-local daemon if alive.
5047    let pidfile = session_home
5048        .join("state")
5049        .join("wire")
5050        .join("daemon.pid");
5051    if let Ok(bytes) = std::fs::read(&pidfile) {
5052        let pid: Option<u32> =
5053            if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
5054                v.get("pid").and_then(|p| p.as_u64()).map(|p| p as u32)
5055            } else {
5056                String::from_utf8_lossy(&bytes).trim().parse::<u32>().ok()
5057            };
5058        if let Some(p) = pid {
5059            let _ = std::process::Command::new("kill")
5060                .args(["-TERM", &p.to_string()])
5061                .output();
5062        }
5063    }
5064
5065    std::fs::remove_dir_all(&session_home)
5066        .with_context(|| format!("removing session dir {session_home:?}"))?;
5067
5068    // Strip from registry.
5069    let mut registry = crate::session::read_registry().unwrap_or_default();
5070    registry.by_cwd.retain(|_, v| v != &name);
5071    crate::session::write_registry(&registry)?;
5072
5073    if as_json {
5074        println!(
5075            "{}",
5076            serde_json::to_string(&json!({
5077                "name": name,
5078                "destroyed": true,
5079            }))?
5080        );
5081    } else {
5082        println!("destroyed session {name:?}.");
5083    }
5084    Ok(())
5085}
5086
5087// ---------- diag (structured trace) ----------
5088
5089fn cmd_diag(action: DiagAction) -> Result<()> {
5090    let state = config::state_dir()?;
5091    let knob = state.join("diag.enabled");
5092    let log_path = state.join("diag.jsonl");
5093    match action {
5094        DiagAction::Tail { limit, json } => {
5095            let entries = crate::diag::tail(limit);
5096            if json {
5097                for e in entries {
5098                    println!("{}", serde_json::to_string(&e)?);
5099                }
5100            } else if entries.is_empty() {
5101                println!("wire diag: no entries (diag may be disabled — `wire diag enable`)");
5102            } else {
5103                for e in entries {
5104                    let ts = e["ts"].as_u64().unwrap_or(0);
5105                    let ty = e["type"].as_str().unwrap_or("?");
5106                    let pid = e["pid"].as_u64().unwrap_or(0);
5107                    let payload = e["payload"].to_string();
5108                    println!("[{ts}] pid={pid} {ty} {payload}");
5109                }
5110            }
5111        }
5112        DiagAction::Enable => {
5113            config::ensure_dirs()?;
5114            std::fs::write(&knob, "1")?;
5115            println!("wire diag: enabled at {knob:?}");
5116        }
5117        DiagAction::Disable => {
5118            if knob.exists() {
5119                std::fs::remove_file(&knob)?;
5120            }
5121            println!("wire diag: disabled (env WIRE_DIAG may still flip it on per-process)");
5122        }
5123        DiagAction::Status { json } => {
5124            let enabled = crate::diag::is_enabled();
5125            let size = std::fs::metadata(&log_path)
5126                .map(|m| m.len())
5127                .unwrap_or(0);
5128            if json {
5129                println!(
5130                    "{}",
5131                    serde_json::to_string(&serde_json::json!({
5132                        "enabled": enabled,
5133                        "log_path": log_path,
5134                        "log_size_bytes": size,
5135                    }))?
5136                );
5137            } else {
5138                println!("wire diag status");
5139                println!("  enabled:    {enabled}");
5140                println!("  log:        {log_path:?}");
5141                println!("  log size:   {size} bytes");
5142            }
5143        }
5144    }
5145    Ok(())
5146}
5147
5148// ---------- service (install / uninstall / status) ----------
5149
5150fn cmd_service(action: ServiceAction) -> Result<()> {
5151    let (report, as_json) = match action {
5152        ServiceAction::Install { json } => (crate::service::install()?, json),
5153        ServiceAction::Uninstall { json } => (crate::service::uninstall()?, json),
5154        ServiceAction::Status { json } => (crate::service::status()?, json),
5155    };
5156    if as_json {
5157        println!("{}", serde_json::to_string(&report)?);
5158    } else {
5159        println!("wire service {}", report.action);
5160        println!("  platform:  {}", report.platform);
5161        println!("  unit:      {}", report.unit_path);
5162        println!("  status:    {}", report.status);
5163        println!("  detail:    {}", report.detail);
5164    }
5165    Ok(())
5166}
5167
5168// ---------- upgrade (atomic daemon swap) ----------
5169
5170/// `wire upgrade` — kill all running `wire daemon` processes, spawn a
5171/// fresh one from the currently-installed binary, write a new versioned
5172/// pidfile. The fix for today's exact failure mode: a daemon process that
5173/// kept running OLD binary text in memory under a symlink that had since
5174/// been repointed at a NEW binary on disk.
5175///
5176/// Idempotent. If no stale daemon is running, just starts a fresh one
5177/// (same as `wire daemon &` but with the wait-until-alive guard from
5178/// ensure_up::ensure_daemon_running).
5179///
5180/// `--check` mode reports drift without acting — lists the processes
5181/// that WOULD be killed and the binary version of each.
5182fn cmd_upgrade(check_only: bool, as_json: bool) -> Result<()> {
5183    // 1. Identify all `wire daemon` processes.
5184    let pgrep_out = std::process::Command::new("pgrep")
5185        .args(["-f", "wire daemon"])
5186        .output();
5187    let running_pids: Vec<u32> = match pgrep_out {
5188        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
5189            .split_whitespace()
5190            .filter_map(|s| s.parse::<u32>().ok())
5191            .collect(),
5192        _ => Vec::new(),
5193    };
5194
5195    // 2. Read pidfile to surface what the daemon THINKS it is.
5196    let record = crate::ensure_up::read_pid_record("daemon");
5197    let recorded_version: Option<String> = match &record {
5198        crate::ensure_up::PidRecord::Json(d) => Some(d.version.clone()),
5199        crate::ensure_up::PidRecord::LegacyInt(_) => Some("<pre-0.5.11>".to_string()),
5200        _ => None,
5201    };
5202    let cli_version = env!("CARGO_PKG_VERSION").to_string();
5203
5204    if check_only {
5205        let report = json!({
5206            "running_pids": running_pids,
5207            "pidfile_version": recorded_version,
5208            "cli_version": cli_version,
5209            "would_kill": running_pids,
5210        });
5211        if as_json {
5212            println!("{}", serde_json::to_string(&report)?);
5213        } else {
5214            println!("wire upgrade --check");
5215            println!("  cli version:      {cli_version}");
5216            println!("  pidfile version:  {}", recorded_version.as_deref().unwrap_or("(missing)"));
5217            if running_pids.is_empty() {
5218                println!("  running daemons:  none");
5219            } else {
5220                let pids: Vec<String> = running_pids.iter().map(|p| p.to_string()).collect();
5221                println!("  running daemons:  pids {}", pids.join(", "));
5222                println!("  would kill all + spawn fresh");
5223            }
5224        }
5225        return Ok(());
5226    }
5227
5228    // 3. Kill every running wire daemon. Use SIGTERM first, then SIGKILL
5229    // after a brief grace period.
5230    let mut killed: Vec<u32> = Vec::new();
5231    for pid in &running_pids {
5232        // SIGTERM (15).
5233        let _ = std::process::Command::new("kill")
5234            .args(["-15", &pid.to_string()])
5235            .status();
5236        killed.push(*pid);
5237    }
5238    // Wait up to ~2s for graceful exit.
5239    if !killed.is_empty() {
5240        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
5241        loop {
5242            let still_alive: Vec<u32> = killed
5243                .iter()
5244                .copied()
5245                .filter(|p| process_alive_pid(*p))
5246                .collect();
5247            if still_alive.is_empty() {
5248                break;
5249            }
5250            if std::time::Instant::now() >= deadline {
5251                // SIGKILL hold-outs.
5252                for pid in still_alive {
5253                    let _ = std::process::Command::new("kill")
5254                        .args(["-9", &pid.to_string()])
5255                        .status();
5256                }
5257                break;
5258            }
5259            std::thread::sleep(std::time::Duration::from_millis(50));
5260        }
5261    }
5262
5263    // 4. Remove stale pidfile so ensure_daemon_running doesn't think the
5264    //    old daemon is still owning it.
5265    let pidfile = config::state_dir()?.join("daemon.pid");
5266    if pidfile.exists() {
5267        let _ = std::fs::remove_file(&pidfile);
5268    }
5269
5270    // 5. Spawn fresh daemon via ensure_up — atomically waits for
5271    //    process_alive + writes the versioned pidfile.
5272    let spawned = crate::ensure_up::ensure_daemon_running()?;
5273
5274    let new_record = crate::ensure_up::read_pid_record("daemon");
5275    let new_pid = new_record.pid();
5276    let new_version: Option<String> = if let crate::ensure_up::PidRecord::Json(d) = &new_record {
5277        Some(d.version.clone())
5278    } else {
5279        None
5280    };
5281
5282    if as_json {
5283        println!(
5284            "{}",
5285            serde_json::to_string(&json!({
5286                "killed": killed,
5287                "spawned_fresh_daemon": spawned,
5288                "new_pid": new_pid,
5289                "new_version": new_version,
5290                "cli_version": cli_version,
5291            }))?
5292        );
5293    } else {
5294        if killed.is_empty() {
5295            println!("wire upgrade: no stale daemons running");
5296        } else {
5297            println!("wire upgrade: killed {} daemon(s) (pids {})",
5298                killed.len(),
5299                killed.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", "));
5300        }
5301        if spawned {
5302            println!(
5303                "wire upgrade: spawned fresh daemon (pid {} v{})",
5304                new_pid.map(|p| p.to_string()).unwrap_or_else(|| "?".to_string()),
5305                new_version.as_deref().unwrap_or(&cli_version),
5306            );
5307        } else {
5308            println!("wire upgrade: daemon was already running on current binary");
5309        }
5310    }
5311    Ok(())
5312}
5313
5314fn process_alive_pid(pid: u32) -> bool {
5315    #[cfg(target_os = "linux")]
5316    {
5317        std::path::Path::new(&format!("/proc/{pid}")).exists()
5318    }
5319    #[cfg(not(target_os = "linux"))]
5320    {
5321        std::process::Command::new("kill")
5322            .args(["-0", &pid.to_string()])
5323            .stdin(std::process::Stdio::null())
5324            .stdout(std::process::Stdio::null())
5325            .stderr(std::process::Stdio::null())
5326            .status()
5327            .map(|s| s.success())
5328            .unwrap_or(false)
5329    }
5330}
5331
5332// ---------- doctor (single-command diagnostic) ----------
5333
5334/// One DoctorCheck = one verdict on one health dimension.
5335#[derive(Clone, Debug, serde::Serialize)]
5336pub struct DoctorCheck {
5337    /// Short stable identifier (`daemon`, `relay`, `pair_rejections`, ...).
5338    /// Stable across versions for tooling consumption.
5339    pub id: String,
5340    /// PASS / WARN / FAIL.
5341    pub status: String,
5342    /// One-line human summary.
5343    pub detail: String,
5344    /// Optional remediation hint shown after the failing line.
5345    #[serde(skip_serializing_if = "Option::is_none")]
5346    pub fix: Option<String>,
5347}
5348
5349impl DoctorCheck {
5350    fn pass(id: &str, detail: impl Into<String>) -> Self {
5351        Self {
5352            id: id.into(),
5353            status: "PASS".into(),
5354            detail: detail.into(),
5355            fix: None,
5356        }
5357    }
5358    fn warn(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5359        Self {
5360            id: id.into(),
5361            status: "WARN".into(),
5362            detail: detail.into(),
5363            fix: Some(fix.into()),
5364        }
5365    }
5366    fn fail(id: &str, detail: impl Into<String>, fix: impl Into<String>) -> Self {
5367        Self {
5368            id: id.into(),
5369            status: "FAIL".into(),
5370            detail: detail.into(),
5371            fix: Some(fix.into()),
5372        }
5373    }
5374}
5375
5376/// `wire doctor` — single-command diagnostic for the silent-fail classes
5377/// 0.5.11 ships fixes for. Surfaces what each fix produces (P0.1 cursor
5378/// blocks, P0.2 pair-rejection logs, P0.4 daemon version mismatch, etc.)
5379/// so operators don't have to know where each lives.
5380fn cmd_doctor(as_json: bool, recent_rejections: usize) -> Result<()> {
5381    let mut checks: Vec<DoctorCheck> = Vec::new();
5382
5383    checks.push(check_daemon_health());
5384    checks.push(check_daemon_pid_consistency());
5385    checks.push(check_relay_reachable());
5386    checks.push(check_pair_rejections(recent_rejections));
5387    checks.push(check_cursor_progress());
5388
5389    let fails = checks.iter().filter(|c| c.status == "FAIL").count();
5390    let warns = checks.iter().filter(|c| c.status == "WARN").count();
5391
5392    if as_json {
5393        println!(
5394            "{}",
5395            serde_json::to_string(&json!({
5396                "checks": checks,
5397                "fail_count": fails,
5398                "warn_count": warns,
5399                "ok": fails == 0,
5400            }))?
5401        );
5402    } else {
5403        println!("wire doctor — {} checks", checks.len());
5404        for c in &checks {
5405            let bullet = match c.status.as_str() {
5406                "PASS" => "✓",
5407                "WARN" => "!",
5408                "FAIL" => "✗",
5409                _ => "?",
5410            };
5411            println!("  {bullet} [{}] {}: {}", c.status, c.id, c.detail);
5412            if let Some(fix) = &c.fix {
5413                println!("      fix: {fix}");
5414            }
5415        }
5416        println!();
5417        if fails == 0 && warns == 0 {
5418            println!("ALL GREEN");
5419        } else {
5420            println!("{fails} FAIL, {warns} WARN");
5421        }
5422    }
5423
5424    if fails > 0 {
5425        std::process::exit(1);
5426    }
5427    Ok(())
5428}
5429
5430/// Check: daemon running, exactly one instance, no orphans.
5431///
5432/// Today's debug surfaced PID 54017 (old-binary wire daemon running for 4
5433/// days, advancing cursor without pinning). `wire status` lied about it.
5434/// `wire doctor` must catch THIS class: multiple daemons running, OR
5435/// pid-file claims daemon down while a process is actually up.
5436fn check_daemon_health() -> DoctorCheck {
5437    // v0.5.13 (issue #2 bug A): doctor PASSed on orphan-only state while
5438    // `wire status` reported DOWN, disagreeing for 25 min. Doctor used
5439    // pgrep alone; status cross-checked the pidfile. Doctor now consults
5440    // BOTH so the two surfaces never disagree.
5441    let output = std::process::Command::new("pgrep")
5442        .args(["-f", "wire daemon"])
5443        .output();
5444    let pgrep_pids: Vec<u32> = match output {
5445        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout)
5446            .split_whitespace()
5447            .filter_map(|s| s.parse::<u32>().ok())
5448            .collect(),
5449        _ => Vec::new(),
5450    };
5451    let pidfile_pid = crate::ensure_up::read_pid_record("daemon").pid();
5452    // Is the pidfile-claimed daemon actually alive?
5453    let pidfile_alive = pidfile_pid
5454        .map(|pid| {
5455            #[cfg(target_os = "linux")]
5456            {
5457                std::path::Path::new(&format!("/proc/{pid}")).exists()
5458            }
5459            #[cfg(not(target_os = "linux"))]
5460            {
5461                std::process::Command::new("kill")
5462                    .args(["-0", &pid.to_string()])
5463                    .output()
5464                    .map(|o| o.status.success())
5465                    .unwrap_or(false)
5466            }
5467        })
5468        .unwrap_or(false);
5469    let orphan_pids: Vec<u32> = pgrep_pids
5470        .iter()
5471        .filter(|p| Some(**p) != pidfile_pid)
5472        .copied()
5473        .collect();
5474
5475    let fmt_pids = |xs: &[u32]| -> String {
5476        xs.iter()
5477            .map(|p| p.to_string())
5478            .collect::<Vec<_>>()
5479            .join(", ")
5480    };
5481
5482    match (pgrep_pids.len(), pidfile_alive, orphan_pids.is_empty()) {
5483        (0, _, _) => DoctorCheck::fail(
5484            "daemon",
5485            "no `wire daemon` process running — nothing pulling inbox or pushing outbox",
5486            "`wire daemon &` to start, or re-run `wire up <handle>@<relay>` to bootstrap",
5487        ),
5488        // Single daemon AND it matches the pidfile → healthy.
5489        (1, true, true) => DoctorCheck::pass(
5490            "daemon",
5491            format!(
5492                "one daemon running (pid {}, matches pidfile)",
5493                pgrep_pids[0]
5494            ),
5495        ),
5496        // Pidfile is alive but pgrep ALSO sees orphan processes.
5497        (n, true, false) => DoctorCheck::fail(
5498            "daemon",
5499            format!(
5500                "{n} `wire daemon` processes running (pids: {}); pidfile claims pid {} but pgrep also sees orphan(s): {}. \
5501                 The orphans race the relay cursor — they advance past events your current binary can't process. \
5502                 (Issue #2 exact class.)",
5503                fmt_pids(&pgrep_pids),
5504                pidfile_pid.unwrap(),
5505                fmt_pids(&orphan_pids),
5506            ),
5507            "`wire upgrade` kills all orphans and spawns a fresh daemon with a clean pidfile",
5508        ),
5509        // Pidfile is dead but processes ARE running → all are orphans.
5510        (n, false, _) => DoctorCheck::fail(
5511            "daemon",
5512            format!(
5513                "{n} `wire daemon` process(es) running (pids: {}) but pidfile {} — \
5514                 every running daemon is an orphan, advancing the cursor without coordinating with the current CLI. \
5515                 (Issue #2 exact class: doctor previously PASSed this state while `wire status` said DOWN.)",
5516                fmt_pids(&pgrep_pids),
5517                match pidfile_pid {
5518                    Some(p) => format!("claims pid {p} which is dead"),
5519                    None => "is missing".to_string(),
5520                },
5521            ),
5522            "`wire upgrade` to kill the orphan(s) and spawn a fresh daemon",
5523        ),
5524        // Multiple daemons all matching … impossible by construction; fall back to warn.
5525        (n, true, true) => DoctorCheck::warn(
5526            "daemon",
5527            format!(
5528                "{n} `wire daemon` processes running (pids: {}). Multiple daemons race the relay cursor.",
5529                fmt_pids(&pgrep_pids)
5530            ),
5531            "kill all-but-one: `pkill -f \"wire daemon\"; wire daemon &`",
5532        ),
5533    }
5534}
5535
5536/// Check: structured pidfile matches running daemon. Spark's P0.4 5th
5537/// check. Surfaces version mismatch (daemon running old binary text in
5538/// memory under a current symlink — today's exact bug class), schema
5539/// drift (future format bumps), and identity contamination (daemon's
5540/// recorded DID doesn't match this box's configured DID).
5541fn check_daemon_pid_consistency() -> DoctorCheck {
5542    let record = crate::ensure_up::read_pid_record("daemon");
5543    match record {
5544        crate::ensure_up::PidRecord::Missing => DoctorCheck::pass(
5545            "daemon_pid_consistency",
5546            "no daemon.pid yet — fresh box or daemon never started",
5547        ),
5548        crate::ensure_up::PidRecord::Corrupt(reason) => DoctorCheck::warn(
5549            "daemon_pid_consistency",
5550            format!("daemon.pid is corrupt: {reason}"),
5551            "delete state/wire/daemon.pid; next `wire daemon &` will rewrite",
5552        ),
5553        crate::ensure_up::PidRecord::LegacyInt(pid) => DoctorCheck::warn(
5554            "daemon_pid_consistency",
5555            format!(
5556                "daemon.pid is legacy-int form (pid={pid}, no version/bin_path metadata). \
5557                 Daemon was started by a pre-0.5.11 binary."
5558            ),
5559            "run `wire upgrade` to kill the old daemon and start a fresh one with the JSON pidfile",
5560        ),
5561        crate::ensure_up::PidRecord::Json(d) => {
5562            let mut issues: Vec<String> = Vec::new();
5563            if d.schema != crate::ensure_up::DAEMON_PID_SCHEMA {
5564                issues.push(format!(
5565                    "schema={} (expected {})",
5566                    d.schema,
5567                    crate::ensure_up::DAEMON_PID_SCHEMA
5568                ));
5569            }
5570            let cli_version = env!("CARGO_PKG_VERSION");
5571            if d.version != cli_version {
5572                issues.push(format!(
5573                    "version daemon={} cli={cli_version}",
5574                    d.version
5575                ));
5576            }
5577            if !std::path::Path::new(&d.bin_path).exists() {
5578                issues.push(format!("bin_path {} missing on disk", d.bin_path));
5579            }
5580            // Cross-check DID + relay against current config (best-effort).
5581            if let Ok(card) = config::read_agent_card()
5582                && let Some(current_did) = card.get("did").and_then(Value::as_str)
5583                && let Some(recorded_did) = &d.did
5584                && recorded_did != current_did
5585            {
5586                issues.push(format!(
5587                    "did daemon={recorded_did} config={current_did} — identity drift"
5588                ));
5589            }
5590            if let Ok(state) = config::read_relay_state()
5591                && let Some(current_relay) = state
5592                    .get("self")
5593                    .and_then(|s| s.get("relay_url"))
5594                    .and_then(Value::as_str)
5595                && let Some(recorded_relay) = &d.relay_url
5596                && recorded_relay != current_relay
5597            {
5598                issues.push(format!(
5599                    "relay_url daemon={recorded_relay} config={current_relay} — relay-migration drift"
5600                ));
5601            }
5602            if issues.is_empty() {
5603                DoctorCheck::pass(
5604                    "daemon_pid_consistency",
5605                    format!(
5606                        "daemon v{} bound to {} as {}",
5607                        d.version,
5608                        d.relay_url.as_deref().unwrap_or("?"),
5609                        d.did.as_deref().unwrap_or("?")
5610                    ),
5611                )
5612            } else {
5613                DoctorCheck::warn(
5614                    "daemon_pid_consistency",
5615                    format!("daemon pidfile drift: {}", issues.join("; ")),
5616                    "`wire upgrade` to atomically restart daemon with current config".to_string(),
5617                )
5618            }
5619        }
5620    }
5621}
5622
5623/// Check: bound relay's /healthz returns 200.
5624fn check_relay_reachable() -> DoctorCheck {
5625    let state = match config::read_relay_state() {
5626        Ok(s) => s,
5627        Err(e) => return DoctorCheck::fail(
5628            "relay",
5629            format!("could not read relay state: {e}"),
5630            "run `wire up <handle>@<relay>` to bootstrap",
5631        ),
5632    };
5633    let url = state
5634        .get("self")
5635        .and_then(|s| s.get("relay_url"))
5636        .and_then(Value::as_str)
5637        .unwrap_or("");
5638    if url.is_empty() {
5639        return DoctorCheck::warn(
5640            "relay",
5641            "no relay bound — wire send/pull will not work",
5642            "run `wire bind-relay <url>` or `wire up <handle>@<relay>`",
5643        );
5644    }
5645    let client = crate::relay_client::RelayClient::new(url);
5646    match client.check_healthz() {
5647        Ok(()) => DoctorCheck::pass("relay", format!("{url} healthz=200")),
5648        Err(e) => DoctorCheck::fail(
5649            "relay",
5650            format!("{url} unreachable: {e}"),
5651            format!("network reachable to {url}? relay running? check `curl {url}/healthz`"),
5652        ),
5653    }
5654}
5655
5656/// Check: count recent entries in pair-rejected.jsonl (P0.2 output). Every
5657/// entry there is a silent failure that, pre-0.5.11, would have left the
5658/// operator wondering why pairing didn't complete.
5659fn check_pair_rejections(recent_n: usize) -> DoctorCheck {
5660    let path = match config::state_dir() {
5661        Ok(d) => d.join("pair-rejected.jsonl"),
5662        Err(e) => return DoctorCheck::warn(
5663            "pair_rejections",
5664            format!("could not resolve state dir: {e}"),
5665            "set WIRE_HOME or fix XDG_STATE_HOME",
5666        ),
5667    };
5668    if !path.exists() {
5669        return DoctorCheck::pass(
5670            "pair_rejections",
5671            "no pair-rejected.jsonl — no recorded pair failures",
5672        );
5673    }
5674    let body = match std::fs::read_to_string(&path) {
5675        Ok(b) => b,
5676        Err(e) => return DoctorCheck::warn(
5677            "pair_rejections",
5678            format!("could not read {path:?}: {e}"),
5679            "check file permissions",
5680        ),
5681    };
5682    let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
5683    if lines.is_empty() {
5684        return DoctorCheck::pass(
5685            "pair_rejections",
5686            "pair-rejected.jsonl present but empty",
5687        );
5688    }
5689    let total = lines.len();
5690    let recent: Vec<&str> = lines.iter().rev().take(recent_n).rev().copied().collect();
5691    let mut summary: Vec<String> = Vec::new();
5692    for line in &recent {
5693        if let Ok(rec) = serde_json::from_str::<Value>(line) {
5694            let peer = rec.get("peer").and_then(Value::as_str).unwrap_or("?");
5695            let code = rec.get("code").and_then(Value::as_str).unwrap_or("?");
5696            summary.push(format!("{peer}/{code}"));
5697        }
5698    }
5699    DoctorCheck::warn(
5700        "pair_rejections",
5701        format!(
5702            "{total} pair failures recorded. recent: [{}]",
5703            summary.join(", ")
5704        ),
5705        format!(
5706            "inspect {path:?} for full details. Each entry is a pair-flow error that previously silently dropped — re-run `wire pair <handle>@<relay>` to retry."
5707        ),
5708    )
5709}
5710
5711/// Check: cursor isn't stuck. We can't tell without polling — but we can
5712/// report the current cursor position so operators see if it changes.
5713/// Real "stuck" detection needs two pulls separated in time; defer that
5714/// behaviour to a `wire doctor --watch` mode.
5715fn check_cursor_progress() -> DoctorCheck {
5716    let state = match config::read_relay_state() {
5717        Ok(s) => s,
5718        Err(e) => return DoctorCheck::warn(
5719            "cursor",
5720            format!("could not read relay state: {e}"),
5721            "check ~/Library/Application Support/wire/relay.json",
5722        ),
5723    };
5724    let cursor = state
5725        .get("self")
5726        .and_then(|s| s.get("last_pulled_event_id"))
5727        .and_then(Value::as_str)
5728        .map(|s| s.chars().take(16).collect::<String>())
5729        .unwrap_or_else(|| "<none>".to_string());
5730    DoctorCheck::pass(
5731        "cursor",
5732        format!(
5733            "current cursor: {cursor}. P0.1 cursor blocking is active — see `wire pull --json` for cursor_blocked / rejected[].blocks_cursor entries."
5734        ),
5735    )
5736}
5737
5738#[cfg(test)]
5739mod doctor_tests {
5740    use super::*;
5741
5742    #[test]
5743    fn doctor_check_constructors_set_status_correctly() {
5744        // Silent-fail-prevention rule: pass/warn/fail must be visibly
5745        // distinguishable to operators. If any constructor lets the wrong
5746        // status through, `wire doctor` lies and we're back to today's
5747        // 30-minute debug.
5748        let p = DoctorCheck::pass("x", "ok");
5749        assert_eq!(p.status, "PASS");
5750        assert_eq!(p.fix, None);
5751
5752        let w = DoctorCheck::warn("x", "watch out", "do this");
5753        assert_eq!(w.status, "WARN");
5754        assert_eq!(w.fix, Some("do this".to_string()));
5755
5756        let f = DoctorCheck::fail("x", "broken", "fix it");
5757        assert_eq!(f.status, "FAIL");
5758        assert_eq!(f.fix, Some("fix it".to_string()));
5759    }
5760
5761    #[test]
5762    fn check_pair_rejections_no_file_is_pass() {
5763        // Fresh-box case: no pair-rejected.jsonl yet. Must NOT report this
5764        // as a problem.
5765        config::test_support::with_temp_home(|| {
5766            config::ensure_dirs().unwrap();
5767            let c = check_pair_rejections(5);
5768            assert_eq!(c.status, "PASS", "no file should be PASS, got {c:?}");
5769        });
5770    }
5771
5772    #[test]
5773    fn check_pair_rejections_with_entries_warns() {
5774        // Existence of rejections is itself a signal — even if each entry
5775        // is a "known good failure," the operator wants to know they
5776        // happened.
5777        config::test_support::with_temp_home(|| {
5778            config::ensure_dirs().unwrap();
5779            crate::pair_invite::record_pair_rejection(
5780                "willard",
5781                "pair_drop_ack_send_failed",
5782                "POST 502",
5783            );
5784            let c = check_pair_rejections(5);
5785            assert_eq!(c.status, "WARN");
5786            assert!(c.detail.contains("1 pair failures"));
5787            assert!(c.detail.contains("willard/pair_drop_ack_send_failed"));
5788        });
5789    }
5790}
5791
5792// ---------- up megacommand (full bootstrap) ----------
5793
5794/// `wire up <nick@relay-host>` — single command from fresh box to ready-to-
5795/// pair. Composes the steps that today's onboarding walks operators through
5796/// one by one (init / bind-relay / claim / background daemon / arm monitor
5797/// recipe). Idempotent: every step checks current state and skips if done.
5798///
5799/// Argument parsing accepts:
5800///   - `<nick>@<relay-host>` — explicit relay
5801///   - `<nick>`              — defaults to wireup.net (the configured public
5802///                             relay)
5803fn cmd_up(handle_arg: &str, name: Option<&str>, as_json: bool) -> Result<()> {
5804    let (nick, relay_url) = match handle_arg.split_once('@') {
5805        Some((n, host)) => {
5806            let url = if host.starts_with("http://") || host.starts_with("https://") {
5807                host.to_string()
5808            } else {
5809                format!("https://{host}")
5810            };
5811            (n.to_string(), url)
5812        }
5813        None => (handle_arg.to_string(), crate::pair_invite::DEFAULT_RELAY.to_string()),
5814    };
5815
5816    let mut report: Vec<(String, String)> = Vec::new();
5817    let mut step = |stage: &str, detail: String| {
5818        report.push((stage.to_string(), detail.clone()));
5819        if !as_json {
5820            eprintln!("wire up: {stage} — {detail}");
5821        }
5822    };
5823
5824    // 1. init (or verify existing identity matches the requested nick).
5825    if config::is_initialized()? {
5826        let card = config::read_agent_card()?;
5827        let existing_did = card.get("did").and_then(Value::as_str).unwrap_or("");
5828        let existing_handle =
5829            crate::agent_card::display_handle_from_did(existing_did).to_string();
5830        if existing_handle != nick {
5831            bail!(
5832                "wire up: already initialized as {existing_handle:?} but you asked for {nick:?}. \
5833                 Either run with the existing handle (`wire up {existing_handle}@<relay>`) or \
5834                 delete `{:?}` to start fresh.",
5835                config::config_dir()?
5836            );
5837        }
5838        step("init", format!("already initialized as {existing_handle}"));
5839    } else {
5840        cmd_init(&nick, name, Some(&relay_url), /* as_json */ false)?;
5841        step("init", format!("created identity {nick} bound to {relay_url}"));
5842    }
5843
5844    // 2. Ensure relay binding matches. cmd_init with --relay binds it; if
5845    // already initialized we may need to bind to the requested relay
5846    // separately (operator switched relays).
5847    let relay_state = config::read_relay_state()?;
5848    let bound_relay = relay_state
5849        .get("self")
5850        .and_then(|s| s.get("relay_url"))
5851        .and_then(Value::as_str)
5852        .unwrap_or("")
5853        .to_string();
5854    if bound_relay.is_empty() {
5855        // Identity exists but never bound to a relay — bind now.
5856        cmd_bind_relay(&relay_url, /* as_json */ false)?;
5857        step("bind-relay", format!("bound to {relay_url}"));
5858    } else if bound_relay != relay_url {
5859        step(
5860            "bind-relay",
5861            format!(
5862                "WARNING: identity bound to {bound_relay} but you specified {relay_url}. \
5863                 Keeping existing binding. Run `wire bind-relay {relay_url}` to switch."
5864            ),
5865        );
5866    } else {
5867        step("bind-relay", format!("already bound to {bound_relay}"));
5868    }
5869
5870    // 3. Claim nick on the relay's handle directory. Idempotent — same-DID
5871    // re-claims are accepted by the relay.
5872    match cmd_claim(&nick, Some(&relay_url), None, /* as_json */ false) {
5873        Ok(()) => step("claim", format!("{nick}@{} claimed", strip_proto(&relay_url))),
5874        Err(e) => step(
5875            "claim",
5876            format!("WARNING: claim failed: {e}. You can retry `wire claim {nick}`."),
5877        ),
5878    }
5879
5880    // 4. Background daemon — must be running for pull/push/ack to flow.
5881    match crate::ensure_up::ensure_daemon_running() {
5882        Ok(true) => step("daemon", "started fresh background daemon".to_string()),
5883        Ok(false) => step("daemon", "already running".to_string()),
5884        Err(e) => step(
5885            "daemon",
5886            format!("WARNING: could not start daemon: {e}. Run `wire daemon &` manually."),
5887        ),
5888    }
5889
5890    // 5. Final summary — point operator at the next commands.
5891    let summary = format!(
5892        "ready. `wire pair <peer>@<relay>` to pair, `wire send <peer> \"<msg>\"` to send, \
5893         `wire monitor` to watch incoming events."
5894    );
5895    step("ready", summary.clone());
5896
5897    if as_json {
5898        let steps_json: Vec<_> = report
5899            .iter()
5900            .map(|(k, v)| json!({"stage": k, "detail": v}))
5901            .collect();
5902        println!(
5903            "{}",
5904            serde_json::to_string(&json!({
5905                "nick": nick,
5906                "relay": relay_url,
5907                "steps": steps_json,
5908            }))?
5909        );
5910    }
5911    Ok(())
5912}
5913
5914/// Strip http:// or https:// prefix for display in `wire up` step output.
5915fn strip_proto(url: &str) -> String {
5916    url.trim_start_matches("https://")
5917        .trim_start_matches("http://")
5918        .to_string()
5919}
5920
5921// ---------- pair megacommand (zero-paste handle-based) ----------
5922
5923/// `wire pair <nick@domain>` zero-shot. Dispatched from Command::Pair when
5924/// the handle is in `nick@domain` form. Wraps:
5925///
5926///   1. cmd_add — resolve, pin, drop intro
5927///   2. Wait up to `timeout_secs` for the peer's `pair_drop_ack` to arrive
5928///      (signalled by `peers.<handle>.slot_token` populating in relay state)
5929///   3. Verify bilateral pin: trust contains peer + relay state has token
5930///   4. Print final state — both sides VERIFIED + can `wire send`
5931///
5932/// On timeout: hard-errors with the specific stuck step so the operator
5933/// knows which side to chase. No silent partial success.
5934fn cmd_pair_megacommand(
5935    handle_arg: &str,
5936    relay_override: Option<&str>,
5937    timeout_secs: u64,
5938    _as_json: bool,
5939) -> Result<()> {
5940    let parsed = crate::pair_profile::parse_handle(handle_arg)?;
5941    let peer_handle = parsed.nick.clone();
5942
5943    eprintln!("wire pair: resolving {handle_arg}...");
5944    cmd_add(handle_arg, relay_override, /* as_json */ false)?;
5945
5946    eprintln!(
5947        "wire pair: intro delivered. waiting up to {timeout_secs}s for {peer_handle} \
5948         to ack (their daemon must be running + pulling)..."
5949    );
5950
5951    // Trigger an immediate daemon-style pull so we don't wait the full daemon
5952    // interval. Best-effort — if it fails, we still fall through to the
5953    // polling loop.
5954    let _ = run_sync_pull();
5955
5956    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
5957    let poll_interval = std::time::Duration::from_millis(500);
5958
5959    loop {
5960        // Drain anything new from the relay (e.g. our pair_drop_ack landing).
5961        let _ = run_sync_pull();
5962        let relay_state = config::read_relay_state()?;
5963        let peer_entry = relay_state
5964            .get("peers")
5965            .and_then(|p| p.get(&peer_handle))
5966            .cloned();
5967        let token = peer_entry
5968            .as_ref()
5969            .and_then(|e| e.get("slot_token"))
5970            .and_then(Value::as_str)
5971            .unwrap_or("");
5972
5973        if !token.is_empty() {
5974            // Bilateral pin complete — we have their slot_token, we can send.
5975            let trust = config::read_trust()?;
5976            let pinned_in_trust = trust
5977                .get("agents")
5978                .and_then(|a| a.get(&peer_handle))
5979                .is_some();
5980            println!(
5981                "wire pair: paired with {peer_handle}.\n  trust: {}  bilateral: yes (slot_token recorded)\n  next: `wire send {peer_handle} \"<msg>\"`",
5982                if pinned_in_trust { "VERIFIED" } else { "MISSING (bug)" }
5983            );
5984            return Ok(());
5985        }
5986
5987        if std::time::Instant::now() >= deadline {
5988            // Timeout — surface the EXACT stuck step. Likely culprits:
5989            //   - peer daemon not running on their box
5990            //   - peer's relay slot is offline
5991            //   - their daemon is on an older binary that doesn't know
5992            //     pair_drop kind=1100 (the P0.1 class — now visible via
5993            //     wire pull --json on their side as a blocking rejection)
5994            bail!(
5995                "wire pair: timed out after {timeout_secs}s. \
5996                 peer {peer_handle} never sent pair_drop_ack. \
5997                 likely causes: (a) their daemon is down — ask them to run \
5998                 `wire status` and `wire daemon &`; (b) their binary is older \
5999                 than 0.5.x and doesn't understand pair_drop events — ask \
6000                 them to `wire upgrade`; (c) network / relay blip — re-run \
6001                 `wire pair {handle_arg}` to retry."
6002            );
6003        }
6004
6005        std::thread::sleep(poll_interval);
6006    }
6007}
6008
6009fn cmd_claim(
6010    nick: &str,
6011    relay_override: Option<&str>,
6012    public_url: Option<&str>,
6013    as_json: bool,
6014) -> Result<()> {
6015    if !crate::pair_profile::is_valid_nick(nick) {
6016        bail!(
6017            "phyllis: {nick:?} won't fit in the books — handles need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list"
6018        );
6019    }
6020    // `wire claim` is the one-step bootstrap: auto-init + auto-allocate slot
6021    // + claim handle. Operator should never have to run init/bind-relay first.
6022    let (_did, relay_url, slot_id, slot_token) =
6023        crate::pair_invite::ensure_self_with_relay(relay_override)?;
6024    let card = config::read_agent_card()?;
6025
6026    let client = crate::relay_client::RelayClient::new(&relay_url);
6027    let resp = client.handle_claim(nick, &slot_id, &slot_token, public_url, &card)?;
6028
6029    if as_json {
6030        println!(
6031            "{}",
6032            serde_json::to_string(&json!({
6033                "nick": nick,
6034                "relay": relay_url,
6035                "response": resp,
6036            }))?
6037        );
6038    } else {
6039        // Best-effort: derive the public domain from the relay URL. If
6040        // operator passed --public-url that's the canonical address; else
6041        // the relay URL itself. Falls back to a placeholder if both miss.
6042        let domain = public_url
6043            .unwrap_or(&relay_url)
6044            .trim_start_matches("https://")
6045            .trim_start_matches("http://")
6046            .trim_end_matches('/')
6047            .split('/')
6048            .next()
6049            .unwrap_or("<this-relay-domain>")
6050            .to_string();
6051        println!("claimed {nick} on {relay_url} — others can reach you at: {nick}@{domain}");
6052        println!("verify with: wire whois {nick}@{domain}");
6053    }
6054    Ok(())
6055}
6056
6057fn cmd_profile(action: ProfileAction) -> Result<()> {
6058    match action {
6059        ProfileAction::Set { field, value, json } => {
6060            // Try parsing the value as JSON; if that fails, treat it as a
6061            // bare string. Lets operators pass either `42` or `"hello"` or
6062            // `["rust","late-night"]` without quoting hell.
6063            let parsed: Value =
6064                serde_json::from_str(&value).unwrap_or(Value::String(value.clone()));
6065            let new_profile = crate::pair_profile::write_profile_field(&field, parsed)?;
6066            if json {
6067                println!(
6068                    "{}",
6069                    serde_json::to_string(&json!({
6070                        "field": field,
6071                        "profile": new_profile,
6072                    }))?
6073                );
6074            } else {
6075                println!("profile.{field} set");
6076            }
6077        }
6078        ProfileAction::Get { json } => return cmd_whois(None, json, None),
6079        ProfileAction::Clear { field, json } => {
6080            let new_profile = crate::pair_profile::write_profile_field(&field, Value::Null)?;
6081            if json {
6082                println!(
6083                    "{}",
6084                    serde_json::to_string(&json!({
6085                        "field": field,
6086                        "cleared": true,
6087                        "profile": new_profile,
6088                    }))?
6089                );
6090            } else {
6091                println!("profile.{field} cleared");
6092            }
6093        }
6094    }
6095    Ok(())
6096}
6097
6098// ---------- setup — one-shot MCP host registration ----------
6099
6100fn cmd_setup(apply: bool) -> Result<()> {
6101    use std::path::PathBuf;
6102
6103    let entry = json!({"command": "wire", "args": ["mcp"]});
6104    let entry_pretty = serde_json::to_string_pretty(&json!({"wire": &entry}))?;
6105
6106    // Detect probable MCP host config locations. Cross-platform — we only
6107    // touch the file if it already exists OR --apply was passed.
6108    let mut targets: Vec<(&str, PathBuf)> = Vec::new();
6109    if let Some(home) = dirs::home_dir() {
6110        // Claude Code (CLI) — real config path is ~/.claude.json on all platforms (Linux/macOS/Windows).
6111        // The mcpServers map lives at the top level of that file.
6112        targets.push(("Claude Code", home.join(".claude.json")));
6113        // Legacy / alternate Claude Code XDG path — still try, harmless if absent.
6114        targets.push(("Claude Code (alt)", home.join(".config/claude/mcp.json")));
6115        // Claude Desktop macOS
6116        #[cfg(target_os = "macos")]
6117        targets.push((
6118            "Claude Desktop (macOS)",
6119            home.join("Library/Application Support/Claude/claude_desktop_config.json"),
6120        ));
6121        // Claude Desktop Windows
6122        #[cfg(target_os = "windows")]
6123        if let Ok(appdata) = std::env::var("APPDATA") {
6124            targets.push((
6125                "Claude Desktop (Windows)",
6126                PathBuf::from(appdata).join("Claude/claude_desktop_config.json"),
6127            ));
6128        }
6129        // Cursor
6130        targets.push(("Cursor", home.join(".cursor/mcp.json")));
6131    }
6132    // Project-local — works for several MCP-aware tools
6133    targets.push(("project-local (.mcp.json)", PathBuf::from(".mcp.json")));
6134
6135    println!("wire setup\n");
6136    println!("MCP server snippet (add this to your client's mcpServers):");
6137    println!();
6138    println!("{entry_pretty}");
6139    println!();
6140
6141    if !apply {
6142        println!("Probable MCP host config locations on this machine:");
6143        for (name, path) in &targets {
6144            let marker = if path.exists() {
6145                "✓ found"
6146            } else {
6147                "  (would create)"
6148            };
6149            println!("  {marker:14}  {name}: {}", path.display());
6150        }
6151        println!();
6152        println!("Run `wire setup --apply` to merge wire into each config above.");
6153        println!(
6154            "Existing entries with a different command keep yours unchanged unless wire's exact entry is missing."
6155        );
6156        return Ok(());
6157    }
6158
6159    let mut modified: Vec<String> = Vec::new();
6160    let mut skipped: Vec<String> = Vec::new();
6161    for (name, path) in &targets {
6162        match upsert_mcp_entry(path, "wire", &entry) {
6163            Ok(true) => modified.push(format!("✓ {name} ({})", path.display())),
6164            Ok(false) => skipped.push(format!("  {name} ({}): already configured", path.display())),
6165            Err(e) => skipped.push(format!("✗ {name} ({}): {e}", path.display())),
6166        }
6167    }
6168    if !modified.is_empty() {
6169        println!("Modified:");
6170        for line in &modified {
6171            println!("  {line}");
6172        }
6173        println!();
6174        println!("Restart the app(s) above to load wire MCP.");
6175    }
6176    if !skipped.is_empty() {
6177        println!();
6178        println!("Skipped:");
6179        for line in &skipped {
6180            println!("  {line}");
6181        }
6182    }
6183    Ok(())
6184}
6185
6186/// Idempotent merge of an `mcpServers.<name>` entry into a JSON config file.
6187/// Returns Ok(true) if file was changed, Ok(false) if entry already matched.
6188fn upsert_mcp_entry(path: &std::path::Path, server_name: &str, entry: &Value) -> Result<bool> {
6189    let mut cfg: Value = if path.exists() {
6190        let body = std::fs::read_to_string(path).context("reading config")?;
6191        serde_json::from_str(&body).unwrap_or_else(|_| json!({}))
6192    } else {
6193        json!({})
6194    };
6195    if !cfg.is_object() {
6196        cfg = json!({});
6197    }
6198    let root = cfg.as_object_mut().unwrap();
6199    let servers = root
6200        .entry("mcpServers".to_string())
6201        .or_insert_with(|| json!({}));
6202    if !servers.is_object() {
6203        *servers = json!({});
6204    }
6205    let map = servers.as_object_mut().unwrap();
6206    if map.get(server_name) == Some(entry) {
6207        return Ok(false);
6208    }
6209    map.insert(server_name.to_string(), entry.clone());
6210    if let Some(parent) = path.parent()
6211        && !parent.as_os_str().is_empty()
6212    {
6213        std::fs::create_dir_all(parent).context("creating parent dir")?;
6214    }
6215    let out = serde_json::to_string_pretty(&cfg)? + "\n";
6216    std::fs::write(path, out).context("writing config")?;
6217    Ok(true)
6218}
6219
6220// ---------- reactor — event-handler dispatch loop ----------
6221
6222#[allow(clippy::too_many_arguments)]
6223fn cmd_reactor(
6224    on_event: &str,
6225    peer_filter: Option<&str>,
6226    kind_filter: Option<&str>,
6227    verified_only: bool,
6228    interval_secs: u64,
6229    once: bool,
6230    dry_run: bool,
6231    max_per_minute: u32,
6232    max_chain_depth: u32,
6233) -> Result<()> {
6234    use crate::inbox_watch::{InboxEvent, InboxWatcher};
6235    use std::collections::{HashMap, HashSet, VecDeque};
6236    use std::io::Write;
6237    use std::process::{Command, Stdio};
6238    use std::time::{Duration, Instant};
6239
6240    let cursor_path = config::state_dir()?.join("reactor.cursor");
6241    // event_ids THIS reactor's handler has caused to be sent (via wire send).
6242    // Used by chain-depth check — an incoming `(re:X)` where X is in this set
6243    // means peer is replying to something we just said → don't reply back.
6244    //
6245    // Persisted across restarts so a reactor that crashes mid-conversation
6246    // doesn't re-enter the loop. Reads on startup, writes after each
6247    // outbox-grow detection. Capped at 500 entries (LRU-ish — old entries
6248    // dropped from front of file).
6249    let emitted_path = config::state_dir()?.join("reactor-emitted.log");
6250    let mut emitted_ids: HashSet<String> = HashSet::new();
6251    if emitted_path.exists()
6252        && let Ok(body) = std::fs::read_to_string(&emitted_path)
6253    {
6254        for line in body.lines() {
6255            let t = line.trim();
6256            if !t.is_empty() {
6257                emitted_ids.insert(t.to_string());
6258            }
6259        }
6260    }
6261    // Outbox file paths the reactor watches for new sent-event_ids.
6262    let outbox_dir = config::outbox_dir()?;
6263    // (peer → file size we've already scanned). Lets us notice new outbox
6264    // appends without re-reading the whole file each sweep.
6265    let mut outbox_cursors: HashMap<String, u64> = HashMap::new();
6266
6267    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6268
6269    let kind_num: Option<u32> = match kind_filter {
6270        Some(k) => Some(parse_kind(k)?),
6271        None => None,
6272    };
6273
6274    // Per-peer sliding window of dispatch instants for rate-limit check.
6275    let mut peer_dispatch_log: HashMap<String, VecDeque<Instant>> = HashMap::new();
6276
6277    let dispatch = |ev: &InboxEvent,
6278                    peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>,
6279                    emitted_ids: &HashSet<String>|
6280     -> Result<bool> {
6281        if let Some(p) = peer_filter
6282            && ev.peer != p
6283        {
6284            return Ok(false);
6285        }
6286        if verified_only && !ev.verified {
6287            return Ok(false);
6288        }
6289        if let Some(want) = kind_num {
6290            let ev_kind = ev.raw.get("kind").and_then(Value::as_u64).map(|n| n as u32);
6291            if ev_kind != Some(want) {
6292                return Ok(false);
6293            }
6294        }
6295
6296        // Chain-depth check: if the body contains `(re:<event_id>)` and that
6297        // event_id is in our emitted set, this is a reply to one of our
6298        // replies → loop suspected, skip.
6299        if max_chain_depth > 0 {
6300            let body_str = match &ev.raw["body"] {
6301                Value::String(s) => s.clone(),
6302                other => serde_json::to_string(other).unwrap_or_default(),
6303            };
6304            if let Some(referenced) = parse_re_marker(&body_str) {
6305                // Handler scripts usually truncate event_id (e.g. ${ID:0:12}).
6306                // Match emitted set by prefix to catch both full + truncated.
6307                let matched = emitted_ids.contains(&referenced)
6308                    || emitted_ids.iter().any(|full| full.starts_with(&referenced));
6309                if matched {
6310                    eprintln!(
6311                        "wire reactor: skip {} from {} — chain-depth (reply to our re:{})",
6312                        ev.event_id, ev.peer, referenced
6313                    );
6314                    return Ok(false);
6315                }
6316            }
6317        }
6318
6319        // Per-peer rate-limit check (sliding 60s window).
6320        if max_per_minute > 0 {
6321            let now = Instant::now();
6322            let win = peer_dispatch_log.entry(ev.peer.clone()).or_default();
6323            while let Some(&front) = win.front() {
6324                if now.duration_since(front) > Duration::from_secs(60) {
6325                    win.pop_front();
6326                } else {
6327                    break;
6328                }
6329            }
6330            if win.len() as u32 >= max_per_minute {
6331                eprintln!(
6332                    "wire reactor: skip {} from {} — rate-limit ({}/min reached)",
6333                    ev.event_id, ev.peer, max_per_minute
6334                );
6335                return Ok(false);
6336            }
6337            win.push_back(now);
6338        }
6339
6340        if dry_run {
6341            println!("{}", serde_json::to_string(&ev.raw)?);
6342            return Ok(true);
6343        }
6344
6345        let mut child = Command::new("sh")
6346            .arg("-c")
6347            .arg(on_event)
6348            .stdin(Stdio::piped())
6349            .stdout(Stdio::inherit())
6350            .stderr(Stdio::inherit())
6351            .env("WIRE_EVENT_PEER", &ev.peer)
6352            .env("WIRE_EVENT_ID", &ev.event_id)
6353            .env("WIRE_EVENT_KIND", &ev.kind)
6354            .spawn()
6355            .with_context(|| format!("spawning reactor handler: {on_event}"))?;
6356        if let Some(mut stdin) = child.stdin.take() {
6357            let body = serde_json::to_vec(&ev.raw)?;
6358            let _ = stdin.write_all(&body);
6359            let _ = stdin.write_all(b"\n");
6360        }
6361        std::mem::drop(child);
6362        Ok(true)
6363    };
6364
6365    // Scan outbox files for newly-appended event_ids and add to emitted set.
6366    let scan_outbox = |emitted_ids: &mut HashSet<String>,
6367                       outbox_cursors: &mut HashMap<String, u64>|
6368     -> Result<usize> {
6369        if !outbox_dir.exists() {
6370            return Ok(0);
6371        }
6372        let mut added = 0;
6373        let mut new_ids: Vec<String> = Vec::new();
6374        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
6375            let path = entry.path();
6376            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
6377                continue;
6378            }
6379            let peer = match path.file_stem().and_then(|s| s.to_str()) {
6380                Some(s) => s.to_string(),
6381                None => continue,
6382            };
6383            let cur_len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
6384            let start = *outbox_cursors.get(&peer).unwrap_or(&0);
6385            if cur_len <= start {
6386                outbox_cursors.insert(peer, start);
6387                continue;
6388            }
6389            let body = std::fs::read_to_string(&path).unwrap_or_default();
6390            let tail = &body[start as usize..];
6391            for line in tail.lines() {
6392                if let Ok(v) = serde_json::from_str::<Value>(line)
6393                    && let Some(eid) = v.get("event_id").and_then(Value::as_str)
6394                    && emitted_ids.insert(eid.to_string())
6395                {
6396                    new_ids.push(eid.to_string());
6397                    added += 1;
6398                }
6399            }
6400            outbox_cursors.insert(peer, cur_len);
6401        }
6402        if !new_ids.is_empty() {
6403            // Append new ids to disk, cap on-disk file at 500 entries.
6404            let mut all: Vec<String> = emitted_ids.iter().cloned().collect();
6405            if all.len() > 500 {
6406                all.sort();
6407                let drop_n = all.len() - 500;
6408                let dropped: HashSet<String> = all.iter().take(drop_n).cloned().collect();
6409                emitted_ids.retain(|x| !dropped.contains(x));
6410                all = emitted_ids.iter().cloned().collect();
6411            }
6412            let _ = std::fs::write(&emitted_path, all.join("\n") + "\n");
6413        }
6414        Ok(added)
6415    };
6416
6417    let sweep = |watcher: &mut InboxWatcher,
6418                 emitted_ids: &mut HashSet<String>,
6419                 outbox_cursors: &mut HashMap<String, u64>,
6420                 peer_dispatch_log: &mut HashMap<String, VecDeque<Instant>>|
6421     -> Result<usize> {
6422        // Pick up any event_ids we sent since last sweep.
6423        let _ = scan_outbox(emitted_ids, outbox_cursors);
6424
6425        let events = watcher.poll()?;
6426        let mut fired = 0usize;
6427        for ev in &events {
6428            match dispatch(ev, peer_dispatch_log, emitted_ids) {
6429                Ok(true) => fired += 1,
6430                Ok(false) => {}
6431                Err(e) => eprintln!("wire reactor: handler error for {}: {e}", ev.event_id),
6432            }
6433        }
6434        watcher.save_cursors(&cursor_path)?;
6435        Ok(fired)
6436    };
6437
6438    if once {
6439        sweep(
6440            &mut watcher,
6441            &mut emitted_ids,
6442            &mut outbox_cursors,
6443            &mut peer_dispatch_log,
6444        )?;
6445        return Ok(());
6446    }
6447    let interval = std::time::Duration::from_secs(interval_secs.max(1));
6448    loop {
6449        if let Err(e) = sweep(
6450            &mut watcher,
6451            &mut emitted_ids,
6452            &mut outbox_cursors,
6453            &mut peer_dispatch_log,
6454        ) {
6455            eprintln!("wire reactor: sweep error: {e}");
6456        }
6457        std::thread::sleep(interval);
6458    }
6459}
6460
6461/// Parse `(re:<event_id>)` marker out of an event body. Returns the
6462/// referenced event_id (full or prefix) if present. Tolerates spaces.
6463fn parse_re_marker(body: &str) -> Option<String> {
6464    let needle = "(re:";
6465    let i = body.find(needle)?;
6466    let rest = &body[i + needle.len()..];
6467    let end = rest.find(')')?;
6468    let id = rest[..end].trim().to_string();
6469    if id.is_empty() {
6470        return None;
6471    }
6472    Some(id)
6473}
6474
6475// ---------- notify (Goal 2) ----------
6476
6477fn cmd_notify(
6478    interval_secs: u64,
6479    peer_filter: Option<&str>,
6480    once: bool,
6481    as_json: bool,
6482) -> Result<()> {
6483    use crate::inbox_watch::InboxWatcher;
6484    let cursor_path = config::state_dir()?.join("notify.cursor");
6485    let mut watcher = InboxWatcher::from_cursor_file(&cursor_path)?;
6486
6487    let sweep = |watcher: &mut InboxWatcher| -> Result<()> {
6488        let events = watcher.poll()?;
6489        for ev in events {
6490            if let Some(p) = peer_filter
6491                && ev.peer != p
6492            {
6493                continue;
6494            }
6495            if as_json {
6496                println!("{}", serde_json::to_string(&ev)?);
6497            } else {
6498                os_notify_inbox_event(&ev);
6499            }
6500        }
6501        watcher.save_cursors(&cursor_path)?;
6502        Ok(())
6503    };
6504
6505    if once {
6506        return sweep(&mut watcher);
6507    }
6508
6509    let interval = std::time::Duration::from_secs(interval_secs.max(1));
6510    loop {
6511        if let Err(e) = sweep(&mut watcher) {
6512            eprintln!("wire notify: sweep error: {e}");
6513        }
6514        std::thread::sleep(interval);
6515    }
6516}
6517
6518fn os_notify_inbox_event(ev: &crate::inbox_watch::InboxEvent) {
6519    let title = if ev.verified {
6520        format!("wire ← {}", ev.peer)
6521    } else {
6522        format!("wire ← {} (UNVERIFIED)", ev.peer)
6523    };
6524    let body = format!("{}: {}", ev.kind, ev.body_preview);
6525    crate::os_notify::toast(&title, &body);
6526}
6527
6528#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
6529fn os_toast(title: &str, body: &str) {
6530    eprintln!("[wire notify] {title}\n  {body}");
6531}
6532
6533// Integration tests for the CLI live in `tests/cli.rs` (cargo's tests/ dir).