Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct ResponderHealthRecord {
172    pub status: String,
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub reason: Option<String>,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub last_success_at: Option<String>,
177    pub set_at: String,
178}
179
180#[derive(Clone, Debug)]
181struct PairSlot {
182    /// SPAKE2 message from the host side.
183    host_msg: Option<String>,
184    /// SPAKE2 message from the guest side.
185    guest_msg: Option<String>,
186    /// Sealed bootstrap payload from host (after SAS confirm).
187    host_bootstrap: Option<String>,
188    /// Sealed bootstrap payload from guest.
189    guest_bootstrap: Option<String>,
190    /// Last activity time (monotonic) — used for TTL eviction.
191    last_touched: std::time::Instant,
192}
193
194impl Default for PairSlot {
195    fn default() -> Self {
196        Self {
197            host_msg: None,
198            guest_msg: None,
199            host_bootstrap: None,
200            guest_bootstrap: None,
201            last_touched: std::time::Instant::now(),
202        }
203    }
204}
205
206/// Pair-slot idle TTL. After this many seconds without activity, the slot
207/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
208const PAIR_SLOT_TTL_SECS: u64 = 300;
209
210#[derive(Deserialize)]
211pub struct AllocateRequest {
212    /// Optional handle hint — purely informational, server doesn't enforce.
213    #[serde(default)]
214    pub handle: Option<String>,
215}
216
217#[derive(Deserialize)]
218pub struct PostEventRequest {
219    pub event: Value,
220}
221
222#[derive(Deserialize)]
223pub struct ListEventsQuery {
224    /// Resume from after this event_id (exclusive). Omit for full slot read.
225    pub since: Option<String>,
226    /// Max events to return. Default 100, max 1000.
227    pub limit: Option<usize>,
228}
229
230impl Relay {
231    pub async fn new(state_dir: PathBuf) -> Result<Self> {
232        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
233        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
234        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
235        let mut inner = Inner {
236            slots: HashMap::new(),
237            tokens: HashMap::new(),
238            slot_bytes: HashMap::new(),
239            last_pull_at_unix: HashMap::new(),
240            streams: HashMap::new(),
241            pair_lookup: HashMap::new(),
242            pair_slots: HashMap::new(),
243            handles: HashMap::new(),
244            responder_health: HashMap::new(),
245            invites: HashMap::new(),
246        };
247        // Reload tokens
248        let token_path = state_dir.join("tokens.json");
249        if token_path.exists() {
250            let body = tokio::fs::read_to_string(&token_path).await?;
251            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
252        }
253        // Reload slots from JSONL
254        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
255        while let Some(entry) = slots_dir.next_entry().await? {
256            let path = entry.path();
257            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
258                continue;
259            }
260            let stem = match path.file_stem().and_then(|s| s.to_str()) {
261                Some(s) => s.to_string(),
262                None => continue,
263            };
264            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
265            let mut events = Vec::new();
266            for line in body.lines() {
267                if let Ok(v) = serde_json::from_str::<Value>(line) {
268                    events.push(v);
269                }
270            }
271            // Recompute byte usage for the slot from its persisted events.
272            let bytes: usize = events
273                .iter()
274                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
275                .sum();
276            inner.slot_bytes.insert(stem.clone(), bytes);
277            inner.slots.insert(stem, events);
278        }
279        // Reload handle directory (v0.5).
280        let handles_dir = state_dir.join("handles");
281        if handles_dir.exists() {
282            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
283            while let Some(entry) = rd.next_entry().await? {
284                let path = entry.path();
285                if path.extension().and_then(|x| x.to_str()) != Some("json") {
286                    continue;
287                }
288                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
289                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
290                    inner.handles.insert(rec.nick.clone(), rec);
291                }
292            }
293        }
294        // Reload responder health records (R3).
295        let responder_health_dir = state_dir.join("responder-health");
296        if responder_health_dir.exists() {
297            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
298            while let Some(entry) = rd.next_entry().await? {
299                let path = entry.path();
300                if path.extension().and_then(|x| x.to_str()) != Some("json") {
301                    continue;
302                }
303                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
304                    continue;
305                };
306                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
307                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
308                    inner.responder_health.insert(slot_id.to_string(), rec);
309                }
310            }
311        }
312        // Reload short-URL invites. JSONL append-only; later entries with
313        // the same token overwrite earlier (won't happen — tokens are
314        // unique by construction — but coded defensively).
315        let invites_path = state_dir.join("invites.jsonl");
316        if invites_path.exists() {
317            let now_unix = SystemTime::now()
318                .duration_since(UNIX_EPOCH)
319                .map(|d| d.as_secs())
320                .unwrap_or(0);
321            let body = tokio::fs::read_to_string(&invites_path)
322                .await
323                .unwrap_or_default();
324            for line in body.lines() {
325                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
326                    && rec.expires_unix > now_unix
327                {
328                    inner.invites.insert(rec.token.clone(), rec);
329                }
330            }
331        }
332        let boot_unix = SystemTime::now()
333            .duration_since(UNIX_EPOCH)
334            .map(|d| d.as_secs())
335            .unwrap_or(0);
336        // Reload counter snapshot. Missing/corrupt file → start at zero.
337        let snap: CountersSnapshot =
338            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
339                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
340                Err(_) => CountersSnapshot::default(),
341            };
342        Ok(Self {
343            inner: Arc::new(Mutex::new(inner)),
344            state_dir,
345            counters: Arc::new(RelayCounters {
346                boot_unix,
347                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
348                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
349                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
350                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
351                events_posted_total: AtomicU64::new(snap.events_posted_total),
352            }),
353        })
354    }
355
356    pub fn router(self) -> Router {
357        // Rate limit applied to write endpoints that create new state (slots,
358        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
359        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
360        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
361        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
362        // global cap shoulder DDoS protection in series.
363        let governor_conf = std::sync::Arc::new(
364            GovernorConfigBuilder::default()
365                .per_second(10)
366                .burst_size(50)
367                .key_extractor(GlobalKeyExtractor)
368                .finish()
369                .expect("valid governor config"),
370        );
371        let governor_layer = GovernorLayer {
372            config: governor_conf,
373        };
374
375        // Hot writes group — rate limited.
376        let hot_writes = Router::new()
377            .route("/v1/slot/allocate", post(allocate_slot))
378            .route("/v1/pair", post(pair_open))
379            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
380            .route("/v1/pair/abandon", post(pair_abandon))
381            .layer(governor_layer);
382
383        Router::new()
384            .route("/", get(landing_index))
385            .route("/favicon.svg", get(landing_favicon))
386            .route("/og.png", get(landing_og))
387            .route("/demo.cast", get(landing_demo_cast))
388            .route("/install", get(landing_install_sh))
389            .route("/install.sh", get(landing_install_sh))
390            .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
391            .route("/healthz", get(healthz))
392            .route("/stats", get(stats_root))
393            .route("/stats.json", get(stats_json))
394            .route("/stats.html", get(landing_stats_html))
395            .route("/stats.history", get(stats_history))
396            .route("/phonebook", get(landing_phonebook_html))
397            .route("/phonebook.html", get(landing_phonebook_html))
398            .route("/v1/events/:slot_id", post(post_event).get(list_events))
399            .route("/v1/slot/:slot_id/state", get(slot_state))
400            .route(
401                "/v1/slot/:slot_id/responder-health",
402                post(responder_health_set),
403            )
404            .route("/v1/events/:slot_id/stream", get(stream_events))
405            .route("/v1/pair/:pair_id", get(pair_get))
406            .route("/v1/handle/claim", post(handle_claim))
407            .route("/v1/handle/intro/:nick", post(handle_intro))
408            .route("/v1/handles", get(handles_directory))
409            .route("/v1/invite/register", post(invite_register))
410            .route("/i/:token", get(invite_script))
411            .route("/.well-known/wire/agent", get(well_known_agent))
412            .route(
413                "/.well-known/agent-card.json",
414                get(well_known_agent_card_a2a),
415            )
416            .merge(hot_writes)
417            .with_state(self)
418    }
419
420    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
421    /// Called inline on every pair-slot mutation; a background sweeper task
422    /// (see `Self::start_sweeper`) covers the long-idle case.
423    async fn evict_expired_pair_slots(&self) {
424        let now = std::time::Instant::now();
425        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
426        let mut inner = self.inner.lock().await;
427        let mut to_remove = Vec::new();
428        for (id, slot) in inner.pair_slots.iter() {
429            if now.duration_since(slot.last_touched) > ttl {
430                to_remove.push(id.clone());
431            }
432        }
433        for id in to_remove {
434            inner.pair_slots.remove(&id);
435            inner.pair_lookup.retain(|_, v| v != &id);
436        }
437    }
438
439    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
440    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
441    /// — process exit reaps it. Safe to skip in tests where you'd rather test
442    /// eviction inline.
443    pub fn spawn_pair_sweeper(&self) {
444        let me = self.clone();
445        tokio::spawn(async move {
446            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
447            loop {
448                tick.tick().await;
449                me.evict_expired_pair_slots().await;
450            }
451        });
452    }
453
454    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
455    /// every 30s by `spawn_counter_persister` and once during graceful
456    /// shutdown so a deploy doesn't reset the running totals.
457    pub async fn persist_counters(&self) -> Result<()> {
458        let snap = CountersSnapshot {
459            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
460            handle_first_claims_total: self
461                .counters
462                .handle_first_claims_total
463                .load(Ordering::Relaxed),
464            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
465            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
466            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
467        };
468        let body = serde_json::to_vec_pretty(&snap)?;
469        let path = self.state_dir.join("counters.json");
470        tokio::fs::write(path, body).await?;
471        Ok(())
472    }
473
474    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
475    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
476    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
477    /// roll old entries off once the history exceeds 90 days.
478    pub async fn append_history(&self) -> Result<()> {
479        use tokio::io::AsyncWriteExt;
480        let now = SystemTime::now()
481            .duration_since(UNIX_EPOCH)
482            .map(|d| d.as_secs())
483            .unwrap_or(0);
484        let (handles_active, slots_active, pair_slots_open, streams_active) = {
485            let inner = self.inner.lock().await;
486            (
487                inner.handles.len(),
488                inner.slots.len(),
489                inner.pair_slots.len(),
490                inner.streams.values().map(Vec::len).sum::<usize>(),
491            )
492        };
493        let entry = HistoryEntry {
494            ts: now,
495            handles_active,
496            slots_active,
497            pair_slots_open,
498            streams_active,
499            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
500            handle_first_claims_total: self
501                .counters
502                .handle_first_claims_total
503                .load(Ordering::Relaxed),
504            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
505            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
506            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
507        };
508        let line = serde_json::to_vec(&entry)?;
509        let path = self.state_dir.join("stats-history.jsonl");
510        let mut f = tokio::fs::OpenOptions::new()
511            .create(true)
512            .append(true)
513            .open(&path)
514            .await?;
515        f.write_all(&line).await?;
516        f.write_all(b"\n").await?;
517        f.flush().await?;
518        Ok(())
519    }
520
521    /// Spawn a background tokio task that calls `persist_counters` every 30s
522    /// + appends a history row on the same tick. Loss bound: counters can
523    ///   drift back up to 30s on crash, history can drop one row.
524    pub fn spawn_counter_persister(&self) {
525        let me = self.clone();
526        tokio::spawn(async move {
527            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
528            // First tick fires immediately; skip it so we don't write the
529            // freshly-loaded snapshot back unchanged.
530            tick.tick().await;
531            loop {
532                tick.tick().await;
533                if let Err(e) = me.persist_counters().await {
534                    eprintln!("counter persist failed: {e}");
535                }
536                if let Err(e) = me.append_history().await {
537                    eprintln!("history append failed: {e}");
538                }
539            }
540        });
541    }
542
543    async fn persist_tokens(&self) -> Result<()> {
544        let body = {
545            let inner = self.inner.lock().await;
546            serde_json::to_string_pretty(&inner.tokens)?
547        };
548        let path = self.state_dir.join("tokens.json");
549        tokio::fs::write(path, body).await?;
550        Ok(())
551    }
552
553    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
554        // Defense in depth: only allow lowercase hex slot_ids of the exact length
555        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
556        // any future code path that might let attacker-controlled slot_ids reach
557        // disk operations. allocate_slot() always meets this; this assert is
558        // belt-and-suspenders against future regressions.
559        if !is_valid_slot_id(slot_id) {
560            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
561        }
562        let path = self
563            .state_dir
564            .join("slots")
565            .join(format!("{slot_id}.jsonl"));
566        let mut line = serde_json::to_vec(event)?;
567        line.push(b'\n');
568        use tokio::io::AsyncWriteExt;
569        let mut f = tokio::fs::OpenOptions::new()
570            .create(true)
571            .append(true)
572            .open(&path)
573            .await
574            .with_context(|| format!("opening {path:?}"))?;
575        f.write_all(&line).await?;
576        f.flush().await?;
577        Ok(())
578    }
579}
580
581async fn healthz() -> impl IntoResponse {
582    (StatusCode::OK, "ok\n")
583}
584
585// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
586// process restart; state fields (`handles_active`, `slots_active`) survive
587// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
588async fn stats_history(
589    State(relay): State<Relay>,
590    Query(q): Query<StatsHistoryQuery>,
591) -> impl IntoResponse {
592    let hours = q.hours.unwrap_or(24).min(168);
593    let now = SystemTime::now()
594        .duration_since(UNIX_EPOCH)
595        .map(|d| d.as_secs())
596        .unwrap_or(0);
597    let cutoff = now.saturating_sub(hours * 3600);
598    let path = relay.state_dir.join("stats-history.jsonl");
599    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
600    let entries: Vec<Value> = body
601        .lines()
602        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
603        .filter(|v| {
604            v.get("ts")
605                .and_then(Value::as_u64)
606                .map(|t| t >= cutoff)
607                .unwrap_or(false)
608        })
609        .collect();
610    (
611        StatusCode::OK,
612        Json(json!({
613            "hours": hours,
614            "now_unix": now,
615            "count": entries.len(),
616            "entries": entries,
617        })),
618    )
619}
620
621async fn landing_stats_html() -> impl IntoResponse {
622    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
623    (
624        StatusCode::OK,
625        [
626            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
627            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
628        ],
629        STATS_HTML,
630    )
631}
632
633async fn landing_phonebook_html() -> impl IntoResponse {
634    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
635    (
636        StatusCode::OK,
637        [
638            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
639            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
640        ],
641        PHONEBOOK_HTML,
642    )
643}
644
645/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
646/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
647/// Keeps the JSON contract intact while letting humans land on the page at
648/// the short URL.
649async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
650    let wants_html = headers
651        .get(axum::http::header::ACCEPT)
652        .and_then(|v| v.to_str().ok())
653        .unwrap_or("")
654        .contains("text/html");
655    if wants_html {
656        landing_stats_html().await.into_response()
657    } else {
658        stats_json(State(relay)).await.into_response()
659    }
660}
661
662async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
663    let now = SystemTime::now()
664        .duration_since(UNIX_EPOCH)
665        .map(|d| d.as_secs())
666        .unwrap_or(0);
667    let inner = relay.inner.lock().await;
668    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
669    let body = json!({
670        "version": env!("CARGO_PKG_VERSION"),
671        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
672        "handles_active": inner.handles.len(),
673        "slots_active": inner.slots.len(),
674        "pair_slots_open": inner.pair_slots.len(),
675        "streams_active": streams_active,
676        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
677        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
678        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
679        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
680        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
681    });
682    (StatusCode::OK, Json(body))
683}
684
685// Static landing site baked into the binary so apex (wireup.net) can flip
686// straight to Fly without a separate static-host. ~37 KB total — negligible
687// against the release binary size, and keeps the relay self-contained.
688async fn landing_index() -> impl IntoResponse {
689    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
690    (
691        StatusCode::OK,
692        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
693        INDEX_HTML,
694    )
695}
696
697async fn landing_favicon() -> impl IntoResponse {
698    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
699    (
700        StatusCode::OK,
701        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
702        FAVICON_SVG,
703    )
704}
705
706async fn landing_og() -> impl IntoResponse {
707    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
708    (
709        StatusCode::OK,
710        [
711            (axum::http::header::CONTENT_TYPE, "image/png"),
712            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
713        ],
714        OG_PNG,
715    )
716}
717
718async fn landing_demo_cast() -> impl IntoResponse {
719    static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
720    (
721        StatusCode::OK,
722        [
723            (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
724            (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
725        ],
726        DEMO_CAST,
727    )
728}
729
730async fn landing_install_sh() -> impl IntoResponse {
731    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
732    (
733        StatusCode::OK,
734        [
735            (
736                axum::http::header::CONTENT_TYPE,
737                "text/x-shellscript; charset=utf-8",
738            ),
739            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
740        ],
741        INSTALL_SH,
742    )
743}
744
745async fn landing_openshell_policy_sh() -> impl IntoResponse {
746    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
747    (
748        StatusCode::OK,
749        [
750            (
751                axum::http::header::CONTENT_TYPE,
752                "text/x-shellscript; charset=utf-8",
753            ),
754            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
755        ],
756        POLICY_SH,
757    )
758}
759
760async fn allocate_slot(
761    State(relay): State<Relay>,
762    Json(_req): Json<AllocateRequest>,
763) -> impl IntoResponse {
764    let slot_id = random_hex(16);
765    let slot_token = random_hex(32);
766    {
767        let mut inner = relay.inner.lock().await;
768        inner.slots.insert(slot_id.clone(), Vec::new());
769        inner.tokens.insert(slot_id.clone(), slot_token.clone());
770    }
771    if let Err(e) = relay.persist_tokens().await {
772        return (
773            StatusCode::INTERNAL_SERVER_ERROR,
774            Json(json!({"error": format!("persist failed: {e}")})),
775        )
776            .into_response();
777    }
778    relay
779        .counters
780        .slot_allocations_total
781        .fetch_add(1, Ordering::Relaxed);
782    (
783        StatusCode::CREATED,
784        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
785    )
786        .into_response()
787}
788
789async fn post_event(
790    State(relay): State<Relay>,
791    Path(slot_id): Path<String>,
792    headers: HeaderMap,
793    Json(req): Json<PostEventRequest>,
794) -> impl IntoResponse {
795    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
796        return resp;
797    }
798    // Body size cap (rough; serialize and check).
799    let body_bytes = match serde_json::to_vec(&req.event) {
800        Ok(b) => b,
801        Err(e) => {
802            return (
803                StatusCode::BAD_REQUEST,
804                Json(json!({"error": format!("event not serializable: {e}")})),
805            )
806                .into_response();
807        }
808    };
809    if body_bytes.len() > MAX_EVENT_BYTES {
810        return (
811            StatusCode::PAYLOAD_TOO_LARGE,
812            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
813        )
814            .into_response();
815    }
816    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
817    {
818        let inner = relay.inner.lock().await;
819        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
820        if used + body_bytes.len() > MAX_SLOT_BYTES {
821            return (
822                StatusCode::PAYLOAD_TOO_LARGE,
823                Json(json!({
824                    "error": "slot quota exceeded",
825                    "slot_bytes_used": used,
826                    "slot_bytes_max": MAX_SLOT_BYTES,
827                    "remediation": "operator should `wire rotate-slot` to drain old slot",
828                })),
829            )
830                .into_response();
831        }
832    }
833    let event_id = req
834        .event
835        .get("event_id")
836        .and_then(Value::as_str)
837        .map(str::to_string);
838
839    // Dedupe by event_id if present.
840    let dup = {
841        let inner = relay.inner.lock().await;
842        let slot = inner.slots.get(&slot_id);
843        if let (Some(eid), Some(slot)) = (&event_id, slot) {
844            slot.iter()
845                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
846        } else {
847            false
848        }
849    };
850    if dup {
851        return (
852            StatusCode::OK,
853            Json(json!({"event_id": event_id, "status": "duplicate"})),
854        )
855            .into_response();
856    }
857
858    {
859        let mut inner = relay.inner.lock().await;
860        let event_size = body_bytes.len();
861        let slot = inner.slots.entry(slot_id.clone()).or_default();
862        slot.push(req.event.clone());
863        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
864    }
865    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
866        return (
867            StatusCode::INTERNAL_SERVER_ERROR,
868            Json(json!({"error": format!("persist failed: {e}")})),
869        )
870            .into_response();
871    }
872    relay
873        .counters
874        .events_posted_total
875        .fetch_add(1, Ordering::Relaxed);
876    // R1 push: broadcast the new event to every active SSE subscriber on
877    // this slot. Dead channels are pruned in-place. The broadcast happens
878    // AFTER the disk persist so subscribers and disk readers see the same
879    // events; on persist failure we already returned 500 above.
880    {
881        let mut inner = relay.inner.lock().await;
882        if let Some(subs) = inner.streams.get_mut(&slot_id) {
883            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
884        }
885    }
886    (
887        StatusCode::CREATED,
888        Json(json!({"event_id": event_id, "status": "stored"})),
889    )
890        .into_response()
891}
892
893/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
894/// (same as `list_events`). The connection registers an `UnboundedSender`
895/// on the slot's subscriber list; every subsequent `post_event` to the slot
896/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
897/// connection stays open until the client disconnects.
898///
899/// A 30-second keepalive ping is emitted automatically so reverse proxies
900/// (Cloudflare tunnel, nginx) don't time out the upstream.
901///
902/// Note: the subscriber sees events posted AFTER it subscribed. To catch
903/// up on history first, the client should call `GET /v1/events/:slot_id`
904/// with `since=` before opening the stream.
905async fn stream_events(
906    State(relay): State<Relay>,
907    Path(slot_id): Path<String>,
908    headers: HeaderMap,
909) -> axum::response::Response {
910    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
911    use futures::stream::StreamExt;
912
913    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
914        return resp;
915    }
916
917    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
918    {
919        let mut inner = relay.inner.lock().await;
920        inner.streams.entry(slot_id.clone()).or_default().push(tx);
921    }
922
923    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
924        SseEvent::default()
925            .json_data(&ev)
926            .map_err(|e| std::io::Error::other(e.to_string()))
927    });
928
929    Sse::new(stream)
930        .keep_alive(
931            KeepAlive::new()
932                .interval(std::time::Duration::from_secs(30))
933                .text("phyllis: still on the line"),
934        )
935        .into_response()
936}
937
938// ---------- pair-slot handlers ----------
939
940#[derive(Deserialize)]
941pub struct PairOpenRequest {
942    pub code_hash: String,
943    /// SPAKE2 message (base64).
944    pub msg: String,
945    pub role: String, // "host" or "guest"
946}
947
948#[derive(Deserialize)]
949pub struct PairBootstrapRequest {
950    pub role: String,
951    pub sealed: String,
952}
953
954#[derive(Deserialize)]
955pub struct PairAbandonRequest {
956    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
957    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
958    pub code_hash: String,
959}
960
961/// Forget the pair-slot associated with this code_hash. Either side can call;
962/// no auth beyond knowledge of the code (which is the shared secret of this
963/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
964/// Used by clients to recover after a crash mid-handshake, so the host doesn't
965/// stay locked out until the 5-minute TTL.
966async fn pair_abandon(
967    State(relay): State<Relay>,
968    Json(req): Json<PairAbandonRequest>,
969) -> impl IntoResponse {
970    let mut inner = relay.inner.lock().await;
971    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
972        inner.pair_slots.remove(&pair_id);
973    }
974    StatusCode::NO_CONTENT.into_response()
975}
976
977async fn pair_open(
978    State(relay): State<Relay>,
979    Json(req): Json<PairOpenRequest>,
980) -> impl IntoResponse {
981    if req.role != "host" && req.role != "guest" {
982        return (
983            StatusCode::BAD_REQUEST,
984            Json(json!({"error": "role must be 'host' or 'guest'"})),
985        )
986            .into_response();
987    }
988    relay.evict_expired_pair_slots().await;
989    let mut inner = relay.inner.lock().await;
990    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
991        Some(id) => id,
992        None => {
993            let new_id = random_hex(16);
994            inner
995                .pair_lookup
996                .insert(req.code_hash.clone(), new_id.clone());
997            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
998            relay
999                .counters
1000                .pair_opens_total
1001                .fetch_add(1, Ordering::Relaxed);
1002            new_id
1003        }
1004    };
1005    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1006    slot.last_touched = std::time::Instant::now();
1007    if req.role == "host" {
1008        if slot.host_msg.is_some() {
1009            return (
1010                StatusCode::CONFLICT,
1011                Json(json!({"error": "host already registered for this code"})),
1012            )
1013                .into_response();
1014        }
1015        slot.host_msg = Some(req.msg);
1016    } else {
1017        if slot.guest_msg.is_some() {
1018            return (
1019                StatusCode::CONFLICT,
1020                Json(json!({"error": "guest already registered for this code"})),
1021            )
1022                .into_response();
1023        }
1024        slot.guest_msg = Some(req.msg);
1025    }
1026    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1027}
1028
1029#[derive(Deserialize)]
1030pub struct PairGetQuery {
1031    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1032    pub as_role: String,
1033}
1034
1035async fn pair_get(
1036    State(relay): State<Relay>,
1037    Path(pair_id): Path<String>,
1038    Query(q): Query<PairGetQuery>,
1039) -> impl IntoResponse {
1040    relay.evict_expired_pair_slots().await;
1041    let mut inner = relay.inner.lock().await;
1042    let slot = match inner.pair_slots.get_mut(&pair_id) {
1043        Some(s) => {
1044            s.last_touched = std::time::Instant::now();
1045            s.clone()
1046        }
1047        None => {
1048            return (
1049                StatusCode::NOT_FOUND,
1050                Json(json!({"error": "unknown pair_id"})),
1051            )
1052                .into_response();
1053        }
1054    };
1055    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1056        "host" => (slot.guest_msg, slot.guest_bootstrap),
1057        "guest" => (slot.host_msg, slot.host_bootstrap),
1058        _ => {
1059            return (
1060                StatusCode::BAD_REQUEST,
1061                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1062            )
1063                .into_response();
1064        }
1065    };
1066    (
1067        StatusCode::OK,
1068        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1069    )
1070        .into_response()
1071}
1072
1073async fn pair_bootstrap(
1074    State(relay): State<Relay>,
1075    Path(pair_id): Path<String>,
1076    Json(req): Json<PairBootstrapRequest>,
1077) -> impl IntoResponse {
1078    relay.evict_expired_pair_slots().await;
1079    let mut inner = relay.inner.lock().await;
1080    let slot = match inner.pair_slots.get_mut(&pair_id) {
1081        Some(s) => s,
1082        None => {
1083            return (
1084                StatusCode::NOT_FOUND,
1085                Json(json!({"error": "unknown pair_id"})),
1086            )
1087                .into_response();
1088        }
1089    };
1090    slot.last_touched = std::time::Instant::now();
1091    match req.role.as_str() {
1092        "host" => slot.host_bootstrap = Some(req.sealed),
1093        "guest" => slot.guest_bootstrap = Some(req.sealed),
1094        _ => {
1095            return (
1096                StatusCode::BAD_REQUEST,
1097                Json(json!({"error": "role must be 'host' or 'guest'"})),
1098            )
1099                .into_response();
1100        }
1101    }
1102    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1103}
1104
1105// ---------- handle directory (v0.5) ----------
1106
1107#[derive(Deserialize)]
1108pub struct HandleClaimRequest {
1109    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1110    /// domain the relay's `.well-known` is served from.
1111    pub nick: String,
1112    /// Slot id the claimant owns on this relay (proves they allocated here).
1113    pub slot_id: String,
1114    /// Optional public-facing relay URL the relay should advertise back in
1115    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1116    /// know the relay URL out-of-band.
1117    pub relay_url: Option<String>,
1118    /// Claimant's full signed agent-card (includes DID + verify_keys +
1119    /// optional profile).
1120    pub card: Value,
1121}
1122
1123/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1124///
1125/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1126/// slot rotation). Different-DID claims on a taken nick return 409.
1127/// Caller must (a) own the `slot_id` they reference (verified by token
1128/// being present), and (b) submit a card with a valid self-signature.
1129async fn handle_claim(
1130    State(relay): State<Relay>,
1131    headers: HeaderMap,
1132    Json(req): Json<HandleClaimRequest>,
1133) -> impl IntoResponse {
1134    // Bearer auth: claimant must hold the slot_token for the slot they
1135    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1136    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1137        return resp;
1138    }
1139    // Validate nick (same rules as the client-side parser).
1140    if !crate::pair_profile::is_valid_nick(&req.nick) {
1141        return (
1142            StatusCode::BAD_REQUEST,
1143            Json(json!({
1144                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1145                "nick": req.nick,
1146            })),
1147        )
1148            .into_response();
1149    }
1150    // Verify the card signature using the public verify_agent_card helper.
1151    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1152        return (
1153            StatusCode::BAD_REQUEST,
1154            Json(json!({"error": format!("card signature invalid: {e}")})),
1155        )
1156            .into_response();
1157    }
1158    let did = match req.card.get("did").and_then(Value::as_str) {
1159        Some(d) => d.to_string(),
1160        None => {
1161            return (
1162                StatusCode::BAD_REQUEST,
1163                Json(json!({"error": "card missing 'did' field"})),
1164            )
1165                .into_response();
1166        }
1167    };
1168
1169    // FCFS check.
1170    let first_claim = {
1171        let inner = relay.inner.lock().await;
1172        match inner.handles.get(&req.nick) {
1173            Some(existing) if existing.did != did => {
1174                return (
1175                    StatusCode::CONFLICT,
1176                    Json(json!({
1177                        "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1178                        "nick": req.nick,
1179                        "claimed_by": existing.did,
1180                    })),
1181                )
1182                    .into_response();
1183            }
1184            Some(_) => false,
1185            None => true,
1186        }
1187    };
1188
1189    let now = time::OffsetDateTime::now_utc()
1190        .format(&time::format_description::well_known::Rfc3339)
1191        .unwrap_or_default();
1192    let record = HandleRecord {
1193        nick: req.nick.clone(),
1194        did: did.clone(),
1195        card: req.card.clone(),
1196        slot_id: req.slot_id.clone(),
1197        relay_url: req.relay_url.clone(),
1198        claimed_at: now,
1199    };
1200
1201    // Persist to disk first (durable), then update in-memory.
1202    let path = relay
1203        .state_dir
1204        .join("handles")
1205        .join(format!("{}.json", req.nick));
1206    let body = match serde_json::to_vec_pretty(&record) {
1207        Ok(b) => b,
1208        Err(e) => {
1209            return (
1210                StatusCode::INTERNAL_SERVER_ERROR,
1211                Json(json!({"error": format!("serialize failed: {e}")})),
1212            )
1213                .into_response();
1214        }
1215    };
1216    if let Err(e) = tokio::fs::write(&path, &body).await {
1217        return (
1218            StatusCode::INTERNAL_SERVER_ERROR,
1219            Json(json!({"error": format!("persist failed: {e}")})),
1220        )
1221            .into_response();
1222    }
1223    {
1224        let mut inner = relay.inner.lock().await;
1225        inner.handles.insert(req.nick.clone(), record);
1226    }
1227    relay
1228        .counters
1229        .handle_claims_total
1230        .fetch_add(1, Ordering::Relaxed);
1231    if first_claim {
1232        relay
1233            .counters
1234            .handle_first_claims_total
1235            .fetch_add(1, Ordering::Relaxed);
1236    }
1237    (
1238        StatusCode::CREATED,
1239        Json(json!({
1240            "nick": req.nick,
1241            "did": did,
1242            "status": if first_claim { "claimed" } else { "re-claimed" },
1243        })),
1244    )
1245        .into_response()
1246}
1247
1248#[derive(Deserialize)]
1249pub struct WellKnownAgentQuery {
1250    pub handle: String,
1251}
1252
1253#[derive(Deserialize)]
1254pub struct HandlesDirectoryQuery {
1255    pub cursor: Option<String>,
1256    pub limit: Option<usize>,
1257    pub vibe: Option<String>,
1258}
1259
1260// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1261// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1262// here, gets back a 6-hex token. Anyone who does
1263//   curl -fsSL https://wireup.net/i/<token> | sh
1264// gets wire installed (if needed) + the invite accepted, in one shot.
1265//
1266// Possession of the short URL = pair authorization (same shape as the
1267// underlying wire:// invite — it's just a redirector).
1268
1269#[derive(Deserialize)]
1270pub struct InviteRegisterRequest {
1271    /// The wire://pair?... URL produced by `wire invite`. Required.
1272    pub invite_url: String,
1273    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1274    #[serde(default)]
1275    pub ttl_seconds: Option<u64>,
1276    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1277    /// `None` = unlimited until TTL hits.
1278    #[serde(default)]
1279    pub uses: Option<u32>,
1280}
1281
1282impl Relay {
1283    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1284    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1285        use tokio::io::AsyncWriteExt;
1286        let mut line = serde_json::to_vec(rec)?;
1287        line.push(b'\n');
1288        let path = self.state_dir.join("invites.jsonl");
1289        let mut f = tokio::fs::OpenOptions::new()
1290            .create(true)
1291            .append(true)
1292            .open(&path)
1293            .await?;
1294        f.write_all(&line).await?;
1295        f.flush().await?;
1296        Ok(())
1297    }
1298}
1299
1300async fn invite_register(
1301    State(relay): State<Relay>,
1302    Json(req): Json<InviteRegisterRequest>,
1303) -> impl IntoResponse {
1304    if req.invite_url.is_empty() {
1305        return (
1306            StatusCode::BAD_REQUEST,
1307            Json(json!({"error": "invite_url required"})),
1308        )
1309            .into_response();
1310    }
1311    // Length cap on the embedded URL to keep persisted records bounded.
1312    if req.invite_url.len() > 8_192 {
1313        return (
1314            StatusCode::PAYLOAD_TOO_LARGE,
1315            Json(json!({"error": "invite_url > 8 KiB"})),
1316        )
1317            .into_response();
1318    }
1319    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1320    let now = SystemTime::now()
1321        .duration_since(UNIX_EPOCH)
1322        .map(|d| d.as_secs())
1323        .unwrap_or(0);
1324    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1325    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1326    let token = random_hex(3);
1327    let rec = InviteRecord {
1328        token: token.clone(),
1329        invite_url: req.invite_url,
1330        expires_unix: now + ttl,
1331        uses_remaining: req.uses,
1332        created_unix: now,
1333    };
1334    {
1335        let mut inner = relay.inner.lock().await;
1336        if inner.invites.contains_key(&token) {
1337            return (
1338                StatusCode::CONFLICT,
1339                Json(json!({"error": "token collision, retry"})),
1340            )
1341                .into_response();
1342        }
1343        inner.invites.insert(token.clone(), rec.clone());
1344    }
1345    if let Err(e) = relay.persist_invite(&rec).await {
1346        return (
1347            StatusCode::INTERNAL_SERVER_ERROR,
1348            Json(json!({"error": format!("persist failed: {e}")})),
1349        )
1350            .into_response();
1351    }
1352    (
1353        StatusCode::CREATED,
1354        Json(json!({
1355            "token": token,
1356            "path": format!("/i/{token}"),
1357            "expires_unix": rec.expires_unix,
1358            "uses_remaining": rec.uses_remaining,
1359        })),
1360    )
1361        .into_response()
1362}
1363
1364#[derive(Deserialize)]
1365pub struct InviteScriptQuery {
1366    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1367    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1368    /// short URL programmatically). Default: shell-script template.
1369    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1370    /// resolution lookup, not an acceptance. The actual accept happens
1371    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1372    pub format: Option<String>,
1373}
1374
1375async fn invite_script(
1376    State(relay): State<Relay>,
1377    Path(token): Path<String>,
1378    Query(q): Query<InviteScriptQuery>,
1379) -> impl IntoResponse {
1380    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1381    // path-traversal try never reaches the map lookup.
1382    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1383        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1384    }
1385    let want_raw_url = q.format.as_deref() == Some("url");
1386    let now = SystemTime::now()
1387        .duration_since(UNIX_EPOCH)
1388        .map(|d| d.as_secs())
1389        .unwrap_or(0);
1390    let invite_url = {
1391        let mut inner = relay.inner.lock().await;
1392        let Some(rec) = inner.invites.get_mut(&token) else {
1393            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1394        };
1395        if rec.expires_unix <= now {
1396            return (StatusCode::GONE, "this invite has expired\n").into_response();
1397        }
1398        if let Some(n) = rec.uses_remaining {
1399            if n == 0 {
1400                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1401            }
1402            // Only decrement on script-template fetch (the one that's
1403            // actually doing the pair). The raw-URL resolution path is a
1404            // lookup, not an accept.
1405            if !want_raw_url {
1406                rec.uses_remaining = Some(n - 1);
1407            }
1408        }
1409        rec.invite_url.clone()
1410    };
1411    if want_raw_url {
1412        return (
1413            StatusCode::OK,
1414            [
1415                (
1416                    axum::http::header::CONTENT_TYPE,
1417                    "text/plain; charset=utf-8",
1418                ),
1419                (
1420                    axum::http::header::CACHE_CONTROL,
1421                    "private, no-store, max-age=0",
1422                ),
1423            ],
1424            invite_url,
1425        )
1426            .into_response();
1427    }
1428    let escaped = invite_url.replace('\'', "'\\''");
1429    let script = format!(
1430        "#!/bin/sh\n\
1431         # wire — one-curl onboarding (install + pair in one shot)\n\
1432         # source: https://github.com/SlanchaAi/wire\n\
1433         set -eu\n\
1434         INVITE='{escaped}'\n\
1435         echo \"\u{2192} checking for wire CLI...\"\n\
1436         if ! command -v wire >/dev/null 2>&1; then\n  \
1437           echo \"\u{2192} wire not installed; installing first...\"\n  \
1438           curl -fsSL https://wireup.net/install.sh | sh\n  \
1439           case \":$PATH:\" in\n    \
1440             *:\"$HOME/.local/bin\":*) ;;\n    \
1441             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1442           esac\n  \
1443           if ! command -v wire >/dev/null 2>&1; then\n    \
1444             echo \"\"\n    \
1445             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1446             echo \"Open a new shell, then run:\"\n    \
1447             echo \"  wire accept '$INVITE'\"\n    \
1448             exit 0\n  \
1449           fi\n\
1450         fi\n\
1451         echo \"\u{2192} accepting invite...\"\n\
1452         wire accept \"$INVITE\"\n"
1453    );
1454    (
1455        StatusCode::OK,
1456        [
1457            (
1458                axum::http::header::CONTENT_TYPE,
1459                "text/x-shellscript; charset=utf-8",
1460            ),
1461            (
1462                axum::http::header::CACHE_CONTROL,
1463                "private, no-store, max-age=0",
1464            ),
1465        ],
1466        script,
1467    )
1468        .into_response()
1469}
1470
1471async fn handles_directory(
1472    State(relay): State<Relay>,
1473    Query(q): Query<HandlesDirectoryQuery>,
1474) -> impl IntoResponse {
1475    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1476    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1477    let inner = relay.inner.lock().await;
1478    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1479    drop(inner);
1480    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1481
1482    let cursor = q.cursor.as_deref();
1483    let mut eligible = Vec::new();
1484    for rec in records {
1485        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1486            continue;
1487        }
1488        // Hygiene: hide test-shaped nicks from the public directory. Records
1489        // remain claimed (FCFS protection persists), they just don't surface
1490        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1491        // `test-` for integration runs.
1492        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1493            continue;
1494        }
1495        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1496        if profile
1497            .get("listed")
1498            .and_then(Value::as_bool)
1499            .is_some_and(|listed| !listed)
1500        {
1501            continue;
1502        }
1503        if let Some(want) = &vibe_filter {
1504            let matched = profile
1505                .get("vibe")
1506                .and_then(Value::as_array)
1507                .map(|arr| {
1508                    arr.iter().any(|v| {
1509                        v.as_str()
1510                            .map(|s| s.eq_ignore_ascii_case(want))
1511                            .unwrap_or(false)
1512                    })
1513                })
1514                .unwrap_or(false);
1515            if !matched {
1516                continue;
1517            }
1518        }
1519        eligible.push((rec, profile));
1520    }
1521
1522    let has_more = eligible.len() > limit;
1523    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1524    let next_cursor = if has_more {
1525        page.last().map(|(rec, _)| rec.nick.clone())
1526    } else {
1527        None
1528    };
1529    let handles: Vec<Value> = page
1530        .into_iter()
1531        .map(|(rec, profile)| {
1532            json!({
1533                "nick": rec.nick,
1534                "did": rec.did,
1535                "profile": {
1536                    "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1537                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1538                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1539                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1540                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1541                },
1542                "claimed_at": rec.claimed_at,
1543            })
1544        })
1545        .collect();
1546    (
1547        StatusCode::OK,
1548        Json(json!({
1549            "handles": handles,
1550            "next_cursor": next_cursor,
1551        })),
1552    )
1553        .into_response()
1554}
1555
1556/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1557/// into a known nick's slot WITHOUT needing that slot's bearer token.
1558///
1559/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1560/// reachability, but NEVER its `slot_token` (that would leak read+write
1561/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1562/// a stranger to deliver their signed agent-card to the nick's owner. This
1563/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1564/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1565/// in the body must verify-OK on its own.
1566///
1567/// Rate-limiting is the same governor that gates the other write endpoints.
1568/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1569async fn handle_intro(
1570    State(relay): State<Relay>,
1571    Path(nick): Path<String>,
1572    Json(req): Json<PostEventRequest>,
1573) -> impl IntoResponse {
1574    // Look up the nick. Must already be claimed.
1575    let slot_id = {
1576        let inner = relay.inner.lock().await;
1577        match inner.handles.get(&nick) {
1578            Some(rec) => rec.slot_id.clone(),
1579            None => {
1580                return (
1581                    StatusCode::NOT_FOUND,
1582                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1583                )
1584                    .into_response();
1585            }
1586        }
1587    };
1588
1589    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1590    // to the standard /v1/events/:slot_id with bearer auth.
1591    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1592    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1593    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1594        return (
1595            StatusCode::BAD_REQUEST,
1596            Json(json!({
1597                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1598                "got_kind": kind,
1599                "got_type": type_str,
1600            })),
1601        )
1602            .into_response();
1603    }
1604
1605    // Body must embed a signed agent-card (so the receiver can pin from it).
1606    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1607        Some(c) => c.clone(),
1608        None => {
1609            return (
1610                StatusCode::BAD_REQUEST,
1611                Json(json!({"error": "intro event body must embed 'card' field"})),
1612            )
1613                .into_response();
1614        }
1615    };
1616    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1617        return (
1618            StatusCode::BAD_REQUEST,
1619            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1620        )
1621            .into_response();
1622    }
1623
1624    // Size + quota checks (same as post_event).
1625    let body_bytes = match serde_json::to_vec(&req.event) {
1626        Ok(b) => b,
1627        Err(e) => {
1628            return (
1629                StatusCode::BAD_REQUEST,
1630                Json(json!({"error": format!("event not serializable: {e}")})),
1631            )
1632                .into_response();
1633        }
1634    };
1635    if body_bytes.len() > MAX_EVENT_BYTES {
1636        return (
1637            StatusCode::PAYLOAD_TOO_LARGE,
1638            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1639        )
1640            .into_response();
1641    }
1642    {
1643        let inner = relay.inner.lock().await;
1644        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1645        if used + body_bytes.len() > MAX_SLOT_BYTES {
1646            return (
1647                StatusCode::PAYLOAD_TOO_LARGE,
1648                Json(json!({
1649                    "error": "target slot quota exceeded",
1650                    "slot_bytes_used": used,
1651                    "slot_bytes_max": MAX_SLOT_BYTES,
1652                })),
1653            )
1654                .into_response();
1655        }
1656    }
1657
1658    let event_id = req
1659        .event
1660        .get("event_id")
1661        .and_then(Value::as_str)
1662        .map(str::to_string);
1663
1664    // Dedupe by event_id if present.
1665    let dup = {
1666        let inner = relay.inner.lock().await;
1667        let slot = inner.slots.get(&slot_id);
1668        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1669            slot.iter()
1670                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1671        } else {
1672            false
1673        }
1674    };
1675    if dup {
1676        return (
1677            StatusCode::OK,
1678            Json(json!({"event_id": event_id, "status": "duplicate"})),
1679        )
1680            .into_response();
1681    }
1682
1683    {
1684        let mut inner = relay.inner.lock().await;
1685        let event_size = body_bytes.len();
1686        let slot = inner.slots.entry(slot_id.clone()).or_default();
1687        slot.push(req.event.clone());
1688        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1689    }
1690    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1691        return (
1692            StatusCode::INTERNAL_SERVER_ERROR,
1693            Json(json!({"error": format!("persist failed: {e}")})),
1694        )
1695            .into_response();
1696    }
1697    (
1698        StatusCode::CREATED,
1699        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1700    )
1701        .into_response()
1702}
1703
1704/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1705/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1706/// slot coords if claimed; 404 if not.
1707///
1708/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1709/// Domain part is ignored (the relay only serves nicks it has on file).
1710/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1711/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1712/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1713/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1714///
1715/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1716/// under the standard A2A `extensions` array using the wire extension URI.
1717/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1718/// wire-native clients get the full richer card by following the extension.
1719async fn well_known_agent_card_a2a(
1720    State(relay): State<Relay>,
1721    Query(q): Query<WellKnownAgentQuery>,
1722) -> impl IntoResponse {
1723    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1724    if nick.is_empty() {
1725        return (
1726            StatusCode::BAD_REQUEST,
1727            Json(json!({"error": "handle missing nick"})),
1728        )
1729            .into_response();
1730    }
1731    let inner = relay.inner.lock().await;
1732    let rec = match inner.handles.get(&nick) {
1733        Some(r) => r.clone(),
1734        None => {
1735            return (
1736                StatusCode::NOT_FOUND,
1737                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1738            )
1739                .into_response();
1740        }
1741    };
1742    drop(inner);
1743
1744    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1745    let description = profile
1746        .get("motto")
1747        .and_then(Value::as_str)
1748        .unwrap_or("")
1749        .to_string();
1750    let display_name = profile
1751        .get("display_name")
1752        .and_then(Value::as_str)
1753        .unwrap_or(&rec.nick)
1754        .to_string();
1755    let relay_url = rec.relay_url.clone().unwrap_or_default();
1756    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1757    let endpoint = if !relay_url.is_empty() {
1758        format!(
1759            "{}/v1/handle/intro/{}",
1760            relay_url.trim_end_matches('/'),
1761            rec.nick
1762        )
1763    } else {
1764        format!("/v1/handle/intro/{}", rec.nick)
1765    };
1766    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1767
1768    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1769    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1770    // parses without custom code.
1771    let a2a_card = json!({
1772        "id": rec.did,
1773        "name": display_name,
1774        "description": description,
1775        "version": "wire/0.5",
1776        "endpoint": endpoint,
1777        "provider": {
1778            "name": "wire",
1779            "url": "https://github.com/SlanchaAi/wire"
1780        },
1781        "capabilities": {
1782            "streaming": false,
1783            "pushNotifications": false,
1784            "extendedAgentCard": true
1785        },
1786        "securitySchemes": {
1787            "ed25519-event-sig": {
1788                "type": "signature",
1789                "alg": "EdDSA",
1790                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1791            }
1792        },
1793        "security": [{"ed25519-event-sig": []}],
1794        "skills": [],
1795        "extensions": [{
1796            // A2A extension URIs are opaque namespace identifiers, not
1797            // forwardable URLs. Changing this string is a coordinated
1798            // federation-spec bump because peers match it exactly.
1799            "uri": "https://slancha.ai/wire/ext/v0.5",
1800            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1801            "required": false,
1802            "params": {
1803                "did": rec.did,
1804                "handle": rec.nick,
1805                "slot_id": rec.slot_id,
1806                "relay_url": rec.relay_url,
1807                "card": rec.card,
1808                "profile": profile,
1809                "claimed_at": rec.claimed_at,
1810            }
1811        }],
1812        "signature": card_sig,
1813    });
1814    (StatusCode::OK, Json(a2a_card)).into_response()
1815}
1816
1817async fn well_known_agent(
1818    State(relay): State<Relay>,
1819    Query(q): Query<WellKnownAgentQuery>,
1820) -> impl IntoResponse {
1821    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1822    if nick.is_empty() {
1823        return (
1824            StatusCode::BAD_REQUEST,
1825            Json(json!({"error": "handle missing nick"})),
1826        )
1827            .into_response();
1828    }
1829    let inner = relay.inner.lock().await;
1830    match inner.handles.get(&nick) {
1831        Some(rec) => (
1832            StatusCode::OK,
1833            Json(json!({
1834                "nick": rec.nick,
1835                "did": rec.did,
1836                "card": rec.card,
1837                "slot_id": rec.slot_id,
1838                "relay_url": rec.relay_url,
1839                "claimed_at": rec.claimed_at,
1840            })),
1841        )
1842            .into_response(),
1843        None => (
1844            StatusCode::NOT_FOUND,
1845            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1846        )
1847            .into_response(),
1848    }
1849}
1850
1851async fn list_events(
1852    State(relay): State<Relay>,
1853    Path(slot_id): Path<String>,
1854    Query(q): Query<ListEventsQuery>,
1855    headers: HeaderMap,
1856) -> impl IntoResponse {
1857    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1858        return resp;
1859    }
1860    let limit = q.limit.unwrap_or(100).min(1000);
1861    let mut inner = relay.inner.lock().await;
1862    // R4: record this pull as proof that the slot owner is still polling.
1863    // Anyone holding the slot_token (i.e., a paired peer) can later read
1864    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1865    let now_unix = std::time::SystemTime::now()
1866        .duration_since(std::time::UNIX_EPOCH)
1867        .map(|d| d.as_secs())
1868        .unwrap_or(0);
1869    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1870    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1871    let start = match q.since {
1872        Some(eid) => events
1873            .iter()
1874            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1875            .map(|i| i + 1)
1876            .unwrap_or(0),
1877        None => 0,
1878    };
1879    let end = (start + limit).min(events.len());
1880    let slice = events[start..end].to_vec();
1881    (StatusCode::OK, Json(slice)).into_response()
1882}
1883
1884/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1885/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1886/// recent `list_events` call, in unix seconds) and `event_count` (total
1887/// stored). A remote sender uses this before `wire send <peer>` to warn the
1888/// operator if the peer hasn't polled recently.
1889async fn slot_state(
1890    State(relay): State<Relay>,
1891    Path(slot_id): Path<String>,
1892    headers: HeaderMap,
1893) -> impl IntoResponse {
1894    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1895        return resp;
1896    }
1897    let inner = relay.inner.lock().await;
1898    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1899    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1900    let responder_health = inner.responder_health.get(&slot_id).cloned();
1901    (
1902        StatusCode::OK,
1903        Json(json!({
1904            "slot_id": slot_id,
1905            "event_count": event_count,
1906            "last_pull_at_unix": last_pull_at_unix,
1907            "responder_health": responder_health,
1908        })),
1909    )
1910        .into_response()
1911}
1912
1913async fn responder_health_set(
1914    State(relay): State<Relay>,
1915    Path(slot_id): Path<String>,
1916    headers: HeaderMap,
1917    Json(record): Json<ResponderHealthRecord>,
1918) -> impl IntoResponse {
1919    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1920        return resp;
1921    }
1922    let path = relay
1923        .state_dir
1924        .join("responder-health")
1925        .join(format!("{slot_id}.json"));
1926    let body = match serde_json::to_vec_pretty(&record) {
1927        Ok(b) => b,
1928        Err(e) => {
1929            return (
1930                StatusCode::INTERNAL_SERVER_ERROR,
1931                Json(json!({"error": format!("serialize failed: {e}")})),
1932            )
1933                .into_response();
1934        }
1935    };
1936    if let Err(e) = tokio::fs::write(&path, body).await {
1937        return (
1938            StatusCode::INTERNAL_SERVER_ERROR,
1939            Json(json!({"error": format!("persist failed: {e}")})),
1940        )
1941            .into_response();
1942    }
1943    {
1944        let mut inner = relay.inner.lock().await;
1945        inner
1946            .responder_health
1947            .insert(slot_id.clone(), record.clone());
1948    }
1949    (StatusCode::OK, Json(record)).into_response()
1950}
1951
1952async fn check_token(
1953    relay: &Relay,
1954    headers: &HeaderMap,
1955    slot_id: &str,
1956) -> std::result::Result<(), axum::response::Response> {
1957    let auth = headers
1958        .get(AUTHORIZATION)
1959        .and_then(|h| h.to_str().ok())
1960        .and_then(|s| s.strip_prefix("Bearer "))
1961        .map(str::to_string);
1962    let presented = match auth {
1963        Some(t) => t,
1964        None => {
1965            return Err((
1966                StatusCode::UNAUTHORIZED,
1967                Json(json!({"error": "missing Bearer token"})),
1968            )
1969                .into_response());
1970        }
1971    };
1972    let inner = relay.inner.lock().await;
1973    let expected = match inner.tokens.get(slot_id) {
1974        Some(t) => t.clone(),
1975        None => {
1976            return Err((
1977                StatusCode::NOT_FOUND,
1978                Json(json!({"error": "unknown slot"})),
1979            )
1980                .into_response());
1981        }
1982    };
1983    drop(inner);
1984    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
1985        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
1986    }
1987    Ok(())
1988}
1989
1990fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1991    if a.len() != b.len() {
1992        return false;
1993    }
1994    let mut acc = 0u8;
1995    for (x, y) in a.iter().zip(b.iter()) {
1996        acc |= x ^ y;
1997    }
1998    acc == 0
1999}
2000
2001fn is_valid_slot_id(s: &str) -> bool {
2002    s.len() == 32
2003        && s.bytes()
2004            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2005}
2006
2007fn random_hex(n_bytes: usize) -> String {
2008    let mut buf = vec![0u8; n_bytes];
2009    rand::thread_rng().fill_bytes(&mut buf);
2010    hex::encode(buf)
2011}
2012
2013/// Run the relay until SIGINT/SIGTERM.
2014pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2015    let relay = Relay::new(state_dir).await?;
2016    relay.spawn_pair_sweeper();
2017    relay.spawn_counter_persister();
2018    let app = relay.clone().router();
2019    let listener = tokio::net::TcpListener::bind(bind)
2020        .await
2021        .with_context(|| format!("binding {bind}"))?;
2022    eprintln!("wire relay-server listening on {bind}");
2023    let shutdown_relay = relay.clone();
2024    axum::serve(listener, app)
2025        .with_graceful_shutdown(async move {
2026            let _ = tokio::signal::ctrl_c().await;
2027            eprintln!("\nshutting down — final counter snapshot");
2028            if let Err(e) = shutdown_relay.persist_counters().await {
2029                eprintln!("final counter persist failed: {e}");
2030            }
2031        })
2032        .await?;
2033    Ok(())
2034}
2035
2036#[cfg(test)]
2037mod tests {
2038    use super::*;
2039
2040    #[test]
2041    fn constant_time_eq_basic() {
2042        assert!(constant_time_eq(b"abc", b"abc"));
2043        assert!(!constant_time_eq(b"abc", b"abd"));
2044        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2045    }
2046
2047    #[test]
2048    fn random_hex_length() {
2049        let s = random_hex(16);
2050        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2051        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2052    }
2053
2054    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2055    async fn pair_slot_evicts_when_idle_past_ttl() {
2056        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2057        let _ = std::fs::remove_dir_all(&dir);
2058        let relay = Relay::new(dir.clone()).await.unwrap();
2059
2060        // Seed a pair-slot manually with a past last_touched.
2061        {
2062            let mut inner = relay.inner.lock().await;
2063            inner
2064                .pair_lookup
2065                .insert("hash-A".to_string(), "id-A".to_string());
2066            inner.pair_slots.insert(
2067                "id-A".to_string(),
2068                PairSlot {
2069                    last_touched: std::time::Instant::now()
2070                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2071                    ..PairSlot::default()
2072                },
2073            );
2074
2075            // And a fresh one — should survive.
2076            inner
2077                .pair_lookup
2078                .insert("hash-B".to_string(), "id-B".to_string());
2079            inner
2080                .pair_slots
2081                .insert("id-B".to_string(), PairSlot::default());
2082
2083            assert_eq!(inner.pair_slots.len(), 2);
2084            assert_eq!(inner.pair_lookup.len(), 2);
2085        }
2086
2087        relay.evict_expired_pair_slots().await;
2088
2089        let inner = relay.inner.lock().await;
2090        assert_eq!(
2091            inner.pair_slots.len(),
2092            1,
2093            "expired slot should have been evicted"
2094        );
2095        assert!(inner.pair_slots.contains_key("id-B"));
2096        assert_eq!(inner.pair_lookup.len(), 1);
2097        assert!(inner.pair_lookup.contains_key("hash-B"));
2098        let _ = std::fs::remove_dir_all(&dir);
2099    }
2100
2101    #[test]
2102    fn slot_id_validator_accepts_only_lowercase_32hex() {
2103        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2104        assert!(is_valid_slot_id(&random_hex(16)));
2105        // wrong length
2106        assert!(!is_valid_slot_id("abc"));
2107        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2108        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2109        // uppercase
2110        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2111        // path traversal attempts
2112        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2113        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2114        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2115        // null bytes
2116        assert!(!is_valid_slot_id(
2117            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2118        ));
2119    }
2120}