Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct ResponderHealthRecord {
172    pub status: String,
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub reason: Option<String>,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub last_success_at: Option<String>,
177    pub set_at: String,
178}
179
180#[derive(Clone, Debug)]
181struct PairSlot {
182    /// SPAKE2 message from the host side.
183    host_msg: Option<String>,
184    /// SPAKE2 message from the guest side.
185    guest_msg: Option<String>,
186    /// Sealed bootstrap payload from host (after SAS confirm).
187    host_bootstrap: Option<String>,
188    /// Sealed bootstrap payload from guest.
189    guest_bootstrap: Option<String>,
190    /// Last activity time (monotonic) — used for TTL eviction.
191    last_touched: std::time::Instant,
192}
193
194impl Default for PairSlot {
195    fn default() -> Self {
196        Self {
197            host_msg: None,
198            guest_msg: None,
199            host_bootstrap: None,
200            guest_bootstrap: None,
201            last_touched: std::time::Instant::now(),
202        }
203    }
204}
205
206/// Pair-slot idle TTL. After this many seconds without activity, the slot
207/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
208const PAIR_SLOT_TTL_SECS: u64 = 300;
209
210#[derive(Deserialize)]
211pub struct AllocateRequest {
212    /// Optional handle hint — purely informational, server doesn't enforce.
213    #[serde(default)]
214    pub handle: Option<String>,
215}
216
217#[derive(Deserialize)]
218pub struct PostEventRequest {
219    pub event: Value,
220}
221
222#[derive(Deserialize)]
223pub struct ListEventsQuery {
224    /// Resume from after this event_id (exclusive). Omit for full slot read.
225    pub since: Option<String>,
226    /// Max events to return. Default 100, max 1000.
227    pub limit: Option<usize>,
228}
229
230impl Relay {
231    pub async fn new(state_dir: PathBuf) -> Result<Self> {
232        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
233        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
234        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
235        let mut inner = Inner {
236            slots: HashMap::new(),
237            tokens: HashMap::new(),
238            slot_bytes: HashMap::new(),
239            last_pull_at_unix: HashMap::new(),
240            streams: HashMap::new(),
241            pair_lookup: HashMap::new(),
242            pair_slots: HashMap::new(),
243            handles: HashMap::new(),
244            responder_health: HashMap::new(),
245            invites: HashMap::new(),
246        };
247        // Reload tokens
248        let token_path = state_dir.join("tokens.json");
249        if token_path.exists() {
250            let body = tokio::fs::read_to_string(&token_path).await?;
251            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
252        }
253        // Reload slots from JSONL
254        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
255        while let Some(entry) = slots_dir.next_entry().await? {
256            let path = entry.path();
257            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
258                continue;
259            }
260            let stem = match path.file_stem().and_then(|s| s.to_str()) {
261                Some(s) => s.to_string(),
262                None => continue,
263            };
264            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
265            let mut events = Vec::new();
266            for line in body.lines() {
267                if let Ok(v) = serde_json::from_str::<Value>(line) {
268                    events.push(v);
269                }
270            }
271            // Recompute byte usage for the slot from its persisted events.
272            let bytes: usize = events
273                .iter()
274                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
275                .sum();
276            inner.slot_bytes.insert(stem.clone(), bytes);
277            inner.slots.insert(stem, events);
278        }
279        // Reload handle directory (v0.5).
280        let handles_dir = state_dir.join("handles");
281        if handles_dir.exists() {
282            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
283            while let Some(entry) = rd.next_entry().await? {
284                let path = entry.path();
285                if path.extension().and_then(|x| x.to_str()) != Some("json") {
286                    continue;
287                }
288                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
289                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
290                    inner.handles.insert(rec.nick.clone(), rec);
291                }
292            }
293        }
294        // Reload responder health records (R3).
295        let responder_health_dir = state_dir.join("responder-health");
296        if responder_health_dir.exists() {
297            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
298            while let Some(entry) = rd.next_entry().await? {
299                let path = entry.path();
300                if path.extension().and_then(|x| x.to_str()) != Some("json") {
301                    continue;
302                }
303                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
304                    continue;
305                };
306                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
307                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
308                    inner.responder_health.insert(slot_id.to_string(), rec);
309                }
310            }
311        }
312        // Reload short-URL invites. JSONL append-only; later entries with
313        // the same token overwrite earlier (won't happen — tokens are
314        // unique by construction — but coded defensively).
315        let invites_path = state_dir.join("invites.jsonl");
316        if invites_path.exists() {
317            let now_unix = SystemTime::now()
318                .duration_since(UNIX_EPOCH)
319                .map(|d| d.as_secs())
320                .unwrap_or(0);
321            let body = tokio::fs::read_to_string(&invites_path)
322                .await
323                .unwrap_or_default();
324            for line in body.lines() {
325                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
326                    && rec.expires_unix > now_unix
327                {
328                    inner.invites.insert(rec.token.clone(), rec);
329                }
330            }
331        }
332        let boot_unix = SystemTime::now()
333            .duration_since(UNIX_EPOCH)
334            .map(|d| d.as_secs())
335            .unwrap_or(0);
336        // Reload counter snapshot. Missing/corrupt file → start at zero.
337        let snap: CountersSnapshot =
338            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
339                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
340                Err(_) => CountersSnapshot::default(),
341            };
342        Ok(Self {
343            inner: Arc::new(Mutex::new(inner)),
344            state_dir,
345            counters: Arc::new(RelayCounters {
346                boot_unix,
347                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
348                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
349                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
350                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
351                events_posted_total: AtomicU64::new(snap.events_posted_total),
352            }),
353        })
354    }
355
356    pub fn router(self) -> Router {
357        self.router_with_mode(ServerMode::default())
358    }
359
360    pub fn router_with_mode(self, mode: ServerMode) -> Router {
361        // Rate limit applied to write endpoints that create new state (slots,
362        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
363        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
364        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
365        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
366        // global cap shoulder DDoS protection in series.
367        let governor_conf = std::sync::Arc::new(
368            GovernorConfigBuilder::default()
369                .per_second(10)
370                .burst_size(50)
371                .key_extractor(GlobalKeyExtractor)
372                .finish()
373                .expect("valid governor config"),
374        );
375        let governor_layer = GovernorLayer {
376            config: governor_conf,
377        };
378
379        // Hot writes group — rate limited.
380        let hot_writes = Router::new()
381            .route("/v1/slot/allocate", post(allocate_slot))
382            .route("/v1/pair", post(pair_open))
383            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
384            .route("/v1/pair/abandon", post(pair_abandon))
385            .layer(governor_layer);
386
387        // Core data-plane routes: events, pair, slots, healthz. Always
388        // present, in both federation and local-only modes.
389        let mut router = Router::new()
390            .route("/healthz", get(healthz))
391            .route("/v1/events/:slot_id", post(post_event).get(list_events))
392            .route("/v1/slot/:slot_id/state", get(slot_state))
393            .route(
394                "/v1/slot/:slot_id/responder-health",
395                post(responder_health_set),
396            )
397            .route("/v1/events/:slot_id/stream", get(stream_events))
398            .route("/v1/pair/:pair_id", get(pair_get))
399            .route("/v1/handle/intro/:nick", post(handle_intro));
400
401        // Discovery + landing surfaces: phonebook, well-known agent cards,
402        // landing page, stats, invite landing. In `--local-only` mode we
403        // skip all of these — the relay becomes invisible from the outside
404        // (and from other agents on the same box that might enumerate it).
405        // This is the v0.5.17 within-machine privacy guarantee.
406        if !mode.local_only {
407            router = router
408                .route("/", get(landing_index))
409                .route("/favicon.svg", get(landing_favicon))
410                .route("/og.png", get(landing_og))
411                .route("/demo.cast", get(landing_demo_cast))
412                .route("/install", get(landing_install_sh))
413                .route("/install.sh", get(landing_install_sh))
414                .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
415                .route("/stats", get(stats_root))
416                .route("/stats.json", get(stats_json))
417                .route("/stats.html", get(landing_stats_html))
418                .route("/stats.history", get(stats_history))
419                .route("/phonebook", get(landing_phonebook_html))
420                .route("/phonebook.html", get(landing_phonebook_html))
421                .route("/v1/handle/claim", post(handle_claim))
422                .route("/v1/handles", get(handles_directory))
423                .route("/v1/invite/register", post(invite_register))
424                .route("/i/:token", get(invite_script))
425                .route("/.well-known/wire/agent", get(well_known_agent))
426                .route(
427                    "/.well-known/agent-card.json",
428                    get(well_known_agent_card_a2a),
429                );
430        } else {
431            // Local-only mode still needs handle_claim for in-process
432            // session bootstrap (`wire session new` allocates a local
433            // slot AND claims a handle on the local relay so peers can
434            // resolve it). The claim is gated to loopback-only callers
435            // by the bind, not by the route.
436            router = router.route("/v1/handle/claim", post(handle_claim));
437        }
438
439        router.merge(hot_writes).with_state(self)
440    }
441
442    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
443    /// Called inline on every pair-slot mutation; a background sweeper task
444    /// (see `Self::start_sweeper`) covers the long-idle case.
445    async fn evict_expired_pair_slots(&self) {
446        let now = std::time::Instant::now();
447        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
448        let mut inner = self.inner.lock().await;
449        let mut to_remove = Vec::new();
450        for (id, slot) in inner.pair_slots.iter() {
451            if now.duration_since(slot.last_touched) > ttl {
452                to_remove.push(id.clone());
453            }
454        }
455        for id in to_remove {
456            inner.pair_slots.remove(&id);
457            inner.pair_lookup.retain(|_, v| v != &id);
458        }
459    }
460
461    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
462    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
463    /// — process exit reaps it. Safe to skip in tests where you'd rather test
464    /// eviction inline.
465    pub fn spawn_pair_sweeper(&self) {
466        let me = self.clone();
467        tokio::spawn(async move {
468            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
469            loop {
470                tick.tick().await;
471                me.evict_expired_pair_slots().await;
472            }
473        });
474    }
475
476    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
477    /// every 30s by `spawn_counter_persister` and once during graceful
478    /// shutdown so a deploy doesn't reset the running totals.
479    pub async fn persist_counters(&self) -> Result<()> {
480        let snap = CountersSnapshot {
481            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
482            handle_first_claims_total: self
483                .counters
484                .handle_first_claims_total
485                .load(Ordering::Relaxed),
486            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
487            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
488            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
489        };
490        let body = serde_json::to_vec_pretty(&snap)?;
491        let path = self.state_dir.join("counters.json");
492        tokio::fs::write(path, body).await?;
493        Ok(())
494    }
495
496    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
497    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
498    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
499    /// roll old entries off once the history exceeds 90 days.
500    pub async fn append_history(&self) -> Result<()> {
501        use tokio::io::AsyncWriteExt;
502        let now = SystemTime::now()
503            .duration_since(UNIX_EPOCH)
504            .map(|d| d.as_secs())
505            .unwrap_or(0);
506        let (handles_active, slots_active, pair_slots_open, streams_active) = {
507            let inner = self.inner.lock().await;
508            (
509                inner.handles.len(),
510                inner.slots.len(),
511                inner.pair_slots.len(),
512                inner.streams.values().map(Vec::len).sum::<usize>(),
513            )
514        };
515        let entry = HistoryEntry {
516            ts: now,
517            handles_active,
518            slots_active,
519            pair_slots_open,
520            streams_active,
521            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
522            handle_first_claims_total: self
523                .counters
524                .handle_first_claims_total
525                .load(Ordering::Relaxed),
526            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
527            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
528            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
529        };
530        let line = serde_json::to_vec(&entry)?;
531        let path = self.state_dir.join("stats-history.jsonl");
532        let mut f = tokio::fs::OpenOptions::new()
533            .create(true)
534            .append(true)
535            .open(&path)
536            .await?;
537        f.write_all(&line).await?;
538        f.write_all(b"\n").await?;
539        f.flush().await?;
540        Ok(())
541    }
542
543    /// Spawn a background tokio task that calls `persist_counters` every 30s
544    /// + appends a history row on the same tick. Loss bound: counters can
545    ///   drift back up to 30s on crash, history can drop one row.
546    pub fn spawn_counter_persister(&self) {
547        let me = self.clone();
548        tokio::spawn(async move {
549            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
550            // First tick fires immediately; skip it so we don't write the
551            // freshly-loaded snapshot back unchanged.
552            tick.tick().await;
553            loop {
554                tick.tick().await;
555                if let Err(e) = me.persist_counters().await {
556                    eprintln!("counter persist failed: {e}");
557                }
558                if let Err(e) = me.append_history().await {
559                    eprintln!("history append failed: {e}");
560                }
561            }
562        });
563    }
564
565    async fn persist_tokens(&self) -> Result<()> {
566        let body = {
567            let inner = self.inner.lock().await;
568            serde_json::to_string_pretty(&inner.tokens)?
569        };
570        let path = self.state_dir.join("tokens.json");
571        tokio::fs::write(path, body).await?;
572        Ok(())
573    }
574
575    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
576        // Defense in depth: only allow lowercase hex slot_ids of the exact length
577        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
578        // any future code path that might let attacker-controlled slot_ids reach
579        // disk operations. allocate_slot() always meets this; this assert is
580        // belt-and-suspenders against future regressions.
581        if !is_valid_slot_id(slot_id) {
582            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
583        }
584        let path = self
585            .state_dir
586            .join("slots")
587            .join(format!("{slot_id}.jsonl"));
588        let mut line = serde_json::to_vec(event)?;
589        line.push(b'\n');
590        use tokio::io::AsyncWriteExt;
591        let mut f = tokio::fs::OpenOptions::new()
592            .create(true)
593            .append(true)
594            .open(&path)
595            .await
596            .with_context(|| format!("opening {path:?}"))?;
597        f.write_all(&line).await?;
598        f.flush().await?;
599        Ok(())
600    }
601}
602
603async fn healthz() -> impl IntoResponse {
604    (StatusCode::OK, "ok\n")
605}
606
607// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
608// process restart; state fields (`handles_active`, `slots_active`) survive
609// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
610async fn stats_history(
611    State(relay): State<Relay>,
612    Query(q): Query<StatsHistoryQuery>,
613) -> impl IntoResponse {
614    let hours = q.hours.unwrap_or(24).min(168);
615    let now = SystemTime::now()
616        .duration_since(UNIX_EPOCH)
617        .map(|d| d.as_secs())
618        .unwrap_or(0);
619    let cutoff = now.saturating_sub(hours * 3600);
620    let path = relay.state_dir.join("stats-history.jsonl");
621    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
622    let entries: Vec<Value> = body
623        .lines()
624        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
625        .filter(|v| {
626            v.get("ts")
627                .and_then(Value::as_u64)
628                .map(|t| t >= cutoff)
629                .unwrap_or(false)
630        })
631        .collect();
632    (
633        StatusCode::OK,
634        Json(json!({
635            "hours": hours,
636            "now_unix": now,
637            "count": entries.len(),
638            "entries": entries,
639        })),
640    )
641}
642
643async fn landing_stats_html() -> impl IntoResponse {
644    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
645    (
646        StatusCode::OK,
647        [
648            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
649            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
650        ],
651        STATS_HTML,
652    )
653}
654
655async fn landing_phonebook_html() -> impl IntoResponse {
656    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
657    (
658        StatusCode::OK,
659        [
660            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
661            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
662        ],
663        PHONEBOOK_HTML,
664    )
665}
666
667/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
668/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
669/// Keeps the JSON contract intact while letting humans land on the page at
670/// the short URL.
671async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
672    let wants_html = headers
673        .get(axum::http::header::ACCEPT)
674        .and_then(|v| v.to_str().ok())
675        .unwrap_or("")
676        .contains("text/html");
677    if wants_html {
678        landing_stats_html().await.into_response()
679    } else {
680        stats_json(State(relay)).await.into_response()
681    }
682}
683
684async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
685    let now = SystemTime::now()
686        .duration_since(UNIX_EPOCH)
687        .map(|d| d.as_secs())
688        .unwrap_or(0);
689    let inner = relay.inner.lock().await;
690    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
691    let body = json!({
692        "version": env!("CARGO_PKG_VERSION"),
693        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
694        "handles_active": inner.handles.len(),
695        "slots_active": inner.slots.len(),
696        "pair_slots_open": inner.pair_slots.len(),
697        "streams_active": streams_active,
698        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
699        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
700        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
701        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
702        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
703    });
704    (StatusCode::OK, Json(body))
705}
706
707// Static landing site baked into the binary so apex (wireup.net) can flip
708// straight to Fly without a separate static-host. ~37 KB total — negligible
709// against the release binary size, and keeps the relay self-contained.
710async fn landing_index() -> impl IntoResponse {
711    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
712    (
713        StatusCode::OK,
714        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
715        INDEX_HTML,
716    )
717}
718
719async fn landing_favicon() -> impl IntoResponse {
720    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
721    (
722        StatusCode::OK,
723        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
724        FAVICON_SVG,
725    )
726}
727
728async fn landing_og() -> impl IntoResponse {
729    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
730    (
731        StatusCode::OK,
732        [
733            (axum::http::header::CONTENT_TYPE, "image/png"),
734            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
735        ],
736        OG_PNG,
737    )
738}
739
740async fn landing_demo_cast() -> impl IntoResponse {
741    static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
742    (
743        StatusCode::OK,
744        [
745            (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
746            (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
747        ],
748        DEMO_CAST,
749    )
750}
751
752async fn landing_install_sh() -> impl IntoResponse {
753    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
754    (
755        StatusCode::OK,
756        [
757            (
758                axum::http::header::CONTENT_TYPE,
759                "text/x-shellscript; charset=utf-8",
760            ),
761            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
762        ],
763        INSTALL_SH,
764    )
765}
766
767async fn landing_openshell_policy_sh() -> impl IntoResponse {
768    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
769    (
770        StatusCode::OK,
771        [
772            (
773                axum::http::header::CONTENT_TYPE,
774                "text/x-shellscript; charset=utf-8",
775            ),
776            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
777        ],
778        POLICY_SH,
779    )
780}
781
782async fn allocate_slot(
783    State(relay): State<Relay>,
784    Json(_req): Json<AllocateRequest>,
785) -> impl IntoResponse {
786    let slot_id = random_hex(16);
787    let slot_token = random_hex(32);
788    {
789        let mut inner = relay.inner.lock().await;
790        inner.slots.insert(slot_id.clone(), Vec::new());
791        inner.tokens.insert(slot_id.clone(), slot_token.clone());
792    }
793    if let Err(e) = relay.persist_tokens().await {
794        return (
795            StatusCode::INTERNAL_SERVER_ERROR,
796            Json(json!({"error": format!("persist failed: {e}")})),
797        )
798            .into_response();
799    }
800    relay
801        .counters
802        .slot_allocations_total
803        .fetch_add(1, Ordering::Relaxed);
804    (
805        StatusCode::CREATED,
806        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
807    )
808        .into_response()
809}
810
811async fn post_event(
812    State(relay): State<Relay>,
813    Path(slot_id): Path<String>,
814    headers: HeaderMap,
815    Json(req): Json<PostEventRequest>,
816) -> impl IntoResponse {
817    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
818        return resp;
819    }
820    // Body size cap (rough; serialize and check).
821    let body_bytes = match serde_json::to_vec(&req.event) {
822        Ok(b) => b,
823        Err(e) => {
824            return (
825                StatusCode::BAD_REQUEST,
826                Json(json!({"error": format!("event not serializable: {e}")})),
827            )
828                .into_response();
829        }
830    };
831    if body_bytes.len() > MAX_EVENT_BYTES {
832        return (
833            StatusCode::PAYLOAD_TOO_LARGE,
834            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
835        )
836            .into_response();
837    }
838    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
839    {
840        let inner = relay.inner.lock().await;
841        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
842        if used + body_bytes.len() > MAX_SLOT_BYTES {
843            return (
844                StatusCode::PAYLOAD_TOO_LARGE,
845                Json(json!({
846                    "error": "slot quota exceeded",
847                    "slot_bytes_used": used,
848                    "slot_bytes_max": MAX_SLOT_BYTES,
849                    "remediation": "operator should `wire rotate-slot` to drain old slot",
850                })),
851            )
852                .into_response();
853        }
854    }
855    let event_id = req
856        .event
857        .get("event_id")
858        .and_then(Value::as_str)
859        .map(str::to_string);
860
861    // Dedupe by event_id if present.
862    let dup = {
863        let inner = relay.inner.lock().await;
864        let slot = inner.slots.get(&slot_id);
865        if let (Some(eid), Some(slot)) = (&event_id, slot) {
866            slot.iter()
867                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
868        } else {
869            false
870        }
871    };
872    if dup {
873        return (
874            StatusCode::OK,
875            Json(json!({"event_id": event_id, "status": "duplicate"})),
876        )
877            .into_response();
878    }
879
880    {
881        let mut inner = relay.inner.lock().await;
882        let event_size = body_bytes.len();
883        let slot = inner.slots.entry(slot_id.clone()).or_default();
884        slot.push(req.event.clone());
885        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
886    }
887    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
888        return (
889            StatusCode::INTERNAL_SERVER_ERROR,
890            Json(json!({"error": format!("persist failed: {e}")})),
891        )
892            .into_response();
893    }
894    relay
895        .counters
896        .events_posted_total
897        .fetch_add(1, Ordering::Relaxed);
898    // R1 push: broadcast the new event to every active SSE subscriber on
899    // this slot. Dead channels are pruned in-place. The broadcast happens
900    // AFTER the disk persist so subscribers and disk readers see the same
901    // events; on persist failure we already returned 500 above.
902    {
903        let mut inner = relay.inner.lock().await;
904        if let Some(subs) = inner.streams.get_mut(&slot_id) {
905            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
906        }
907    }
908    (
909        StatusCode::CREATED,
910        Json(json!({"event_id": event_id, "status": "stored"})),
911    )
912        .into_response()
913}
914
915/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
916/// (same as `list_events`). The connection registers an `UnboundedSender`
917/// on the slot's subscriber list; every subsequent `post_event` to the slot
918/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
919/// connection stays open until the client disconnects.
920///
921/// A 30-second keepalive ping is emitted automatically so reverse proxies
922/// (Cloudflare tunnel, nginx) don't time out the upstream.
923///
924/// Note: the subscriber sees events posted AFTER it subscribed. To catch
925/// up on history first, the client should call `GET /v1/events/:slot_id`
926/// with `since=` before opening the stream.
927async fn stream_events(
928    State(relay): State<Relay>,
929    Path(slot_id): Path<String>,
930    headers: HeaderMap,
931) -> axum::response::Response {
932    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
933    use futures::stream::StreamExt;
934
935    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
936        return resp;
937    }
938
939    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
940    {
941        let mut inner = relay.inner.lock().await;
942        inner.streams.entry(slot_id.clone()).or_default().push(tx);
943    }
944
945    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
946        SseEvent::default()
947            .json_data(&ev)
948            .map_err(|e| std::io::Error::other(e.to_string()))
949    });
950
951    Sse::new(stream)
952        .keep_alive(
953            KeepAlive::new()
954                .interval(std::time::Duration::from_secs(30))
955                .text("phyllis: still on the line"),
956        )
957        .into_response()
958}
959
960// ---------- pair-slot handlers ----------
961
962#[derive(Deserialize)]
963pub struct PairOpenRequest {
964    pub code_hash: String,
965    /// SPAKE2 message (base64).
966    pub msg: String,
967    pub role: String, // "host" or "guest"
968}
969
970#[derive(Deserialize)]
971pub struct PairBootstrapRequest {
972    pub role: String,
973    pub sealed: String,
974}
975
976#[derive(Deserialize)]
977pub struct PairAbandonRequest {
978    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
979    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
980    pub code_hash: String,
981}
982
983/// Forget the pair-slot associated with this code_hash. Either side can call;
984/// no auth beyond knowledge of the code (which is the shared secret of this
985/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
986/// Used by clients to recover after a crash mid-handshake, so the host doesn't
987/// stay locked out until the 5-minute TTL.
988async fn pair_abandon(
989    State(relay): State<Relay>,
990    Json(req): Json<PairAbandonRequest>,
991) -> impl IntoResponse {
992    let mut inner = relay.inner.lock().await;
993    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
994        inner.pair_slots.remove(&pair_id);
995    }
996    StatusCode::NO_CONTENT.into_response()
997}
998
999async fn pair_open(
1000    State(relay): State<Relay>,
1001    Json(req): Json<PairOpenRequest>,
1002) -> impl IntoResponse {
1003    if req.role != "host" && req.role != "guest" {
1004        return (
1005            StatusCode::BAD_REQUEST,
1006            Json(json!({"error": "role must be 'host' or 'guest'"})),
1007        )
1008            .into_response();
1009    }
1010    relay.evict_expired_pair_slots().await;
1011    let mut inner = relay.inner.lock().await;
1012    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1013        Some(id) => id,
1014        None => {
1015            let new_id = random_hex(16);
1016            inner
1017                .pair_lookup
1018                .insert(req.code_hash.clone(), new_id.clone());
1019            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1020            relay
1021                .counters
1022                .pair_opens_total
1023                .fetch_add(1, Ordering::Relaxed);
1024            new_id
1025        }
1026    };
1027    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1028    slot.last_touched = std::time::Instant::now();
1029    if req.role == "host" {
1030        if slot.host_msg.is_some() {
1031            return (
1032                StatusCode::CONFLICT,
1033                Json(json!({"error": "host already registered for this code"})),
1034            )
1035                .into_response();
1036        }
1037        slot.host_msg = Some(req.msg);
1038    } else {
1039        if slot.guest_msg.is_some() {
1040            return (
1041                StatusCode::CONFLICT,
1042                Json(json!({"error": "guest already registered for this code"})),
1043            )
1044                .into_response();
1045        }
1046        slot.guest_msg = Some(req.msg);
1047    }
1048    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1049}
1050
1051#[derive(Deserialize)]
1052pub struct PairGetQuery {
1053    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1054    pub as_role: String,
1055}
1056
1057async fn pair_get(
1058    State(relay): State<Relay>,
1059    Path(pair_id): Path<String>,
1060    Query(q): Query<PairGetQuery>,
1061) -> impl IntoResponse {
1062    relay.evict_expired_pair_slots().await;
1063    let mut inner = relay.inner.lock().await;
1064    let slot = match inner.pair_slots.get_mut(&pair_id) {
1065        Some(s) => {
1066            s.last_touched = std::time::Instant::now();
1067            s.clone()
1068        }
1069        None => {
1070            return (
1071                StatusCode::NOT_FOUND,
1072                Json(json!({"error": "unknown pair_id"})),
1073            )
1074                .into_response();
1075        }
1076    };
1077    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1078        "host" => (slot.guest_msg, slot.guest_bootstrap),
1079        "guest" => (slot.host_msg, slot.host_bootstrap),
1080        _ => {
1081            return (
1082                StatusCode::BAD_REQUEST,
1083                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1084            )
1085                .into_response();
1086        }
1087    };
1088    (
1089        StatusCode::OK,
1090        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1091    )
1092        .into_response()
1093}
1094
1095async fn pair_bootstrap(
1096    State(relay): State<Relay>,
1097    Path(pair_id): Path<String>,
1098    Json(req): Json<PairBootstrapRequest>,
1099) -> impl IntoResponse {
1100    relay.evict_expired_pair_slots().await;
1101    let mut inner = relay.inner.lock().await;
1102    let slot = match inner.pair_slots.get_mut(&pair_id) {
1103        Some(s) => s,
1104        None => {
1105            return (
1106                StatusCode::NOT_FOUND,
1107                Json(json!({"error": "unknown pair_id"})),
1108            )
1109                .into_response();
1110        }
1111    };
1112    slot.last_touched = std::time::Instant::now();
1113    match req.role.as_str() {
1114        "host" => slot.host_bootstrap = Some(req.sealed),
1115        "guest" => slot.guest_bootstrap = Some(req.sealed),
1116        _ => {
1117            return (
1118                StatusCode::BAD_REQUEST,
1119                Json(json!({"error": "role must be 'host' or 'guest'"})),
1120            )
1121                .into_response();
1122        }
1123    }
1124    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1125}
1126
1127// ---------- handle directory (v0.5) ----------
1128
1129#[derive(Deserialize)]
1130pub struct HandleClaimRequest {
1131    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1132    /// domain the relay's `.well-known` is served from.
1133    pub nick: String,
1134    /// Slot id the claimant owns on this relay (proves they allocated here).
1135    pub slot_id: String,
1136    /// Optional public-facing relay URL the relay should advertise back in
1137    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1138    /// know the relay URL out-of-band.
1139    pub relay_url: Option<String>,
1140    /// Claimant's full signed agent-card (includes DID + verify_keys +
1141    /// optional profile).
1142    pub card: Value,
1143}
1144
1145/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1146///
1147/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1148/// slot rotation). Different-DID claims on a taken nick return 409.
1149/// Caller must (a) own the `slot_id` they reference (verified by token
1150/// being present), and (b) submit a card with a valid self-signature.
1151async fn handle_claim(
1152    State(relay): State<Relay>,
1153    headers: HeaderMap,
1154    Json(req): Json<HandleClaimRequest>,
1155) -> impl IntoResponse {
1156    // Bearer auth: claimant must hold the slot_token for the slot they
1157    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1158    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1159        return resp;
1160    }
1161    // Validate nick (same rules as the client-side parser).
1162    if !crate::pair_profile::is_valid_nick(&req.nick) {
1163        return (
1164            StatusCode::BAD_REQUEST,
1165            Json(json!({
1166                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1167                "nick": req.nick,
1168            })),
1169        )
1170            .into_response();
1171    }
1172    // Verify the card signature using the public verify_agent_card helper.
1173    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1174        return (
1175            StatusCode::BAD_REQUEST,
1176            Json(json!({"error": format!("card signature invalid: {e}")})),
1177        )
1178            .into_response();
1179    }
1180    let did = match req.card.get("did").and_then(Value::as_str) {
1181        Some(d) => d.to_string(),
1182        None => {
1183            return (
1184                StatusCode::BAD_REQUEST,
1185                Json(json!({"error": "card missing 'did' field"})),
1186            )
1187                .into_response();
1188        }
1189    };
1190
1191    // FCFS check.
1192    let first_claim = {
1193        let inner = relay.inner.lock().await;
1194        match inner.handles.get(&req.nick) {
1195            Some(existing) if existing.did != did => {
1196                return (
1197                    StatusCode::CONFLICT,
1198                    Json(json!({
1199                        "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1200                        "nick": req.nick,
1201                        "claimed_by": existing.did,
1202                    })),
1203                )
1204                    .into_response();
1205            }
1206            Some(_) => false,
1207            None => true,
1208        }
1209    };
1210
1211    let now = time::OffsetDateTime::now_utc()
1212        .format(&time::format_description::well_known::Rfc3339)
1213        .unwrap_or_default();
1214    let record = HandleRecord {
1215        nick: req.nick.clone(),
1216        did: did.clone(),
1217        card: req.card.clone(),
1218        slot_id: req.slot_id.clone(),
1219        relay_url: req.relay_url.clone(),
1220        claimed_at: now,
1221    };
1222
1223    // Persist to disk first (durable), then update in-memory.
1224    let path = relay
1225        .state_dir
1226        .join("handles")
1227        .join(format!("{}.json", req.nick));
1228    let body = match serde_json::to_vec_pretty(&record) {
1229        Ok(b) => b,
1230        Err(e) => {
1231            return (
1232                StatusCode::INTERNAL_SERVER_ERROR,
1233                Json(json!({"error": format!("serialize failed: {e}")})),
1234            )
1235                .into_response();
1236        }
1237    };
1238    if let Err(e) = tokio::fs::write(&path, &body).await {
1239        return (
1240            StatusCode::INTERNAL_SERVER_ERROR,
1241            Json(json!({"error": format!("persist failed: {e}")})),
1242        )
1243            .into_response();
1244    }
1245    {
1246        let mut inner = relay.inner.lock().await;
1247        inner.handles.insert(req.nick.clone(), record);
1248    }
1249    relay
1250        .counters
1251        .handle_claims_total
1252        .fetch_add(1, Ordering::Relaxed);
1253    if first_claim {
1254        relay
1255            .counters
1256            .handle_first_claims_total
1257            .fetch_add(1, Ordering::Relaxed);
1258    }
1259    (
1260        StatusCode::CREATED,
1261        Json(json!({
1262            "nick": req.nick,
1263            "did": did,
1264            "status": if first_claim { "claimed" } else { "re-claimed" },
1265        })),
1266    )
1267        .into_response()
1268}
1269
1270#[derive(Deserialize)]
1271pub struct WellKnownAgentQuery {
1272    pub handle: String,
1273}
1274
1275#[derive(Deserialize)]
1276pub struct HandlesDirectoryQuery {
1277    pub cursor: Option<String>,
1278    pub limit: Option<usize>,
1279    pub vibe: Option<String>,
1280}
1281
1282// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1283// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1284// here, gets back a 6-hex token. Anyone who does
1285//   curl -fsSL https://wireup.net/i/<token> | sh
1286// gets wire installed (if needed) + the invite accepted, in one shot.
1287//
1288// Possession of the short URL = pair authorization (same shape as the
1289// underlying wire:// invite — it's just a redirector).
1290
1291#[derive(Deserialize)]
1292pub struct InviteRegisterRequest {
1293    /// The wire://pair?... URL produced by `wire invite`. Required.
1294    pub invite_url: String,
1295    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1296    #[serde(default)]
1297    pub ttl_seconds: Option<u64>,
1298    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1299    /// `None` = unlimited until TTL hits.
1300    #[serde(default)]
1301    pub uses: Option<u32>,
1302}
1303
1304impl Relay {
1305    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1306    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1307        use tokio::io::AsyncWriteExt;
1308        let mut line = serde_json::to_vec(rec)?;
1309        line.push(b'\n');
1310        let path = self.state_dir.join("invites.jsonl");
1311        let mut f = tokio::fs::OpenOptions::new()
1312            .create(true)
1313            .append(true)
1314            .open(&path)
1315            .await?;
1316        f.write_all(&line).await?;
1317        f.flush().await?;
1318        Ok(())
1319    }
1320}
1321
1322async fn invite_register(
1323    State(relay): State<Relay>,
1324    Json(req): Json<InviteRegisterRequest>,
1325) -> impl IntoResponse {
1326    if req.invite_url.is_empty() {
1327        return (
1328            StatusCode::BAD_REQUEST,
1329            Json(json!({"error": "invite_url required"})),
1330        )
1331            .into_response();
1332    }
1333    // Length cap on the embedded URL to keep persisted records bounded.
1334    if req.invite_url.len() > 8_192 {
1335        return (
1336            StatusCode::PAYLOAD_TOO_LARGE,
1337            Json(json!({"error": "invite_url > 8 KiB"})),
1338        )
1339            .into_response();
1340    }
1341    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1342    let now = SystemTime::now()
1343        .duration_since(UNIX_EPOCH)
1344        .map(|d| d.as_secs())
1345        .unwrap_or(0);
1346    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1347    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1348    let token = random_hex(3);
1349    let rec = InviteRecord {
1350        token: token.clone(),
1351        invite_url: req.invite_url,
1352        expires_unix: now + ttl,
1353        uses_remaining: req.uses,
1354        created_unix: now,
1355    };
1356    {
1357        let mut inner = relay.inner.lock().await;
1358        if inner.invites.contains_key(&token) {
1359            return (
1360                StatusCode::CONFLICT,
1361                Json(json!({"error": "token collision, retry"})),
1362            )
1363                .into_response();
1364        }
1365        inner.invites.insert(token.clone(), rec.clone());
1366    }
1367    if let Err(e) = relay.persist_invite(&rec).await {
1368        return (
1369            StatusCode::INTERNAL_SERVER_ERROR,
1370            Json(json!({"error": format!("persist failed: {e}")})),
1371        )
1372            .into_response();
1373    }
1374    (
1375        StatusCode::CREATED,
1376        Json(json!({
1377            "token": token,
1378            "path": format!("/i/{token}"),
1379            "expires_unix": rec.expires_unix,
1380            "uses_remaining": rec.uses_remaining,
1381        })),
1382    )
1383        .into_response()
1384}
1385
1386#[derive(Deserialize)]
1387pub struct InviteScriptQuery {
1388    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1389    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1390    /// short URL programmatically). Default: shell-script template.
1391    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1392    /// resolution lookup, not an acceptance. The actual accept happens
1393    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1394    pub format: Option<String>,
1395}
1396
1397async fn invite_script(
1398    State(relay): State<Relay>,
1399    Path(token): Path<String>,
1400    Query(q): Query<InviteScriptQuery>,
1401) -> impl IntoResponse {
1402    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1403    // path-traversal try never reaches the map lookup.
1404    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1405        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1406    }
1407    let want_raw_url = q.format.as_deref() == Some("url");
1408    let now = SystemTime::now()
1409        .duration_since(UNIX_EPOCH)
1410        .map(|d| d.as_secs())
1411        .unwrap_or(0);
1412    let invite_url = {
1413        let mut inner = relay.inner.lock().await;
1414        let Some(rec) = inner.invites.get_mut(&token) else {
1415            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1416        };
1417        if rec.expires_unix <= now {
1418            return (StatusCode::GONE, "this invite has expired\n").into_response();
1419        }
1420        if let Some(n) = rec.uses_remaining {
1421            if n == 0 {
1422                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1423            }
1424            // Only decrement on script-template fetch (the one that's
1425            // actually doing the pair). The raw-URL resolution path is a
1426            // lookup, not an accept.
1427            if !want_raw_url {
1428                rec.uses_remaining = Some(n - 1);
1429            }
1430        }
1431        rec.invite_url.clone()
1432    };
1433    if want_raw_url {
1434        return (
1435            StatusCode::OK,
1436            [
1437                (
1438                    axum::http::header::CONTENT_TYPE,
1439                    "text/plain; charset=utf-8",
1440                ),
1441                (
1442                    axum::http::header::CACHE_CONTROL,
1443                    "private, no-store, max-age=0",
1444                ),
1445            ],
1446            invite_url,
1447        )
1448            .into_response();
1449    }
1450    let escaped = invite_url.replace('\'', "'\\''");
1451    let script = format!(
1452        "#!/bin/sh\n\
1453         # wire — one-curl onboarding (install + pair in one shot)\n\
1454         # source: https://github.com/SlanchaAi/wire\n\
1455         set -eu\n\
1456         INVITE='{escaped}'\n\
1457         echo \"\u{2192} checking for wire CLI...\"\n\
1458         if ! command -v wire >/dev/null 2>&1; then\n  \
1459           echo \"\u{2192} wire not installed; installing first...\"\n  \
1460           curl -fsSL https://wireup.net/install.sh | sh\n  \
1461           case \":$PATH:\" in\n    \
1462             *:\"$HOME/.local/bin\":*) ;;\n    \
1463             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1464           esac\n  \
1465           if ! command -v wire >/dev/null 2>&1; then\n    \
1466             echo \"\"\n    \
1467             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1468             echo \"Open a new shell, then run:\"\n    \
1469             echo \"  wire accept '$INVITE'\"\n    \
1470             exit 0\n  \
1471           fi\n\
1472         fi\n\
1473         echo \"\u{2192} accepting invite...\"\n\
1474         wire accept \"$INVITE\"\n"
1475    );
1476    (
1477        StatusCode::OK,
1478        [
1479            (
1480                axum::http::header::CONTENT_TYPE,
1481                "text/x-shellscript; charset=utf-8",
1482            ),
1483            (
1484                axum::http::header::CACHE_CONTROL,
1485                "private, no-store, max-age=0",
1486            ),
1487        ],
1488        script,
1489    )
1490        .into_response()
1491}
1492
1493async fn handles_directory(
1494    State(relay): State<Relay>,
1495    Query(q): Query<HandlesDirectoryQuery>,
1496) -> impl IntoResponse {
1497    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1498    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1499    let inner = relay.inner.lock().await;
1500    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1501    drop(inner);
1502    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1503
1504    let cursor = q.cursor.as_deref();
1505    let mut eligible = Vec::new();
1506    for rec in records {
1507        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1508            continue;
1509        }
1510        // Hygiene: hide test-shaped nicks from the public directory. Records
1511        // remain claimed (FCFS protection persists), they just don't surface
1512        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1513        // `test-` for integration runs.
1514        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1515            continue;
1516        }
1517        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1518        if profile
1519            .get("listed")
1520            .and_then(Value::as_bool)
1521            .is_some_and(|listed| !listed)
1522        {
1523            continue;
1524        }
1525        if let Some(want) = &vibe_filter {
1526            let matched = profile
1527                .get("vibe")
1528                .and_then(Value::as_array)
1529                .map(|arr| {
1530                    arr.iter().any(|v| {
1531                        v.as_str()
1532                            .map(|s| s.eq_ignore_ascii_case(want))
1533                            .unwrap_or(false)
1534                    })
1535                })
1536                .unwrap_or(false);
1537            if !matched {
1538                continue;
1539            }
1540        }
1541        eligible.push((rec, profile));
1542    }
1543
1544    let has_more = eligible.len() > limit;
1545    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1546    let next_cursor = if has_more {
1547        page.last().map(|(rec, _)| rec.nick.clone())
1548    } else {
1549        None
1550    };
1551    let handles: Vec<Value> = page
1552        .into_iter()
1553        .map(|(rec, profile)| {
1554            json!({
1555                "nick": rec.nick,
1556                "did": rec.did,
1557                "profile": {
1558                    "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1559                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1560                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1561                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1562                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1563                },
1564                "claimed_at": rec.claimed_at,
1565            })
1566        })
1567        .collect();
1568    (
1569        StatusCode::OK,
1570        Json(json!({
1571            "handles": handles,
1572            "next_cursor": next_cursor,
1573        })),
1574    )
1575        .into_response()
1576}
1577
1578/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1579/// into a known nick's slot WITHOUT needing that slot's bearer token.
1580///
1581/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1582/// reachability, but NEVER its `slot_token` (that would leak read+write
1583/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1584/// a stranger to deliver their signed agent-card to the nick's owner. This
1585/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1586/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1587/// in the body must verify-OK on its own.
1588///
1589/// Rate-limiting is the same governor that gates the other write endpoints.
1590/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1591async fn handle_intro(
1592    State(relay): State<Relay>,
1593    Path(nick): Path<String>,
1594    Json(req): Json<PostEventRequest>,
1595) -> impl IntoResponse {
1596    // Look up the nick. Must already be claimed.
1597    let slot_id = {
1598        let inner = relay.inner.lock().await;
1599        match inner.handles.get(&nick) {
1600            Some(rec) => rec.slot_id.clone(),
1601            None => {
1602                return (
1603                    StatusCode::NOT_FOUND,
1604                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1605                )
1606                    .into_response();
1607            }
1608        }
1609    };
1610
1611    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1612    // to the standard /v1/events/:slot_id with bearer auth.
1613    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1614    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1615    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1616        return (
1617            StatusCode::BAD_REQUEST,
1618            Json(json!({
1619                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1620                "got_kind": kind,
1621                "got_type": type_str,
1622            })),
1623        )
1624            .into_response();
1625    }
1626
1627    // Body must embed a signed agent-card (so the receiver can pin from it).
1628    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1629        Some(c) => c.clone(),
1630        None => {
1631            return (
1632                StatusCode::BAD_REQUEST,
1633                Json(json!({"error": "intro event body must embed 'card' field"})),
1634            )
1635                .into_response();
1636        }
1637    };
1638    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1639        return (
1640            StatusCode::BAD_REQUEST,
1641            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1642        )
1643            .into_response();
1644    }
1645
1646    // Size + quota checks (same as post_event).
1647    let body_bytes = match serde_json::to_vec(&req.event) {
1648        Ok(b) => b,
1649        Err(e) => {
1650            return (
1651                StatusCode::BAD_REQUEST,
1652                Json(json!({"error": format!("event not serializable: {e}")})),
1653            )
1654                .into_response();
1655        }
1656    };
1657    if body_bytes.len() > MAX_EVENT_BYTES {
1658        return (
1659            StatusCode::PAYLOAD_TOO_LARGE,
1660            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1661        )
1662            .into_response();
1663    }
1664    {
1665        let inner = relay.inner.lock().await;
1666        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1667        if used + body_bytes.len() > MAX_SLOT_BYTES {
1668            return (
1669                StatusCode::PAYLOAD_TOO_LARGE,
1670                Json(json!({
1671                    "error": "target slot quota exceeded",
1672                    "slot_bytes_used": used,
1673                    "slot_bytes_max": MAX_SLOT_BYTES,
1674                })),
1675            )
1676                .into_response();
1677        }
1678    }
1679
1680    let event_id = req
1681        .event
1682        .get("event_id")
1683        .and_then(Value::as_str)
1684        .map(str::to_string);
1685
1686    // Dedupe by event_id if present.
1687    let dup = {
1688        let inner = relay.inner.lock().await;
1689        let slot = inner.slots.get(&slot_id);
1690        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1691            slot.iter()
1692                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1693        } else {
1694            false
1695        }
1696    };
1697    if dup {
1698        return (
1699            StatusCode::OK,
1700            Json(json!({"event_id": event_id, "status": "duplicate"})),
1701        )
1702            .into_response();
1703    }
1704
1705    {
1706        let mut inner = relay.inner.lock().await;
1707        let event_size = body_bytes.len();
1708        let slot = inner.slots.entry(slot_id.clone()).or_default();
1709        slot.push(req.event.clone());
1710        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1711    }
1712    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1713        return (
1714            StatusCode::INTERNAL_SERVER_ERROR,
1715            Json(json!({"error": format!("persist failed: {e}")})),
1716        )
1717            .into_response();
1718    }
1719    (
1720        StatusCode::CREATED,
1721        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1722    )
1723        .into_response()
1724}
1725
1726/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1727/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1728/// slot coords if claimed; 404 if not.
1729///
1730/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1731/// Domain part is ignored (the relay only serves nicks it has on file).
1732/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1733/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1734/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1735/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1736///
1737/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1738/// under the standard A2A `extensions` array using the wire extension URI.
1739/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1740/// wire-native clients get the full richer card by following the extension.
1741async fn well_known_agent_card_a2a(
1742    State(relay): State<Relay>,
1743    Query(q): Query<WellKnownAgentQuery>,
1744) -> impl IntoResponse {
1745    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1746    if nick.is_empty() {
1747        return (
1748            StatusCode::BAD_REQUEST,
1749            Json(json!({"error": "handle missing nick"})),
1750        )
1751            .into_response();
1752    }
1753    let inner = relay.inner.lock().await;
1754    let rec = match inner.handles.get(&nick) {
1755        Some(r) => r.clone(),
1756        None => {
1757            return (
1758                StatusCode::NOT_FOUND,
1759                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1760            )
1761                .into_response();
1762        }
1763    };
1764    drop(inner);
1765
1766    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1767    let description = profile
1768        .get("motto")
1769        .and_then(Value::as_str)
1770        .unwrap_or("")
1771        .to_string();
1772    let display_name = profile
1773        .get("display_name")
1774        .and_then(Value::as_str)
1775        .unwrap_or(&rec.nick)
1776        .to_string();
1777    let relay_url = rec.relay_url.clone().unwrap_or_default();
1778    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1779    let endpoint = if !relay_url.is_empty() {
1780        format!(
1781            "{}/v1/handle/intro/{}",
1782            relay_url.trim_end_matches('/'),
1783            rec.nick
1784        )
1785    } else {
1786        format!("/v1/handle/intro/{}", rec.nick)
1787    };
1788    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1789
1790    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1791    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1792    // parses without custom code.
1793    let a2a_card = json!({
1794        "id": rec.did,
1795        "name": display_name,
1796        "description": description,
1797        "version": "wire/0.5",
1798        "endpoint": endpoint,
1799        "provider": {
1800            "name": "wire",
1801            "url": "https://github.com/SlanchaAi/wire"
1802        },
1803        "capabilities": {
1804            "streaming": false,
1805            "pushNotifications": false,
1806            "extendedAgentCard": true
1807        },
1808        "securitySchemes": {
1809            "ed25519-event-sig": {
1810                "type": "signature",
1811                "alg": "EdDSA",
1812                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1813            }
1814        },
1815        "security": [{"ed25519-event-sig": []}],
1816        "skills": [],
1817        "extensions": [{
1818            // A2A extension URIs are opaque namespace identifiers, not
1819            // forwardable URLs. Changing this string is a coordinated
1820            // federation-spec bump because peers match it exactly.
1821            "uri": "https://slancha.ai/wire/ext/v0.5",
1822            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1823            "required": false,
1824            "params": {
1825                "did": rec.did,
1826                "handle": rec.nick,
1827                "slot_id": rec.slot_id,
1828                "relay_url": rec.relay_url,
1829                "card": rec.card,
1830                "profile": profile,
1831                "claimed_at": rec.claimed_at,
1832            }
1833        }],
1834        "signature": card_sig,
1835    });
1836    (StatusCode::OK, Json(a2a_card)).into_response()
1837}
1838
1839async fn well_known_agent(
1840    State(relay): State<Relay>,
1841    Query(q): Query<WellKnownAgentQuery>,
1842) -> impl IntoResponse {
1843    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1844    if nick.is_empty() {
1845        return (
1846            StatusCode::BAD_REQUEST,
1847            Json(json!({"error": "handle missing nick"})),
1848        )
1849            .into_response();
1850    }
1851    let inner = relay.inner.lock().await;
1852    match inner.handles.get(&nick) {
1853        Some(rec) => (
1854            StatusCode::OK,
1855            Json(json!({
1856                "nick": rec.nick,
1857                "did": rec.did,
1858                "card": rec.card,
1859                "slot_id": rec.slot_id,
1860                "relay_url": rec.relay_url,
1861                "claimed_at": rec.claimed_at,
1862            })),
1863        )
1864            .into_response(),
1865        None => (
1866            StatusCode::NOT_FOUND,
1867            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1868        )
1869            .into_response(),
1870    }
1871}
1872
1873async fn list_events(
1874    State(relay): State<Relay>,
1875    Path(slot_id): Path<String>,
1876    Query(q): Query<ListEventsQuery>,
1877    headers: HeaderMap,
1878) -> impl IntoResponse {
1879    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1880        return resp;
1881    }
1882    let limit = q.limit.unwrap_or(100).min(1000);
1883    let mut inner = relay.inner.lock().await;
1884    // R4: record this pull as proof that the slot owner is still polling.
1885    // Anyone holding the slot_token (i.e., a paired peer) can later read
1886    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1887    let now_unix = std::time::SystemTime::now()
1888        .duration_since(std::time::UNIX_EPOCH)
1889        .map(|d| d.as_secs())
1890        .unwrap_or(0);
1891    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1892    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1893    let start = match q.since {
1894        Some(eid) => events
1895            .iter()
1896            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1897            .map(|i| i + 1)
1898            .unwrap_or(0),
1899        None => 0,
1900    };
1901    let end = (start + limit).min(events.len());
1902    let slice = events[start..end].to_vec();
1903    (StatusCode::OK, Json(slice)).into_response()
1904}
1905
1906/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1907/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1908/// recent `list_events` call, in unix seconds) and `event_count` (total
1909/// stored). A remote sender uses this before `wire send <peer>` to warn the
1910/// operator if the peer hasn't polled recently.
1911async fn slot_state(
1912    State(relay): State<Relay>,
1913    Path(slot_id): Path<String>,
1914    headers: HeaderMap,
1915) -> impl IntoResponse {
1916    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1917        return resp;
1918    }
1919    let inner = relay.inner.lock().await;
1920    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1921    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1922    let responder_health = inner.responder_health.get(&slot_id).cloned();
1923    (
1924        StatusCode::OK,
1925        Json(json!({
1926            "slot_id": slot_id,
1927            "event_count": event_count,
1928            "last_pull_at_unix": last_pull_at_unix,
1929            "responder_health": responder_health,
1930        })),
1931    )
1932        .into_response()
1933}
1934
1935async fn responder_health_set(
1936    State(relay): State<Relay>,
1937    Path(slot_id): Path<String>,
1938    headers: HeaderMap,
1939    Json(record): Json<ResponderHealthRecord>,
1940) -> impl IntoResponse {
1941    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1942        return resp;
1943    }
1944    let path = relay
1945        .state_dir
1946        .join("responder-health")
1947        .join(format!("{slot_id}.json"));
1948    let body = match serde_json::to_vec_pretty(&record) {
1949        Ok(b) => b,
1950        Err(e) => {
1951            return (
1952                StatusCode::INTERNAL_SERVER_ERROR,
1953                Json(json!({"error": format!("serialize failed: {e}")})),
1954            )
1955                .into_response();
1956        }
1957    };
1958    if let Err(e) = tokio::fs::write(&path, body).await {
1959        return (
1960            StatusCode::INTERNAL_SERVER_ERROR,
1961            Json(json!({"error": format!("persist failed: {e}")})),
1962        )
1963            .into_response();
1964    }
1965    {
1966        let mut inner = relay.inner.lock().await;
1967        inner
1968            .responder_health
1969            .insert(slot_id.clone(), record.clone());
1970    }
1971    (StatusCode::OK, Json(record)).into_response()
1972}
1973
1974async fn check_token(
1975    relay: &Relay,
1976    headers: &HeaderMap,
1977    slot_id: &str,
1978) -> std::result::Result<(), axum::response::Response> {
1979    let auth = headers
1980        .get(AUTHORIZATION)
1981        .and_then(|h| h.to_str().ok())
1982        .and_then(|s| s.strip_prefix("Bearer "))
1983        .map(str::to_string);
1984    let presented = match auth {
1985        Some(t) => t,
1986        None => {
1987            return Err((
1988                StatusCode::UNAUTHORIZED,
1989                Json(json!({"error": "missing Bearer token"})),
1990            )
1991                .into_response());
1992        }
1993    };
1994    let inner = relay.inner.lock().await;
1995    let expected = match inner.tokens.get(slot_id) {
1996        Some(t) => t.clone(),
1997        None => {
1998            return Err((
1999                StatusCode::NOT_FOUND,
2000                Json(json!({"error": "unknown slot"})),
2001            )
2002                .into_response());
2003        }
2004    };
2005    drop(inner);
2006    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2007        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2008    }
2009    Ok(())
2010}
2011
2012fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2013    if a.len() != b.len() {
2014        return false;
2015    }
2016    let mut acc = 0u8;
2017    for (x, y) in a.iter().zip(b.iter()) {
2018        acc |= x ^ y;
2019    }
2020    acc == 0
2021}
2022
2023fn is_valid_slot_id(s: &str) -> bool {
2024    s.len() == 32
2025        && s.bytes()
2026            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2027}
2028
2029fn random_hex(n_bytes: usize) -> String {
2030    let mut buf = vec![0u8; n_bytes];
2031    rand::thread_rng().fill_bytes(&mut buf);
2032    hex::encode(buf)
2033}
2034
2035/// Run the relay until SIGINT/SIGTERM.
2036pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2037    serve_with_mode(bind, state_dir, ServerMode::default()).await
2038}
2039
2040/// v0.5.17: server-mode-aware entry point. Same as `serve` but with the
2041/// `--local-only` toggle exposed so the binary can refuse to publish
2042/// phonebook + well-known surfaces on within-machine relays.
2043pub async fn serve_with_mode(
2044    bind: &str,
2045    state_dir: PathBuf,
2046    mode: ServerMode,
2047) -> Result<()> {
2048    let relay = Relay::new(state_dir).await?;
2049    relay.spawn_pair_sweeper();
2050    relay.spawn_counter_persister();
2051    let app = relay.clone().router_with_mode(mode);
2052    let listener = tokio::net::TcpListener::bind(bind)
2053        .await
2054        .with_context(|| format!("binding {bind}"))?;
2055    if mode.local_only {
2056        eprintln!("wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled");
2057    } else {
2058        eprintln!("wire relay-server listening on {bind}");
2059    }
2060    let shutdown_relay = relay.clone();
2061    axum::serve(listener, app)
2062        .with_graceful_shutdown(async move {
2063            let _ = tokio::signal::ctrl_c().await;
2064            eprintln!("\nshutting down — final counter snapshot");
2065            if let Err(e) = shutdown_relay.persist_counters().await {
2066                eprintln!("final counter persist failed: {e}");
2067            }
2068        })
2069        .await?;
2070    Ok(())
2071}
2072
2073/// v0.5.17: relay-server mode toggles. Default = full federation
2074/// (current behavior). `local_only` strips phonebook + well-known
2075/// surfaces so the relay is invisible from off-box and from any
2076/// directory-scraping agent on the same box.
2077#[derive(Debug, Clone, Copy, Default)]
2078pub struct ServerMode {
2079    /// When true, skip phonebook listing + `.well-known/wire/agent` +
2080    /// `.well-known/agent-card.json` + landing/stats pages. Pair this
2081    /// with a loopback bind (`--local-only` enforces this at the CLI
2082    /// layer) for genuinely within-machine traffic.
2083    pub local_only: bool,
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088    use super::*;
2089
2090    #[test]
2091    fn constant_time_eq_basic() {
2092        assert!(constant_time_eq(b"abc", b"abc"));
2093        assert!(!constant_time_eq(b"abc", b"abd"));
2094        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2095    }
2096
2097    #[test]
2098    fn random_hex_length() {
2099        let s = random_hex(16);
2100        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2101        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2102    }
2103
2104    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2105    async fn pair_slot_evicts_when_idle_past_ttl() {
2106        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2107        let _ = std::fs::remove_dir_all(&dir);
2108        let relay = Relay::new(dir.clone()).await.unwrap();
2109
2110        // Seed a pair-slot manually with a past last_touched.
2111        {
2112            let mut inner = relay.inner.lock().await;
2113            inner
2114                .pair_lookup
2115                .insert("hash-A".to_string(), "id-A".to_string());
2116            inner.pair_slots.insert(
2117                "id-A".to_string(),
2118                PairSlot {
2119                    last_touched: std::time::Instant::now()
2120                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2121                    ..PairSlot::default()
2122                },
2123            );
2124
2125            // And a fresh one — should survive.
2126            inner
2127                .pair_lookup
2128                .insert("hash-B".to_string(), "id-B".to_string());
2129            inner
2130                .pair_slots
2131                .insert("id-B".to_string(), PairSlot::default());
2132
2133            assert_eq!(inner.pair_slots.len(), 2);
2134            assert_eq!(inner.pair_lookup.len(), 2);
2135        }
2136
2137        relay.evict_expired_pair_slots().await;
2138
2139        let inner = relay.inner.lock().await;
2140        assert_eq!(
2141            inner.pair_slots.len(),
2142            1,
2143            "expired slot should have been evicted"
2144        );
2145        assert!(inner.pair_slots.contains_key("id-B"));
2146        assert_eq!(inner.pair_lookup.len(), 1);
2147        assert!(inner.pair_lookup.contains_key("hash-B"));
2148        let _ = std::fs::remove_dir_all(&dir);
2149    }
2150
2151    #[test]
2152    fn slot_id_validator_accepts_only_lowercase_32hex() {
2153        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2154        assert!(is_valid_slot_id(&random_hex(16)));
2155        // wrong length
2156        assert!(!is_valid_slot_id("abc"));
2157        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2158        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2159        // uppercase
2160        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2161        // path traversal attempts
2162        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2163        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2164        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2165        // null bytes
2166        assert!(!is_valid_slot_id(
2167            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2168        ));
2169    }
2170}