Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168    /// v0.5.19 (#9.1): if false, this handle is omitted from the
169    /// `/v1/handles` directory listing — operator opted out of bulk
170    /// discovery. The `.well-known/wire/agent?handle=X` direct lookup
171    /// still resolves so existing peers + out-of-band sharing continue
172    /// to work. Default `None` = discoverable (back-compat for records
173    /// claimed pre-v0.5.19).
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub discoverable: Option<bool>,
176}
177
178impl HandleRecord {
179    /// Effective discoverability: defaults to true when the field is
180    /// absent (pre-v0.5.19 records).
181    fn is_discoverable(&self) -> bool {
182        self.discoverable.unwrap_or(true)
183    }
184}
185
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct ResponderHealthRecord {
188    pub status: String,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    pub reason: Option<String>,
191    #[serde(default, skip_serializing_if = "Option::is_none")]
192    pub last_success_at: Option<String>,
193    pub set_at: String,
194}
195
196#[derive(Clone, Debug)]
197struct PairSlot {
198    /// SPAKE2 message from the host side.
199    host_msg: Option<String>,
200    /// SPAKE2 message from the guest side.
201    guest_msg: Option<String>,
202    /// Sealed bootstrap payload from host (after SAS confirm).
203    host_bootstrap: Option<String>,
204    /// Sealed bootstrap payload from guest.
205    guest_bootstrap: Option<String>,
206    /// Last activity time (monotonic) — used for TTL eviction.
207    last_touched: std::time::Instant,
208}
209
210impl Default for PairSlot {
211    fn default() -> Self {
212        Self {
213            host_msg: None,
214            guest_msg: None,
215            host_bootstrap: None,
216            guest_bootstrap: None,
217            last_touched: std::time::Instant::now(),
218        }
219    }
220}
221
222/// Pair-slot idle TTL. After this many seconds without activity, the slot
223/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
224const PAIR_SLOT_TTL_SECS: u64 = 300;
225
226#[derive(Deserialize)]
227pub struct AllocateRequest {
228    /// Optional handle hint — purely informational, server doesn't enforce.
229    #[serde(default)]
230    pub handle: Option<String>,
231}
232
233#[derive(Deserialize)]
234pub struct PostEventRequest {
235    pub event: Value,
236}
237
238#[derive(Deserialize)]
239pub struct ListEventsQuery {
240    /// Resume from after this event_id (exclusive). Omit for full slot read.
241    pub since: Option<String>,
242    /// Max events to return. Default 100, max 1000.
243    pub limit: Option<usize>,
244}
245
246impl Relay {
247    pub async fn new(state_dir: PathBuf) -> Result<Self> {
248        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
249        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
250        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
251        let mut inner = Inner {
252            slots: HashMap::new(),
253            tokens: HashMap::new(),
254            slot_bytes: HashMap::new(),
255            last_pull_at_unix: HashMap::new(),
256            streams: HashMap::new(),
257            pair_lookup: HashMap::new(),
258            pair_slots: HashMap::new(),
259            handles: HashMap::new(),
260            responder_health: HashMap::new(),
261            invites: HashMap::new(),
262        };
263        // Reload tokens
264        let token_path = state_dir.join("tokens.json");
265        if token_path.exists() {
266            let body = tokio::fs::read_to_string(&token_path).await?;
267            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
268        }
269        // Reload slots from JSONL
270        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
271        while let Some(entry) = slots_dir.next_entry().await? {
272            let path = entry.path();
273            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
274                continue;
275            }
276            let stem = match path.file_stem().and_then(|s| s.to_str()) {
277                Some(s) => s.to_string(),
278                None => continue,
279            };
280            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
281            let mut events = Vec::new();
282            for line in body.lines() {
283                if let Ok(v) = serde_json::from_str::<Value>(line) {
284                    events.push(v);
285                }
286            }
287            // Recompute byte usage for the slot from its persisted events.
288            let bytes: usize = events
289                .iter()
290                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
291                .sum();
292            inner.slot_bytes.insert(stem.clone(), bytes);
293            inner.slots.insert(stem, events);
294        }
295        // Reload handle directory (v0.5).
296        let handles_dir = state_dir.join("handles");
297        if handles_dir.exists() {
298            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
299            while let Some(entry) = rd.next_entry().await? {
300                let path = entry.path();
301                if path.extension().and_then(|x| x.to_str()) != Some("json") {
302                    continue;
303                }
304                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
305                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
306                    inner.handles.insert(rec.nick.clone(), rec);
307                }
308            }
309        }
310        // Reload responder health records (R3).
311        let responder_health_dir = state_dir.join("responder-health");
312        if responder_health_dir.exists() {
313            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
314            while let Some(entry) = rd.next_entry().await? {
315                let path = entry.path();
316                if path.extension().and_then(|x| x.to_str()) != Some("json") {
317                    continue;
318                }
319                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
320                    continue;
321                };
322                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
323                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
324                    inner.responder_health.insert(slot_id.to_string(), rec);
325                }
326            }
327        }
328        // Reload short-URL invites. JSONL append-only; later entries with
329        // the same token overwrite earlier (won't happen — tokens are
330        // unique by construction — but coded defensively).
331        let invites_path = state_dir.join("invites.jsonl");
332        if invites_path.exists() {
333            let now_unix = SystemTime::now()
334                .duration_since(UNIX_EPOCH)
335                .map(|d| d.as_secs())
336                .unwrap_or(0);
337            let body = tokio::fs::read_to_string(&invites_path)
338                .await
339                .unwrap_or_default();
340            for line in body.lines() {
341                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
342                    && rec.expires_unix > now_unix
343                {
344                    inner.invites.insert(rec.token.clone(), rec);
345                }
346            }
347        }
348        let boot_unix = SystemTime::now()
349            .duration_since(UNIX_EPOCH)
350            .map(|d| d.as_secs())
351            .unwrap_or(0);
352        // Reload counter snapshot. Missing/corrupt file → start at zero.
353        let snap: CountersSnapshot =
354            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
355                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
356                Err(_) => CountersSnapshot::default(),
357            };
358        Ok(Self {
359            inner: Arc::new(Mutex::new(inner)),
360            state_dir,
361            counters: Arc::new(RelayCounters {
362                boot_unix,
363                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
364                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
365                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
366                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
367                events_posted_total: AtomicU64::new(snap.events_posted_total),
368            }),
369        })
370    }
371
372    pub fn router(self) -> Router {
373        self.router_with_mode(ServerMode::default())
374    }
375
376    pub fn router_with_mode(self, mode: ServerMode) -> Router {
377        // Rate limit applied to write endpoints that create new state (slots,
378        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
379        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
380        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
381        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
382        // global cap shoulder DDoS protection in series.
383        let governor_conf = std::sync::Arc::new(
384            GovernorConfigBuilder::default()
385                .per_second(10)
386                .burst_size(50)
387                .key_extractor(GlobalKeyExtractor)
388                .finish()
389                .expect("valid governor config"),
390        );
391        let governor_layer = GovernorLayer {
392            config: governor_conf,
393        };
394
395        // Hot writes group — rate limited.
396        let hot_writes = Router::new()
397            .route("/v1/slot/allocate", post(allocate_slot))
398            .route("/v1/pair", post(pair_open))
399            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
400            .route("/v1/pair/abandon", post(pair_abandon))
401            .layer(governor_layer);
402
403        // Core data-plane routes: events, pair, slots, healthz. Always
404        // present, in both federation and local-only modes.
405        let mut router = Router::new()
406            .route("/healthz", get(healthz))
407            .route("/v1/events/:slot_id", post(post_event).get(list_events))
408            .route("/v1/slot/:slot_id/state", get(slot_state))
409            .route(
410                "/v1/slot/:slot_id/responder-health",
411                post(responder_health_set),
412            )
413            .route("/v1/events/:slot_id/stream", get(stream_events))
414            .route("/v1/pair/:pair_id", get(pair_get))
415            .route("/v1/handle/intro/:nick", post(handle_intro));
416
417        // Discovery + landing surfaces: phonebook, well-known agent cards,
418        // landing page, stats, invite landing. In `--local-only` mode we
419        // skip all of these — the relay becomes invisible from the outside
420        // (and from other agents on the same box that might enumerate it).
421        // This is the v0.5.17 within-machine privacy guarantee.
422        if !mode.local_only {
423            router = router
424                .route("/", get(landing_index))
425                .route("/favicon.svg", get(landing_favicon))
426                .route("/og.png", get(landing_og))
427                .route("/demo.cast", get(landing_demo_cast))
428                .route("/install", get(landing_install_sh))
429                .route("/install.sh", get(landing_install_sh))
430                .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
431                .route("/stats", get(stats_root))
432                .route("/stats.json", get(stats_json))
433                .route("/stats.html", get(landing_stats_html))
434                .route("/stats.history", get(stats_history))
435                .route("/phonebook", get(landing_phonebook_html))
436                .route("/phonebook.html", get(landing_phonebook_html))
437                .route("/v1/handle/claim", post(handle_claim))
438                .route("/v1/handles", get(handles_directory))
439                .route("/v1/invite/register", post(invite_register))
440                .route("/i/:token", get(invite_script))
441                .route("/.well-known/wire/agent", get(well_known_agent))
442                .route(
443                    "/.well-known/agent-card.json",
444                    get(well_known_agent_card_a2a),
445                );
446        } else {
447            // Local-only mode still needs handle_claim for in-process
448            // session bootstrap (`wire session new` allocates a local
449            // slot AND claims a handle on the local relay so peers can
450            // resolve it). The claim is gated to loopback-only callers
451            // by the bind, not by the route.
452            router = router.route("/v1/handle/claim", post(handle_claim));
453        }
454
455        router.merge(hot_writes).with_state(self)
456    }
457
458    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
459    /// Called inline on every pair-slot mutation; a background sweeper task
460    /// (see `Self::start_sweeper`) covers the long-idle case.
461    async fn evict_expired_pair_slots(&self) {
462        let now = std::time::Instant::now();
463        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
464        let mut inner = self.inner.lock().await;
465        let mut to_remove = Vec::new();
466        for (id, slot) in inner.pair_slots.iter() {
467            if now.duration_since(slot.last_touched) > ttl {
468                to_remove.push(id.clone());
469            }
470        }
471        for id in to_remove {
472            inner.pair_slots.remove(&id);
473            inner.pair_lookup.retain(|_, v| v != &id);
474        }
475    }
476
477    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
478    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
479    /// — process exit reaps it. Safe to skip in tests where you'd rather test
480    /// eviction inline.
481    pub fn spawn_pair_sweeper(&self) {
482        let me = self.clone();
483        tokio::spawn(async move {
484            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
485            loop {
486                tick.tick().await;
487                me.evict_expired_pair_slots().await;
488            }
489        });
490    }
491
492    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
493    /// every 30s by `spawn_counter_persister` and once during graceful
494    /// shutdown so a deploy doesn't reset the running totals.
495    pub async fn persist_counters(&self) -> Result<()> {
496        let snap = CountersSnapshot {
497            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
498            handle_first_claims_total: self
499                .counters
500                .handle_first_claims_total
501                .load(Ordering::Relaxed),
502            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
503            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
504            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
505        };
506        let body = serde_json::to_vec_pretty(&snap)?;
507        let path = self.state_dir.join("counters.json");
508        tokio::fs::write(path, body).await?;
509        Ok(())
510    }
511
512    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
513    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
514    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
515    /// roll old entries off once the history exceeds 90 days.
516    pub async fn append_history(&self) -> Result<()> {
517        use tokio::io::AsyncWriteExt;
518        let now = SystemTime::now()
519            .duration_since(UNIX_EPOCH)
520            .map(|d| d.as_secs())
521            .unwrap_or(0);
522        let (handles_active, slots_active, pair_slots_open, streams_active) = {
523            let inner = self.inner.lock().await;
524            (
525                inner.handles.len(),
526                inner.slots.len(),
527                inner.pair_slots.len(),
528                inner.streams.values().map(Vec::len).sum::<usize>(),
529            )
530        };
531        let entry = HistoryEntry {
532            ts: now,
533            handles_active,
534            slots_active,
535            pair_slots_open,
536            streams_active,
537            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
538            handle_first_claims_total: self
539                .counters
540                .handle_first_claims_total
541                .load(Ordering::Relaxed),
542            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
543            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
544            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
545        };
546        let line = serde_json::to_vec(&entry)?;
547        let path = self.state_dir.join("stats-history.jsonl");
548        let mut f = tokio::fs::OpenOptions::new()
549            .create(true)
550            .append(true)
551            .open(&path)
552            .await?;
553        f.write_all(&line).await?;
554        f.write_all(b"\n").await?;
555        f.flush().await?;
556        Ok(())
557    }
558
559    /// Spawn a background tokio task that calls `persist_counters` every 30s
560    /// + appends a history row on the same tick. Loss bound: counters can
561    ///   drift back up to 30s on crash, history can drop one row.
562    pub fn spawn_counter_persister(&self) {
563        let me = self.clone();
564        tokio::spawn(async move {
565            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
566            // First tick fires immediately; skip it so we don't write the
567            // freshly-loaded snapshot back unchanged.
568            tick.tick().await;
569            loop {
570                tick.tick().await;
571                if let Err(e) = me.persist_counters().await {
572                    eprintln!("counter persist failed: {e}");
573                }
574                if let Err(e) = me.append_history().await {
575                    eprintln!("history append failed: {e}");
576                }
577            }
578        });
579    }
580
581    async fn persist_tokens(&self) -> Result<()> {
582        let body = {
583            let inner = self.inner.lock().await;
584            serde_json::to_string_pretty(&inner.tokens)?
585        };
586        let path = self.state_dir.join("tokens.json");
587        tokio::fs::write(path, body).await?;
588        Ok(())
589    }
590
591    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
592        // Defense in depth: only allow lowercase hex slot_ids of the exact length
593        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
594        // any future code path that might let attacker-controlled slot_ids reach
595        // disk operations. allocate_slot() always meets this; this assert is
596        // belt-and-suspenders against future regressions.
597        if !is_valid_slot_id(slot_id) {
598            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
599        }
600        let path = self
601            .state_dir
602            .join("slots")
603            .join(format!("{slot_id}.jsonl"));
604        let mut line = serde_json::to_vec(event)?;
605        line.push(b'\n');
606        use tokio::io::AsyncWriteExt;
607        let mut f = tokio::fs::OpenOptions::new()
608            .create(true)
609            .append(true)
610            .open(&path)
611            .await
612            .with_context(|| format!("opening {path:?}"))?;
613        f.write_all(&line).await?;
614        f.flush().await?;
615        Ok(())
616    }
617}
618
619async fn healthz() -> impl IntoResponse {
620    (StatusCode::OK, "ok\n")
621}
622
623// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
624// process restart; state fields (`handles_active`, `slots_active`) survive
625// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
626async fn stats_history(
627    State(relay): State<Relay>,
628    Query(q): Query<StatsHistoryQuery>,
629) -> impl IntoResponse {
630    let hours = q.hours.unwrap_or(24).min(168);
631    let now = SystemTime::now()
632        .duration_since(UNIX_EPOCH)
633        .map(|d| d.as_secs())
634        .unwrap_or(0);
635    let cutoff = now.saturating_sub(hours * 3600);
636    let path = relay.state_dir.join("stats-history.jsonl");
637    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
638    let entries: Vec<Value> = body
639        .lines()
640        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
641        .filter(|v| {
642            v.get("ts")
643                .and_then(Value::as_u64)
644                .map(|t| t >= cutoff)
645                .unwrap_or(false)
646        })
647        .collect();
648    (
649        StatusCode::OK,
650        Json(json!({
651            "hours": hours,
652            "now_unix": now,
653            "count": entries.len(),
654            "entries": entries,
655        })),
656    )
657}
658
659async fn landing_stats_html() -> impl IntoResponse {
660    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
661    (
662        StatusCode::OK,
663        [
664            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
665            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
666        ],
667        STATS_HTML,
668    )
669}
670
671async fn landing_phonebook_html() -> impl IntoResponse {
672    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
673    (
674        StatusCode::OK,
675        [
676            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
677            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
678        ],
679        PHONEBOOK_HTML,
680    )
681}
682
683/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
684/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
685/// Keeps the JSON contract intact while letting humans land on the page at
686/// the short URL.
687async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
688    let wants_html = headers
689        .get(axum::http::header::ACCEPT)
690        .and_then(|v| v.to_str().ok())
691        .unwrap_or("")
692        .contains("text/html");
693    if wants_html {
694        landing_stats_html().await.into_response()
695    } else {
696        stats_json(State(relay)).await.into_response()
697    }
698}
699
700async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
701    let now = SystemTime::now()
702        .duration_since(UNIX_EPOCH)
703        .map(|d| d.as_secs())
704        .unwrap_or(0);
705    let inner = relay.inner.lock().await;
706    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
707    let body = json!({
708        "version": env!("CARGO_PKG_VERSION"),
709        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
710        "handles_active": inner.handles.len(),
711        "slots_active": inner.slots.len(),
712        "pair_slots_open": inner.pair_slots.len(),
713        "streams_active": streams_active,
714        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
715        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
716        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
717        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
718        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
719    });
720    (StatusCode::OK, Json(body))
721}
722
723// Static landing site baked into the binary so apex (wireup.net) can flip
724// straight to Fly without a separate static-host. ~37 KB total — negligible
725// against the release binary size, and keeps the relay self-contained.
726async fn landing_index() -> impl IntoResponse {
727    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
728    (
729        StatusCode::OK,
730        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
731        INDEX_HTML,
732    )
733}
734
735async fn landing_favicon() -> impl IntoResponse {
736    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
737    (
738        StatusCode::OK,
739        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
740        FAVICON_SVG,
741    )
742}
743
744async fn landing_og() -> impl IntoResponse {
745    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
746    (
747        StatusCode::OK,
748        [
749            (axum::http::header::CONTENT_TYPE, "image/png"),
750            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
751        ],
752        OG_PNG,
753    )
754}
755
756async fn landing_demo_cast() -> impl IntoResponse {
757    static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
758    (
759        StatusCode::OK,
760        [
761            (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
762            (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
763        ],
764        DEMO_CAST,
765    )
766}
767
768async fn landing_install_sh() -> impl IntoResponse {
769    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
770    (
771        StatusCode::OK,
772        [
773            (
774                axum::http::header::CONTENT_TYPE,
775                "text/x-shellscript; charset=utf-8",
776            ),
777            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
778        ],
779        INSTALL_SH,
780    )
781}
782
783async fn landing_openshell_policy_sh() -> impl IntoResponse {
784    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
785    (
786        StatusCode::OK,
787        [
788            (
789                axum::http::header::CONTENT_TYPE,
790                "text/x-shellscript; charset=utf-8",
791            ),
792            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
793        ],
794        POLICY_SH,
795    )
796}
797
798async fn allocate_slot(
799    State(relay): State<Relay>,
800    Json(_req): Json<AllocateRequest>,
801) -> impl IntoResponse {
802    let slot_id = random_hex(16);
803    let slot_token = random_hex(32);
804    {
805        let mut inner = relay.inner.lock().await;
806        inner.slots.insert(slot_id.clone(), Vec::new());
807        inner.tokens.insert(slot_id.clone(), slot_token.clone());
808    }
809    if let Err(e) = relay.persist_tokens().await {
810        return (
811            StatusCode::INTERNAL_SERVER_ERROR,
812            Json(json!({"error": format!("persist failed: {e}")})),
813        )
814            .into_response();
815    }
816    relay
817        .counters
818        .slot_allocations_total
819        .fetch_add(1, Ordering::Relaxed);
820    (
821        StatusCode::CREATED,
822        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
823    )
824        .into_response()
825}
826
827async fn post_event(
828    State(relay): State<Relay>,
829    Path(slot_id): Path<String>,
830    headers: HeaderMap,
831    Json(req): Json<PostEventRequest>,
832) -> impl IntoResponse {
833    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
834        return resp;
835    }
836    // Body size cap (rough; serialize and check).
837    let body_bytes = match serde_json::to_vec(&req.event) {
838        Ok(b) => b,
839        Err(e) => {
840            return (
841                StatusCode::BAD_REQUEST,
842                Json(json!({"error": format!("event not serializable: {e}")})),
843            )
844                .into_response();
845        }
846    };
847    if body_bytes.len() > MAX_EVENT_BYTES {
848        return (
849            StatusCode::PAYLOAD_TOO_LARGE,
850            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
851        )
852            .into_response();
853    }
854    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
855    {
856        let inner = relay.inner.lock().await;
857        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
858        if used + body_bytes.len() > MAX_SLOT_BYTES {
859            return (
860                StatusCode::PAYLOAD_TOO_LARGE,
861                Json(json!({
862                    "error": "slot quota exceeded",
863                    "slot_bytes_used": used,
864                    "slot_bytes_max": MAX_SLOT_BYTES,
865                    "remediation": "operator should `wire rotate-slot` to drain old slot",
866                })),
867            )
868                .into_response();
869        }
870    }
871    let event_id = req
872        .event
873        .get("event_id")
874        .and_then(Value::as_str)
875        .map(str::to_string);
876
877    // Dedupe by event_id if present.
878    let dup = {
879        let inner = relay.inner.lock().await;
880        let slot = inner.slots.get(&slot_id);
881        if let (Some(eid), Some(slot)) = (&event_id, slot) {
882            slot.iter()
883                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
884        } else {
885            false
886        }
887    };
888    if dup {
889        return (
890            StatusCode::OK,
891            Json(json!({"event_id": event_id, "status": "duplicate"})),
892        )
893            .into_response();
894    }
895
896    {
897        let mut inner = relay.inner.lock().await;
898        let event_size = body_bytes.len();
899        let slot = inner.slots.entry(slot_id.clone()).or_default();
900        slot.push(req.event.clone());
901        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
902    }
903    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
904        return (
905            StatusCode::INTERNAL_SERVER_ERROR,
906            Json(json!({"error": format!("persist failed: {e}")})),
907        )
908            .into_response();
909    }
910    relay
911        .counters
912        .events_posted_total
913        .fetch_add(1, Ordering::Relaxed);
914    // R1 push: broadcast the new event to every active SSE subscriber on
915    // this slot. Dead channels are pruned in-place. The broadcast happens
916    // AFTER the disk persist so subscribers and disk readers see the same
917    // events; on persist failure we already returned 500 above.
918    {
919        let mut inner = relay.inner.lock().await;
920        if let Some(subs) = inner.streams.get_mut(&slot_id) {
921            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
922        }
923    }
924    (
925        StatusCode::CREATED,
926        Json(json!({"event_id": event_id, "status": "stored"})),
927    )
928        .into_response()
929}
930
931/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
932/// (same as `list_events`). The connection registers an `UnboundedSender`
933/// on the slot's subscriber list; every subsequent `post_event` to the slot
934/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
935/// connection stays open until the client disconnects.
936///
937/// A 30-second keepalive ping is emitted automatically so reverse proxies
938/// (Cloudflare tunnel, nginx) don't time out the upstream.
939///
940/// Note: the subscriber sees events posted AFTER it subscribed. To catch
941/// up on history first, the client should call `GET /v1/events/:slot_id`
942/// with `since=` before opening the stream.
943async fn stream_events(
944    State(relay): State<Relay>,
945    Path(slot_id): Path<String>,
946    headers: HeaderMap,
947) -> axum::response::Response {
948    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
949    use futures::stream::StreamExt;
950
951    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
952        return resp;
953    }
954
955    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
956    {
957        let mut inner = relay.inner.lock().await;
958        inner.streams.entry(slot_id.clone()).or_default().push(tx);
959    }
960
961    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
962        SseEvent::default()
963            .json_data(&ev)
964            .map_err(|e| std::io::Error::other(e.to_string()))
965    });
966
967    Sse::new(stream)
968        .keep_alive(
969            KeepAlive::new()
970                .interval(std::time::Duration::from_secs(30))
971                .text("phyllis: still on the line"),
972        )
973        .into_response()
974}
975
976// ---------- pair-slot handlers ----------
977
978#[derive(Deserialize)]
979pub struct PairOpenRequest {
980    pub code_hash: String,
981    /// SPAKE2 message (base64).
982    pub msg: String,
983    pub role: String, // "host" or "guest"
984}
985
986#[derive(Deserialize)]
987pub struct PairBootstrapRequest {
988    pub role: String,
989    pub sealed: String,
990}
991
992#[derive(Deserialize)]
993pub struct PairAbandonRequest {
994    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
995    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
996    pub code_hash: String,
997}
998
999/// Forget the pair-slot associated with this code_hash. Either side can call;
1000/// no auth beyond knowledge of the code (which is the shared secret of this
1001/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
1002/// Used by clients to recover after a crash mid-handshake, so the host doesn't
1003/// stay locked out until the 5-minute TTL.
1004async fn pair_abandon(
1005    State(relay): State<Relay>,
1006    Json(req): Json<PairAbandonRequest>,
1007) -> impl IntoResponse {
1008    let mut inner = relay.inner.lock().await;
1009    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
1010        inner.pair_slots.remove(&pair_id);
1011    }
1012    StatusCode::NO_CONTENT.into_response()
1013}
1014
1015async fn pair_open(
1016    State(relay): State<Relay>,
1017    Json(req): Json<PairOpenRequest>,
1018) -> impl IntoResponse {
1019    if req.role != "host" && req.role != "guest" {
1020        return (
1021            StatusCode::BAD_REQUEST,
1022            Json(json!({"error": "role must be 'host' or 'guest'"})),
1023        )
1024            .into_response();
1025    }
1026    relay.evict_expired_pair_slots().await;
1027    let mut inner = relay.inner.lock().await;
1028    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1029        Some(id) => id,
1030        None => {
1031            let new_id = random_hex(16);
1032            inner
1033                .pair_lookup
1034                .insert(req.code_hash.clone(), new_id.clone());
1035            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1036            relay
1037                .counters
1038                .pair_opens_total
1039                .fetch_add(1, Ordering::Relaxed);
1040            new_id
1041        }
1042    };
1043    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1044    slot.last_touched = std::time::Instant::now();
1045    if req.role == "host" {
1046        if slot.host_msg.is_some() {
1047            return (
1048                StatusCode::CONFLICT,
1049                Json(json!({"error": "host already registered for this code"})),
1050            )
1051                .into_response();
1052        }
1053        slot.host_msg = Some(req.msg);
1054    } else {
1055        if slot.guest_msg.is_some() {
1056            return (
1057                StatusCode::CONFLICT,
1058                Json(json!({"error": "guest already registered for this code"})),
1059            )
1060                .into_response();
1061        }
1062        slot.guest_msg = Some(req.msg);
1063    }
1064    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1065}
1066
1067#[derive(Deserialize)]
1068pub struct PairGetQuery {
1069    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1070    pub as_role: String,
1071}
1072
1073async fn pair_get(
1074    State(relay): State<Relay>,
1075    Path(pair_id): Path<String>,
1076    Query(q): Query<PairGetQuery>,
1077) -> impl IntoResponse {
1078    relay.evict_expired_pair_slots().await;
1079    let mut inner = relay.inner.lock().await;
1080    let slot = match inner.pair_slots.get_mut(&pair_id) {
1081        Some(s) => {
1082            s.last_touched = std::time::Instant::now();
1083            s.clone()
1084        }
1085        None => {
1086            return (
1087                StatusCode::NOT_FOUND,
1088                Json(json!({"error": "unknown pair_id"})),
1089            )
1090                .into_response();
1091        }
1092    };
1093    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1094        "host" => (slot.guest_msg, slot.guest_bootstrap),
1095        "guest" => (slot.host_msg, slot.host_bootstrap),
1096        _ => {
1097            return (
1098                StatusCode::BAD_REQUEST,
1099                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1100            )
1101                .into_response();
1102        }
1103    };
1104    (
1105        StatusCode::OK,
1106        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1107    )
1108        .into_response()
1109}
1110
1111async fn pair_bootstrap(
1112    State(relay): State<Relay>,
1113    Path(pair_id): Path<String>,
1114    Json(req): Json<PairBootstrapRequest>,
1115) -> impl IntoResponse {
1116    relay.evict_expired_pair_slots().await;
1117    let mut inner = relay.inner.lock().await;
1118    let slot = match inner.pair_slots.get_mut(&pair_id) {
1119        Some(s) => s,
1120        None => {
1121            return (
1122                StatusCode::NOT_FOUND,
1123                Json(json!({"error": "unknown pair_id"})),
1124            )
1125                .into_response();
1126        }
1127    };
1128    slot.last_touched = std::time::Instant::now();
1129    match req.role.as_str() {
1130        "host" => slot.host_bootstrap = Some(req.sealed),
1131        "guest" => slot.guest_bootstrap = Some(req.sealed),
1132        _ => {
1133            return (
1134                StatusCode::BAD_REQUEST,
1135                Json(json!({"error": "role must be 'host' or 'guest'"})),
1136            )
1137                .into_response();
1138        }
1139    }
1140    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1141}
1142
1143// ---------- handle directory (v0.5) ----------
1144
1145#[derive(Deserialize)]
1146pub struct HandleClaimRequest {
1147    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1148    /// domain the relay's `.well-known` is served from.
1149    pub nick: String,
1150    /// Slot id the claimant owns on this relay (proves they allocated here).
1151    pub slot_id: String,
1152    /// Optional public-facing relay URL the relay should advertise back in
1153    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1154    /// know the relay URL out-of-band.
1155    pub relay_url: Option<String>,
1156    /// Claimant's full signed agent-card (includes DID + verify_keys +
1157    /// optional profile).
1158    pub card: Value,
1159    /// v0.5.19 (#9.1): set false to opt out of `/v1/handles` bulk listing.
1160    /// Direct `.well-known/wire/agent` lookup by handle still works. The
1161    /// default (None / absent) is discoverable, for back-compat with
1162    /// pre-v0.5.19 clients.
1163    #[serde(default, skip_serializing_if = "Option::is_none")]
1164    pub discoverable: Option<bool>,
1165}
1166
1167/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1168///
1169/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1170/// slot rotation). Different-DID claims on a taken nick return 409.
1171/// Caller must (a) own the `slot_id` they reference (verified by token
1172/// being present), and (b) submit a card with a valid self-signature.
1173async fn handle_claim(
1174    State(relay): State<Relay>,
1175    headers: HeaderMap,
1176    Json(req): Json<HandleClaimRequest>,
1177) -> impl IntoResponse {
1178    // Bearer auth: claimant must hold the slot_token for the slot they
1179    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1180    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1181        return resp;
1182    }
1183    // Validate nick (same rules as the client-side parser).
1184    if !crate::pair_profile::is_valid_nick(&req.nick) {
1185        return (
1186            StatusCode::BAD_REQUEST,
1187            Json(json!({
1188                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1189                "nick": req.nick,
1190            })),
1191        )
1192            .into_response();
1193    }
1194    // Verify the card signature using the public verify_agent_card helper.
1195    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1196        return (
1197            StatusCode::BAD_REQUEST,
1198            Json(json!({"error": format!("card signature invalid: {e}")})),
1199        )
1200            .into_response();
1201    }
1202    let did = match req.card.get("did").and_then(Value::as_str) {
1203        Some(d) => d.to_string(),
1204        None => {
1205            return (
1206                StatusCode::BAD_REQUEST,
1207                Json(json!({"error": "card missing 'did' field"})),
1208            )
1209                .into_response();
1210        }
1211    };
1212
1213    // FCFS check. Also snapshot the existing record (clone) so the
1214    // re-claim path below can preserve fields the client didn't include
1215    // in the request (notably `discoverable` from v0.5.19, so an old
1216    // client doing a profile-update re-claim doesn't accidentally
1217    // re-publish a hidden handle).
1218    let prior_record: Option<HandleRecord>;
1219    let first_claim = {
1220        let inner = relay.inner.lock().await;
1221        match inner.handles.get(&req.nick) {
1222            Some(existing) if existing.did != did => {
1223                return (
1224                    StatusCode::CONFLICT,
1225                    Json(json!({
1226                        "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1227                        "nick": req.nick,
1228                        "claimed_by": existing.did,
1229                    })),
1230                )
1231                    .into_response();
1232            }
1233            Some(prev) => {
1234                prior_record = Some(prev.clone());
1235                false
1236            }
1237            None => {
1238                prior_record = None;
1239                true
1240            }
1241        }
1242    };
1243
1244    // v0.5.19 (#9.5): round claimed_at to whole seconds. Nanosecond
1245    // precision served no client purpose (display only) and acted as a
1246    // cross-tab fingerprint correlating one operator's multiple handles
1247    // claimed in the same session. Truncate before formatting.
1248    let now = time::OffsetDateTime::now_utc()
1249        .replace_nanosecond(0)
1250        .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1251        .format(&time::format_description::well_known::Rfc3339)
1252        .unwrap_or_default();
1253    // v0.5.19 (#9.1): preserve `discoverable` across re-claims. If the
1254    // request doesn't set it explicitly, keep whatever the existing
1255    // record had so a profile-update re-claim doesn't accidentally
1256    // re-publish a hidden handle. Default for first-time claim is None
1257    // (= discoverable, back-compat).
1258    let discoverable = match (req.discoverable, &prior_record) {
1259        (Some(d), _) => Some(d),
1260        (None, Some(prev)) => prev.discoverable,
1261        (None, None) => None,
1262    };
1263    let record = HandleRecord {
1264        nick: req.nick.clone(),
1265        did: did.clone(),
1266        card: req.card.clone(),
1267        slot_id: req.slot_id.clone(),
1268        relay_url: req.relay_url.clone(),
1269        claimed_at: now,
1270        discoverable,
1271    };
1272
1273    // Persist to disk first (durable), then update in-memory.
1274    let path = relay
1275        .state_dir
1276        .join("handles")
1277        .join(format!("{}.json", req.nick));
1278    let body = match serde_json::to_vec_pretty(&record) {
1279        Ok(b) => b,
1280        Err(e) => {
1281            return (
1282                StatusCode::INTERNAL_SERVER_ERROR,
1283                Json(json!({"error": format!("serialize failed: {e}")})),
1284            )
1285                .into_response();
1286        }
1287    };
1288    if let Err(e) = tokio::fs::write(&path, &body).await {
1289        return (
1290            StatusCode::INTERNAL_SERVER_ERROR,
1291            Json(json!({"error": format!("persist failed: {e}")})),
1292        )
1293            .into_response();
1294    }
1295    {
1296        let mut inner = relay.inner.lock().await;
1297        inner.handles.insert(req.nick.clone(), record);
1298    }
1299    relay
1300        .counters
1301        .handle_claims_total
1302        .fetch_add(1, Ordering::Relaxed);
1303    if first_claim {
1304        relay
1305            .counters
1306            .handle_first_claims_total
1307            .fetch_add(1, Ordering::Relaxed);
1308    }
1309    (
1310        StatusCode::CREATED,
1311        Json(json!({
1312            "nick": req.nick,
1313            "did": did,
1314            "status": if first_claim { "claimed" } else { "re-claimed" },
1315        })),
1316    )
1317        .into_response()
1318}
1319
1320#[derive(Deserialize)]
1321pub struct WellKnownAgentQuery {
1322    pub handle: String,
1323}
1324
1325#[derive(Deserialize)]
1326pub struct HandlesDirectoryQuery {
1327    pub cursor: Option<String>,
1328    pub limit: Option<usize>,
1329    pub vibe: Option<String>,
1330}
1331
1332// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1333// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1334// here, gets back a 6-hex token. Anyone who does
1335//   curl -fsSL https://wireup.net/i/<token> | sh
1336// gets wire installed (if needed) + the invite accepted, in one shot.
1337//
1338// Possession of the short URL = pair authorization (same shape as the
1339// underlying wire:// invite — it's just a redirector).
1340
1341#[derive(Deserialize)]
1342pub struct InviteRegisterRequest {
1343    /// The wire://pair?... URL produced by `wire invite`. Required.
1344    pub invite_url: String,
1345    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1346    #[serde(default)]
1347    pub ttl_seconds: Option<u64>,
1348    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1349    /// `None` = unlimited until TTL hits.
1350    #[serde(default)]
1351    pub uses: Option<u32>,
1352}
1353
1354impl Relay {
1355    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1356    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1357        use tokio::io::AsyncWriteExt;
1358        let mut line = serde_json::to_vec(rec)?;
1359        line.push(b'\n');
1360        let path = self.state_dir.join("invites.jsonl");
1361        let mut f = tokio::fs::OpenOptions::new()
1362            .create(true)
1363            .append(true)
1364            .open(&path)
1365            .await?;
1366        f.write_all(&line).await?;
1367        f.flush().await?;
1368        Ok(())
1369    }
1370}
1371
1372async fn invite_register(
1373    State(relay): State<Relay>,
1374    Json(req): Json<InviteRegisterRequest>,
1375) -> impl IntoResponse {
1376    if req.invite_url.is_empty() {
1377        return (
1378            StatusCode::BAD_REQUEST,
1379            Json(json!({"error": "invite_url required"})),
1380        )
1381            .into_response();
1382    }
1383    // Length cap on the embedded URL to keep persisted records bounded.
1384    if req.invite_url.len() > 8_192 {
1385        return (
1386            StatusCode::PAYLOAD_TOO_LARGE,
1387            Json(json!({"error": "invite_url > 8 KiB"})),
1388        )
1389            .into_response();
1390    }
1391    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1392    let now = SystemTime::now()
1393        .duration_since(UNIX_EPOCH)
1394        .map(|d| d.as_secs())
1395        .unwrap_or(0);
1396    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1397    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1398    let token = random_hex(3);
1399    let rec = InviteRecord {
1400        token: token.clone(),
1401        invite_url: req.invite_url,
1402        expires_unix: now + ttl,
1403        uses_remaining: req.uses,
1404        created_unix: now,
1405    };
1406    {
1407        let mut inner = relay.inner.lock().await;
1408        if inner.invites.contains_key(&token) {
1409            return (
1410                StatusCode::CONFLICT,
1411                Json(json!({"error": "token collision, retry"})),
1412            )
1413                .into_response();
1414        }
1415        inner.invites.insert(token.clone(), rec.clone());
1416    }
1417    if let Err(e) = relay.persist_invite(&rec).await {
1418        return (
1419            StatusCode::INTERNAL_SERVER_ERROR,
1420            Json(json!({"error": format!("persist failed: {e}")})),
1421        )
1422            .into_response();
1423    }
1424    (
1425        StatusCode::CREATED,
1426        Json(json!({
1427            "token": token,
1428            "path": format!("/i/{token}"),
1429            "expires_unix": rec.expires_unix,
1430            "uses_remaining": rec.uses_remaining,
1431        })),
1432    )
1433        .into_response()
1434}
1435
1436#[derive(Deserialize)]
1437pub struct InviteScriptQuery {
1438    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1439    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1440    /// short URL programmatically). Default: shell-script template.
1441    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1442    /// resolution lookup, not an acceptance. The actual accept happens
1443    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1444    pub format: Option<String>,
1445}
1446
1447async fn invite_script(
1448    State(relay): State<Relay>,
1449    Path(token): Path<String>,
1450    Query(q): Query<InviteScriptQuery>,
1451) -> impl IntoResponse {
1452    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1453    // path-traversal try never reaches the map lookup.
1454    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1455        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1456    }
1457    let want_raw_url = q.format.as_deref() == Some("url");
1458    let now = SystemTime::now()
1459        .duration_since(UNIX_EPOCH)
1460        .map(|d| d.as_secs())
1461        .unwrap_or(0);
1462    let invite_url = {
1463        let mut inner = relay.inner.lock().await;
1464        let Some(rec) = inner.invites.get_mut(&token) else {
1465            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1466        };
1467        if rec.expires_unix <= now {
1468            return (StatusCode::GONE, "this invite has expired\n").into_response();
1469        }
1470        if let Some(n) = rec.uses_remaining {
1471            if n == 0 {
1472                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1473            }
1474            // Only decrement on script-template fetch (the one that's
1475            // actually doing the pair). The raw-URL resolution path is a
1476            // lookup, not an accept.
1477            if !want_raw_url {
1478                rec.uses_remaining = Some(n - 1);
1479            }
1480        }
1481        rec.invite_url.clone()
1482    };
1483    if want_raw_url {
1484        return (
1485            StatusCode::OK,
1486            [
1487                (
1488                    axum::http::header::CONTENT_TYPE,
1489                    "text/plain; charset=utf-8",
1490                ),
1491                (
1492                    axum::http::header::CACHE_CONTROL,
1493                    "private, no-store, max-age=0",
1494                ),
1495            ],
1496            invite_url,
1497        )
1498            .into_response();
1499    }
1500    let escaped = invite_url.replace('\'', "'\\''");
1501    let script = format!(
1502        "#!/bin/sh\n\
1503         # wire — one-curl onboarding (install + pair in one shot)\n\
1504         # source: https://github.com/SlanchaAi/wire\n\
1505         set -eu\n\
1506         INVITE='{escaped}'\n\
1507         echo \"\u{2192} checking for wire CLI...\"\n\
1508         if ! command -v wire >/dev/null 2>&1; then\n  \
1509           echo \"\u{2192} wire not installed; installing first...\"\n  \
1510           curl -fsSL https://wireup.net/install.sh | sh\n  \
1511           case \":$PATH:\" in\n    \
1512             *:\"$HOME/.local/bin\":*) ;;\n    \
1513             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1514           esac\n  \
1515           if ! command -v wire >/dev/null 2>&1; then\n    \
1516             echo \"\"\n    \
1517             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1518             echo \"Open a new shell, then run:\"\n    \
1519             echo \"  wire accept '$INVITE'\"\n    \
1520             exit 0\n  \
1521           fi\n\
1522         fi\n\
1523         echo \"\u{2192} accepting invite...\"\n\
1524         wire accept \"$INVITE\"\n"
1525    );
1526    (
1527        StatusCode::OK,
1528        [
1529            (
1530                axum::http::header::CONTENT_TYPE,
1531                "text/x-shellscript; charset=utf-8",
1532            ),
1533            (
1534                axum::http::header::CACHE_CONTROL,
1535                "private, no-store, max-age=0",
1536            ),
1537        ],
1538        script,
1539    )
1540        .into_response()
1541}
1542
1543async fn handles_directory(
1544    State(relay): State<Relay>,
1545    Query(q): Query<HandlesDirectoryQuery>,
1546) -> impl IntoResponse {
1547    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1548    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1549    let inner = relay.inner.lock().await;
1550    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1551    drop(inner);
1552    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1553
1554    let cursor = q.cursor.as_deref();
1555    let mut eligible = Vec::new();
1556    for rec in records {
1557        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1558            continue;
1559        }
1560        // Hygiene: hide test-shaped nicks from the public directory. Records
1561        // remain claimed (FCFS protection persists), they just don't surface
1562        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1563        // `test-` for integration runs.
1564        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1565            continue;
1566        }
1567        // v0.5.19 (#9.1): operator opt-out — hidden handles skip the
1568        // bulk directory but still resolve via `.well-known/wire/agent?
1569        // handle=X` for out-of-band sharing.
1570        if !rec.is_discoverable() {
1571            continue;
1572        }
1573        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1574        if profile
1575            .get("listed")
1576            .and_then(Value::as_bool)
1577            .is_some_and(|listed| !listed)
1578        {
1579            continue;
1580        }
1581        if let Some(want) = &vibe_filter {
1582            let matched = profile
1583                .get("vibe")
1584                .and_then(Value::as_array)
1585                .map(|arr| {
1586                    arr.iter().any(|v| {
1587                        v.as_str()
1588                            .map(|s| s.eq_ignore_ascii_case(want))
1589                            .unwrap_or(false)
1590                    })
1591                })
1592                .unwrap_or(false);
1593            if !matched {
1594                continue;
1595            }
1596        }
1597        eligible.push((rec, profile));
1598    }
1599
1600    let has_more = eligible.len() > limit;
1601    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1602    let next_cursor = if has_more {
1603        page.last().map(|(rec, _)| rec.nick.clone())
1604    } else {
1605        None
1606    };
1607    let handles: Vec<Value> = page
1608        .into_iter()
1609        .map(|(rec, profile)| {
1610            // v0.12.1: fall back to the DID-derived persona emoji when the
1611            // card carries no explicit profile emoji, so every phonebook
1612            // line shows a face next to the name. The persona is a
1613            // deterministic function of the DID, so the relay can compute it
1614            // without the claimant having set anything.
1615            let emoji = profile
1616                .get("emoji")
1617                .and_then(Value::as_str)
1618                .filter(|s| !s.is_empty())
1619                .map(str::to_string)
1620                .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1621            json!({
1622                "nick": rec.nick,
1623                "did": rec.did,
1624                "profile": {
1625                    "emoji": emoji,
1626                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1627                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1628                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1629                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1630                },
1631                "claimed_at": rec.claimed_at,
1632            })
1633        })
1634        .collect();
1635    (
1636        StatusCode::OK,
1637        Json(json!({
1638            "handles": handles,
1639            "next_cursor": next_cursor,
1640        })),
1641    )
1642        .into_response()
1643}
1644
1645/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1646/// into a known nick's slot WITHOUT needing that slot's bearer token.
1647///
1648/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1649/// reachability, but NEVER its `slot_token` (that would leak read+write
1650/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1651/// a stranger to deliver their signed agent-card to the nick's owner. This
1652/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1653/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1654/// in the body must verify-OK on its own.
1655///
1656/// Rate-limiting is the same governor that gates the other write endpoints.
1657/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1658async fn handle_intro(
1659    State(relay): State<Relay>,
1660    Path(nick): Path<String>,
1661    Json(req): Json<PostEventRequest>,
1662) -> impl IntoResponse {
1663    // Look up the nick. Must already be claimed.
1664    let slot_id = {
1665        let inner = relay.inner.lock().await;
1666        match inner.handles.get(&nick) {
1667            Some(rec) => rec.slot_id.clone(),
1668            None => {
1669                return (
1670                    StatusCode::NOT_FOUND,
1671                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1672                )
1673                    .into_response();
1674            }
1675        }
1676    };
1677
1678    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1679    // to the standard /v1/events/:slot_id with bearer auth.
1680    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1681    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1682    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1683        return (
1684            StatusCode::BAD_REQUEST,
1685            Json(json!({
1686                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1687                "got_kind": kind,
1688                "got_type": type_str,
1689            })),
1690        )
1691            .into_response();
1692    }
1693
1694    // Body must embed a signed agent-card (so the receiver can pin from it).
1695    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1696        Some(c) => c.clone(),
1697        None => {
1698            return (
1699                StatusCode::BAD_REQUEST,
1700                Json(json!({"error": "intro event body must embed 'card' field"})),
1701            )
1702                .into_response();
1703        }
1704    };
1705    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1706        return (
1707            StatusCode::BAD_REQUEST,
1708            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1709        )
1710            .into_response();
1711    }
1712
1713    // Size + quota checks (same as post_event).
1714    let body_bytes = match serde_json::to_vec(&req.event) {
1715        Ok(b) => b,
1716        Err(e) => {
1717            return (
1718                StatusCode::BAD_REQUEST,
1719                Json(json!({"error": format!("event not serializable: {e}")})),
1720            )
1721                .into_response();
1722        }
1723    };
1724    if body_bytes.len() > MAX_EVENT_BYTES {
1725        return (
1726            StatusCode::PAYLOAD_TOO_LARGE,
1727            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1728        )
1729            .into_response();
1730    }
1731    {
1732        let inner = relay.inner.lock().await;
1733        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1734        if used + body_bytes.len() > MAX_SLOT_BYTES {
1735            return (
1736                StatusCode::PAYLOAD_TOO_LARGE,
1737                Json(json!({
1738                    "error": "target slot quota exceeded",
1739                    "slot_bytes_used": used,
1740                    "slot_bytes_max": MAX_SLOT_BYTES,
1741                })),
1742            )
1743                .into_response();
1744        }
1745    }
1746
1747    let event_id = req
1748        .event
1749        .get("event_id")
1750        .and_then(Value::as_str)
1751        .map(str::to_string);
1752
1753    // Dedupe by event_id if present.
1754    let dup = {
1755        let inner = relay.inner.lock().await;
1756        let slot = inner.slots.get(&slot_id);
1757        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1758            slot.iter()
1759                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1760        } else {
1761            false
1762        }
1763    };
1764    if dup {
1765        return (
1766            StatusCode::OK,
1767            Json(json!({"event_id": event_id, "status": "duplicate"})),
1768        )
1769            .into_response();
1770    }
1771
1772    {
1773        let mut inner = relay.inner.lock().await;
1774        let event_size = body_bytes.len();
1775        let slot = inner.slots.entry(slot_id.clone()).or_default();
1776        slot.push(req.event.clone());
1777        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1778    }
1779    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1780        return (
1781            StatusCode::INTERNAL_SERVER_ERROR,
1782            Json(json!({"error": format!("persist failed: {e}")})),
1783        )
1784            .into_response();
1785    }
1786    (
1787        StatusCode::CREATED,
1788        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1789    )
1790        .into_response()
1791}
1792
1793/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1794/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1795/// slot coords if claimed; 404 if not.
1796///
1797/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1798/// Domain part is ignored (the relay only serves nicks it has on file).
1799/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1800/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1801/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1802/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1803///
1804/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1805/// under the standard A2A `extensions` array using the wire extension URI.
1806/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1807/// wire-native clients get the full richer card by following the extension.
1808async fn well_known_agent_card_a2a(
1809    State(relay): State<Relay>,
1810    Query(q): Query<WellKnownAgentQuery>,
1811) -> impl IntoResponse {
1812    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1813    if nick.is_empty() {
1814        return (
1815            StatusCode::BAD_REQUEST,
1816            Json(json!({"error": "handle missing nick"})),
1817        )
1818            .into_response();
1819    }
1820    let inner = relay.inner.lock().await;
1821    let rec = match inner.handles.get(&nick) {
1822        Some(r) => r.clone(),
1823        None => {
1824            return (
1825                StatusCode::NOT_FOUND,
1826                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1827            )
1828                .into_response();
1829        }
1830    };
1831    drop(inner);
1832
1833    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1834    let description = profile
1835        .get("motto")
1836        .and_then(Value::as_str)
1837        .unwrap_or("")
1838        .to_string();
1839    let display_name = profile
1840        .get("display_name")
1841        .and_then(Value::as_str)
1842        .unwrap_or(&rec.nick)
1843        .to_string();
1844    let relay_url = rec.relay_url.clone().unwrap_or_default();
1845    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1846    let endpoint = if !relay_url.is_empty() {
1847        format!(
1848            "{}/v1/handle/intro/{}",
1849            relay_url.trim_end_matches('/'),
1850            rec.nick
1851        )
1852    } else {
1853        format!("/v1/handle/intro/{}", rec.nick)
1854    };
1855    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1856
1857    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1858    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1859    // parses without custom code.
1860    let a2a_card = json!({
1861        "id": rec.did,
1862        "name": display_name,
1863        "description": description,
1864        "version": "wire/0.5",
1865        "endpoint": endpoint,
1866        "provider": {
1867            "name": "wire",
1868            "url": "https://github.com/SlanchaAi/wire"
1869        },
1870        "capabilities": {
1871            "streaming": false,
1872            "pushNotifications": false,
1873            "extendedAgentCard": true
1874        },
1875        "securitySchemes": {
1876            "ed25519-event-sig": {
1877                "type": "signature",
1878                "alg": "EdDSA",
1879                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1880            }
1881        },
1882        "security": [{"ed25519-event-sig": []}],
1883        "skills": [],
1884        "extensions": [{
1885            // A2A extension URIs are opaque namespace identifiers, not
1886            // forwardable URLs. Changing this string is a coordinated
1887            // federation-spec bump because peers match it exactly.
1888            "uri": "https://slancha.ai/wire/ext/v0.5",
1889            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1890            "required": false,
1891            "params": {
1892                "did": rec.did,
1893                "handle": rec.nick,
1894                "slot_id": rec.slot_id,
1895                "relay_url": rec.relay_url,
1896                "card": rec.card,
1897                "profile": profile,
1898                "claimed_at": rec.claimed_at,
1899            }
1900        }],
1901        "signature": card_sig,
1902    });
1903    (StatusCode::OK, Json(a2a_card)).into_response()
1904}
1905
1906async fn well_known_agent(
1907    State(relay): State<Relay>,
1908    Query(q): Query<WellKnownAgentQuery>,
1909) -> impl IntoResponse {
1910    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1911    if nick.is_empty() {
1912        return (
1913            StatusCode::BAD_REQUEST,
1914            Json(json!({"error": "handle missing nick"})),
1915        )
1916            .into_response();
1917    }
1918    let inner = relay.inner.lock().await;
1919    match inner.handles.get(&nick) {
1920        Some(rec) => (
1921            StatusCode::OK,
1922            Json(json!({
1923                "nick": rec.nick,
1924                "did": rec.did,
1925                "card": rec.card,
1926                "slot_id": rec.slot_id,
1927                "relay_url": rec.relay_url,
1928                "claimed_at": rec.claimed_at,
1929            })),
1930        )
1931            .into_response(),
1932        None => (
1933            StatusCode::NOT_FOUND,
1934            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1935        )
1936            .into_response(),
1937    }
1938}
1939
1940async fn list_events(
1941    State(relay): State<Relay>,
1942    Path(slot_id): Path<String>,
1943    Query(q): Query<ListEventsQuery>,
1944    headers: HeaderMap,
1945) -> impl IntoResponse {
1946    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1947        return resp;
1948    }
1949    let limit = q.limit.unwrap_or(100).min(1000);
1950    let mut inner = relay.inner.lock().await;
1951    // R4: record this pull as proof that the slot owner is still polling.
1952    // Anyone holding the slot_token (i.e., a paired peer) can later read
1953    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1954    let now_unix = std::time::SystemTime::now()
1955        .duration_since(std::time::UNIX_EPOCH)
1956        .map(|d| d.as_secs())
1957        .unwrap_or(0);
1958    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1959    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1960    let start = match q.since {
1961        Some(eid) => events
1962            .iter()
1963            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1964            .map(|i| i + 1)
1965            .unwrap_or(0),
1966        None => 0,
1967    };
1968    let end = (start + limit).min(events.len());
1969    let slice = events[start..end].to_vec();
1970    (StatusCode::OK, Json(slice)).into_response()
1971}
1972
1973/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1974/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1975/// recent `list_events` call, in unix seconds) and `event_count` (total
1976/// stored). A remote sender uses this before `wire send <peer>` to warn the
1977/// operator if the peer hasn't polled recently.
1978async fn slot_state(
1979    State(relay): State<Relay>,
1980    Path(slot_id): Path<String>,
1981    headers: HeaderMap,
1982) -> impl IntoResponse {
1983    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1984        return resp;
1985    }
1986    let inner = relay.inner.lock().await;
1987    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1988    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1989    let responder_health = inner.responder_health.get(&slot_id).cloned();
1990    (
1991        StatusCode::OK,
1992        Json(json!({
1993            "slot_id": slot_id,
1994            "event_count": event_count,
1995            "last_pull_at_unix": last_pull_at_unix,
1996            "responder_health": responder_health,
1997        })),
1998    )
1999        .into_response()
2000}
2001
2002async fn responder_health_set(
2003    State(relay): State<Relay>,
2004    Path(slot_id): Path<String>,
2005    headers: HeaderMap,
2006    Json(record): Json<ResponderHealthRecord>,
2007) -> impl IntoResponse {
2008    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
2009        return resp;
2010    }
2011    let path = relay
2012        .state_dir
2013        .join("responder-health")
2014        .join(format!("{slot_id}.json"));
2015    let body = match serde_json::to_vec_pretty(&record) {
2016        Ok(b) => b,
2017        Err(e) => {
2018            return (
2019                StatusCode::INTERNAL_SERVER_ERROR,
2020                Json(json!({"error": format!("serialize failed: {e}")})),
2021            )
2022                .into_response();
2023        }
2024    };
2025    if let Err(e) = tokio::fs::write(&path, body).await {
2026        return (
2027            StatusCode::INTERNAL_SERVER_ERROR,
2028            Json(json!({"error": format!("persist failed: {e}")})),
2029        )
2030            .into_response();
2031    }
2032    {
2033        let mut inner = relay.inner.lock().await;
2034        inner
2035            .responder_health
2036            .insert(slot_id.clone(), record.clone());
2037    }
2038    (StatusCode::OK, Json(record)).into_response()
2039}
2040
2041async fn check_token(
2042    relay: &Relay,
2043    headers: &HeaderMap,
2044    slot_id: &str,
2045) -> std::result::Result<(), axum::response::Response> {
2046    let auth = headers
2047        .get(AUTHORIZATION)
2048        .and_then(|h| h.to_str().ok())
2049        .and_then(|s| s.strip_prefix("Bearer "))
2050        .map(str::to_string);
2051    let presented = match auth {
2052        Some(t) => t,
2053        None => {
2054            return Err((
2055                StatusCode::UNAUTHORIZED,
2056                Json(json!({"error": "missing Bearer token"})),
2057            )
2058                .into_response());
2059        }
2060    };
2061    let inner = relay.inner.lock().await;
2062    let expected = match inner.tokens.get(slot_id) {
2063        Some(t) => t.clone(),
2064        None => {
2065            return Err((
2066                StatusCode::NOT_FOUND,
2067                Json(json!({"error": "unknown slot"})),
2068            )
2069                .into_response());
2070        }
2071    };
2072    drop(inner);
2073    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2074        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2075    }
2076    Ok(())
2077}
2078
2079fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2080    if a.len() != b.len() {
2081        return false;
2082    }
2083    let mut acc = 0u8;
2084    for (x, y) in a.iter().zip(b.iter()) {
2085        acc |= x ^ y;
2086    }
2087    acc == 0
2088}
2089
2090fn is_valid_slot_id(s: &str) -> bool {
2091    s.len() == 32
2092        && s.bytes()
2093            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2094}
2095
2096fn random_hex(n_bytes: usize) -> String {
2097    let mut buf = vec![0u8; n_bytes];
2098    rand::thread_rng().fill_bytes(&mut buf);
2099    hex::encode(buf)
2100}
2101
2102/// Run the relay until SIGINT/SIGTERM.
2103pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2104    serve_with_mode(bind, state_dir, ServerMode::default()).await
2105}
2106
2107/// v0.5.17: server-mode-aware entry point. Same as `serve` but with the
2108/// `--local-only` toggle exposed so the binary can refuse to publish
2109/// phonebook + well-known surfaces on within-machine relays.
2110pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2111    let relay = Relay::new(state_dir).await?;
2112    relay.spawn_pair_sweeper();
2113    relay.spawn_counter_persister();
2114    let app = relay.clone().router_with_mode(mode);
2115    let listener = tokio::net::TcpListener::bind(bind)
2116        .await
2117        .with_context(|| format!("binding {bind}"))?;
2118    if mode.local_only {
2119        eprintln!(
2120            "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2121        );
2122    } else {
2123        eprintln!("wire relay-server listening on {bind}");
2124    }
2125    let shutdown_relay = relay.clone();
2126    axum::serve(listener, app)
2127        .with_graceful_shutdown(async move {
2128            let _ = tokio::signal::ctrl_c().await;
2129            eprintln!("\nshutting down — final counter snapshot");
2130            if let Err(e) = shutdown_relay.persist_counters().await {
2131                eprintln!("final counter persist failed: {e}");
2132            }
2133        })
2134        .await?;
2135    Ok(())
2136}
2137
2138/// v0.7.0-alpha.16: Unix Domain Socket entry point. Mirrors `serve_with_mode`
2139/// but binds to a UDS path instead of a TCP host:port. Implicitly forces
2140/// `mode.local_only = true` because UDS has no concept of "publish to a
2141/// public phonebook." Chmods the socket 0600 so only the owner uid can
2142/// connect — the SO_PEERCRED-equivalent trust anchor for sister sessions.
2143///
2144/// Removes any stale socket file at `path` first (otherwise bind fails
2145/// with EADDRINUSE). Removes the socket on graceful shutdown.
2146///
2147/// Implementation note: axum 0.7's `serve` is TcpListener-only, so we
2148/// run the manual hyper accept loop (per-connection
2149/// `hyper::server::conn::http1::Builder`). When axum 0.8 ships with
2150/// generic `Listener` support, this can collapse to one line.
2151#[cfg(unix)]
2152pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2153    use hyper::server::conn::http1;
2154    use hyper_util::rt::TokioIo;
2155    use tower_service::Service;
2156
2157    // Best-effort cleanup of stale socket file.
2158    if socket_path.exists() {
2159        std::fs::remove_file(&socket_path)
2160            .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2161    }
2162    if let Some(parent) = socket_path.parent() {
2163        std::fs::create_dir_all(parent)
2164            .with_context(|| format!("creating socket parent {parent:?}"))?;
2165    }
2166    let relay = Relay::new(state_dir).await?;
2167    relay.spawn_pair_sweeper();
2168    relay.spawn_counter_persister();
2169    let app: axum::Router = relay
2170        .clone()
2171        .router_with_mode(ServerMode { local_only: true });
2172    let listener = tokio::net::UnixListener::bind(&socket_path)
2173        .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2174
2175    // 0600: owner-rw only. Trust anchor is the kernel-attested peer uid
2176    // (SO_PEERCRED equivalent); chmod is defense-in-depth.
2177    use std::os::unix::fs::PermissionsExt;
2178    if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2179        eprintln!(
2180            "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2181             socket may be accessible to other uids. Investigate."
2182        );
2183    }
2184    eprintln!(
2185        "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2186        socket_path.display()
2187    );
2188
2189    // Manual accept loop + per-conn hyper serve (axum 0.7's serve is
2190    // TcpListener-only). On SIGINT, persist counters + remove socket.
2191    let shutdown_relay = relay.clone();
2192    let socket_path_for_cleanup = socket_path.clone();
2193    let mut make_service = app.into_make_service();
2194
2195    let serve_loop = async {
2196        loop {
2197            let (stream, _peer_addr) = match listener.accept().await {
2198                Ok(p) => p,
2199                Err(e) => {
2200                    eprintln!("wire relay-server (UDS): accept failed: {e}");
2201                    continue;
2202                }
2203            };
2204            let tower_service = match make_service.call(&stream).await {
2205                Ok(s) => s,
2206                Err(infallible) => match infallible {},
2207            };
2208            let io = TokioIo::new(stream);
2209            let hyper_service =
2210                hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2211                    let mut svc = tower_service.clone();
2212                    async move { Service::call(&mut svc, req).await }
2213                });
2214            tokio::task::spawn(async move {
2215                if let Err(e) = http1::Builder::new()
2216                    .serve_connection(io, hyper_service)
2217                    .await
2218                {
2219                    // Connection-level error; not fatal to the listener.
2220                    if !e.is_incomplete_message() {
2221                        eprintln!("wire relay-server (UDS): conn error: {e}");
2222                    }
2223                }
2224            });
2225        }
2226    };
2227
2228    let shutdown = async {
2229        let _ = tokio::signal::ctrl_c().await;
2230        eprintln!("\nshutting down — final counter snapshot");
2231        if let Err(e) = shutdown_relay.persist_counters().await {
2232            eprintln!("final counter persist failed: {e}");
2233        }
2234        let _ = std::fs::remove_file(&socket_path_for_cleanup);
2235    };
2236
2237    tokio::select! {
2238        _ = serve_loop => {},
2239        _ = shutdown => {},
2240    };
2241    Ok(())
2242}
2243
2244#[cfg(not(unix))]
2245pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2246    Err(anyhow::anyhow!(
2247        "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2248         Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2249    ))
2250}
2251
2252/// v0.5.17: relay-server mode toggles. Default = full federation
2253/// (current behavior). `local_only` strips phonebook + well-known
2254/// surfaces so the relay is invisible from off-box and from any
2255/// directory-scraping agent on the same box.
2256#[derive(Debug, Clone, Copy, Default)]
2257pub struct ServerMode {
2258    /// When true, skip phonebook listing + `.well-known/wire/agent` +
2259    /// `.well-known/agent-card.json` + landing/stats pages. Pair this
2260    /// with a loopback bind (`--local-only` enforces this at the CLI
2261    /// layer) for genuinely within-machine traffic.
2262    pub local_only: bool,
2263}
2264
2265#[cfg(test)]
2266mod tests {
2267    use super::*;
2268
2269    #[test]
2270    fn constant_time_eq_basic() {
2271        assert!(constant_time_eq(b"abc", b"abc"));
2272        assert!(!constant_time_eq(b"abc", b"abd"));
2273        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2274    }
2275
2276    #[test]
2277    fn random_hex_length() {
2278        let s = random_hex(16);
2279        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2280        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2281    }
2282
2283    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2284    async fn pair_slot_evicts_when_idle_past_ttl() {
2285        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2286        let _ = std::fs::remove_dir_all(&dir);
2287        let relay = Relay::new(dir.clone()).await.unwrap();
2288
2289        // Seed a pair-slot manually with a past last_touched.
2290        {
2291            let mut inner = relay.inner.lock().await;
2292            inner
2293                .pair_lookup
2294                .insert("hash-A".to_string(), "id-A".to_string());
2295            inner.pair_slots.insert(
2296                "id-A".to_string(),
2297                PairSlot {
2298                    last_touched: std::time::Instant::now()
2299                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2300                    ..PairSlot::default()
2301                },
2302            );
2303
2304            // And a fresh one — should survive.
2305            inner
2306                .pair_lookup
2307                .insert("hash-B".to_string(), "id-B".to_string());
2308            inner
2309                .pair_slots
2310                .insert("id-B".to_string(), PairSlot::default());
2311
2312            assert_eq!(inner.pair_slots.len(), 2);
2313            assert_eq!(inner.pair_lookup.len(), 2);
2314        }
2315
2316        relay.evict_expired_pair_slots().await;
2317
2318        let inner = relay.inner.lock().await;
2319        assert_eq!(
2320            inner.pair_slots.len(),
2321            1,
2322            "expired slot should have been evicted"
2323        );
2324        assert!(inner.pair_slots.contains_key("id-B"));
2325        assert_eq!(inner.pair_lookup.len(), 1);
2326        assert!(inner.pair_lookup.contains_key("hash-B"));
2327        let _ = std::fs::remove_dir_all(&dir);
2328    }
2329
2330    #[test]
2331    fn slot_id_validator_accepts_only_lowercase_32hex() {
2332        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2333        assert!(is_valid_slot_id(&random_hex(16)));
2334        // wrong length
2335        assert!(!is_valid_slot_id("abc"));
2336        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2337        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2338        // uppercase
2339        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2340        // path traversal attempts
2341        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2342        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2343        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2344        // null bytes
2345        assert!(!is_valid_slot_id(
2346            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2347        ));
2348    }
2349}