Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168    /// v0.5.19 (#9.1): if false, this handle is omitted from the
169    /// `/v1/handles` directory listing — operator opted out of bulk
170    /// discovery. The `.well-known/wire/agent?handle=X` direct lookup
171    /// still resolves so existing peers + out-of-band sharing continue
172    /// to work. Default `None` = discoverable (back-compat for records
173    /// claimed pre-v0.5.19).
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub discoverable: Option<bool>,
176}
177
178impl HandleRecord {
179    /// Effective discoverability: defaults to true when the field is
180    /// absent (pre-v0.5.19 records).
181    fn is_discoverable(&self) -> bool {
182        self.discoverable.unwrap_or(true)
183    }
184}
185
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct ResponderHealthRecord {
188    pub status: String,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    pub reason: Option<String>,
191    #[serde(default, skip_serializing_if = "Option::is_none")]
192    pub last_success_at: Option<String>,
193    pub set_at: String,
194}
195
196#[derive(Clone, Debug)]
197struct PairSlot {
198    /// SPAKE2 message from the host side.
199    host_msg: Option<String>,
200    /// SPAKE2 message from the guest side.
201    guest_msg: Option<String>,
202    /// Sealed bootstrap payload from host (after SAS confirm).
203    host_bootstrap: Option<String>,
204    /// Sealed bootstrap payload from guest.
205    guest_bootstrap: Option<String>,
206    /// Last activity time (monotonic) — used for TTL eviction.
207    last_touched: std::time::Instant,
208}
209
210impl Default for PairSlot {
211    fn default() -> Self {
212        Self {
213            host_msg: None,
214            guest_msg: None,
215            host_bootstrap: None,
216            guest_bootstrap: None,
217            last_touched: std::time::Instant::now(),
218        }
219    }
220}
221
222/// Pair-slot idle TTL. After this many seconds without activity, the slot
223/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
224const PAIR_SLOT_TTL_SECS: u64 = 300;
225
226#[derive(Deserialize)]
227pub struct AllocateRequest {
228    /// Optional handle hint — purely informational, server doesn't enforce.
229    #[serde(default)]
230    pub handle: Option<String>,
231}
232
233#[derive(Deserialize)]
234pub struct PostEventRequest {
235    pub event: Value,
236}
237
238#[derive(Deserialize)]
239pub struct ListEventsQuery {
240    /// Resume from after this event_id (exclusive). Omit for full slot read.
241    pub since: Option<String>,
242    /// Max events to return. Default 100, max 1000.
243    pub limit: Option<usize>,
244}
245
246impl Relay {
247    pub async fn new(state_dir: PathBuf) -> Result<Self> {
248        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
249        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
250        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
251        let mut inner = Inner {
252            slots: HashMap::new(),
253            tokens: HashMap::new(),
254            slot_bytes: HashMap::new(),
255            last_pull_at_unix: HashMap::new(),
256            streams: HashMap::new(),
257            pair_lookup: HashMap::new(),
258            pair_slots: HashMap::new(),
259            handles: HashMap::new(),
260            responder_health: HashMap::new(),
261            invites: HashMap::new(),
262        };
263        // Reload tokens
264        let token_path = state_dir.join("tokens.json");
265        if token_path.exists() {
266            let body = tokio::fs::read_to_string(&token_path).await?;
267            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
268        }
269        // Reload slots from JSONL
270        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
271        while let Some(entry) = slots_dir.next_entry().await? {
272            let path = entry.path();
273            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
274                continue;
275            }
276            let stem = match path.file_stem().and_then(|s| s.to_str()) {
277                Some(s) => s.to_string(),
278                None => continue,
279            };
280            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
281            let mut events = Vec::new();
282            for line in body.lines() {
283                if let Ok(v) = serde_json::from_str::<Value>(line) {
284                    events.push(v);
285                }
286            }
287            // Recompute byte usage for the slot from its persisted events.
288            let bytes: usize = events
289                .iter()
290                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
291                .sum();
292            inner.slot_bytes.insert(stem.clone(), bytes);
293            inner.slots.insert(stem, events);
294        }
295        // Reload handle directory (v0.5).
296        let handles_dir = state_dir.join("handles");
297        if handles_dir.exists() {
298            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
299            while let Some(entry) = rd.next_entry().await? {
300                let path = entry.path();
301                if path.extension().and_then(|x| x.to_str()) != Some("json") {
302                    continue;
303                }
304                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
305                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
306                    inner.handles.insert(rec.nick.clone(), rec);
307                }
308            }
309        }
310        // Reload responder health records (R3).
311        let responder_health_dir = state_dir.join("responder-health");
312        if responder_health_dir.exists() {
313            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
314            while let Some(entry) = rd.next_entry().await? {
315                let path = entry.path();
316                if path.extension().and_then(|x| x.to_str()) != Some("json") {
317                    continue;
318                }
319                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
320                    continue;
321                };
322                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
323                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
324                    inner.responder_health.insert(slot_id.to_string(), rec);
325                }
326            }
327        }
328        // Reload short-URL invites. JSONL append-only; later entries with
329        // the same token overwrite earlier (won't happen — tokens are
330        // unique by construction — but coded defensively).
331        let invites_path = state_dir.join("invites.jsonl");
332        if invites_path.exists() {
333            let now_unix = SystemTime::now()
334                .duration_since(UNIX_EPOCH)
335                .map(|d| d.as_secs())
336                .unwrap_or(0);
337            let body = tokio::fs::read_to_string(&invites_path)
338                .await
339                .unwrap_or_default();
340            for line in body.lines() {
341                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
342                    && rec.expires_unix > now_unix
343                {
344                    inner.invites.insert(rec.token.clone(), rec);
345                }
346            }
347        }
348        let boot_unix = SystemTime::now()
349            .duration_since(UNIX_EPOCH)
350            .map(|d| d.as_secs())
351            .unwrap_or(0);
352        // Reload counter snapshot. Missing/corrupt file → start at zero.
353        let snap: CountersSnapshot =
354            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
355                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
356                Err(_) => CountersSnapshot::default(),
357            };
358        Ok(Self {
359            inner: Arc::new(Mutex::new(inner)),
360            state_dir,
361            counters: Arc::new(RelayCounters {
362                boot_unix,
363                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
364                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
365                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
366                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
367                events_posted_total: AtomicU64::new(snap.events_posted_total),
368            }),
369        })
370    }
371
372    pub fn router(self) -> Router {
373        self.router_with_mode(ServerMode::default())
374    }
375
376    pub fn router_with_mode(self, mode: ServerMode) -> Router {
377        // Rate limit applied to write endpoints that create new state (slots,
378        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
379        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
380        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
381        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
382        // global cap shoulder DDoS protection in series.
383        let governor_conf = std::sync::Arc::new(
384            GovernorConfigBuilder::default()
385                .per_second(10)
386                .burst_size(50)
387                .key_extractor(GlobalKeyExtractor)
388                .finish()
389                .expect("valid governor config"),
390        );
391        let governor_layer = GovernorLayer {
392            config: governor_conf,
393        };
394
395        // Hot writes group — rate limited.
396        let hot_writes = Router::new()
397            .route("/v1/slot/allocate", post(allocate_slot))
398            .route("/v1/pair", post(pair_open))
399            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
400            .route("/v1/pair/abandon", post(pair_abandon))
401            .layer(governor_layer);
402
403        // Core data-plane routes: events, pair, slots, healthz. Always
404        // present, in both federation and local-only modes.
405        let mut router = Router::new()
406            .route("/healthz", get(healthz))
407            .route("/v1/events/:slot_id", post(post_event).get(list_events))
408            .route("/v1/slot/:slot_id/state", get(slot_state))
409            .route(
410                "/v1/slot/:slot_id/responder-health",
411                post(responder_health_set),
412            )
413            .route("/v1/events/:slot_id/stream", get(stream_events))
414            .route("/v1/pair/:pair_id", get(pair_get))
415            .route("/v1/handle/intro/:nick", post(handle_intro));
416
417        // Discovery + landing surfaces: phonebook, well-known agent cards,
418        // landing page, stats, invite landing. In `--local-only` mode we
419        // skip all of these — the relay becomes invisible from the outside
420        // (and from other agents on the same box that might enumerate it).
421        // This is the v0.5.17 within-machine privacy guarantee.
422        if !mode.local_only {
423            router = router
424                .route("/", get(landing_index))
425                .route("/favicon.svg", get(landing_favicon))
426                .route("/og.png", get(landing_og))
427                .route("/install", get(landing_install_sh))
428                .route("/install.sh", get(landing_install_sh))
429                .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
430                .route("/stats", get(stats_root))
431                .route("/stats.json", get(stats_json))
432                .route("/stats.html", get(landing_stats_html))
433                .route("/stats.history", get(stats_history))
434                .route("/phonebook", get(landing_phonebook_html))
435                .route("/phonebook.html", get(landing_phonebook_html))
436                .route("/v1/handle/claim", post(handle_claim))
437                .route("/v1/handles", get(handles_directory))
438                .route("/v1/invite/register", post(invite_register))
439                .route("/i/:token", get(invite_script))
440                .route("/.well-known/wire/agent", get(well_known_agent))
441                .route(
442                    "/.well-known/agent-card.json",
443                    get(well_known_agent_card_a2a),
444                );
445        } else {
446            // Local-only mode still needs handle_claim for in-process
447            // session bootstrap (`wire session new` allocates a local
448            // slot AND claims a handle on the local relay so peers can
449            // resolve it). The claim is gated to loopback-only callers
450            // by the bind, not by the route.
451            router = router.route("/v1/handle/claim", post(handle_claim));
452        }
453
454        router.merge(hot_writes).with_state(self)
455    }
456
457    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
458    /// Called inline on every pair-slot mutation; a background sweeper task
459    /// (see `Self::start_sweeper`) covers the long-idle case.
460    async fn evict_expired_pair_slots(&self) {
461        let now = std::time::Instant::now();
462        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
463        let mut inner = self.inner.lock().await;
464        let mut to_remove = Vec::new();
465        for (id, slot) in inner.pair_slots.iter() {
466            if now.duration_since(slot.last_touched) > ttl {
467                to_remove.push(id.clone());
468            }
469        }
470        for id in to_remove {
471            inner.pair_slots.remove(&id);
472            inner.pair_lookup.retain(|_, v| v != &id);
473        }
474    }
475
476    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
477    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
478    /// — process exit reaps it. Safe to skip in tests where you'd rather test
479    /// eviction inline.
480    pub fn spawn_pair_sweeper(&self) {
481        let me = self.clone();
482        tokio::spawn(async move {
483            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
484            loop {
485                tick.tick().await;
486                me.evict_expired_pair_slots().await;
487            }
488        });
489    }
490
491    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
492    /// every 30s by `spawn_counter_persister` and once during graceful
493    /// shutdown so a deploy doesn't reset the running totals.
494    pub async fn persist_counters(&self) -> Result<()> {
495        let snap = CountersSnapshot {
496            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
497            handle_first_claims_total: self
498                .counters
499                .handle_first_claims_total
500                .load(Ordering::Relaxed),
501            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
502            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
503            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
504        };
505        let body = serde_json::to_vec_pretty(&snap)?;
506        let path = self.state_dir.join("counters.json");
507        tokio::fs::write(path, body).await?;
508        Ok(())
509    }
510
511    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
512    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
513    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
514    /// roll old entries off once the history exceeds 90 days.
515    pub async fn append_history(&self) -> Result<()> {
516        use tokio::io::AsyncWriteExt;
517        let now = SystemTime::now()
518            .duration_since(UNIX_EPOCH)
519            .map(|d| d.as_secs())
520            .unwrap_or(0);
521        let (handles_active, slots_active, pair_slots_open, streams_active) = {
522            let inner = self.inner.lock().await;
523            (
524                inner.handles.len(),
525                inner.slots.len(),
526                inner.pair_slots.len(),
527                inner.streams.values().map(Vec::len).sum::<usize>(),
528            )
529        };
530        let entry = HistoryEntry {
531            ts: now,
532            handles_active,
533            slots_active,
534            pair_slots_open,
535            streams_active,
536            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
537            handle_first_claims_total: self
538                .counters
539                .handle_first_claims_total
540                .load(Ordering::Relaxed),
541            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
542            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
543            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
544        };
545        let line = serde_json::to_vec(&entry)?;
546        let path = self.state_dir.join("stats-history.jsonl");
547        let mut f = tokio::fs::OpenOptions::new()
548            .create(true)
549            .append(true)
550            .open(&path)
551            .await?;
552        f.write_all(&line).await?;
553        f.write_all(b"\n").await?;
554        f.flush().await?;
555        Ok(())
556    }
557
558    /// Spawn a background tokio task that calls `persist_counters` every 30s
559    /// + appends a history row on the same tick. Loss bound: counters can
560    ///   drift back up to 30s on crash, history can drop one row.
561    pub fn spawn_counter_persister(&self) {
562        let me = self.clone();
563        tokio::spawn(async move {
564            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
565            // First tick fires immediately; skip it so we don't write the
566            // freshly-loaded snapshot back unchanged.
567            tick.tick().await;
568            loop {
569                tick.tick().await;
570                if let Err(e) = me.persist_counters().await {
571                    eprintln!("counter persist failed: {e}");
572                }
573                if let Err(e) = me.append_history().await {
574                    eprintln!("history append failed: {e}");
575                }
576            }
577        });
578    }
579
580    async fn persist_tokens(&self) -> Result<()> {
581        let body = {
582            let inner = self.inner.lock().await;
583            serde_json::to_string_pretty(&inner.tokens)?
584        };
585        let path = self.state_dir.join("tokens.json");
586        tokio::fs::write(path, body).await?;
587        Ok(())
588    }
589
590    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
591        // Defense in depth: only allow lowercase hex slot_ids of the exact length
592        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
593        // any future code path that might let attacker-controlled slot_ids reach
594        // disk operations. allocate_slot() always meets this; this assert is
595        // belt-and-suspenders against future regressions.
596        if !is_valid_slot_id(slot_id) {
597            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
598        }
599        let path = self
600            .state_dir
601            .join("slots")
602            .join(format!("{slot_id}.jsonl"));
603        let mut line = serde_json::to_vec(event)?;
604        line.push(b'\n');
605        use tokio::io::AsyncWriteExt;
606        let mut f = tokio::fs::OpenOptions::new()
607            .create(true)
608            .append(true)
609            .open(&path)
610            .await
611            .with_context(|| format!("opening {path:?}"))?;
612        f.write_all(&line).await?;
613        f.flush().await?;
614        Ok(())
615    }
616}
617
618async fn healthz() -> impl IntoResponse {
619    (StatusCode::OK, "ok\n")
620}
621
622// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
623// process restart; state fields (`handles_active`, `slots_active`) survive
624// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
625async fn stats_history(
626    State(relay): State<Relay>,
627    Query(q): Query<StatsHistoryQuery>,
628) -> impl IntoResponse {
629    let hours = q.hours.unwrap_or(24).min(168);
630    let now = SystemTime::now()
631        .duration_since(UNIX_EPOCH)
632        .map(|d| d.as_secs())
633        .unwrap_or(0);
634    let cutoff = now.saturating_sub(hours * 3600);
635    let path = relay.state_dir.join("stats-history.jsonl");
636    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
637    let entries: Vec<Value> = body
638        .lines()
639        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
640        .filter(|v| {
641            v.get("ts")
642                .and_then(Value::as_u64)
643                .map(|t| t >= cutoff)
644                .unwrap_or(false)
645        })
646        .collect();
647    (
648        StatusCode::OK,
649        Json(json!({
650            "hours": hours,
651            "now_unix": now,
652            "count": entries.len(),
653            "entries": entries,
654        })),
655    )
656}
657
658async fn landing_stats_html() -> impl IntoResponse {
659    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
660    (
661        StatusCode::OK,
662        [
663            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
664            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
665        ],
666        STATS_HTML,
667    )
668}
669
670async fn landing_phonebook_html() -> impl IntoResponse {
671    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
672    (
673        StatusCode::OK,
674        [
675            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
676            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
677        ],
678        PHONEBOOK_HTML,
679    )
680}
681
682/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
683/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
684/// Keeps the JSON contract intact while letting humans land on the page at
685/// the short URL.
686async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
687    let wants_html = headers
688        .get(axum::http::header::ACCEPT)
689        .and_then(|v| v.to_str().ok())
690        .unwrap_or("")
691        .contains("text/html");
692    if wants_html {
693        landing_stats_html().await.into_response()
694    } else {
695        stats_json(State(relay)).await.into_response()
696    }
697}
698
699async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
700    let now = SystemTime::now()
701        .duration_since(UNIX_EPOCH)
702        .map(|d| d.as_secs())
703        .unwrap_or(0);
704    let inner = relay.inner.lock().await;
705    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
706    let body = json!({
707        "version": env!("CARGO_PKG_VERSION"),
708        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
709        "handles_active": inner.handles.len(),
710        "slots_active": inner.slots.len(),
711        "pair_slots_open": inner.pair_slots.len(),
712        "streams_active": streams_active,
713        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
714        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
715        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
716        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
717        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
718    });
719    (StatusCode::OK, Json(body))
720}
721
722// Static landing site baked into the binary so apex (wireup.net) can flip
723// straight to Fly without a separate static-host. ~37 KB total — negligible
724// against the release binary size, and keeps the relay self-contained.
725async fn landing_index() -> impl IntoResponse {
726    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
727    (
728        StatusCode::OK,
729        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
730        INDEX_HTML,
731    )
732}
733
734async fn landing_favicon() -> impl IntoResponse {
735    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
736    (
737        StatusCode::OK,
738        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
739        FAVICON_SVG,
740    )
741}
742
743async fn landing_og() -> impl IntoResponse {
744    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
745    (
746        StatusCode::OK,
747        [
748            (axum::http::header::CONTENT_TYPE, "image/png"),
749            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
750        ],
751        OG_PNG,
752    )
753}
754
755async fn landing_install_sh() -> impl IntoResponse {
756    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
757    (
758        StatusCode::OK,
759        [
760            (
761                axum::http::header::CONTENT_TYPE,
762                "text/x-shellscript; charset=utf-8",
763            ),
764            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
765        ],
766        INSTALL_SH,
767    )
768}
769
770async fn landing_openshell_policy_sh() -> impl IntoResponse {
771    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
772    (
773        StatusCode::OK,
774        [
775            (
776                axum::http::header::CONTENT_TYPE,
777                "text/x-shellscript; charset=utf-8",
778            ),
779            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
780        ],
781        POLICY_SH,
782    )
783}
784
785async fn allocate_slot(
786    State(relay): State<Relay>,
787    Json(_req): Json<AllocateRequest>,
788) -> impl IntoResponse {
789    let slot_id = random_hex(16);
790    let slot_token = random_hex(32);
791    {
792        let mut inner = relay.inner.lock().await;
793        inner.slots.insert(slot_id.clone(), Vec::new());
794        inner.tokens.insert(slot_id.clone(), slot_token.clone());
795    }
796    if let Err(e) = relay.persist_tokens().await {
797        return (
798            StatusCode::INTERNAL_SERVER_ERROR,
799            Json(json!({"error": format!("persist failed: {e}")})),
800        )
801            .into_response();
802    }
803    relay
804        .counters
805        .slot_allocations_total
806        .fetch_add(1, Ordering::Relaxed);
807    (
808        StatusCode::CREATED,
809        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
810    )
811        .into_response()
812}
813
814async fn post_event(
815    State(relay): State<Relay>,
816    Path(slot_id): Path<String>,
817    headers: HeaderMap,
818    Json(req): Json<PostEventRequest>,
819) -> impl IntoResponse {
820    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
821        return resp;
822    }
823    // Body size cap (rough; serialize and check).
824    let body_bytes = match serde_json::to_vec(&req.event) {
825        Ok(b) => b,
826        Err(e) => {
827            return (
828                StatusCode::BAD_REQUEST,
829                Json(json!({"error": format!("event not serializable: {e}")})),
830            )
831                .into_response();
832        }
833    };
834    if body_bytes.len() > MAX_EVENT_BYTES {
835        return (
836            StatusCode::PAYLOAD_TOO_LARGE,
837            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
838        )
839            .into_response();
840    }
841    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
842    {
843        let inner = relay.inner.lock().await;
844        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
845        if used + body_bytes.len() > MAX_SLOT_BYTES {
846            return (
847                StatusCode::PAYLOAD_TOO_LARGE,
848                Json(json!({
849                    "error": "slot quota exceeded",
850                    "slot_bytes_used": used,
851                    "slot_bytes_max": MAX_SLOT_BYTES,
852                    "remediation": "operator should `wire rotate-slot` to drain old slot",
853                })),
854            )
855                .into_response();
856        }
857    }
858    let event_id = req
859        .event
860        .get("event_id")
861        .and_then(Value::as_str)
862        .map(str::to_string);
863
864    // Dedupe by event_id if present.
865    let dup = {
866        let inner = relay.inner.lock().await;
867        let slot = inner.slots.get(&slot_id);
868        if let (Some(eid), Some(slot)) = (&event_id, slot) {
869            slot.iter()
870                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
871        } else {
872            false
873        }
874    };
875    if dup {
876        return (
877            StatusCode::OK,
878            Json(json!({"event_id": event_id, "status": "duplicate"})),
879        )
880            .into_response();
881    }
882
883    {
884        let mut inner = relay.inner.lock().await;
885        let event_size = body_bytes.len();
886        let slot = inner.slots.entry(slot_id.clone()).or_default();
887        slot.push(req.event.clone());
888        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
889    }
890    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
891        return (
892            StatusCode::INTERNAL_SERVER_ERROR,
893            Json(json!({"error": format!("persist failed: {e}")})),
894        )
895            .into_response();
896    }
897    relay
898        .counters
899        .events_posted_total
900        .fetch_add(1, Ordering::Relaxed);
901    // R1 push: broadcast the new event to every active SSE subscriber on
902    // this slot. Dead channels are pruned in-place. The broadcast happens
903    // AFTER the disk persist so subscribers and disk readers see the same
904    // events; on persist failure we already returned 500 above.
905    {
906        let mut inner = relay.inner.lock().await;
907        if let Some(subs) = inner.streams.get_mut(&slot_id) {
908            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
909        }
910    }
911    (
912        StatusCode::CREATED,
913        Json(json!({"event_id": event_id, "status": "stored"})),
914    )
915        .into_response()
916}
917
918/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
919/// (same as `list_events`). The connection registers an `UnboundedSender`
920/// on the slot's subscriber list; every subsequent `post_event` to the slot
921/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
922/// connection stays open until the client disconnects.
923///
924/// A 30-second keepalive ping is emitted automatically so reverse proxies
925/// (Cloudflare tunnel, nginx) don't time out the upstream.
926///
927/// Note: the subscriber sees events posted AFTER it subscribed. To catch
928/// up on history first, the client should call `GET /v1/events/:slot_id`
929/// with `since=` before opening the stream.
930async fn stream_events(
931    State(relay): State<Relay>,
932    Path(slot_id): Path<String>,
933    headers: HeaderMap,
934) -> axum::response::Response {
935    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
936    use futures::stream::StreamExt;
937
938    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
939        return resp;
940    }
941
942    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
943    {
944        let mut inner = relay.inner.lock().await;
945        inner.streams.entry(slot_id.clone()).or_default().push(tx);
946    }
947
948    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
949        SseEvent::default()
950            .json_data(&ev)
951            .map_err(|e| std::io::Error::other(e.to_string()))
952    });
953
954    Sse::new(stream)
955        .keep_alive(
956            KeepAlive::new()
957                .interval(std::time::Duration::from_secs(30))
958                .text("phyllis: still on the line"),
959        )
960        .into_response()
961}
962
963// ---------- pair-slot handlers ----------
964
965#[derive(Deserialize)]
966pub struct PairOpenRequest {
967    pub code_hash: String,
968    /// SPAKE2 message (base64).
969    pub msg: String,
970    pub role: String, // "host" or "guest"
971}
972
973#[derive(Deserialize)]
974pub struct PairBootstrapRequest {
975    pub role: String,
976    pub sealed: String,
977}
978
979#[derive(Deserialize)]
980pub struct PairAbandonRequest {
981    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
982    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
983    pub code_hash: String,
984}
985
986/// Forget the pair-slot associated with this code_hash. Either side can call;
987/// no auth beyond knowledge of the code (which is the shared secret of this
988/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
989/// Used by clients to recover after a crash mid-handshake, so the host doesn't
990/// stay locked out until the 5-minute TTL.
991async fn pair_abandon(
992    State(relay): State<Relay>,
993    Json(req): Json<PairAbandonRequest>,
994) -> impl IntoResponse {
995    let mut inner = relay.inner.lock().await;
996    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
997        inner.pair_slots.remove(&pair_id);
998    }
999    StatusCode::NO_CONTENT.into_response()
1000}
1001
1002async fn pair_open(
1003    State(relay): State<Relay>,
1004    Json(req): Json<PairOpenRequest>,
1005) -> impl IntoResponse {
1006    if req.role != "host" && req.role != "guest" {
1007        return (
1008            StatusCode::BAD_REQUEST,
1009            Json(json!({"error": "role must be 'host' or 'guest'"})),
1010        )
1011            .into_response();
1012    }
1013    relay.evict_expired_pair_slots().await;
1014    let mut inner = relay.inner.lock().await;
1015    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1016        Some(id) => id,
1017        None => {
1018            let new_id = random_hex(16);
1019            inner
1020                .pair_lookup
1021                .insert(req.code_hash.clone(), new_id.clone());
1022            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1023            relay
1024                .counters
1025                .pair_opens_total
1026                .fetch_add(1, Ordering::Relaxed);
1027            new_id
1028        }
1029    };
1030    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1031    slot.last_touched = std::time::Instant::now();
1032    if req.role == "host" {
1033        if slot.host_msg.is_some() {
1034            return (
1035                StatusCode::CONFLICT,
1036                Json(json!({"error": "host already registered for this code"})),
1037            )
1038                .into_response();
1039        }
1040        slot.host_msg = Some(req.msg);
1041    } else {
1042        if slot.guest_msg.is_some() {
1043            return (
1044                StatusCode::CONFLICT,
1045                Json(json!({"error": "guest already registered for this code"})),
1046            )
1047                .into_response();
1048        }
1049        slot.guest_msg = Some(req.msg);
1050    }
1051    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1052}
1053
1054#[derive(Deserialize)]
1055pub struct PairGetQuery {
1056    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1057    pub as_role: String,
1058}
1059
1060async fn pair_get(
1061    State(relay): State<Relay>,
1062    Path(pair_id): Path<String>,
1063    Query(q): Query<PairGetQuery>,
1064) -> impl IntoResponse {
1065    relay.evict_expired_pair_slots().await;
1066    let mut inner = relay.inner.lock().await;
1067    let slot = match inner.pair_slots.get_mut(&pair_id) {
1068        Some(s) => {
1069            s.last_touched = std::time::Instant::now();
1070            s.clone()
1071        }
1072        None => {
1073            return (
1074                StatusCode::NOT_FOUND,
1075                Json(json!({"error": "unknown pair_id"})),
1076            )
1077                .into_response();
1078        }
1079    };
1080    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1081        "host" => (slot.guest_msg, slot.guest_bootstrap),
1082        "guest" => (slot.host_msg, slot.host_bootstrap),
1083        _ => {
1084            return (
1085                StatusCode::BAD_REQUEST,
1086                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1087            )
1088                .into_response();
1089        }
1090    };
1091    (
1092        StatusCode::OK,
1093        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1094    )
1095        .into_response()
1096}
1097
1098async fn pair_bootstrap(
1099    State(relay): State<Relay>,
1100    Path(pair_id): Path<String>,
1101    Json(req): Json<PairBootstrapRequest>,
1102) -> impl IntoResponse {
1103    relay.evict_expired_pair_slots().await;
1104    let mut inner = relay.inner.lock().await;
1105    let slot = match inner.pair_slots.get_mut(&pair_id) {
1106        Some(s) => s,
1107        None => {
1108            return (
1109                StatusCode::NOT_FOUND,
1110                Json(json!({"error": "unknown pair_id"})),
1111            )
1112                .into_response();
1113        }
1114    };
1115    slot.last_touched = std::time::Instant::now();
1116    match req.role.as_str() {
1117        "host" => slot.host_bootstrap = Some(req.sealed),
1118        "guest" => slot.guest_bootstrap = Some(req.sealed),
1119        _ => {
1120            return (
1121                StatusCode::BAD_REQUEST,
1122                Json(json!({"error": "role must be 'host' or 'guest'"})),
1123            )
1124                .into_response();
1125        }
1126    }
1127    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1128}
1129
1130// ---------- handle directory (v0.5) ----------
1131
1132#[derive(Deserialize)]
1133pub struct HandleClaimRequest {
1134    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1135    /// domain the relay's `.well-known` is served from.
1136    pub nick: String,
1137    /// Slot id the claimant owns on this relay (proves they allocated here).
1138    pub slot_id: String,
1139    /// Optional public-facing relay URL the relay should advertise back in
1140    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1141    /// know the relay URL out-of-band.
1142    pub relay_url: Option<String>,
1143    /// Claimant's full signed agent-card (includes DID + verify_keys +
1144    /// optional profile).
1145    pub card: Value,
1146    /// v0.5.19 (#9.1): set false to opt out of `/v1/handles` bulk listing.
1147    /// Direct `.well-known/wire/agent` lookup by handle still works. The
1148    /// default (None / absent) is discoverable, for back-compat with
1149    /// pre-v0.5.19 clients.
1150    #[serde(default, skip_serializing_if = "Option::is_none")]
1151    pub discoverable: Option<bool>,
1152}
1153
1154/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1155///
1156/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1157/// slot rotation). Different-DID claims on a taken nick return 409.
1158/// Caller must (a) own the `slot_id` they reference (verified by token
1159/// being present), and (b) submit a card with a valid self-signature.
1160async fn handle_claim(
1161    State(relay): State<Relay>,
1162    headers: HeaderMap,
1163    Json(req): Json<HandleClaimRequest>,
1164) -> impl IntoResponse {
1165    // Bearer auth: claimant must hold the slot_token for the slot they
1166    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1167    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1168        return resp;
1169    }
1170    // Validate nick (same rules as the client-side parser).
1171    if !crate::pair_profile::is_valid_nick(&req.nick) {
1172        return (
1173            StatusCode::BAD_REQUEST,
1174            Json(json!({
1175                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1176                "nick": req.nick,
1177            })),
1178        )
1179            .into_response();
1180    }
1181    // Verify the card signature using the public verify_agent_card helper.
1182    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1183        return (
1184            StatusCode::BAD_REQUEST,
1185            Json(json!({"error": format!("card signature invalid: {e}")})),
1186        )
1187            .into_response();
1188    }
1189    let did = match req.card.get("did").and_then(Value::as_str) {
1190        Some(d) => d.to_string(),
1191        None => {
1192            return (
1193                StatusCode::BAD_REQUEST,
1194                Json(json!({"error": "card missing 'did' field"})),
1195            )
1196                .into_response();
1197        }
1198    };
1199
1200    // FCFS check. Also snapshot the existing record (clone) so the
1201    // re-claim path below can preserve fields the client didn't include
1202    // in the request (notably `discoverable` from v0.5.19, so an old
1203    // client doing a profile-update re-claim doesn't accidentally
1204    // re-publish a hidden handle).
1205    let prior_record: Option<HandleRecord>;
1206    let first_claim = {
1207        let inner = relay.inner.lock().await;
1208        match inner.handles.get(&req.nick) {
1209            Some(existing) if existing.did != did => {
1210                return (
1211                    StatusCode::CONFLICT,
1212                    Json(json!({
1213                        "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1214                        "nick": req.nick,
1215                        "claimed_by": existing.did,
1216                    })),
1217                )
1218                    .into_response();
1219            }
1220            Some(prev) => {
1221                prior_record = Some(prev.clone());
1222                false
1223            }
1224            None => {
1225                prior_record = None;
1226                true
1227            }
1228        }
1229    };
1230
1231    // v0.5.19 (#9.5): round claimed_at to whole seconds. Nanosecond
1232    // precision served no client purpose (display only) and acted as a
1233    // cross-tab fingerprint correlating one operator's multiple handles
1234    // claimed in the same session. Truncate before formatting.
1235    let now = time::OffsetDateTime::now_utc()
1236        .replace_nanosecond(0)
1237        .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1238        .format(&time::format_description::well_known::Rfc3339)
1239        .unwrap_or_default();
1240    // v0.5.19 (#9.1): preserve `discoverable` across re-claims. If the
1241    // request doesn't set it explicitly, keep whatever the existing
1242    // record had so a profile-update re-claim doesn't accidentally
1243    // re-publish a hidden handle. Default for first-time claim is None
1244    // (= discoverable, back-compat).
1245    let discoverable = match (req.discoverable, &prior_record) {
1246        (Some(d), _) => Some(d),
1247        (None, Some(prev)) => prev.discoverable,
1248        (None, None) => None,
1249    };
1250    let record = HandleRecord {
1251        nick: req.nick.clone(),
1252        did: did.clone(),
1253        card: req.card.clone(),
1254        slot_id: req.slot_id.clone(),
1255        relay_url: req.relay_url.clone(),
1256        claimed_at: now,
1257        discoverable,
1258    };
1259
1260    // Persist to disk first (durable), then update in-memory.
1261    let path = relay
1262        .state_dir
1263        .join("handles")
1264        .join(format!("{}.json", req.nick));
1265    let body = match serde_json::to_vec_pretty(&record) {
1266        Ok(b) => b,
1267        Err(e) => {
1268            return (
1269                StatusCode::INTERNAL_SERVER_ERROR,
1270                Json(json!({"error": format!("serialize failed: {e}")})),
1271            )
1272                .into_response();
1273        }
1274    };
1275    if let Err(e) = tokio::fs::write(&path, &body).await {
1276        return (
1277            StatusCode::INTERNAL_SERVER_ERROR,
1278            Json(json!({"error": format!("persist failed: {e}")})),
1279        )
1280            .into_response();
1281    }
1282    {
1283        let mut inner = relay.inner.lock().await;
1284        inner.handles.insert(req.nick.clone(), record);
1285    }
1286    relay
1287        .counters
1288        .handle_claims_total
1289        .fetch_add(1, Ordering::Relaxed);
1290    if first_claim {
1291        relay
1292            .counters
1293            .handle_first_claims_total
1294            .fetch_add(1, Ordering::Relaxed);
1295    }
1296    (
1297        StatusCode::CREATED,
1298        Json(json!({
1299            "nick": req.nick,
1300            "did": did,
1301            "status": if first_claim { "claimed" } else { "re-claimed" },
1302        })),
1303    )
1304        .into_response()
1305}
1306
1307#[derive(Deserialize)]
1308pub struct WellKnownAgentQuery {
1309    pub handle: String,
1310}
1311
1312#[derive(Deserialize)]
1313pub struct HandlesDirectoryQuery {
1314    pub cursor: Option<String>,
1315    pub limit: Option<usize>,
1316    pub vibe: Option<String>,
1317}
1318
1319// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1320// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1321// here, gets back a 6-hex token. Anyone who does
1322//   curl -fsSL https://wireup.net/i/<token> | sh
1323// gets wire installed (if needed) + the invite accepted, in one shot.
1324//
1325// Possession of the short URL = pair authorization (same shape as the
1326// underlying wire:// invite — it's just a redirector).
1327
1328#[derive(Deserialize)]
1329pub struct InviteRegisterRequest {
1330    /// The wire://pair?... URL produced by `wire invite`. Required.
1331    pub invite_url: String,
1332    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1333    #[serde(default)]
1334    pub ttl_seconds: Option<u64>,
1335    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1336    /// `None` = unlimited until TTL hits.
1337    #[serde(default)]
1338    pub uses: Option<u32>,
1339}
1340
1341impl Relay {
1342    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1343    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1344        use tokio::io::AsyncWriteExt;
1345        let mut line = serde_json::to_vec(rec)?;
1346        line.push(b'\n');
1347        let path = self.state_dir.join("invites.jsonl");
1348        let mut f = tokio::fs::OpenOptions::new()
1349            .create(true)
1350            .append(true)
1351            .open(&path)
1352            .await?;
1353        f.write_all(&line).await?;
1354        f.flush().await?;
1355        Ok(())
1356    }
1357}
1358
1359async fn invite_register(
1360    State(relay): State<Relay>,
1361    Json(req): Json<InviteRegisterRequest>,
1362) -> impl IntoResponse {
1363    if req.invite_url.is_empty() {
1364        return (
1365            StatusCode::BAD_REQUEST,
1366            Json(json!({"error": "invite_url required"})),
1367        )
1368            .into_response();
1369    }
1370    // Length cap on the embedded URL to keep persisted records bounded.
1371    if req.invite_url.len() > 8_192 {
1372        return (
1373            StatusCode::PAYLOAD_TOO_LARGE,
1374            Json(json!({"error": "invite_url > 8 KiB"})),
1375        )
1376            .into_response();
1377    }
1378    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1379    let now = SystemTime::now()
1380        .duration_since(UNIX_EPOCH)
1381        .map(|d| d.as_secs())
1382        .unwrap_or(0);
1383    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1384    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1385    let token = random_hex(3);
1386    let rec = InviteRecord {
1387        token: token.clone(),
1388        invite_url: req.invite_url,
1389        expires_unix: now + ttl,
1390        uses_remaining: req.uses,
1391        created_unix: now,
1392    };
1393    {
1394        let mut inner = relay.inner.lock().await;
1395        if inner.invites.contains_key(&token) {
1396            return (
1397                StatusCode::CONFLICT,
1398                Json(json!({"error": "token collision, retry"})),
1399            )
1400                .into_response();
1401        }
1402        inner.invites.insert(token.clone(), rec.clone());
1403    }
1404    if let Err(e) = relay.persist_invite(&rec).await {
1405        return (
1406            StatusCode::INTERNAL_SERVER_ERROR,
1407            Json(json!({"error": format!("persist failed: {e}")})),
1408        )
1409            .into_response();
1410    }
1411    (
1412        StatusCode::CREATED,
1413        Json(json!({
1414            "token": token,
1415            "path": format!("/i/{token}"),
1416            "expires_unix": rec.expires_unix,
1417            "uses_remaining": rec.uses_remaining,
1418        })),
1419    )
1420        .into_response()
1421}
1422
1423#[derive(Deserialize)]
1424pub struct InviteScriptQuery {
1425    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1426    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1427    /// short URL programmatically). Default: shell-script template.
1428    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1429    /// resolution lookup, not an acceptance. The actual accept happens
1430    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1431    pub format: Option<String>,
1432}
1433
1434async fn invite_script(
1435    State(relay): State<Relay>,
1436    Path(token): Path<String>,
1437    Query(q): Query<InviteScriptQuery>,
1438) -> impl IntoResponse {
1439    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1440    // path-traversal try never reaches the map lookup.
1441    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1442        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1443    }
1444    let want_raw_url = q.format.as_deref() == Some("url");
1445    let now = SystemTime::now()
1446        .duration_since(UNIX_EPOCH)
1447        .map(|d| d.as_secs())
1448        .unwrap_or(0);
1449    let invite_url = {
1450        let mut inner = relay.inner.lock().await;
1451        let Some(rec) = inner.invites.get_mut(&token) else {
1452            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1453        };
1454        if rec.expires_unix <= now {
1455            return (StatusCode::GONE, "this invite has expired\n").into_response();
1456        }
1457        if let Some(n) = rec.uses_remaining {
1458            if n == 0 {
1459                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1460            }
1461            // Only decrement on script-template fetch (the one that's
1462            // actually doing the pair). The raw-URL resolution path is a
1463            // lookup, not an accept.
1464            if !want_raw_url {
1465                rec.uses_remaining = Some(n - 1);
1466            }
1467        }
1468        rec.invite_url.clone()
1469    };
1470    if want_raw_url {
1471        return (
1472            StatusCode::OK,
1473            [
1474                (
1475                    axum::http::header::CONTENT_TYPE,
1476                    "text/plain; charset=utf-8",
1477                ),
1478                (
1479                    axum::http::header::CACHE_CONTROL,
1480                    "private, no-store, max-age=0",
1481                ),
1482            ],
1483            invite_url,
1484        )
1485            .into_response();
1486    }
1487    let escaped = invite_url.replace('\'', "'\\''");
1488    let script = format!(
1489        "#!/bin/sh\n\
1490         # wire — one-curl onboarding (install + pair in one shot)\n\
1491         # source: https://github.com/SlanchaAi/wire\n\
1492         set -eu\n\
1493         INVITE='{escaped}'\n\
1494         echo \"\u{2192} checking for wire CLI...\"\n\
1495         if ! command -v wire >/dev/null 2>&1; then\n  \
1496           echo \"\u{2192} wire not installed; installing first...\"\n  \
1497           curl -fsSL https://wireup.net/install.sh | sh\n  \
1498           case \":$PATH:\" in\n    \
1499             *:\"$HOME/.local/bin\":*) ;;\n    \
1500             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1501           esac\n  \
1502           if ! command -v wire >/dev/null 2>&1; then\n    \
1503             echo \"\"\n    \
1504             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1505             echo \"Open a new shell, then run:\"\n    \
1506             echo \"  wire accept '$INVITE'\"\n    \
1507             exit 0\n  \
1508           fi\n\
1509         fi\n\
1510         echo \"\u{2192} accepting invite...\"\n\
1511         wire accept \"$INVITE\"\n"
1512    );
1513    (
1514        StatusCode::OK,
1515        [
1516            (
1517                axum::http::header::CONTENT_TYPE,
1518                "text/x-shellscript; charset=utf-8",
1519            ),
1520            (
1521                axum::http::header::CACHE_CONTROL,
1522                "private, no-store, max-age=0",
1523            ),
1524        ],
1525        script,
1526    )
1527        .into_response()
1528}
1529
1530async fn handles_directory(
1531    State(relay): State<Relay>,
1532    Query(q): Query<HandlesDirectoryQuery>,
1533) -> impl IntoResponse {
1534    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1535    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1536    let inner = relay.inner.lock().await;
1537    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1538    drop(inner);
1539    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1540
1541    let cursor = q.cursor.as_deref();
1542    let mut eligible = Vec::new();
1543    for rec in records {
1544        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1545            continue;
1546        }
1547        // Hygiene: hide test-shaped nicks from the public directory. Records
1548        // remain claimed (FCFS protection persists), they just don't surface
1549        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1550        // `test-` for integration runs.
1551        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1552            continue;
1553        }
1554        // v0.5.19 (#9.1): operator opt-out — hidden handles skip the
1555        // bulk directory but still resolve via `.well-known/wire/agent?
1556        // handle=X` for out-of-band sharing.
1557        if !rec.is_discoverable() {
1558            continue;
1559        }
1560        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1561        if profile
1562            .get("listed")
1563            .and_then(Value::as_bool)
1564            .is_some_and(|listed| !listed)
1565        {
1566            continue;
1567        }
1568        if let Some(want) = &vibe_filter {
1569            let matched = profile
1570                .get("vibe")
1571                .and_then(Value::as_array)
1572                .map(|arr| {
1573                    arr.iter().any(|v| {
1574                        v.as_str()
1575                            .map(|s| s.eq_ignore_ascii_case(want))
1576                            .unwrap_or(false)
1577                    })
1578                })
1579                .unwrap_or(false);
1580            if !matched {
1581                continue;
1582            }
1583        }
1584        eligible.push((rec, profile));
1585    }
1586
1587    let has_more = eligible.len() > limit;
1588    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1589    let next_cursor = if has_more {
1590        page.last().map(|(rec, _)| rec.nick.clone())
1591    } else {
1592        None
1593    };
1594    let handles: Vec<Value> = page
1595        .into_iter()
1596        .map(|(rec, profile)| {
1597            // v0.12.1: fall back to the DID-derived persona emoji when the
1598            // card carries no explicit profile emoji, so every phonebook
1599            // line shows a face next to the name. The persona is a
1600            // deterministic function of the DID, so the relay can compute it
1601            // without the claimant having set anything.
1602            let emoji = profile
1603                .get("emoji")
1604                .and_then(Value::as_str)
1605                .filter(|s| !s.is_empty())
1606                .map(str::to_string)
1607                .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1608            json!({
1609                "nick": rec.nick,
1610                "did": rec.did,
1611                "profile": {
1612                    "emoji": emoji,
1613                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1614                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1615                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1616                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1617                },
1618                "claimed_at": rec.claimed_at,
1619            })
1620        })
1621        .collect();
1622    (
1623        StatusCode::OK,
1624        Json(json!({
1625            "handles": handles,
1626            "next_cursor": next_cursor,
1627        })),
1628    )
1629        .into_response()
1630}
1631
1632/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1633/// into a known nick's slot WITHOUT needing that slot's bearer token.
1634///
1635/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1636/// reachability, but NEVER its `slot_token` (that would leak read+write
1637/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1638/// a stranger to deliver their signed agent-card to the nick's owner. This
1639/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1640/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1641/// in the body must verify-OK on its own.
1642///
1643/// Rate-limiting is the same governor that gates the other write endpoints.
1644/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1645async fn handle_intro(
1646    State(relay): State<Relay>,
1647    Path(nick): Path<String>,
1648    Json(req): Json<PostEventRequest>,
1649) -> impl IntoResponse {
1650    // Look up the nick. Must already be claimed.
1651    let slot_id = {
1652        let inner = relay.inner.lock().await;
1653        match inner.handles.get(&nick) {
1654            Some(rec) => rec.slot_id.clone(),
1655            None => {
1656                return (
1657                    StatusCode::NOT_FOUND,
1658                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1659                )
1660                    .into_response();
1661            }
1662        }
1663    };
1664
1665    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1666    // to the standard /v1/events/:slot_id with bearer auth.
1667    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1668    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1669    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1670        return (
1671            StatusCode::BAD_REQUEST,
1672            Json(json!({
1673                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1674                "got_kind": kind,
1675                "got_type": type_str,
1676            })),
1677        )
1678            .into_response();
1679    }
1680
1681    // Body must embed a signed agent-card (so the receiver can pin from it).
1682    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1683        Some(c) => c.clone(),
1684        None => {
1685            return (
1686                StatusCode::BAD_REQUEST,
1687                Json(json!({"error": "intro event body must embed 'card' field"})),
1688            )
1689                .into_response();
1690        }
1691    };
1692    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1693        return (
1694            StatusCode::BAD_REQUEST,
1695            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1696        )
1697            .into_response();
1698    }
1699
1700    // Size + quota checks (same as post_event).
1701    let body_bytes = match serde_json::to_vec(&req.event) {
1702        Ok(b) => b,
1703        Err(e) => {
1704            return (
1705                StatusCode::BAD_REQUEST,
1706                Json(json!({"error": format!("event not serializable: {e}")})),
1707            )
1708                .into_response();
1709        }
1710    };
1711    if body_bytes.len() > MAX_EVENT_BYTES {
1712        return (
1713            StatusCode::PAYLOAD_TOO_LARGE,
1714            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1715        )
1716            .into_response();
1717    }
1718    {
1719        let inner = relay.inner.lock().await;
1720        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1721        if used + body_bytes.len() > MAX_SLOT_BYTES {
1722            return (
1723                StatusCode::PAYLOAD_TOO_LARGE,
1724                Json(json!({
1725                    "error": "target slot quota exceeded",
1726                    "slot_bytes_used": used,
1727                    "slot_bytes_max": MAX_SLOT_BYTES,
1728                })),
1729            )
1730                .into_response();
1731        }
1732    }
1733
1734    let event_id = req
1735        .event
1736        .get("event_id")
1737        .and_then(Value::as_str)
1738        .map(str::to_string);
1739
1740    // Dedupe by event_id if present.
1741    let dup = {
1742        let inner = relay.inner.lock().await;
1743        let slot = inner.slots.get(&slot_id);
1744        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1745            slot.iter()
1746                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1747        } else {
1748            false
1749        }
1750    };
1751    if dup {
1752        return (
1753            StatusCode::OK,
1754            Json(json!({"event_id": event_id, "status": "duplicate"})),
1755        )
1756            .into_response();
1757    }
1758
1759    {
1760        let mut inner = relay.inner.lock().await;
1761        let event_size = body_bytes.len();
1762        let slot = inner.slots.entry(slot_id.clone()).or_default();
1763        slot.push(req.event.clone());
1764        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1765    }
1766    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1767        return (
1768            StatusCode::INTERNAL_SERVER_ERROR,
1769            Json(json!({"error": format!("persist failed: {e}")})),
1770        )
1771            .into_response();
1772    }
1773    (
1774        StatusCode::CREATED,
1775        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1776    )
1777        .into_response()
1778}
1779
1780/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1781/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1782/// slot coords if claimed; 404 if not.
1783///
1784/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1785/// Domain part is ignored (the relay only serves nicks it has on file).
1786/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1787/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1788/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1789/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1790///
1791/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1792/// under the standard A2A `extensions` array using the wire extension URI.
1793/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1794/// wire-native clients get the full richer card by following the extension.
1795async fn well_known_agent_card_a2a(
1796    State(relay): State<Relay>,
1797    Query(q): Query<WellKnownAgentQuery>,
1798) -> impl IntoResponse {
1799    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1800    if nick.is_empty() {
1801        return (
1802            StatusCode::BAD_REQUEST,
1803            Json(json!({"error": "handle missing nick"})),
1804        )
1805            .into_response();
1806    }
1807    let inner = relay.inner.lock().await;
1808    let rec = match inner.handles.get(&nick) {
1809        Some(r) => r.clone(),
1810        None => {
1811            return (
1812                StatusCode::NOT_FOUND,
1813                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1814            )
1815                .into_response();
1816        }
1817    };
1818    drop(inner);
1819
1820    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1821    let description = profile
1822        .get("motto")
1823        .and_then(Value::as_str)
1824        .unwrap_or("")
1825        .to_string();
1826    let display_name = profile
1827        .get("display_name")
1828        .and_then(Value::as_str)
1829        .unwrap_or(&rec.nick)
1830        .to_string();
1831    let relay_url = rec.relay_url.clone().unwrap_or_default();
1832    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1833    let endpoint = if !relay_url.is_empty() {
1834        format!(
1835            "{}/v1/handle/intro/{}",
1836            relay_url.trim_end_matches('/'),
1837            rec.nick
1838        )
1839    } else {
1840        format!("/v1/handle/intro/{}", rec.nick)
1841    };
1842    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1843
1844    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1845    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1846    // parses without custom code.
1847    let a2a_card = json!({
1848        "id": rec.did,
1849        "name": display_name,
1850        "description": description,
1851        "version": "wire/0.5",
1852        "endpoint": endpoint,
1853        "provider": {
1854            "name": "wire",
1855            "url": "https://github.com/SlanchaAi/wire"
1856        },
1857        "capabilities": {
1858            "streaming": false,
1859            "pushNotifications": false,
1860            "extendedAgentCard": true
1861        },
1862        "securitySchemes": {
1863            "ed25519-event-sig": {
1864                "type": "signature",
1865                "alg": "EdDSA",
1866                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1867            }
1868        },
1869        "security": [{"ed25519-event-sig": []}],
1870        "skills": [],
1871        "extensions": [{
1872            // A2A extension URIs are opaque namespace identifiers, not
1873            // forwardable URLs. Changing this string is a coordinated
1874            // federation-spec bump because peers match it exactly.
1875            "uri": "https://slancha.ai/wire/ext/v0.5",
1876            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1877            "required": false,
1878            "params": {
1879                "did": rec.did,
1880                "handle": rec.nick,
1881                "slot_id": rec.slot_id,
1882                "relay_url": rec.relay_url,
1883                "card": rec.card,
1884                "profile": profile,
1885                "claimed_at": rec.claimed_at,
1886            }
1887        }],
1888        "signature": card_sig,
1889    });
1890    (StatusCode::OK, Json(a2a_card)).into_response()
1891}
1892
1893async fn well_known_agent(
1894    State(relay): State<Relay>,
1895    Query(q): Query<WellKnownAgentQuery>,
1896) -> impl IntoResponse {
1897    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1898    if nick.is_empty() {
1899        return (
1900            StatusCode::BAD_REQUEST,
1901            Json(json!({"error": "handle missing nick"})),
1902        )
1903            .into_response();
1904    }
1905    let inner = relay.inner.lock().await;
1906    match inner.handles.get(&nick) {
1907        Some(rec) => (
1908            StatusCode::OK,
1909            Json(json!({
1910                "nick": rec.nick,
1911                "did": rec.did,
1912                "card": rec.card,
1913                "slot_id": rec.slot_id,
1914                "relay_url": rec.relay_url,
1915                "claimed_at": rec.claimed_at,
1916            })),
1917        )
1918            .into_response(),
1919        None => (
1920            StatusCode::NOT_FOUND,
1921            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1922        )
1923            .into_response(),
1924    }
1925}
1926
1927async fn list_events(
1928    State(relay): State<Relay>,
1929    Path(slot_id): Path<String>,
1930    Query(q): Query<ListEventsQuery>,
1931    headers: HeaderMap,
1932) -> impl IntoResponse {
1933    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1934        return resp;
1935    }
1936    let limit = q.limit.unwrap_or(100).min(1000);
1937    let mut inner = relay.inner.lock().await;
1938    // R4: record this pull as proof that the slot owner is still polling.
1939    // Anyone holding the slot_token (i.e., a paired peer) can later read
1940    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1941    let now_unix = std::time::SystemTime::now()
1942        .duration_since(std::time::UNIX_EPOCH)
1943        .map(|d| d.as_secs())
1944        .unwrap_or(0);
1945    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1946    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1947    let start = match q.since {
1948        Some(eid) => events
1949            .iter()
1950            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1951            .map(|i| i + 1)
1952            .unwrap_or(0),
1953        None => 0,
1954    };
1955    let end = (start + limit).min(events.len());
1956    let slice = events[start..end].to_vec();
1957    (StatusCode::OK, Json(slice)).into_response()
1958}
1959
1960/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1961/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1962/// recent `list_events` call, in unix seconds) and `event_count` (total
1963/// stored). A remote sender uses this before `wire send <peer>` to warn the
1964/// operator if the peer hasn't polled recently.
1965async fn slot_state(
1966    State(relay): State<Relay>,
1967    Path(slot_id): Path<String>,
1968    headers: HeaderMap,
1969) -> impl IntoResponse {
1970    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1971        return resp;
1972    }
1973    let inner = relay.inner.lock().await;
1974    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1975    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1976    let responder_health = inner.responder_health.get(&slot_id).cloned();
1977    (
1978        StatusCode::OK,
1979        Json(json!({
1980            "slot_id": slot_id,
1981            "event_count": event_count,
1982            "last_pull_at_unix": last_pull_at_unix,
1983            "responder_health": responder_health,
1984        })),
1985    )
1986        .into_response()
1987}
1988
1989async fn responder_health_set(
1990    State(relay): State<Relay>,
1991    Path(slot_id): Path<String>,
1992    headers: HeaderMap,
1993    Json(record): Json<ResponderHealthRecord>,
1994) -> impl IntoResponse {
1995    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1996        return resp;
1997    }
1998    let path = relay
1999        .state_dir
2000        .join("responder-health")
2001        .join(format!("{slot_id}.json"));
2002    let body = match serde_json::to_vec_pretty(&record) {
2003        Ok(b) => b,
2004        Err(e) => {
2005            return (
2006                StatusCode::INTERNAL_SERVER_ERROR,
2007                Json(json!({"error": format!("serialize failed: {e}")})),
2008            )
2009                .into_response();
2010        }
2011    };
2012    if let Err(e) = tokio::fs::write(&path, body).await {
2013        return (
2014            StatusCode::INTERNAL_SERVER_ERROR,
2015            Json(json!({"error": format!("persist failed: {e}")})),
2016        )
2017            .into_response();
2018    }
2019    {
2020        let mut inner = relay.inner.lock().await;
2021        inner
2022            .responder_health
2023            .insert(slot_id.clone(), record.clone());
2024    }
2025    (StatusCode::OK, Json(record)).into_response()
2026}
2027
2028async fn check_token(
2029    relay: &Relay,
2030    headers: &HeaderMap,
2031    slot_id: &str,
2032) -> std::result::Result<(), axum::response::Response> {
2033    let auth = headers
2034        .get(AUTHORIZATION)
2035        .and_then(|h| h.to_str().ok())
2036        .and_then(|s| s.strip_prefix("Bearer "))
2037        .map(str::to_string);
2038    let presented = match auth {
2039        Some(t) => t,
2040        None => {
2041            return Err((
2042                StatusCode::UNAUTHORIZED,
2043                Json(json!({"error": "missing Bearer token"})),
2044            )
2045                .into_response());
2046        }
2047    };
2048    let inner = relay.inner.lock().await;
2049    let expected = match inner.tokens.get(slot_id) {
2050        Some(t) => t.clone(),
2051        None => {
2052            return Err((
2053                StatusCode::NOT_FOUND,
2054                Json(json!({"error": "unknown slot"})),
2055            )
2056                .into_response());
2057        }
2058    };
2059    drop(inner);
2060    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2061        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2062    }
2063    Ok(())
2064}
2065
2066fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2067    if a.len() != b.len() {
2068        return false;
2069    }
2070    let mut acc = 0u8;
2071    for (x, y) in a.iter().zip(b.iter()) {
2072        acc |= x ^ y;
2073    }
2074    acc == 0
2075}
2076
2077fn is_valid_slot_id(s: &str) -> bool {
2078    s.len() == 32
2079        && s.bytes()
2080            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2081}
2082
2083fn random_hex(n_bytes: usize) -> String {
2084    let mut buf = vec![0u8; n_bytes];
2085    rand::thread_rng().fill_bytes(&mut buf);
2086    hex::encode(buf)
2087}
2088
2089/// Run the relay until SIGINT/SIGTERM.
2090pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2091    serve_with_mode(bind, state_dir, ServerMode::default()).await
2092}
2093
2094/// v0.5.17: server-mode-aware entry point. Same as `serve` but with the
2095/// `--local-only` toggle exposed so the binary can refuse to publish
2096/// phonebook + well-known surfaces on within-machine relays.
2097pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2098    let relay = Relay::new(state_dir).await?;
2099    relay.spawn_pair_sweeper();
2100    relay.spawn_counter_persister();
2101    let app = relay.clone().router_with_mode(mode);
2102    let listener = tokio::net::TcpListener::bind(bind)
2103        .await
2104        .with_context(|| format!("binding {bind}"))?;
2105    if mode.local_only {
2106        eprintln!(
2107            "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2108        );
2109    } else {
2110        eprintln!("wire relay-server listening on {bind}");
2111    }
2112    let shutdown_relay = relay.clone();
2113    axum::serve(listener, app)
2114        .with_graceful_shutdown(async move {
2115            let _ = tokio::signal::ctrl_c().await;
2116            eprintln!("\nshutting down — final counter snapshot");
2117            if let Err(e) = shutdown_relay.persist_counters().await {
2118                eprintln!("final counter persist failed: {e}");
2119            }
2120        })
2121        .await?;
2122    Ok(())
2123}
2124
2125/// v0.7.0-alpha.16: Unix Domain Socket entry point. Mirrors `serve_with_mode`
2126/// but binds to a UDS path instead of a TCP host:port. Implicitly forces
2127/// `mode.local_only = true` because UDS has no concept of "publish to a
2128/// public phonebook." Chmods the socket 0600 so only the owner uid can
2129/// connect — the SO_PEERCRED-equivalent trust anchor for sister sessions.
2130///
2131/// Removes any stale socket file at `path` first (otherwise bind fails
2132/// with EADDRINUSE). Removes the socket on graceful shutdown.
2133///
2134/// Implementation note: axum 0.7's `serve` is TcpListener-only, so we
2135/// run the manual hyper accept loop (per-connection
2136/// `hyper::server::conn::http1::Builder`). When axum 0.8 ships with
2137/// generic `Listener` support, this can collapse to one line.
2138#[cfg(unix)]
2139pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2140    use hyper::server::conn::http1;
2141    use hyper_util::rt::TokioIo;
2142    use tower_service::Service;
2143
2144    // Best-effort cleanup of stale socket file.
2145    if socket_path.exists() {
2146        std::fs::remove_file(&socket_path)
2147            .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2148    }
2149    if let Some(parent) = socket_path.parent() {
2150        std::fs::create_dir_all(parent)
2151            .with_context(|| format!("creating socket parent {parent:?}"))?;
2152    }
2153    let relay = Relay::new(state_dir).await?;
2154    relay.spawn_pair_sweeper();
2155    relay.spawn_counter_persister();
2156    let app: axum::Router = relay
2157        .clone()
2158        .router_with_mode(ServerMode { local_only: true });
2159    let listener = tokio::net::UnixListener::bind(&socket_path)
2160        .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2161
2162    // 0600: owner-rw only. Trust anchor is the kernel-attested peer uid
2163    // (SO_PEERCRED equivalent); chmod is defense-in-depth.
2164    use std::os::unix::fs::PermissionsExt;
2165    if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2166        eprintln!(
2167            "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2168             socket may be accessible to other uids. Investigate."
2169        );
2170    }
2171    eprintln!(
2172        "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2173        socket_path.display()
2174    );
2175
2176    // Manual accept loop + per-conn hyper serve (axum 0.7's serve is
2177    // TcpListener-only). On SIGINT, persist counters + remove socket.
2178    let shutdown_relay = relay.clone();
2179    let socket_path_for_cleanup = socket_path.clone();
2180    let mut make_service = app.into_make_service();
2181
2182    let serve_loop = async {
2183        loop {
2184            let (stream, _peer_addr) = match listener.accept().await {
2185                Ok(p) => p,
2186                Err(e) => {
2187                    eprintln!("wire relay-server (UDS): accept failed: {e}");
2188                    continue;
2189                }
2190            };
2191            let tower_service = match make_service.call(&stream).await {
2192                Ok(s) => s,
2193                Err(infallible) => match infallible {},
2194            };
2195            let io = TokioIo::new(stream);
2196            let hyper_service =
2197                hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2198                    let mut svc = tower_service.clone();
2199                    async move { Service::call(&mut svc, req).await }
2200                });
2201            tokio::task::spawn(async move {
2202                if let Err(e) = http1::Builder::new()
2203                    .serve_connection(io, hyper_service)
2204                    .await
2205                {
2206                    // Connection-level error; not fatal to the listener.
2207                    if !e.is_incomplete_message() {
2208                        eprintln!("wire relay-server (UDS): conn error: {e}");
2209                    }
2210                }
2211            });
2212        }
2213    };
2214
2215    let shutdown = async {
2216        let _ = tokio::signal::ctrl_c().await;
2217        eprintln!("\nshutting down — final counter snapshot");
2218        if let Err(e) = shutdown_relay.persist_counters().await {
2219            eprintln!("final counter persist failed: {e}");
2220        }
2221        let _ = std::fs::remove_file(&socket_path_for_cleanup);
2222    };
2223
2224    tokio::select! {
2225        _ = serve_loop => {},
2226        _ = shutdown => {},
2227    };
2228    Ok(())
2229}
2230
2231#[cfg(not(unix))]
2232pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2233    Err(anyhow::anyhow!(
2234        "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2235         Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2236    ))
2237}
2238
2239/// v0.5.17: relay-server mode toggles. Default = full federation
2240/// (current behavior). `local_only` strips phonebook + well-known
2241/// surfaces so the relay is invisible from off-box and from any
2242/// directory-scraping agent on the same box.
2243#[derive(Debug, Clone, Copy, Default)]
2244pub struct ServerMode {
2245    /// When true, skip phonebook listing + `.well-known/wire/agent` +
2246    /// `.well-known/agent-card.json` + landing/stats pages. Pair this
2247    /// with a loopback bind (`--local-only` enforces this at the CLI
2248    /// layer) for genuinely within-machine traffic.
2249    pub local_only: bool,
2250}
2251
2252#[cfg(test)]
2253mod tests {
2254    use super::*;
2255
2256    #[test]
2257    fn constant_time_eq_basic() {
2258        assert!(constant_time_eq(b"abc", b"abc"));
2259        assert!(!constant_time_eq(b"abc", b"abd"));
2260        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2261    }
2262
2263    #[test]
2264    fn random_hex_length() {
2265        let s = random_hex(16);
2266        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2267        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2268    }
2269
2270    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2271    async fn pair_slot_evicts_when_idle_past_ttl() {
2272        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2273        let _ = std::fs::remove_dir_all(&dir);
2274        let relay = Relay::new(dir.clone()).await.unwrap();
2275
2276        // Seed a pair-slot manually with a past last_touched.
2277        {
2278            let mut inner = relay.inner.lock().await;
2279            inner
2280                .pair_lookup
2281                .insert("hash-A".to_string(), "id-A".to_string());
2282            inner.pair_slots.insert(
2283                "id-A".to_string(),
2284                PairSlot {
2285                    last_touched: std::time::Instant::now()
2286                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2287                    ..PairSlot::default()
2288                },
2289            );
2290
2291            // And a fresh one — should survive.
2292            inner
2293                .pair_lookup
2294                .insert("hash-B".to_string(), "id-B".to_string());
2295            inner
2296                .pair_slots
2297                .insert("id-B".to_string(), PairSlot::default());
2298
2299            assert_eq!(inner.pair_slots.len(), 2);
2300            assert_eq!(inner.pair_lookup.len(), 2);
2301        }
2302
2303        relay.evict_expired_pair_slots().await;
2304
2305        let inner = relay.inner.lock().await;
2306        assert_eq!(
2307            inner.pair_slots.len(),
2308            1,
2309            "expired slot should have been evicted"
2310        );
2311        assert!(inner.pair_slots.contains_key("id-B"));
2312        assert_eq!(inner.pair_lookup.len(), 1);
2313        assert!(inner.pair_lookup.contains_key("hash-B"));
2314        let _ = std::fs::remove_dir_all(&dir);
2315    }
2316
2317    #[test]
2318    fn slot_id_validator_accepts_only_lowercase_32hex() {
2319        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2320        assert!(is_valid_slot_id(&random_hex(16)));
2321        // wrong length
2322        assert!(!is_valid_slot_id("abc"));
2323        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2324        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2325        // uppercase
2326        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2327        // path traversal attempts
2328        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2329        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2330        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2331        // null bytes
2332        assert!(!is_valid_slot_id(
2333            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2334        ));
2335    }
2336}