Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168    /// v0.5.19 (#9.1): if false, this handle is omitted from the
169    /// `/v1/handles` directory listing — operator opted out of bulk
170    /// discovery. The `.well-known/wire/agent?handle=X` direct lookup
171    /// still resolves so existing peers + out-of-band sharing continue
172    /// to work. Default `None` = discoverable (back-compat for records
173    /// claimed pre-v0.5.19).
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub discoverable: Option<bool>,
176}
177
178impl HandleRecord {
179    /// Effective discoverability: defaults to true when the field is
180    /// absent (pre-v0.5.19 records).
181    fn is_discoverable(&self) -> bool {
182        self.discoverable.unwrap_or(true)
183    }
184}
185
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct ResponderHealthRecord {
188    pub status: String,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    pub reason: Option<String>,
191    #[serde(default, skip_serializing_if = "Option::is_none")]
192    pub last_success_at: Option<String>,
193    pub set_at: String,
194}
195
196#[derive(Clone, Debug)]
197struct PairSlot {
198    /// SPAKE2 message from the host side.
199    host_msg: Option<String>,
200    /// SPAKE2 message from the guest side.
201    guest_msg: Option<String>,
202    /// Sealed bootstrap payload from host (after SAS confirm).
203    host_bootstrap: Option<String>,
204    /// Sealed bootstrap payload from guest.
205    guest_bootstrap: Option<String>,
206    /// Last activity time (monotonic) — used for TTL eviction.
207    last_touched: std::time::Instant,
208}
209
210impl Default for PairSlot {
211    fn default() -> Self {
212        Self {
213            host_msg: None,
214            guest_msg: None,
215            host_bootstrap: None,
216            guest_bootstrap: None,
217            last_touched: std::time::Instant::now(),
218        }
219    }
220}
221
222/// Pair-slot idle TTL. After this many seconds without activity, the slot
223/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
224const PAIR_SLOT_TTL_SECS: u64 = 300;
225
226#[derive(Deserialize)]
227pub struct AllocateRequest {
228    /// Optional handle hint — purely informational, server doesn't enforce.
229    #[serde(default)]
230    pub handle: Option<String>,
231}
232
233#[derive(Deserialize)]
234pub struct PostEventRequest {
235    pub event: Value,
236}
237
238#[derive(Deserialize)]
239pub struct ListEventsQuery {
240    /// Resume from after this event_id (exclusive). Omit for full slot read.
241    pub since: Option<String>,
242    /// Max events to return. Default 100, max 1000.
243    pub limit: Option<usize>,
244}
245
246impl Relay {
247    pub async fn new(state_dir: PathBuf) -> Result<Self> {
248        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
249        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
250        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
251        let mut inner = Inner {
252            slots: HashMap::new(),
253            tokens: HashMap::new(),
254            slot_bytes: HashMap::new(),
255            last_pull_at_unix: HashMap::new(),
256            streams: HashMap::new(),
257            pair_lookup: HashMap::new(),
258            pair_slots: HashMap::new(),
259            handles: HashMap::new(),
260            responder_health: HashMap::new(),
261            invites: HashMap::new(),
262        };
263        // Reload tokens
264        let token_path = state_dir.join("tokens.json");
265        if token_path.exists() {
266            let body = tokio::fs::read_to_string(&token_path).await?;
267            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
268        }
269        // Reload slots from JSONL
270        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
271        while let Some(entry) = slots_dir.next_entry().await? {
272            let path = entry.path();
273            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
274                continue;
275            }
276            let stem = match path.file_stem().and_then(|s| s.to_str()) {
277                Some(s) => s.to_string(),
278                None => continue,
279            };
280            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
281            let mut events = Vec::new();
282            for line in body.lines() {
283                if let Ok(v) = serde_json::from_str::<Value>(line) {
284                    events.push(v);
285                }
286            }
287            // Recompute byte usage for the slot from its persisted events.
288            let bytes: usize = events
289                .iter()
290                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
291                .sum();
292            inner.slot_bytes.insert(stem.clone(), bytes);
293            inner.slots.insert(stem, events);
294        }
295        // Reload handle directory (v0.5).
296        let handles_dir = state_dir.join("handles");
297        if handles_dir.exists() {
298            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
299            while let Some(entry) = rd.next_entry().await? {
300                let path = entry.path();
301                if path.extension().and_then(|x| x.to_str()) != Some("json") {
302                    continue;
303                }
304                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
305                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
306                    inner.handles.insert(rec.nick.clone(), rec);
307                }
308            }
309        }
310        // Reload responder health records (R3).
311        let responder_health_dir = state_dir.join("responder-health");
312        if responder_health_dir.exists() {
313            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
314            while let Some(entry) = rd.next_entry().await? {
315                let path = entry.path();
316                if path.extension().and_then(|x| x.to_str()) != Some("json") {
317                    continue;
318                }
319                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
320                    continue;
321                };
322                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
323                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
324                    inner.responder_health.insert(slot_id.to_string(), rec);
325                }
326            }
327        }
328        // Reload short-URL invites. JSONL append-only; later entries with
329        // the same token overwrite earlier (won't happen — tokens are
330        // unique by construction — but coded defensively).
331        let invites_path = state_dir.join("invites.jsonl");
332        if invites_path.exists() {
333            let now_unix = SystemTime::now()
334                .duration_since(UNIX_EPOCH)
335                .map(|d| d.as_secs())
336                .unwrap_or(0);
337            let body = tokio::fs::read_to_string(&invites_path)
338                .await
339                .unwrap_or_default();
340            for line in body.lines() {
341                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
342                    && rec.expires_unix > now_unix
343                {
344                    inner.invites.insert(rec.token.clone(), rec);
345                }
346            }
347        }
348        let boot_unix = SystemTime::now()
349            .duration_since(UNIX_EPOCH)
350            .map(|d| d.as_secs())
351            .unwrap_or(0);
352        // Reload counter snapshot. Missing/corrupt file → start at zero.
353        let snap: CountersSnapshot =
354            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
355                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
356                Err(_) => CountersSnapshot::default(),
357            };
358        Ok(Self {
359            inner: Arc::new(Mutex::new(inner)),
360            state_dir,
361            counters: Arc::new(RelayCounters {
362                boot_unix,
363                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
364                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
365                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
366                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
367                events_posted_total: AtomicU64::new(snap.events_posted_total),
368            }),
369        })
370    }
371
372    pub fn router(self) -> Router {
373        self.router_with_mode(ServerMode::default())
374    }
375
376    pub fn router_with_mode(self, mode: ServerMode) -> Router {
377        // Rate limit applied to write endpoints that create new state (slots,
378        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
379        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
380        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
381        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
382        // global cap shoulder DDoS protection in series.
383        let governor_conf = std::sync::Arc::new(
384            GovernorConfigBuilder::default()
385                .per_second(10)
386                .burst_size(50)
387                .key_extractor(GlobalKeyExtractor)
388                .finish()
389                .expect("valid governor config"),
390        );
391        let governor_layer = GovernorLayer {
392            config: governor_conf,
393        };
394
395        // Hot writes group — rate limited.
396        let hot_writes = Router::new()
397            .route("/v1/slot/allocate", post(allocate_slot))
398            .route("/v1/pair", post(pair_open))
399            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
400            .route("/v1/pair/abandon", post(pair_abandon))
401            .layer(governor_layer);
402
403        // Core data-plane routes: events, pair, slots, healthz. Always
404        // present, in both federation and local-only modes.
405        let mut router = Router::new()
406            .route("/healthz", get(healthz))
407            .route("/v1/events/:slot_id", post(post_event).get(list_events))
408            .route("/v1/slot/:slot_id/state", get(slot_state))
409            .route(
410                "/v1/slot/:slot_id/responder-health",
411                post(responder_health_set),
412            )
413            .route("/v1/events/:slot_id/stream", get(stream_events))
414            .route("/v1/pair/:pair_id", get(pair_get))
415            .route("/v1/handle/intro/:nick", post(handle_intro));
416
417        // Discovery + landing surfaces: phonebook, well-known agent cards,
418        // landing page, stats, invite landing. In `--local-only` mode we
419        // skip all of these — the relay becomes invisible from the outside
420        // (and from other agents on the same box that might enumerate it).
421        // This is the v0.5.17 within-machine privacy guarantee.
422        if !mode.local_only {
423            router = router
424                .route("/", get(landing_index))
425                .route("/favicon.svg", get(landing_favicon))
426                .route("/og.png", get(landing_og))
427                .route("/demo.cast", get(landing_demo_cast))
428                .route("/install", get(landing_install_sh))
429                .route("/install.sh", get(landing_install_sh))
430                .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
431                .route("/stats", get(stats_root))
432                .route("/stats.json", get(stats_json))
433                .route("/stats.html", get(landing_stats_html))
434                .route("/stats.history", get(stats_history))
435                .route("/phonebook", get(landing_phonebook_html))
436                .route("/phonebook.html", get(landing_phonebook_html))
437                .route("/v1/handle/claim", post(handle_claim))
438                .route("/v1/handles", get(handles_directory))
439                .route("/v1/invite/register", post(invite_register))
440                .route("/i/:token", get(invite_script))
441                .route("/.well-known/wire/agent", get(well_known_agent))
442                .route(
443                    "/.well-known/agent-card.json",
444                    get(well_known_agent_card_a2a),
445                );
446        } else {
447            // Local-only mode still needs handle_claim for in-process
448            // session bootstrap (`wire session new` allocates a local
449            // slot AND claims a handle on the local relay so peers can
450            // resolve it). The claim is gated to loopback-only callers
451            // by the bind, not by the route.
452            router = router.route("/v1/handle/claim", post(handle_claim));
453        }
454
455        router.merge(hot_writes).with_state(self)
456    }
457
458    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
459    /// Called inline on every pair-slot mutation; a background sweeper task
460    /// (see `Self::start_sweeper`) covers the long-idle case.
461    async fn evict_expired_pair_slots(&self) {
462        let now = std::time::Instant::now();
463        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
464        let mut inner = self.inner.lock().await;
465        let mut to_remove = Vec::new();
466        for (id, slot) in inner.pair_slots.iter() {
467            if now.duration_since(slot.last_touched) > ttl {
468                to_remove.push(id.clone());
469            }
470        }
471        for id in to_remove {
472            inner.pair_slots.remove(&id);
473            inner.pair_lookup.retain(|_, v| v != &id);
474        }
475    }
476
477    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
478    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
479    /// — process exit reaps it. Safe to skip in tests where you'd rather test
480    /// eviction inline.
481    pub fn spawn_pair_sweeper(&self) {
482        let me = self.clone();
483        tokio::spawn(async move {
484            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
485            loop {
486                tick.tick().await;
487                me.evict_expired_pair_slots().await;
488            }
489        });
490    }
491
492    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
493    /// every 30s by `spawn_counter_persister` and once during graceful
494    /// shutdown so a deploy doesn't reset the running totals.
495    pub async fn persist_counters(&self) -> Result<()> {
496        let snap = CountersSnapshot {
497            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
498            handle_first_claims_total: self
499                .counters
500                .handle_first_claims_total
501                .load(Ordering::Relaxed),
502            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
503            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
504            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
505        };
506        let body = serde_json::to_vec_pretty(&snap)?;
507        let path = self.state_dir.join("counters.json");
508        tokio::fs::write(path, body).await?;
509        Ok(())
510    }
511
512    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
513    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
514    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
515    /// roll old entries off once the history exceeds 90 days.
516    pub async fn append_history(&self) -> Result<()> {
517        use tokio::io::AsyncWriteExt;
518        let now = SystemTime::now()
519            .duration_since(UNIX_EPOCH)
520            .map(|d| d.as_secs())
521            .unwrap_or(0);
522        let (handles_active, slots_active, pair_slots_open, streams_active) = {
523            let inner = self.inner.lock().await;
524            (
525                inner.handles.len(),
526                inner.slots.len(),
527                inner.pair_slots.len(),
528                inner.streams.values().map(Vec::len).sum::<usize>(),
529            )
530        };
531        let entry = HistoryEntry {
532            ts: now,
533            handles_active,
534            slots_active,
535            pair_slots_open,
536            streams_active,
537            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
538            handle_first_claims_total: self
539                .counters
540                .handle_first_claims_total
541                .load(Ordering::Relaxed),
542            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
543            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
544            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
545        };
546        let line = serde_json::to_vec(&entry)?;
547        let path = self.state_dir.join("stats-history.jsonl");
548        let mut f = tokio::fs::OpenOptions::new()
549            .create(true)
550            .append(true)
551            .open(&path)
552            .await?;
553        f.write_all(&line).await?;
554        f.write_all(b"\n").await?;
555        f.flush().await?;
556        Ok(())
557    }
558
559    /// Spawn a background tokio task that calls `persist_counters` every 30s
560    /// + appends a history row on the same tick. Loss bound: counters can
561    ///   drift back up to 30s on crash, history can drop one row.
562    pub fn spawn_counter_persister(&self) {
563        let me = self.clone();
564        tokio::spawn(async move {
565            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
566            // First tick fires immediately; skip it so we don't write the
567            // freshly-loaded snapshot back unchanged.
568            tick.tick().await;
569            loop {
570                tick.tick().await;
571                if let Err(e) = me.persist_counters().await {
572                    eprintln!("counter persist failed: {e}");
573                }
574                if let Err(e) = me.append_history().await {
575                    eprintln!("history append failed: {e}");
576                }
577            }
578        });
579    }
580
581    async fn persist_tokens(&self) -> Result<()> {
582        let body = {
583            let inner = self.inner.lock().await;
584            serde_json::to_string_pretty(&inner.tokens)?
585        };
586        let path = self.state_dir.join("tokens.json");
587        tokio::fs::write(path, body).await?;
588        Ok(())
589    }
590
591    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
592        // Defense in depth: only allow lowercase hex slot_ids of the exact length
593        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
594        // any future code path that might let attacker-controlled slot_ids reach
595        // disk operations. allocate_slot() always meets this; this assert is
596        // belt-and-suspenders against future regressions.
597        if !is_valid_slot_id(slot_id) {
598            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
599        }
600        let path = self
601            .state_dir
602            .join("slots")
603            .join(format!("{slot_id}.jsonl"));
604        let mut line = serde_json::to_vec(event)?;
605        line.push(b'\n');
606        use tokio::io::AsyncWriteExt;
607        let mut f = tokio::fs::OpenOptions::new()
608            .create(true)
609            .append(true)
610            .open(&path)
611            .await
612            .with_context(|| format!("opening {path:?}"))?;
613        f.write_all(&line).await?;
614        f.flush().await?;
615        Ok(())
616    }
617}
618
619async fn healthz() -> impl IntoResponse {
620    (StatusCode::OK, "ok\n")
621}
622
623// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
624// process restart; state fields (`handles_active`, `slots_active`) survive
625// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
626async fn stats_history(
627    State(relay): State<Relay>,
628    Query(q): Query<StatsHistoryQuery>,
629) -> impl IntoResponse {
630    let hours = q.hours.unwrap_or(24).min(168);
631    let now = SystemTime::now()
632        .duration_since(UNIX_EPOCH)
633        .map(|d| d.as_secs())
634        .unwrap_or(0);
635    let cutoff = now.saturating_sub(hours * 3600);
636    let path = relay.state_dir.join("stats-history.jsonl");
637    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
638    let entries: Vec<Value> = body
639        .lines()
640        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
641        .filter(|v| {
642            v.get("ts")
643                .and_then(Value::as_u64)
644                .map(|t| t >= cutoff)
645                .unwrap_or(false)
646        })
647        .collect();
648    (
649        StatusCode::OK,
650        Json(json!({
651            "hours": hours,
652            "now_unix": now,
653            "count": entries.len(),
654            "entries": entries,
655        })),
656    )
657}
658
659async fn landing_stats_html() -> impl IntoResponse {
660    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
661    (
662        StatusCode::OK,
663        [
664            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
665            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
666        ],
667        STATS_HTML,
668    )
669}
670
671async fn landing_phonebook_html() -> impl IntoResponse {
672    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
673    (
674        StatusCode::OK,
675        [
676            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
677            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
678        ],
679        PHONEBOOK_HTML,
680    )
681}
682
683/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
684/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
685/// Keeps the JSON contract intact while letting humans land on the page at
686/// the short URL.
687async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
688    let wants_html = headers
689        .get(axum::http::header::ACCEPT)
690        .and_then(|v| v.to_str().ok())
691        .unwrap_or("")
692        .contains("text/html");
693    if wants_html {
694        landing_stats_html().await.into_response()
695    } else {
696        stats_json(State(relay)).await.into_response()
697    }
698}
699
700async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
701    let now = SystemTime::now()
702        .duration_since(UNIX_EPOCH)
703        .map(|d| d.as_secs())
704        .unwrap_or(0);
705    let inner = relay.inner.lock().await;
706    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
707    let body = json!({
708        "version": env!("CARGO_PKG_VERSION"),
709        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
710        "handles_active": inner.handles.len(),
711        "slots_active": inner.slots.len(),
712        "pair_slots_open": inner.pair_slots.len(),
713        "streams_active": streams_active,
714        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
715        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
716        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
717        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
718        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
719    });
720    (StatusCode::OK, Json(body))
721}
722
723// Static landing site baked into the binary so apex (wireup.net) can flip
724// straight to Fly without a separate static-host. ~37 KB total — negligible
725// against the release binary size, and keeps the relay self-contained.
726async fn landing_index() -> impl IntoResponse {
727    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
728    (
729        StatusCode::OK,
730        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
731        INDEX_HTML,
732    )
733}
734
735async fn landing_favicon() -> impl IntoResponse {
736    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
737    (
738        StatusCode::OK,
739        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
740        FAVICON_SVG,
741    )
742}
743
744async fn landing_og() -> impl IntoResponse {
745    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
746    (
747        StatusCode::OK,
748        [
749            (axum::http::header::CONTENT_TYPE, "image/png"),
750            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
751        ],
752        OG_PNG,
753    )
754}
755
756async fn landing_demo_cast() -> impl IntoResponse {
757    static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
758    (
759        StatusCode::OK,
760        [
761            (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
762            (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
763        ],
764        DEMO_CAST,
765    )
766}
767
768async fn landing_install_sh() -> impl IntoResponse {
769    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
770    (
771        StatusCode::OK,
772        [
773            (
774                axum::http::header::CONTENT_TYPE,
775                "text/x-shellscript; charset=utf-8",
776            ),
777            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
778        ],
779        INSTALL_SH,
780    )
781}
782
783async fn landing_openshell_policy_sh() -> impl IntoResponse {
784    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
785    (
786        StatusCode::OK,
787        [
788            (
789                axum::http::header::CONTENT_TYPE,
790                "text/x-shellscript; charset=utf-8",
791            ),
792            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
793        ],
794        POLICY_SH,
795    )
796}
797
798async fn allocate_slot(
799    State(relay): State<Relay>,
800    Json(_req): Json<AllocateRequest>,
801) -> impl IntoResponse {
802    let slot_id = random_hex(16);
803    let slot_token = random_hex(32);
804    {
805        let mut inner = relay.inner.lock().await;
806        inner.slots.insert(slot_id.clone(), Vec::new());
807        inner.tokens.insert(slot_id.clone(), slot_token.clone());
808    }
809    if let Err(e) = relay.persist_tokens().await {
810        return (
811            StatusCode::INTERNAL_SERVER_ERROR,
812            Json(json!({"error": format!("persist failed: {e}")})),
813        )
814            .into_response();
815    }
816    relay
817        .counters
818        .slot_allocations_total
819        .fetch_add(1, Ordering::Relaxed);
820    (
821        StatusCode::CREATED,
822        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
823    )
824        .into_response()
825}
826
827async fn post_event(
828    State(relay): State<Relay>,
829    Path(slot_id): Path<String>,
830    headers: HeaderMap,
831    Json(req): Json<PostEventRequest>,
832) -> impl IntoResponse {
833    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
834        return resp;
835    }
836    // Body size cap (rough; serialize and check).
837    let body_bytes = match serde_json::to_vec(&req.event) {
838        Ok(b) => b,
839        Err(e) => {
840            return (
841                StatusCode::BAD_REQUEST,
842                Json(json!({"error": format!("event not serializable: {e}")})),
843            )
844                .into_response();
845        }
846    };
847    if body_bytes.len() > MAX_EVENT_BYTES {
848        return (
849            StatusCode::PAYLOAD_TOO_LARGE,
850            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
851        )
852            .into_response();
853    }
854    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
855    {
856        let inner = relay.inner.lock().await;
857        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
858        if used + body_bytes.len() > MAX_SLOT_BYTES {
859            return (
860                StatusCode::PAYLOAD_TOO_LARGE,
861                Json(json!({
862                    "error": "slot quota exceeded",
863                    "slot_bytes_used": used,
864                    "slot_bytes_max": MAX_SLOT_BYTES,
865                    "remediation": "operator should `wire rotate-slot` to drain old slot",
866                })),
867            )
868                .into_response();
869        }
870    }
871    let event_id = req
872        .event
873        .get("event_id")
874        .and_then(Value::as_str)
875        .map(str::to_string);
876
877    // Dedupe by event_id if present.
878    let dup = {
879        let inner = relay.inner.lock().await;
880        let slot = inner.slots.get(&slot_id);
881        if let (Some(eid), Some(slot)) = (&event_id, slot) {
882            slot.iter()
883                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
884        } else {
885            false
886        }
887    };
888    if dup {
889        return (
890            StatusCode::OK,
891            Json(json!({"event_id": event_id, "status": "duplicate"})),
892        )
893            .into_response();
894    }
895
896    {
897        let mut inner = relay.inner.lock().await;
898        let event_size = body_bytes.len();
899        let slot = inner.slots.entry(slot_id.clone()).or_default();
900        slot.push(req.event.clone());
901        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
902    }
903    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
904        return (
905            StatusCode::INTERNAL_SERVER_ERROR,
906            Json(json!({"error": format!("persist failed: {e}")})),
907        )
908            .into_response();
909    }
910    relay
911        .counters
912        .events_posted_total
913        .fetch_add(1, Ordering::Relaxed);
914    // R1 push: broadcast the new event to every active SSE subscriber on
915    // this slot. Dead channels are pruned in-place. The broadcast happens
916    // AFTER the disk persist so subscribers and disk readers see the same
917    // events; on persist failure we already returned 500 above.
918    {
919        let mut inner = relay.inner.lock().await;
920        if let Some(subs) = inner.streams.get_mut(&slot_id) {
921            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
922        }
923    }
924    (
925        StatusCode::CREATED,
926        Json(json!({"event_id": event_id, "status": "stored"})),
927    )
928        .into_response()
929}
930
931/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
932/// (same as `list_events`). The connection registers an `UnboundedSender`
933/// on the slot's subscriber list; every subsequent `post_event` to the slot
934/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
935/// connection stays open until the client disconnects.
936///
937/// A 30-second keepalive ping is emitted automatically so reverse proxies
938/// (Cloudflare tunnel, nginx) don't time out the upstream.
939///
940/// Note: the subscriber sees events posted AFTER it subscribed. To catch
941/// up on history first, the client should call `GET /v1/events/:slot_id`
942/// with `since=` before opening the stream.
943async fn stream_events(
944    State(relay): State<Relay>,
945    Path(slot_id): Path<String>,
946    headers: HeaderMap,
947) -> axum::response::Response {
948    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
949    use futures::stream::StreamExt;
950
951    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
952        return resp;
953    }
954
955    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
956    {
957        let mut inner = relay.inner.lock().await;
958        inner.streams.entry(slot_id.clone()).or_default().push(tx);
959    }
960
961    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
962        SseEvent::default()
963            .json_data(&ev)
964            .map_err(|e| std::io::Error::other(e.to_string()))
965    });
966
967    Sse::new(stream)
968        .keep_alive(
969            KeepAlive::new()
970                .interval(std::time::Duration::from_secs(30))
971                .text("phyllis: still on the line"),
972        )
973        .into_response()
974}
975
976// ---------- pair-slot handlers ----------
977
978#[derive(Deserialize)]
979pub struct PairOpenRequest {
980    pub code_hash: String,
981    /// SPAKE2 message (base64).
982    pub msg: String,
983    pub role: String, // "host" or "guest"
984}
985
986#[derive(Deserialize)]
987pub struct PairBootstrapRequest {
988    pub role: String,
989    pub sealed: String,
990}
991
992#[derive(Deserialize)]
993pub struct PairAbandonRequest {
994    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
995    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
996    pub code_hash: String,
997}
998
999/// Forget the pair-slot associated with this code_hash. Either side can call;
1000/// no auth beyond knowledge of the code (which is the shared secret of this
1001/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
1002/// Used by clients to recover after a crash mid-handshake, so the host doesn't
1003/// stay locked out until the 5-minute TTL.
1004async fn pair_abandon(
1005    State(relay): State<Relay>,
1006    Json(req): Json<PairAbandonRequest>,
1007) -> impl IntoResponse {
1008    let mut inner = relay.inner.lock().await;
1009    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
1010        inner.pair_slots.remove(&pair_id);
1011    }
1012    StatusCode::NO_CONTENT.into_response()
1013}
1014
1015async fn pair_open(
1016    State(relay): State<Relay>,
1017    Json(req): Json<PairOpenRequest>,
1018) -> impl IntoResponse {
1019    if req.role != "host" && req.role != "guest" {
1020        return (
1021            StatusCode::BAD_REQUEST,
1022            Json(json!({"error": "role must be 'host' or 'guest'"})),
1023        )
1024            .into_response();
1025    }
1026    relay.evict_expired_pair_slots().await;
1027    let mut inner = relay.inner.lock().await;
1028    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1029        Some(id) => id,
1030        None => {
1031            let new_id = random_hex(16);
1032            inner
1033                .pair_lookup
1034                .insert(req.code_hash.clone(), new_id.clone());
1035            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1036            relay
1037                .counters
1038                .pair_opens_total
1039                .fetch_add(1, Ordering::Relaxed);
1040            new_id
1041        }
1042    };
1043    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1044    slot.last_touched = std::time::Instant::now();
1045    if req.role == "host" {
1046        if slot.host_msg.is_some() {
1047            return (
1048                StatusCode::CONFLICT,
1049                Json(json!({"error": "host already registered for this code"})),
1050            )
1051                .into_response();
1052        }
1053        slot.host_msg = Some(req.msg);
1054    } else {
1055        if slot.guest_msg.is_some() {
1056            return (
1057                StatusCode::CONFLICT,
1058                Json(json!({"error": "guest already registered for this code"})),
1059            )
1060                .into_response();
1061        }
1062        slot.guest_msg = Some(req.msg);
1063    }
1064    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1065}
1066
1067#[derive(Deserialize)]
1068pub struct PairGetQuery {
1069    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1070    pub as_role: String,
1071}
1072
1073async fn pair_get(
1074    State(relay): State<Relay>,
1075    Path(pair_id): Path<String>,
1076    Query(q): Query<PairGetQuery>,
1077) -> impl IntoResponse {
1078    relay.evict_expired_pair_slots().await;
1079    let mut inner = relay.inner.lock().await;
1080    let slot = match inner.pair_slots.get_mut(&pair_id) {
1081        Some(s) => {
1082            s.last_touched = std::time::Instant::now();
1083            s.clone()
1084        }
1085        None => {
1086            return (
1087                StatusCode::NOT_FOUND,
1088                Json(json!({"error": "unknown pair_id"})),
1089            )
1090                .into_response();
1091        }
1092    };
1093    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1094        "host" => (slot.guest_msg, slot.guest_bootstrap),
1095        "guest" => (slot.host_msg, slot.host_bootstrap),
1096        _ => {
1097            return (
1098                StatusCode::BAD_REQUEST,
1099                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1100            )
1101                .into_response();
1102        }
1103    };
1104    (
1105        StatusCode::OK,
1106        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1107    )
1108        .into_response()
1109}
1110
1111async fn pair_bootstrap(
1112    State(relay): State<Relay>,
1113    Path(pair_id): Path<String>,
1114    Json(req): Json<PairBootstrapRequest>,
1115) -> impl IntoResponse {
1116    relay.evict_expired_pair_slots().await;
1117    let mut inner = relay.inner.lock().await;
1118    let slot = match inner.pair_slots.get_mut(&pair_id) {
1119        Some(s) => s,
1120        None => {
1121            return (
1122                StatusCode::NOT_FOUND,
1123                Json(json!({"error": "unknown pair_id"})),
1124            )
1125                .into_response();
1126        }
1127    };
1128    slot.last_touched = std::time::Instant::now();
1129    match req.role.as_str() {
1130        "host" => slot.host_bootstrap = Some(req.sealed),
1131        "guest" => slot.guest_bootstrap = Some(req.sealed),
1132        _ => {
1133            return (
1134                StatusCode::BAD_REQUEST,
1135                Json(json!({"error": "role must be 'host' or 'guest'"})),
1136            )
1137                .into_response();
1138        }
1139    }
1140    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1141}
1142
1143// ---------- handle directory (v0.5) ----------
1144
1145#[derive(Deserialize)]
1146pub struct HandleClaimRequest {
1147    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1148    /// domain the relay's `.well-known` is served from.
1149    pub nick: String,
1150    /// Slot id the claimant owns on this relay (proves they allocated here).
1151    pub slot_id: String,
1152    /// Optional public-facing relay URL the relay should advertise back in
1153    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1154    /// know the relay URL out-of-band.
1155    pub relay_url: Option<String>,
1156    /// Claimant's full signed agent-card (includes DID + verify_keys +
1157    /// optional profile).
1158    pub card: Value,
1159    /// v0.5.19 (#9.1): set false to opt out of `/v1/handles` bulk listing.
1160    /// Direct `.well-known/wire/agent` lookup by handle still works. The
1161    /// default (None / absent) is discoverable, for back-compat with
1162    /// pre-v0.5.19 clients.
1163    #[serde(default, skip_serializing_if = "Option::is_none")]
1164    pub discoverable: Option<bool>,
1165}
1166
1167/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1168///
1169/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1170/// slot rotation). Different-DID claims on a taken nick return 409.
1171/// Caller must (a) own the `slot_id` they reference (verified by token
1172/// being present), and (b) submit a card with a valid self-signature.
1173async fn handle_claim(
1174    State(relay): State<Relay>,
1175    headers: HeaderMap,
1176    Json(req): Json<HandleClaimRequest>,
1177) -> impl IntoResponse {
1178    // Bearer auth: claimant must hold the slot_token for the slot they
1179    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1180    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1181        return resp;
1182    }
1183    // Validate nick (same rules as the client-side parser).
1184    if !crate::pair_profile::is_valid_nick(&req.nick) {
1185        return (
1186            StatusCode::BAD_REQUEST,
1187            Json(json!({
1188                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1189                "nick": req.nick,
1190            })),
1191        )
1192            .into_response();
1193    }
1194    // Verify the card signature using the public verify_agent_card helper.
1195    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1196        return (
1197            StatusCode::BAD_REQUEST,
1198            Json(json!({"error": format!("card signature invalid: {e}")})),
1199        )
1200            .into_response();
1201    }
1202    let did = match req.card.get("did").and_then(Value::as_str) {
1203        Some(d) => d.to_string(),
1204        None => {
1205            return (
1206                StatusCode::BAD_REQUEST,
1207                Json(json!({"error": "card missing 'did' field"})),
1208            )
1209                .into_response();
1210        }
1211    };
1212
1213    // FCFS check. Also snapshot the existing record (clone) so the
1214    // re-claim path below can preserve fields the client didn't include
1215    // in the request (notably `discoverable` from v0.5.19, so an old
1216    // client doing a profile-update re-claim doesn't accidentally
1217    // re-publish a hidden handle).
1218    let prior_record: Option<HandleRecord>;
1219    let first_claim = {
1220        let inner = relay.inner.lock().await;
1221        match inner.handles.get(&req.nick) {
1222            Some(existing) if existing.did != did => {
1223                return (
1224                    StatusCode::CONFLICT,
1225                    Json(json!({
1226                        "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1227                        "nick": req.nick,
1228                        "claimed_by": existing.did,
1229                    })),
1230                )
1231                    .into_response();
1232            }
1233            Some(prev) => {
1234                prior_record = Some(prev.clone());
1235                false
1236            }
1237            None => {
1238                prior_record = None;
1239                true
1240            }
1241        }
1242    };
1243
1244    // v0.5.19 (#9.5): round claimed_at to whole seconds. Nanosecond
1245    // precision served no client purpose (display only) and acted as a
1246    // cross-tab fingerprint correlating one operator's multiple handles
1247    // claimed in the same session. Truncate before formatting.
1248    let now = time::OffsetDateTime::now_utc()
1249        .replace_nanosecond(0)
1250        .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1251        .format(&time::format_description::well_known::Rfc3339)
1252        .unwrap_or_default();
1253    // v0.5.19 (#9.1): preserve `discoverable` across re-claims. If the
1254    // request doesn't set it explicitly, keep whatever the existing
1255    // record had so a profile-update re-claim doesn't accidentally
1256    // re-publish a hidden handle. Default for first-time claim is None
1257    // (= discoverable, back-compat).
1258    let discoverable = match (req.discoverable, &prior_record) {
1259        (Some(d), _) => Some(d),
1260        (None, Some(prev)) => prev.discoverable,
1261        (None, None) => None,
1262    };
1263    let record = HandleRecord {
1264        nick: req.nick.clone(),
1265        did: did.clone(),
1266        card: req.card.clone(),
1267        slot_id: req.slot_id.clone(),
1268        relay_url: req.relay_url.clone(),
1269        claimed_at: now,
1270        discoverable,
1271    };
1272
1273    // Persist to disk first (durable), then update in-memory.
1274    let path = relay
1275        .state_dir
1276        .join("handles")
1277        .join(format!("{}.json", req.nick));
1278    let body = match serde_json::to_vec_pretty(&record) {
1279        Ok(b) => b,
1280        Err(e) => {
1281            return (
1282                StatusCode::INTERNAL_SERVER_ERROR,
1283                Json(json!({"error": format!("serialize failed: {e}")})),
1284            )
1285                .into_response();
1286        }
1287    };
1288    if let Err(e) = tokio::fs::write(&path, &body).await {
1289        return (
1290            StatusCode::INTERNAL_SERVER_ERROR,
1291            Json(json!({"error": format!("persist failed: {e}")})),
1292        )
1293            .into_response();
1294    }
1295    {
1296        let mut inner = relay.inner.lock().await;
1297        inner.handles.insert(req.nick.clone(), record);
1298    }
1299    relay
1300        .counters
1301        .handle_claims_total
1302        .fetch_add(1, Ordering::Relaxed);
1303    if first_claim {
1304        relay
1305            .counters
1306            .handle_first_claims_total
1307            .fetch_add(1, Ordering::Relaxed);
1308    }
1309    (
1310        StatusCode::CREATED,
1311        Json(json!({
1312            "nick": req.nick,
1313            "did": did,
1314            "status": if first_claim { "claimed" } else { "re-claimed" },
1315        })),
1316    )
1317        .into_response()
1318}
1319
1320#[derive(Deserialize)]
1321pub struct WellKnownAgentQuery {
1322    pub handle: String,
1323}
1324
1325#[derive(Deserialize)]
1326pub struct HandlesDirectoryQuery {
1327    pub cursor: Option<String>,
1328    pub limit: Option<usize>,
1329    pub vibe: Option<String>,
1330}
1331
1332// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1333// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1334// here, gets back a 6-hex token. Anyone who does
1335//   curl -fsSL https://wireup.net/i/<token> | sh
1336// gets wire installed (if needed) + the invite accepted, in one shot.
1337//
1338// Possession of the short URL = pair authorization (same shape as the
1339// underlying wire:// invite — it's just a redirector).
1340
1341#[derive(Deserialize)]
1342pub struct InviteRegisterRequest {
1343    /// The wire://pair?... URL produced by `wire invite`. Required.
1344    pub invite_url: String,
1345    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1346    #[serde(default)]
1347    pub ttl_seconds: Option<u64>,
1348    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1349    /// `None` = unlimited until TTL hits.
1350    #[serde(default)]
1351    pub uses: Option<u32>,
1352}
1353
1354impl Relay {
1355    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1356    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1357        use tokio::io::AsyncWriteExt;
1358        let mut line = serde_json::to_vec(rec)?;
1359        line.push(b'\n');
1360        let path = self.state_dir.join("invites.jsonl");
1361        let mut f = tokio::fs::OpenOptions::new()
1362            .create(true)
1363            .append(true)
1364            .open(&path)
1365            .await?;
1366        f.write_all(&line).await?;
1367        f.flush().await?;
1368        Ok(())
1369    }
1370}
1371
1372async fn invite_register(
1373    State(relay): State<Relay>,
1374    Json(req): Json<InviteRegisterRequest>,
1375) -> impl IntoResponse {
1376    if req.invite_url.is_empty() {
1377        return (
1378            StatusCode::BAD_REQUEST,
1379            Json(json!({"error": "invite_url required"})),
1380        )
1381            .into_response();
1382    }
1383    // Length cap on the embedded URL to keep persisted records bounded.
1384    if req.invite_url.len() > 8_192 {
1385        return (
1386            StatusCode::PAYLOAD_TOO_LARGE,
1387            Json(json!({"error": "invite_url > 8 KiB"})),
1388        )
1389            .into_response();
1390    }
1391    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1392    let now = SystemTime::now()
1393        .duration_since(UNIX_EPOCH)
1394        .map(|d| d.as_secs())
1395        .unwrap_or(0);
1396    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1397    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1398    let token = random_hex(3);
1399    let rec = InviteRecord {
1400        token: token.clone(),
1401        invite_url: req.invite_url,
1402        expires_unix: now + ttl,
1403        uses_remaining: req.uses,
1404        created_unix: now,
1405    };
1406    {
1407        let mut inner = relay.inner.lock().await;
1408        if inner.invites.contains_key(&token) {
1409            return (
1410                StatusCode::CONFLICT,
1411                Json(json!({"error": "token collision, retry"})),
1412            )
1413                .into_response();
1414        }
1415        inner.invites.insert(token.clone(), rec.clone());
1416    }
1417    if let Err(e) = relay.persist_invite(&rec).await {
1418        return (
1419            StatusCode::INTERNAL_SERVER_ERROR,
1420            Json(json!({"error": format!("persist failed: {e}")})),
1421        )
1422            .into_response();
1423    }
1424    (
1425        StatusCode::CREATED,
1426        Json(json!({
1427            "token": token,
1428            "path": format!("/i/{token}"),
1429            "expires_unix": rec.expires_unix,
1430            "uses_remaining": rec.uses_remaining,
1431        })),
1432    )
1433        .into_response()
1434}
1435
1436#[derive(Deserialize)]
1437pub struct InviteScriptQuery {
1438    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1439    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1440    /// short URL programmatically). Default: shell-script template.
1441    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1442    /// resolution lookup, not an acceptance. The actual accept happens
1443    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1444    pub format: Option<String>,
1445}
1446
1447async fn invite_script(
1448    State(relay): State<Relay>,
1449    Path(token): Path<String>,
1450    Query(q): Query<InviteScriptQuery>,
1451) -> impl IntoResponse {
1452    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1453    // path-traversal try never reaches the map lookup.
1454    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1455        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1456    }
1457    let want_raw_url = q.format.as_deref() == Some("url");
1458    let now = SystemTime::now()
1459        .duration_since(UNIX_EPOCH)
1460        .map(|d| d.as_secs())
1461        .unwrap_or(0);
1462    let invite_url = {
1463        let mut inner = relay.inner.lock().await;
1464        let Some(rec) = inner.invites.get_mut(&token) else {
1465            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1466        };
1467        if rec.expires_unix <= now {
1468            return (StatusCode::GONE, "this invite has expired\n").into_response();
1469        }
1470        if let Some(n) = rec.uses_remaining {
1471            if n == 0 {
1472                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1473            }
1474            // Only decrement on script-template fetch (the one that's
1475            // actually doing the pair). The raw-URL resolution path is a
1476            // lookup, not an accept.
1477            if !want_raw_url {
1478                rec.uses_remaining = Some(n - 1);
1479            }
1480        }
1481        rec.invite_url.clone()
1482    };
1483    if want_raw_url {
1484        return (
1485            StatusCode::OK,
1486            [
1487                (
1488                    axum::http::header::CONTENT_TYPE,
1489                    "text/plain; charset=utf-8",
1490                ),
1491                (
1492                    axum::http::header::CACHE_CONTROL,
1493                    "private, no-store, max-age=0",
1494                ),
1495            ],
1496            invite_url,
1497        )
1498            .into_response();
1499    }
1500    let escaped = invite_url.replace('\'', "'\\''");
1501    let script = format!(
1502        "#!/bin/sh\n\
1503         # wire — one-curl onboarding (install + pair in one shot)\n\
1504         # source: https://github.com/SlanchaAi/wire\n\
1505         set -eu\n\
1506         INVITE='{escaped}'\n\
1507         echo \"\u{2192} checking for wire CLI...\"\n\
1508         if ! command -v wire >/dev/null 2>&1; then\n  \
1509           echo \"\u{2192} wire not installed; installing first...\"\n  \
1510           curl -fsSL https://wireup.net/install.sh | sh\n  \
1511           case \":$PATH:\" in\n    \
1512             *:\"$HOME/.local/bin\":*) ;;\n    \
1513             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1514           esac\n  \
1515           if ! command -v wire >/dev/null 2>&1; then\n    \
1516             echo \"\"\n    \
1517             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1518             echo \"Open a new shell, then run:\"\n    \
1519             echo \"  wire accept '$INVITE'\"\n    \
1520             exit 0\n  \
1521           fi\n\
1522         fi\n\
1523         echo \"\u{2192} accepting invite...\"\n\
1524         wire accept \"$INVITE\"\n"
1525    );
1526    (
1527        StatusCode::OK,
1528        [
1529            (
1530                axum::http::header::CONTENT_TYPE,
1531                "text/x-shellscript; charset=utf-8",
1532            ),
1533            (
1534                axum::http::header::CACHE_CONTROL,
1535                "private, no-store, max-age=0",
1536            ),
1537        ],
1538        script,
1539    )
1540        .into_response()
1541}
1542
1543async fn handles_directory(
1544    State(relay): State<Relay>,
1545    Query(q): Query<HandlesDirectoryQuery>,
1546) -> impl IntoResponse {
1547    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1548    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1549    let inner = relay.inner.lock().await;
1550    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1551    drop(inner);
1552    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1553
1554    let cursor = q.cursor.as_deref();
1555    let mut eligible = Vec::new();
1556    for rec in records {
1557        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1558            continue;
1559        }
1560        // Hygiene: hide test-shaped nicks from the public directory. Records
1561        // remain claimed (FCFS protection persists), they just don't surface
1562        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1563        // `test-` for integration runs.
1564        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1565            continue;
1566        }
1567        // v0.5.19 (#9.1): operator opt-out — hidden handles skip the
1568        // bulk directory but still resolve via `.well-known/wire/agent?
1569        // handle=X` for out-of-band sharing.
1570        if !rec.is_discoverable() {
1571            continue;
1572        }
1573        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1574        if profile
1575            .get("listed")
1576            .and_then(Value::as_bool)
1577            .is_some_and(|listed| !listed)
1578        {
1579            continue;
1580        }
1581        if let Some(want) = &vibe_filter {
1582            let matched = profile
1583                .get("vibe")
1584                .and_then(Value::as_array)
1585                .map(|arr| {
1586                    arr.iter().any(|v| {
1587                        v.as_str()
1588                            .map(|s| s.eq_ignore_ascii_case(want))
1589                            .unwrap_or(false)
1590                    })
1591                })
1592                .unwrap_or(false);
1593            if !matched {
1594                continue;
1595            }
1596        }
1597        eligible.push((rec, profile));
1598    }
1599
1600    let has_more = eligible.len() > limit;
1601    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1602    let next_cursor = if has_more {
1603        page.last().map(|(rec, _)| rec.nick.clone())
1604    } else {
1605        None
1606    };
1607    let handles: Vec<Value> = page
1608        .into_iter()
1609        .map(|(rec, profile)| {
1610            json!({
1611                "nick": rec.nick,
1612                "did": rec.did,
1613                "profile": {
1614                    "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1615                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1616                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1617                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1618                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1619                },
1620                "claimed_at": rec.claimed_at,
1621            })
1622        })
1623        .collect();
1624    (
1625        StatusCode::OK,
1626        Json(json!({
1627            "handles": handles,
1628            "next_cursor": next_cursor,
1629        })),
1630    )
1631        .into_response()
1632}
1633
1634/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1635/// into a known nick's slot WITHOUT needing that slot's bearer token.
1636///
1637/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1638/// reachability, but NEVER its `slot_token` (that would leak read+write
1639/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1640/// a stranger to deliver their signed agent-card to the nick's owner. This
1641/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1642/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1643/// in the body must verify-OK on its own.
1644///
1645/// Rate-limiting is the same governor that gates the other write endpoints.
1646/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1647async fn handle_intro(
1648    State(relay): State<Relay>,
1649    Path(nick): Path<String>,
1650    Json(req): Json<PostEventRequest>,
1651) -> impl IntoResponse {
1652    // Look up the nick. Must already be claimed.
1653    let slot_id = {
1654        let inner = relay.inner.lock().await;
1655        match inner.handles.get(&nick) {
1656            Some(rec) => rec.slot_id.clone(),
1657            None => {
1658                return (
1659                    StatusCode::NOT_FOUND,
1660                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1661                )
1662                    .into_response();
1663            }
1664        }
1665    };
1666
1667    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1668    // to the standard /v1/events/:slot_id with bearer auth.
1669    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1670    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1671    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1672        return (
1673            StatusCode::BAD_REQUEST,
1674            Json(json!({
1675                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1676                "got_kind": kind,
1677                "got_type": type_str,
1678            })),
1679        )
1680            .into_response();
1681    }
1682
1683    // Body must embed a signed agent-card (so the receiver can pin from it).
1684    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1685        Some(c) => c.clone(),
1686        None => {
1687            return (
1688                StatusCode::BAD_REQUEST,
1689                Json(json!({"error": "intro event body must embed 'card' field"})),
1690            )
1691                .into_response();
1692        }
1693    };
1694    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1695        return (
1696            StatusCode::BAD_REQUEST,
1697            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1698        )
1699            .into_response();
1700    }
1701
1702    // Size + quota checks (same as post_event).
1703    let body_bytes = match serde_json::to_vec(&req.event) {
1704        Ok(b) => b,
1705        Err(e) => {
1706            return (
1707                StatusCode::BAD_REQUEST,
1708                Json(json!({"error": format!("event not serializable: {e}")})),
1709            )
1710                .into_response();
1711        }
1712    };
1713    if body_bytes.len() > MAX_EVENT_BYTES {
1714        return (
1715            StatusCode::PAYLOAD_TOO_LARGE,
1716            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1717        )
1718            .into_response();
1719    }
1720    {
1721        let inner = relay.inner.lock().await;
1722        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1723        if used + body_bytes.len() > MAX_SLOT_BYTES {
1724            return (
1725                StatusCode::PAYLOAD_TOO_LARGE,
1726                Json(json!({
1727                    "error": "target slot quota exceeded",
1728                    "slot_bytes_used": used,
1729                    "slot_bytes_max": MAX_SLOT_BYTES,
1730                })),
1731            )
1732                .into_response();
1733        }
1734    }
1735
1736    let event_id = req
1737        .event
1738        .get("event_id")
1739        .and_then(Value::as_str)
1740        .map(str::to_string);
1741
1742    // Dedupe by event_id if present.
1743    let dup = {
1744        let inner = relay.inner.lock().await;
1745        let slot = inner.slots.get(&slot_id);
1746        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1747            slot.iter()
1748                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1749        } else {
1750            false
1751        }
1752    };
1753    if dup {
1754        return (
1755            StatusCode::OK,
1756            Json(json!({"event_id": event_id, "status": "duplicate"})),
1757        )
1758            .into_response();
1759    }
1760
1761    {
1762        let mut inner = relay.inner.lock().await;
1763        let event_size = body_bytes.len();
1764        let slot = inner.slots.entry(slot_id.clone()).or_default();
1765        slot.push(req.event.clone());
1766        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1767    }
1768    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1769        return (
1770            StatusCode::INTERNAL_SERVER_ERROR,
1771            Json(json!({"error": format!("persist failed: {e}")})),
1772        )
1773            .into_response();
1774    }
1775    (
1776        StatusCode::CREATED,
1777        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1778    )
1779        .into_response()
1780}
1781
1782/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1783/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1784/// slot coords if claimed; 404 if not.
1785///
1786/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1787/// Domain part is ignored (the relay only serves nicks it has on file).
1788/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1789/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1790/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1791/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1792///
1793/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1794/// under the standard A2A `extensions` array using the wire extension URI.
1795/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1796/// wire-native clients get the full richer card by following the extension.
1797async fn well_known_agent_card_a2a(
1798    State(relay): State<Relay>,
1799    Query(q): Query<WellKnownAgentQuery>,
1800) -> impl IntoResponse {
1801    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1802    if nick.is_empty() {
1803        return (
1804            StatusCode::BAD_REQUEST,
1805            Json(json!({"error": "handle missing nick"})),
1806        )
1807            .into_response();
1808    }
1809    let inner = relay.inner.lock().await;
1810    let rec = match inner.handles.get(&nick) {
1811        Some(r) => r.clone(),
1812        None => {
1813            return (
1814                StatusCode::NOT_FOUND,
1815                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1816            )
1817                .into_response();
1818        }
1819    };
1820    drop(inner);
1821
1822    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1823    let description = profile
1824        .get("motto")
1825        .and_then(Value::as_str)
1826        .unwrap_or("")
1827        .to_string();
1828    let display_name = profile
1829        .get("display_name")
1830        .and_then(Value::as_str)
1831        .unwrap_or(&rec.nick)
1832        .to_string();
1833    let relay_url = rec.relay_url.clone().unwrap_or_default();
1834    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1835    let endpoint = if !relay_url.is_empty() {
1836        format!(
1837            "{}/v1/handle/intro/{}",
1838            relay_url.trim_end_matches('/'),
1839            rec.nick
1840        )
1841    } else {
1842        format!("/v1/handle/intro/{}", rec.nick)
1843    };
1844    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1845
1846    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1847    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1848    // parses without custom code.
1849    let a2a_card = json!({
1850        "id": rec.did,
1851        "name": display_name,
1852        "description": description,
1853        "version": "wire/0.5",
1854        "endpoint": endpoint,
1855        "provider": {
1856            "name": "wire",
1857            "url": "https://github.com/SlanchaAi/wire"
1858        },
1859        "capabilities": {
1860            "streaming": false,
1861            "pushNotifications": false,
1862            "extendedAgentCard": true
1863        },
1864        "securitySchemes": {
1865            "ed25519-event-sig": {
1866                "type": "signature",
1867                "alg": "EdDSA",
1868                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1869            }
1870        },
1871        "security": [{"ed25519-event-sig": []}],
1872        "skills": [],
1873        "extensions": [{
1874            // A2A extension URIs are opaque namespace identifiers, not
1875            // forwardable URLs. Changing this string is a coordinated
1876            // federation-spec bump because peers match it exactly.
1877            "uri": "https://slancha.ai/wire/ext/v0.5",
1878            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1879            "required": false,
1880            "params": {
1881                "did": rec.did,
1882                "handle": rec.nick,
1883                "slot_id": rec.slot_id,
1884                "relay_url": rec.relay_url,
1885                "card": rec.card,
1886                "profile": profile,
1887                "claimed_at": rec.claimed_at,
1888            }
1889        }],
1890        "signature": card_sig,
1891    });
1892    (StatusCode::OK, Json(a2a_card)).into_response()
1893}
1894
1895async fn well_known_agent(
1896    State(relay): State<Relay>,
1897    Query(q): Query<WellKnownAgentQuery>,
1898) -> impl IntoResponse {
1899    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1900    if nick.is_empty() {
1901        return (
1902            StatusCode::BAD_REQUEST,
1903            Json(json!({"error": "handle missing nick"})),
1904        )
1905            .into_response();
1906    }
1907    let inner = relay.inner.lock().await;
1908    match inner.handles.get(&nick) {
1909        Some(rec) => (
1910            StatusCode::OK,
1911            Json(json!({
1912                "nick": rec.nick,
1913                "did": rec.did,
1914                "card": rec.card,
1915                "slot_id": rec.slot_id,
1916                "relay_url": rec.relay_url,
1917                "claimed_at": rec.claimed_at,
1918            })),
1919        )
1920            .into_response(),
1921        None => (
1922            StatusCode::NOT_FOUND,
1923            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1924        )
1925            .into_response(),
1926    }
1927}
1928
1929async fn list_events(
1930    State(relay): State<Relay>,
1931    Path(slot_id): Path<String>,
1932    Query(q): Query<ListEventsQuery>,
1933    headers: HeaderMap,
1934) -> impl IntoResponse {
1935    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1936        return resp;
1937    }
1938    let limit = q.limit.unwrap_or(100).min(1000);
1939    let mut inner = relay.inner.lock().await;
1940    // R4: record this pull as proof that the slot owner is still polling.
1941    // Anyone holding the slot_token (i.e., a paired peer) can later read
1942    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1943    let now_unix = std::time::SystemTime::now()
1944        .duration_since(std::time::UNIX_EPOCH)
1945        .map(|d| d.as_secs())
1946        .unwrap_or(0);
1947    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1948    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1949    let start = match q.since {
1950        Some(eid) => events
1951            .iter()
1952            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1953            .map(|i| i + 1)
1954            .unwrap_or(0),
1955        None => 0,
1956    };
1957    let end = (start + limit).min(events.len());
1958    let slice = events[start..end].to_vec();
1959    (StatusCode::OK, Json(slice)).into_response()
1960}
1961
1962/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1963/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1964/// recent `list_events` call, in unix seconds) and `event_count` (total
1965/// stored). A remote sender uses this before `wire send <peer>` to warn the
1966/// operator if the peer hasn't polled recently.
1967async fn slot_state(
1968    State(relay): State<Relay>,
1969    Path(slot_id): Path<String>,
1970    headers: HeaderMap,
1971) -> impl IntoResponse {
1972    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1973        return resp;
1974    }
1975    let inner = relay.inner.lock().await;
1976    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1977    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1978    let responder_health = inner.responder_health.get(&slot_id).cloned();
1979    (
1980        StatusCode::OK,
1981        Json(json!({
1982            "slot_id": slot_id,
1983            "event_count": event_count,
1984            "last_pull_at_unix": last_pull_at_unix,
1985            "responder_health": responder_health,
1986        })),
1987    )
1988        .into_response()
1989}
1990
1991async fn responder_health_set(
1992    State(relay): State<Relay>,
1993    Path(slot_id): Path<String>,
1994    headers: HeaderMap,
1995    Json(record): Json<ResponderHealthRecord>,
1996) -> impl IntoResponse {
1997    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1998        return resp;
1999    }
2000    let path = relay
2001        .state_dir
2002        .join("responder-health")
2003        .join(format!("{slot_id}.json"));
2004    let body = match serde_json::to_vec_pretty(&record) {
2005        Ok(b) => b,
2006        Err(e) => {
2007            return (
2008                StatusCode::INTERNAL_SERVER_ERROR,
2009                Json(json!({"error": format!("serialize failed: {e}")})),
2010            )
2011                .into_response();
2012        }
2013    };
2014    if let Err(e) = tokio::fs::write(&path, body).await {
2015        return (
2016            StatusCode::INTERNAL_SERVER_ERROR,
2017            Json(json!({"error": format!("persist failed: {e}")})),
2018        )
2019            .into_response();
2020    }
2021    {
2022        let mut inner = relay.inner.lock().await;
2023        inner
2024            .responder_health
2025            .insert(slot_id.clone(), record.clone());
2026    }
2027    (StatusCode::OK, Json(record)).into_response()
2028}
2029
2030async fn check_token(
2031    relay: &Relay,
2032    headers: &HeaderMap,
2033    slot_id: &str,
2034) -> std::result::Result<(), axum::response::Response> {
2035    let auth = headers
2036        .get(AUTHORIZATION)
2037        .and_then(|h| h.to_str().ok())
2038        .and_then(|s| s.strip_prefix("Bearer "))
2039        .map(str::to_string);
2040    let presented = match auth {
2041        Some(t) => t,
2042        None => {
2043            return Err((
2044                StatusCode::UNAUTHORIZED,
2045                Json(json!({"error": "missing Bearer token"})),
2046            )
2047                .into_response());
2048        }
2049    };
2050    let inner = relay.inner.lock().await;
2051    let expected = match inner.tokens.get(slot_id) {
2052        Some(t) => t.clone(),
2053        None => {
2054            return Err((
2055                StatusCode::NOT_FOUND,
2056                Json(json!({"error": "unknown slot"})),
2057            )
2058                .into_response());
2059        }
2060    };
2061    drop(inner);
2062    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2063        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2064    }
2065    Ok(())
2066}
2067
2068fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2069    if a.len() != b.len() {
2070        return false;
2071    }
2072    let mut acc = 0u8;
2073    for (x, y) in a.iter().zip(b.iter()) {
2074        acc |= x ^ y;
2075    }
2076    acc == 0
2077}
2078
2079fn is_valid_slot_id(s: &str) -> bool {
2080    s.len() == 32
2081        && s.bytes()
2082            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2083}
2084
2085fn random_hex(n_bytes: usize) -> String {
2086    let mut buf = vec![0u8; n_bytes];
2087    rand::thread_rng().fill_bytes(&mut buf);
2088    hex::encode(buf)
2089}
2090
2091/// Run the relay until SIGINT/SIGTERM.
2092pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2093    serve_with_mode(bind, state_dir, ServerMode::default()).await
2094}
2095
2096/// v0.5.17: server-mode-aware entry point. Same as `serve` but with the
2097/// `--local-only` toggle exposed so the binary can refuse to publish
2098/// phonebook + well-known surfaces on within-machine relays.
2099pub async fn serve_with_mode(
2100    bind: &str,
2101    state_dir: PathBuf,
2102    mode: ServerMode,
2103) -> Result<()> {
2104    let relay = Relay::new(state_dir).await?;
2105    relay.spawn_pair_sweeper();
2106    relay.spawn_counter_persister();
2107    let app = relay.clone().router_with_mode(mode);
2108    let listener = tokio::net::TcpListener::bind(bind)
2109        .await
2110        .with_context(|| format!("binding {bind}"))?;
2111    if mode.local_only {
2112        eprintln!("wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled");
2113    } else {
2114        eprintln!("wire relay-server listening on {bind}");
2115    }
2116    let shutdown_relay = relay.clone();
2117    axum::serve(listener, app)
2118        .with_graceful_shutdown(async move {
2119            let _ = tokio::signal::ctrl_c().await;
2120            eprintln!("\nshutting down — final counter snapshot");
2121            if let Err(e) = shutdown_relay.persist_counters().await {
2122                eprintln!("final counter persist failed: {e}");
2123            }
2124        })
2125        .await?;
2126    Ok(())
2127}
2128
2129/// v0.5.17: relay-server mode toggles. Default = full federation
2130/// (current behavior). `local_only` strips phonebook + well-known
2131/// surfaces so the relay is invisible from off-box and from any
2132/// directory-scraping agent on the same box.
2133#[derive(Debug, Clone, Copy, Default)]
2134pub struct ServerMode {
2135    /// When true, skip phonebook listing + `.well-known/wire/agent` +
2136    /// `.well-known/agent-card.json` + landing/stats pages. Pair this
2137    /// with a loopback bind (`--local-only` enforces this at the CLI
2138    /// layer) for genuinely within-machine traffic.
2139    pub local_only: bool,
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144    use super::*;
2145
2146    #[test]
2147    fn constant_time_eq_basic() {
2148        assert!(constant_time_eq(b"abc", b"abc"));
2149        assert!(!constant_time_eq(b"abc", b"abd"));
2150        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2151    }
2152
2153    #[test]
2154    fn random_hex_length() {
2155        let s = random_hex(16);
2156        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2157        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2158    }
2159
2160    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2161    async fn pair_slot_evicts_when_idle_past_ttl() {
2162        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2163        let _ = std::fs::remove_dir_all(&dir);
2164        let relay = Relay::new(dir.clone()).await.unwrap();
2165
2166        // Seed a pair-slot manually with a past last_touched.
2167        {
2168            let mut inner = relay.inner.lock().await;
2169            inner
2170                .pair_lookup
2171                .insert("hash-A".to_string(), "id-A".to_string());
2172            inner.pair_slots.insert(
2173                "id-A".to_string(),
2174                PairSlot {
2175                    last_touched: std::time::Instant::now()
2176                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2177                    ..PairSlot::default()
2178                },
2179            );
2180
2181            // And a fresh one — should survive.
2182            inner
2183                .pair_lookup
2184                .insert("hash-B".to_string(), "id-B".to_string());
2185            inner
2186                .pair_slots
2187                .insert("id-B".to_string(), PairSlot::default());
2188
2189            assert_eq!(inner.pair_slots.len(), 2);
2190            assert_eq!(inner.pair_lookup.len(), 2);
2191        }
2192
2193        relay.evict_expired_pair_slots().await;
2194
2195        let inner = relay.inner.lock().await;
2196        assert_eq!(
2197            inner.pair_slots.len(),
2198            1,
2199            "expired slot should have been evicted"
2200        );
2201        assert!(inner.pair_slots.contains_key("id-B"));
2202        assert_eq!(inner.pair_lookup.len(), 1);
2203        assert!(inner.pair_lookup.contains_key("hash-B"));
2204        let _ = std::fs::remove_dir_all(&dir);
2205    }
2206
2207    #[test]
2208    fn slot_id_validator_accepts_only_lowercase_32hex() {
2209        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2210        assert!(is_valid_slot_id(&random_hex(16)));
2211        // wrong length
2212        assert!(!is_valid_slot_id("abc"));
2213        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2214        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2215        // uppercase
2216        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2217        // path traversal attempts
2218        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2219        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2220        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2221        // null bytes
2222        assert!(!is_valid_slot_id(
2223            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2224        ));
2225    }
2226}