Skip to main content

wire/
relay_server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3// Copyright (C) 2026  wire contributors.
4//
5// This file (and only this file in the wire repository) is licensed under
6// the GNU Affero General Public License v3.0 or later. The protocol crate
7// (signing, agent_card, trust, canonical, cli, mcp) is Apache-2.0; the CLI
8// binary entry point is MIT. See LICENSE.md for the trio explanation.
9//
10// AGPL on the relay specifically discourages forks that operate `wire-relay`
11// as a closed-source SaaS offering — those forks must publish their changes
12// under AGPL too. Self-hosting your own relay for your own org or running
13// the public-good relay we operate is fully permitted.
14//
15//! HTTP mailbox relay — minimal, persistent, bearer-authenticated.
16//!
17//! Design (v0.1):
18//!   - One process serves N slots; each slot is a per-peer FIFO of signed events.
19//!   - Slot allocation returns `(slot_id, slot_token)`. Holder of the token
20//!     can post + read that slot. Tokens never expire in v0.1 (rotate by
21//!     allocating a new slot).
22//!   - Events are stored verbatim — the relay does NOT verify Ed25519 signatures
23//!     itself. Verification happens client-side (`wire tail` + `verify_message_v31`).
24//!     The relay is dumb on purpose: it is a content-addressed mailbox, not a
25//!     trust authority.
26//!   - 256 KiB max body per event.
27//!   - Persistence: each slot's events are appended to
28//!     `<state_dir>/slots/<slot_id>.jsonl` on every `POST /events`. Tokens
29//!     are persisted to `<state_dir>/tokens.json` on allocation.
30//!   - On startup, slots + tokens are reloaded from disk.
31
32use anyhow::{Context, Result};
33use axum::{
34    Json, Router,
35    extract::{Path, Query, State},
36    http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37    response::IntoResponse,
38    routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50    GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54/// Total bytes a single slot can hold before further POSTs are rejected (413).
55/// Defends against an abusive bearer-holder filling relay disk (T11). At 64 MB
56/// per slot, an attacker pushing the rate-limit ceiling fills their own slot
57/// in ~25 seconds, then gets 413 forever — disk impact bounded.
58const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62    inner: Arc<Mutex<Inner>>,
63    state_dir: PathBuf,
64    counters: Arc<RelayCounters>,
65}
66
67/// Lock-free usage counters served by GET /stats. Counter values are
68/// loaded from `<state_dir>/counters.json` on startup and snapshotted back
69/// to disk every 30s by `spawn_counter_persister`, so deploys + restarts
70/// don't reset them. `boot_unix` is per-process — uptime is process-local.
71struct RelayCounters {
72    boot_unix: u64,
73    handle_claims_total: AtomicU64,
74    handle_first_claims_total: AtomicU64,
75    slot_allocations_total: AtomicU64,
76    pair_opens_total: AtomicU64,
77    events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82    handle_claims_total: u64,
83    handle_first_claims_total: u64,
84    slot_allocations_total: u64,
85    pair_opens_total: u64,
86    events_posted_total: u64,
87}
88
89/// One row in `<state_dir>/stats-history.jsonl` — written every 30s by
90/// `spawn_counter_persister` so /stats.html can draw sparklines. Live-state
91/// fields (`*_active`) are point-in-time; *_total fields are the cumulative
92/// counters at that timestamp. Field names mirror the /stats endpoint.
93#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95    ts: u64,
96    handles_active: usize,
97    slots_active: usize,
98    pair_slots_open: usize,
99    streams_active: usize,
100    handle_claims_total: u64,
101    handle_first_claims_total: u64,
102    slot_allocations_total: u64,
103    pair_opens_total: u64,
104    events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109    /// How many hours of history to return, default 24, max 168 (7 days).
110    pub hours: Option<u64>,
111}
112
113struct Inner {
114    /// slot_id -> ordered list of stored events (parsed JSON Values).
115    slots: HashMap<String, Vec<Value>>,
116    /// slot_id -> bearer token. Token holder may read + write that slot.
117    tokens: HashMap<String, String>,
118    /// slot_id -> total bytes stored. Enforced against MAX_SLOT_BYTES.
119    slot_bytes: HashMap<String, usize>,
120    /// slot_id -> wall-clock unix seconds of the slot owner's last `list_events`
121    /// call. Used by `GET /v1/slot/:slot_id/state` so a remote sender can
122    /// gauge whether the slot's owner is still polling (i.e., still attentive).
123    /// `None` means the slot has never been pulled since the relay restarted.
124    last_pull_at_unix: HashMap<String, u64>,
125    /// slot_id -> active SSE subscribers (R1 push). Each `UnboundedSender`
126    /// belongs to one open `GET /v1/events/:slot_id/stream` connection.
127    /// On every successful `post_event` to a slot we walk the slot's list
128    /// and broadcast the event; closed channels are pruned lazily on send-
129    /// error. Auth: subscribers presented a valid slot_token at stream open.
130    streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131    /// code_hash -> pair_id (lookup so guests find the host).
132    pair_lookup: HashMap<String, String>,
133    /// pair_id -> ephemeral pairing state.
134    pair_slots: HashMap<String, PairSlot>,
135    /// nick -> registered handle directory entry (v0.5).
136    handles: HashMap<String, HandleRecord>,
137    /// slot_id -> latest operator-published auto-responder health record (R3).
138    responder_health: HashMap<String, ResponderHealthRecord>,
139    /// token -> short-URL invite record (v0.5.10 — one-curl onboarding).
140    /// Token is the path segment in `GET /i/{token}`. Record holds the
141    /// underlying `wire://pair?...` URL plus TTL/uses bookkeeping.
142    invites: HashMap<String, InviteRecord>,
143}
144
145/// One entry in the short-URL invite map. Persisted to
146/// `<state_dir>/invites.jsonl` so deploys don't drop active invites.
147#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149    token: String,
150    invite_url: String,
151    expires_unix: u64,
152    /// `None` = unlimited until TTL hits. `Some(n)` = decrement each fetch.
153    uses_remaining: Option<u32>,
154    created_unix: u64,
155}
156
157/// One entry in the relay's handle directory (v0.5 — agentic hotline).
158/// FCFS on nick: first claimant binds the nick to their DID. Same-DID re-claims
159/// are allowed (used for profile updates + slot rotation).
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162    pub nick: String,
163    pub did: String,
164    pub card: Value,
165    pub slot_id: String,
166    pub relay_url: Option<String>,
167    pub claimed_at: String,
168    /// v0.5.19 (#9.1): if false, this handle is omitted from the
169    /// `/v1/handles` directory listing — operator opted out of bulk
170    /// discovery. The `.well-known/wire/agent?handle=X` direct lookup
171    /// still resolves so existing peers + out-of-band sharing continue
172    /// to work.
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub discoverable: Option<bool>,
175}
176
177impl HandleRecord {
178    fn is_discoverable(&self) -> bool {
179        self.discoverable.unwrap_or(true)
180    }
181}
182
183#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
184pub struct ResponderHealthRecord {
185    pub status: String,
186    #[serde(default, skip_serializing_if = "Option::is_none")]
187    pub reason: Option<String>,
188    #[serde(default, skip_serializing_if = "Option::is_none")]
189    pub last_success_at: Option<String>,
190    pub set_at: String,
191}
192
193#[derive(Clone, Debug)]
194struct PairSlot {
195    /// SPAKE2 message from the host side.
196    host_msg: Option<String>,
197    /// SPAKE2 message from the guest side.
198    guest_msg: Option<String>,
199    /// Sealed bootstrap payload from host (after SAS confirm).
200    host_bootstrap: Option<String>,
201    /// Sealed bootstrap payload from guest.
202    guest_bootstrap: Option<String>,
203    /// Last activity time (monotonic) — used for TTL eviction.
204    last_touched: std::time::Instant,
205}
206
207impl Default for PairSlot {
208    fn default() -> Self {
209        Self {
210            host_msg: None,
211            guest_msg: None,
212            host_bootstrap: None,
213            guest_bootstrap: None,
214            last_touched: std::time::Instant::now(),
215        }
216    }
217}
218
219/// Pair-slot idle TTL. After this many seconds without activity, the slot
220/// is evicted to free memory + bound brute-force surface (PENTEST.md §code-review #3).
221const PAIR_SLOT_TTL_SECS: u64 = 300;
222
223#[derive(Deserialize)]
224pub struct AllocateRequest {
225    /// Optional handle hint — purely informational, server doesn't enforce.
226    #[serde(default)]
227    pub handle: Option<String>,
228}
229
230#[derive(Deserialize)]
231pub struct PostEventRequest {
232    pub event: Value,
233}
234
235#[derive(Deserialize)]
236pub struct ListEventsQuery {
237    /// Resume from after this event_id (exclusive). Omit for full slot read.
238    pub since: Option<String>,
239    /// Max events to return. Default 100, max 1000.
240    pub limit: Option<usize>,
241}
242
243impl Relay {
244    pub async fn new(state_dir: PathBuf) -> Result<Self> {
245        tokio::fs::create_dir_all(state_dir.join("slots")).await?;
246        tokio::fs::create_dir_all(state_dir.join("handles")).await?;
247        tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
248        let mut inner = Inner {
249            slots: HashMap::new(),
250            tokens: HashMap::new(),
251            slot_bytes: HashMap::new(),
252            last_pull_at_unix: HashMap::new(),
253            streams: HashMap::new(),
254            pair_lookup: HashMap::new(),
255            pair_slots: HashMap::new(),
256            handles: HashMap::new(),
257            responder_health: HashMap::new(),
258            invites: HashMap::new(),
259        };
260        // Reload tokens
261        let token_path = state_dir.join("tokens.json");
262        if token_path.exists() {
263            let body = tokio::fs::read_to_string(&token_path).await?;
264            inner.tokens = serde_json::from_str(&body).unwrap_or_default();
265        }
266        // Reload slots from JSONL
267        let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
268        while let Some(entry) = slots_dir.next_entry().await? {
269            let path = entry.path();
270            if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
271                continue;
272            }
273            let stem = match path.file_stem().and_then(|s| s.to_str()) {
274                Some(s) => s.to_string(),
275                None => continue,
276            };
277            let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
278            let mut events = Vec::new();
279            for line in body.lines() {
280                if let Ok(v) = serde_json::from_str::<Value>(line) {
281                    events.push(v);
282                }
283            }
284            // Recompute byte usage for the slot from its persisted events.
285            let bytes: usize = events
286                .iter()
287                .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
288                .sum();
289            inner.slot_bytes.insert(stem.clone(), bytes);
290            inner.slots.insert(stem, events);
291        }
292        // Reload handle directory (v0.5).
293        let handles_dir = state_dir.join("handles");
294        if handles_dir.exists() {
295            let mut rd = tokio::fs::read_dir(&handles_dir).await?;
296            while let Some(entry) = rd.next_entry().await? {
297                let path = entry.path();
298                if path.extension().and_then(|x| x.to_str()) != Some("json") {
299                    continue;
300                }
301                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
302                if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
303                    inner.handles.insert(rec.nick.clone(), rec);
304                }
305            }
306        }
307        // Reload responder health records (R3).
308        let responder_health_dir = state_dir.join("responder-health");
309        if responder_health_dir.exists() {
310            let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
311            while let Some(entry) = rd.next_entry().await? {
312                let path = entry.path();
313                if path.extension().and_then(|x| x.to_str()) != Some("json") {
314                    continue;
315                }
316                let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
317                    continue;
318                };
319                let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
320                if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
321                    inner.responder_health.insert(slot_id.to_string(), rec);
322                }
323            }
324        }
325        // Reload short-URL invites. JSONL append-only; later entries with
326        // the same token overwrite earlier (won't happen — tokens are
327        // unique by construction — but coded defensively).
328        let invites_path = state_dir.join("invites.jsonl");
329        if invites_path.exists() {
330            let now_unix = SystemTime::now()
331                .duration_since(UNIX_EPOCH)
332                .map(|d| d.as_secs())
333                .unwrap_or(0);
334            let body = tokio::fs::read_to_string(&invites_path)
335                .await
336                .unwrap_or_default();
337            for line in body.lines() {
338                if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
339                    && rec.expires_unix > now_unix
340                {
341                    inner.invites.insert(rec.token.clone(), rec);
342                }
343            }
344        }
345        let boot_unix = SystemTime::now()
346            .duration_since(UNIX_EPOCH)
347            .map(|d| d.as_secs())
348            .unwrap_or(0);
349        // Reload counter snapshot. Missing/corrupt file → start at zero.
350        let snap: CountersSnapshot =
351            match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
352                Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
353                Err(_) => CountersSnapshot::default(),
354            };
355        Ok(Self {
356            inner: Arc::new(Mutex::new(inner)),
357            state_dir,
358            counters: Arc::new(RelayCounters {
359                boot_unix,
360                handle_claims_total: AtomicU64::new(snap.handle_claims_total),
361                handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
362                slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
363                pair_opens_total: AtomicU64::new(snap.pair_opens_total),
364                events_posted_total: AtomicU64::new(snap.events_posted_total),
365            }),
366        })
367    }
368
369    pub fn router(self) -> Router {
370        self.router_with_mode(ServerMode::default())
371    }
372
373    pub fn router_with_mode(self, mode: ServerMode) -> Router {
374        // Rate limit applied to write endpoints that create new state (slots,
375        // pair-sessions, bootstraps). 10 req/sec sustained, 50 req burst.
376        // v0.1 uses the GLOBAL key extractor (single bucket for all callers) —
377        // per-IP needs ConnectInfo middleware which axum 0.7 wires differently.
378        // Per-IP keying is a v0.2 hardening; for now Cloudflare WAF + this
379        // global cap shoulder DDoS protection in series.
380        let governor_conf = std::sync::Arc::new(
381            GovernorConfigBuilder::default()
382                .per_second(10)
383                .burst_size(50)
384                .key_extractor(GlobalKeyExtractor)
385                .finish()
386                .expect("valid governor config"),
387        );
388        let governor_layer = GovernorLayer {
389            config: governor_conf,
390        };
391
392        // Hot writes group — rate limited.
393        let hot_writes = Router::new()
394            .route("/v1/slot/allocate", post(allocate_slot))
395            .route("/v1/pair", post(pair_open))
396            .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
397            .route("/v1/pair/abandon", post(pair_abandon))
398            .layer(governor_layer);
399
400        // Core data-plane routes: events, pair, slots, healthz. Always
401        // present, in both federation and local-only modes.
402        let mut router = Router::new()
403            .route("/healthz", get(healthz))
404            .route("/v1/events/:slot_id", post(post_event).get(list_events))
405            .route("/v1/slot/:slot_id/state", get(slot_state))
406            .route(
407                "/v1/slot/:slot_id/responder-health",
408                post(responder_health_set),
409            )
410            .route("/v1/events/:slot_id/stream", get(stream_events))
411            .route("/v1/pair/:pair_id", get(pair_get))
412            .route("/v1/handle/intro/:nick", post(handle_intro));
413
414        // Discovery + landing surfaces: phonebook, well-known agent cards,
415        // landing page, stats, invite landing. In `--local-only` mode we
416        // skip all of these — the relay becomes invisible from the outside
417        // (and from other agents on the same box that might enumerate it).
418        // This is the v0.5.17 within-machine privacy guarantee.
419        if !mode.local_only {
420            router = router
421                .route("/", get(landing_index))
422                .route("/favicon.svg", get(landing_favicon))
423                .route("/og.png", get(landing_og))
424                .route("/install", get(landing_install_sh))
425                .route("/install.sh", get(landing_install_sh))
426                .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
427                .route("/stats", get(stats_root))
428                .route("/stats.json", get(stats_json))
429                .route("/stats.html", get(landing_stats_html))
430                .route("/stats.history", get(stats_history))
431                .route("/phonebook", get(landing_phonebook_html))
432                .route("/phonebook.html", get(landing_phonebook_html))
433                .route("/v1/handle/claim", post(handle_claim))
434                .route("/v1/handles", get(handles_directory))
435                .route("/v1/invite/register", post(invite_register))
436                .route("/i/:token", get(invite_script))
437                .route("/.well-known/wire/agent", get(well_known_agent))
438                .route(
439                    "/.well-known/agent-card.json",
440                    get(well_known_agent_card_a2a),
441                );
442        } else {
443            // Local-only mode still needs handle_claim for in-process
444            // session bootstrap (`wire session new` allocates a local
445            // slot AND claims a handle on the local relay so peers can
446            // resolve it). The claim is gated to loopback-only callers
447            // by the bind, not by the route.
448            router = router.route("/v1/handle/claim", post(handle_claim));
449        }
450
451        router.merge(hot_writes).with_state(self)
452    }
453
454    /// Evict pair-slots that have been idle past `PAIR_SLOT_TTL_SECS`.
455    /// Called inline on every pair-slot mutation; a background sweeper task
456    /// (see `Self::start_sweeper`) covers the long-idle case.
457    async fn evict_expired_pair_slots(&self) {
458        let now = std::time::Instant::now();
459        let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
460        let mut inner = self.inner.lock().await;
461        let mut to_remove = Vec::new();
462        for (id, slot) in inner.pair_slots.iter() {
463            if now.duration_since(slot.last_touched) > ttl {
464                to_remove.push(id.clone());
465            }
466        }
467        for id in to_remove {
468            inner.pair_slots.remove(&id);
469            inner.pair_lookup.retain(|_, v| v != &id);
470        }
471    }
472
473    /// Spawn a background tokio task that runs `evict_expired_pair_slots` every
474    /// 60 seconds. Call once after `Relay::new`; the handle is leaked deliberately
475    /// — process exit reaps it. Safe to skip in tests where you'd rather test
476    /// eviction inline.
477    pub fn spawn_pair_sweeper(&self) {
478        let me = self.clone();
479        tokio::spawn(async move {
480            let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
481            loop {
482                tick.tick().await;
483                me.evict_expired_pair_slots().await;
484            }
485        });
486    }
487
488    /// Snapshot the in-process counters to `<state_dir>/counters.json`. Called
489    /// every 30s by `spawn_counter_persister` and once during graceful
490    /// shutdown so a deploy doesn't reset the running totals.
491    pub async fn persist_counters(&self) -> Result<()> {
492        let snap = CountersSnapshot {
493            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
494            handle_first_claims_total: self
495                .counters
496                .handle_first_claims_total
497                .load(Ordering::Relaxed),
498            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
499            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
500            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
501        };
502        let body = serde_json::to_vec_pretty(&snap)?;
503        let path = self.state_dir.join("counters.json");
504        tokio::fs::write(path, body).await?;
505        Ok(())
506    }
507
508    /// Append one row to `<state_dir>/stats-history.jsonl` mirroring the
509    /// /stats endpoint at this instant. Used by /stats.html for sparklines.
510    /// File grows ~250 B per call → ~720 KB/day. A future prune wave can
511    /// roll old entries off once the history exceeds 90 days.
512    pub async fn append_history(&self) -> Result<()> {
513        use tokio::io::AsyncWriteExt;
514        let now = SystemTime::now()
515            .duration_since(UNIX_EPOCH)
516            .map(|d| d.as_secs())
517            .unwrap_or(0);
518        let (handles_active, slots_active, pair_slots_open, streams_active) = {
519            let inner = self.inner.lock().await;
520            (
521                inner.handles.len(),
522                inner.slots.len(),
523                inner.pair_slots.len(),
524                inner.streams.values().map(Vec::len).sum::<usize>(),
525            )
526        };
527        let entry = HistoryEntry {
528            ts: now,
529            handles_active,
530            slots_active,
531            pair_slots_open,
532            streams_active,
533            handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
534            handle_first_claims_total: self
535                .counters
536                .handle_first_claims_total
537                .load(Ordering::Relaxed),
538            slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
539            pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
540            events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
541        };
542        let line = serde_json::to_vec(&entry)?;
543        let path = self.state_dir.join("stats-history.jsonl");
544        let mut f = tokio::fs::OpenOptions::new()
545            .create(true)
546            .append(true)
547            .open(&path)
548            .await?;
549        f.write_all(&line).await?;
550        f.write_all(b"\n").await?;
551        f.flush().await?;
552        Ok(())
553    }
554
555    /// Spawn a background tokio task that calls `persist_counters` every 30s
556    /// + appends a history row on the same tick. Loss bound: counters can
557    ///   drift back up to 30s on crash, history can drop one row.
558    pub fn spawn_counter_persister(&self) {
559        let me = self.clone();
560        tokio::spawn(async move {
561            let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
562            // First tick fires immediately; skip it so we don't write the
563            // freshly-loaded snapshot back unchanged.
564            tick.tick().await;
565            loop {
566                tick.tick().await;
567                if let Err(e) = me.persist_counters().await {
568                    eprintln!("counter persist failed: {e}");
569                }
570                if let Err(e) = me.append_history().await {
571                    eprintln!("history append failed: {e}");
572                }
573            }
574        });
575    }
576
577    async fn persist_tokens(&self) -> Result<()> {
578        let body = {
579            let inner = self.inner.lock().await;
580            serde_json::to_string_pretty(&inner.tokens)?
581        };
582        let path = self.state_dir.join("tokens.json");
583        tokio::fs::write(path, body).await?;
584        Ok(())
585    }
586
587    async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
588        // Defense in depth: only allow lowercase hex slot_ids of the exact length
589        // we ever produce ourselves (16 random bytes -> 32 hex chars). Blocks
590        // any future code path that might let attacker-controlled slot_ids reach
591        // disk operations. allocate_slot() always meets this; this assert is
592        // belt-and-suspenders against future regressions.
593        if !is_valid_slot_id(slot_id) {
594            return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
595        }
596        let path = self
597            .state_dir
598            .join("slots")
599            .join(format!("{slot_id}.jsonl"));
600        let mut line = serde_json::to_vec(event)?;
601        line.push(b'\n');
602        use tokio::io::AsyncWriteExt;
603        let mut f = tokio::fs::OpenOptions::new()
604            .create(true)
605            .append(true)
606            .open(&path)
607            .await
608            .with_context(|| format!("opening {path:?}"))?;
609        f.write_all(&line).await?;
610        f.flush().await?;
611        Ok(())
612    }
613}
614
615async fn healthz() -> impl IntoResponse {
616    (StatusCode::OK, "ok\n")
617}
618
619// Public aggregate-usage snapshot. Counter fields (`*_total`) reset on
620// process restart; state fields (`handles_active`, `slots_active`) survive
621// on the persistent volume. No DIDs / handles / IPs leaked — counts only.
622async fn stats_history(
623    State(relay): State<Relay>,
624    Query(q): Query<StatsHistoryQuery>,
625) -> impl IntoResponse {
626    let hours = q.hours.unwrap_or(24).min(168);
627    let now = SystemTime::now()
628        .duration_since(UNIX_EPOCH)
629        .map(|d| d.as_secs())
630        .unwrap_or(0);
631    let cutoff = now.saturating_sub(hours * 3600);
632    let path = relay.state_dir.join("stats-history.jsonl");
633    let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
634    let entries: Vec<Value> = body
635        .lines()
636        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
637        .filter(|v| {
638            v.get("ts")
639                .and_then(Value::as_u64)
640                .map(|t| t >= cutoff)
641                .unwrap_or(false)
642        })
643        .collect();
644    (
645        StatusCode::OK,
646        Json(json!({
647            "hours": hours,
648            "now_unix": now,
649            "count": entries.len(),
650            "entries": entries,
651        })),
652    )
653}
654
655async fn landing_stats_html() -> impl IntoResponse {
656    static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
657    (
658        StatusCode::OK,
659        [
660            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
661            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
662        ],
663        STATS_HTML,
664    )
665}
666
667async fn landing_phonebook_html() -> impl IntoResponse {
668    static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
669    (
670        StatusCode::OK,
671        [
672            (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
673            (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
674        ],
675        PHONEBOOK_HTML,
676    )
677}
678
679/// `/stats` dispatch: serve the pretty HTML dashboard to browsers (Accept
680/// includes text/html) and JSON to everything else (curl, scripts, scrapers).
681/// Keeps the JSON contract intact while letting humans land on the page at
682/// the short URL.
683async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
684    let wants_html = headers
685        .get(axum::http::header::ACCEPT)
686        .and_then(|v| v.to_str().ok())
687        .unwrap_or("")
688        .contains("text/html");
689    if wants_html {
690        landing_stats_html().await.into_response()
691    } else {
692        stats_json(State(relay)).await.into_response()
693    }
694}
695
696async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
697    let now = SystemTime::now()
698        .duration_since(UNIX_EPOCH)
699        .map(|d| d.as_secs())
700        .unwrap_or(0);
701    let inner = relay.inner.lock().await;
702    let streams_active: usize = inner.streams.values().map(Vec::len).sum();
703    let body = json!({
704        "version": env!("CARGO_PKG_VERSION"),
705        "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
706        "handles_active": inner.handles.len(),
707        "slots_active": inner.slots.len(),
708        "pair_slots_open": inner.pair_slots.len(),
709        "streams_active": streams_active,
710        "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
711        "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
712        "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
713        "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
714        "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
715    });
716    (StatusCode::OK, Json(body))
717}
718
719// Static landing site baked into the binary so apex (wireup.net) can flip
720// straight to Fly without a separate static-host. ~37 KB total — negligible
721// against the release binary size, and keeps the relay self-contained.
722async fn landing_index() -> impl IntoResponse {
723    static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
724    (
725        StatusCode::OK,
726        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
727        INDEX_HTML,
728    )
729}
730
731async fn landing_favicon() -> impl IntoResponse {
732    static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
733    (
734        StatusCode::OK,
735        [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
736        FAVICON_SVG,
737    )
738}
739
740async fn landing_og() -> impl IntoResponse {
741    static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
742    (
743        StatusCode::OK,
744        [
745            (axum::http::header::CONTENT_TYPE, "image/png"),
746            (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
747        ],
748        OG_PNG,
749    )
750}
751
752async fn landing_install_sh() -> impl IntoResponse {
753    static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
754    (
755        StatusCode::OK,
756        [
757            (
758                axum::http::header::CONTENT_TYPE,
759                "text/x-shellscript; charset=utf-8",
760            ),
761            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
762        ],
763        INSTALL_SH,
764    )
765}
766
767async fn landing_openshell_policy_sh() -> impl IntoResponse {
768    static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
769    (
770        StatusCode::OK,
771        [
772            (
773                axum::http::header::CONTENT_TYPE,
774                "text/x-shellscript; charset=utf-8",
775            ),
776            (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
777        ],
778        POLICY_SH,
779    )
780}
781
782async fn allocate_slot(
783    State(relay): State<Relay>,
784    Json(_req): Json<AllocateRequest>,
785) -> impl IntoResponse {
786    let slot_id = random_hex(16);
787    let slot_token = random_hex(32);
788    {
789        let mut inner = relay.inner.lock().await;
790        inner.slots.insert(slot_id.clone(), Vec::new());
791        inner.tokens.insert(slot_id.clone(), slot_token.clone());
792    }
793    if let Err(e) = relay.persist_tokens().await {
794        return (
795            StatusCode::INTERNAL_SERVER_ERROR,
796            Json(json!({"error": format!("persist failed: {e}")})),
797        )
798            .into_response();
799    }
800    relay
801        .counters
802        .slot_allocations_total
803        .fetch_add(1, Ordering::Relaxed);
804    (
805        StatusCode::CREATED,
806        Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
807    )
808        .into_response()
809}
810
811async fn post_event(
812    State(relay): State<Relay>,
813    Path(slot_id): Path<String>,
814    headers: HeaderMap,
815    Json(req): Json<PostEventRequest>,
816) -> impl IntoResponse {
817    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
818        return resp;
819    }
820    // Body size cap (rough; serialize and check).
821    let body_bytes = match serde_json::to_vec(&req.event) {
822        Ok(b) => b,
823        Err(e) => {
824            return (
825                StatusCode::BAD_REQUEST,
826                Json(json!({"error": format!("event not serializable: {e}")})),
827            )
828                .into_response();
829        }
830    };
831    if body_bytes.len() > MAX_EVENT_BYTES {
832        return (
833            StatusCode::PAYLOAD_TOO_LARGE,
834            Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
835        )
836            .into_response();
837    }
838    // Per-slot quota: cap accumulated bytes per slot at MAX_SLOT_BYTES.
839    {
840        let inner = relay.inner.lock().await;
841        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
842        if used + body_bytes.len() > MAX_SLOT_BYTES {
843            return (
844                StatusCode::PAYLOAD_TOO_LARGE,
845                Json(json!({
846                    "error": "slot quota exceeded",
847                    "slot_bytes_used": used,
848                    "slot_bytes_max": MAX_SLOT_BYTES,
849                    "remediation": "operator should `wire rotate-slot` to drain old slot",
850                })),
851            )
852                .into_response();
853        }
854    }
855    let event_id = req
856        .event
857        .get("event_id")
858        .and_then(Value::as_str)
859        .map(str::to_string);
860
861    // Dedupe by event_id if present.
862    let dup = {
863        let inner = relay.inner.lock().await;
864        let slot = inner.slots.get(&slot_id);
865        if let (Some(eid), Some(slot)) = (&event_id, slot) {
866            slot.iter()
867                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
868        } else {
869            false
870        }
871    };
872    if dup {
873        return (
874            StatusCode::OK,
875            Json(json!({"event_id": event_id, "status": "duplicate"})),
876        )
877            .into_response();
878    }
879
880    {
881        let mut inner = relay.inner.lock().await;
882        let event_size = body_bytes.len();
883        let slot = inner.slots.entry(slot_id.clone()).or_default();
884        slot.push(req.event.clone());
885        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
886    }
887    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
888        return (
889            StatusCode::INTERNAL_SERVER_ERROR,
890            Json(json!({"error": format!("persist failed: {e}")})),
891        )
892            .into_response();
893    }
894    relay
895        .counters
896        .events_posted_total
897        .fetch_add(1, Ordering::Relaxed);
898    // R1 push: broadcast the new event to every active SSE subscriber on
899    // this slot. Dead channels are pruned in-place. The broadcast happens
900    // AFTER the disk persist so subscribers and disk readers see the same
901    // events; on persist failure we already returned 500 above.
902    {
903        let mut inner = relay.inner.lock().await;
904        if let Some(subs) = inner.streams.get_mut(&slot_id) {
905            subs.retain(|tx| tx.send(req.event.clone()).is_ok());
906        }
907    }
908    (
909        StatusCode::CREATED,
910        Json(json!({"event_id": event_id, "status": "stored"})),
911    )
912        .into_response()
913}
914
915/// R1 — server-sent-events push stream for a slot. Auth'd by slot_token
916/// (same as `list_events`). The connection registers an `UnboundedSender`
917/// on the slot's subscriber list; every subsequent `post_event` to the slot
918/// fans out to all subscribers as `data: <event-json>\n\n` lines. The
919/// connection stays open until the client disconnects.
920///
921/// A 30-second keepalive ping is emitted automatically so reverse proxies
922/// (Cloudflare tunnel, nginx) don't time out the upstream.
923///
924/// Note: the subscriber sees events posted AFTER it subscribed. To catch
925/// up on history first, the client should call `GET /v1/events/:slot_id`
926/// with `since=` before opening the stream.
927async fn stream_events(
928    State(relay): State<Relay>,
929    Path(slot_id): Path<String>,
930    headers: HeaderMap,
931) -> axum::response::Response {
932    use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
933    use futures::stream::StreamExt;
934
935    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
936        return resp;
937    }
938
939    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
940    {
941        let mut inner = relay.inner.lock().await;
942        inner.streams.entry(slot_id.clone()).or_default().push(tx);
943    }
944
945    let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
946        SseEvent::default()
947            .json_data(&ev)
948            .map_err(|e| std::io::Error::other(e.to_string()))
949    });
950
951    Sse::new(stream)
952        .keep_alive(
953            KeepAlive::new()
954                .interval(std::time::Duration::from_secs(30))
955                .text("phyllis: still on the line"),
956        )
957        .into_response()
958}
959
960// ---------- pair-slot handlers ----------
961
962#[derive(Deserialize)]
963pub struct PairOpenRequest {
964    pub code_hash: String,
965    /// SPAKE2 message (base64).
966    pub msg: String,
967    pub role: String, // "host" or "guest"
968}
969
970#[derive(Deserialize)]
971pub struct PairBootstrapRequest {
972    pub role: String,
973    pub sealed: String,
974}
975
976#[derive(Deserialize)]
977pub struct PairAbandonRequest {
978    /// SHA-256 hex digest of the code phrase. Same value the caller posts to
979    /// /v1/pair as `code_hash` — knowing the code IS the auth here.
980    pub code_hash: String,
981}
982
983/// Forget the pair-slot associated with this code_hash. Either side can call;
984/// no auth beyond knowledge of the code (which is the shared secret of this
985/// handshake anyway). Idempotent: returns 204 whether or not the slot exists.
986/// Used by clients to recover after a crash mid-handshake, so the host doesn't
987/// stay locked out until the 5-minute TTL.
988async fn pair_abandon(
989    State(relay): State<Relay>,
990    Json(req): Json<PairAbandonRequest>,
991) -> impl IntoResponse {
992    let mut inner = relay.inner.lock().await;
993    if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
994        inner.pair_slots.remove(&pair_id);
995    }
996    StatusCode::NO_CONTENT.into_response()
997}
998
999async fn pair_open(
1000    State(relay): State<Relay>,
1001    Json(req): Json<PairOpenRequest>,
1002) -> impl IntoResponse {
1003    if req.role != "host" && req.role != "guest" {
1004        return (
1005            StatusCode::BAD_REQUEST,
1006            Json(json!({"error": "role must be 'host' or 'guest'"})),
1007        )
1008            .into_response();
1009    }
1010    relay.evict_expired_pair_slots().await;
1011    let mut inner = relay.inner.lock().await;
1012    let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1013        Some(id) => id,
1014        None => {
1015            let new_id = random_hex(16);
1016            inner
1017                .pair_lookup
1018                .insert(req.code_hash.clone(), new_id.clone());
1019            inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1020            relay
1021                .counters
1022                .pair_opens_total
1023                .fetch_add(1, Ordering::Relaxed);
1024            new_id
1025        }
1026    };
1027    let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1028    slot.last_touched = std::time::Instant::now();
1029    if req.role == "host" {
1030        if slot.host_msg.is_some() {
1031            return (
1032                StatusCode::CONFLICT,
1033                Json(json!({"error": "host already registered for this code"})),
1034            )
1035                .into_response();
1036        }
1037        slot.host_msg = Some(req.msg);
1038    } else {
1039        if slot.guest_msg.is_some() {
1040            return (
1041                StatusCode::CONFLICT,
1042                Json(json!({"error": "guest already registered for this code"})),
1043            )
1044                .into_response();
1045        }
1046        slot.guest_msg = Some(req.msg);
1047    }
1048    (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1049}
1050
1051#[derive(Deserialize)]
1052pub struct PairGetQuery {
1053    /// "host" or "guest" — caller's role; we return the OTHER side's data.
1054    pub as_role: String,
1055}
1056
1057async fn pair_get(
1058    State(relay): State<Relay>,
1059    Path(pair_id): Path<String>,
1060    Query(q): Query<PairGetQuery>,
1061) -> impl IntoResponse {
1062    relay.evict_expired_pair_slots().await;
1063    let mut inner = relay.inner.lock().await;
1064    let slot = match inner.pair_slots.get_mut(&pair_id) {
1065        Some(s) => {
1066            s.last_touched = std::time::Instant::now();
1067            s.clone()
1068        }
1069        None => {
1070            return (
1071                StatusCode::NOT_FOUND,
1072                Json(json!({"error": "unknown pair_id"})),
1073            )
1074                .into_response();
1075        }
1076    };
1077    let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1078        "host" => (slot.guest_msg, slot.guest_bootstrap),
1079        "guest" => (slot.host_msg, slot.host_bootstrap),
1080        _ => {
1081            return (
1082                StatusCode::BAD_REQUEST,
1083                Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1084            )
1085                .into_response();
1086        }
1087    };
1088    (
1089        StatusCode::OK,
1090        Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1091    )
1092        .into_response()
1093}
1094
1095async fn pair_bootstrap(
1096    State(relay): State<Relay>,
1097    Path(pair_id): Path<String>,
1098    Json(req): Json<PairBootstrapRequest>,
1099) -> impl IntoResponse {
1100    relay.evict_expired_pair_slots().await;
1101    let mut inner = relay.inner.lock().await;
1102    let slot = match inner.pair_slots.get_mut(&pair_id) {
1103        Some(s) => s,
1104        None => {
1105            return (
1106                StatusCode::NOT_FOUND,
1107                Json(json!({"error": "unknown pair_id"})),
1108            )
1109                .into_response();
1110        }
1111    };
1112    slot.last_touched = std::time::Instant::now();
1113    match req.role.as_str() {
1114        "host" => slot.host_bootstrap = Some(req.sealed),
1115        "guest" => slot.guest_bootstrap = Some(req.sealed),
1116        _ => {
1117            return (
1118                StatusCode::BAD_REQUEST,
1119                Json(json!({"error": "role must be 'host' or 'guest'"})),
1120            )
1121                .into_response();
1122        }
1123    }
1124    (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1125}
1126
1127// ---------- handle directory (v0.5) ----------
1128
1129#[derive(Deserialize)]
1130pub struct HandleClaimRequest {
1131    /// Nick the claimant wants (case-folded). Domain part is implicit: the
1132    /// domain the relay's `.well-known` is served from.
1133    pub nick: String,
1134    /// Slot id the claimant owns on this relay (proves they allocated here).
1135    pub slot_id: String,
1136    /// Optional public-facing relay URL the relay should advertise back in
1137    /// `.well-known/wire/agent` responses. If omitted, callers will need to
1138    /// know the relay URL out-of-band.
1139    pub relay_url: Option<String>,
1140    /// Claimant's full signed agent-card (includes DID + verify_keys +
1141    /// optional profile).
1142    pub card: Value,
1143    /// v0.5.19 (#9.1): set false to opt out of `/v1/handles` bulk listing.
1144    /// Direct `.well-known/wire/agent` lookup by handle still works.
1145    /// Omitted on first claim defaults to discoverable=true.
1146    #[serde(default, skip_serializing_if = "Option::is_none")]
1147    pub discoverable: Option<bool>,
1148}
1149
1150/// `POST /v1/handle/claim` — claim or update a `nick@<relay-domain>` handle.
1151///
1152/// FCFS on nick. Same-DID re-claims allowed (used for profile updates +
1153/// slot rotation). Different-DID claims on a taken nick return 409.
1154/// Caller must (a) own the `slot_id` they reference (verified by token
1155/// being present), and (b) submit a card with a valid self-signature.
1156async fn handle_claim(
1157    State(relay): State<Relay>,
1158    headers: HeaderMap,
1159    Json(req): Json<HandleClaimRequest>,
1160) -> impl IntoResponse {
1161    // Bearer auth: claimant must hold the slot_token for the slot they
1162    // reference. Prevents nick-squatting from an unauthenticated POSTer.
1163    if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1164        return resp;
1165    }
1166    // Validate nick (same rules as the client-side parser).
1167    if !crate::pair_profile::is_valid_nick(&req.nick) {
1168        return (
1169            StatusCode::BAD_REQUEST,
1170            Json(json!({
1171                "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1172                "nick": req.nick,
1173            })),
1174        )
1175            .into_response();
1176    }
1177    // Verify the card signature using the public verify_agent_card helper.
1178    if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1179        return (
1180            StatusCode::BAD_REQUEST,
1181            Json(json!({"error": format!("card signature invalid: {e}")})),
1182        )
1183            .into_response();
1184    }
1185    let did = match req.card.get("did").and_then(Value::as_str) {
1186        Some(d) => d.to_string(),
1187        None => {
1188            return (
1189                StatusCode::BAD_REQUEST,
1190                Json(json!({"error": "card missing 'did' field"})),
1191            )
1192                .into_response();
1193        }
1194    };
1195
1196    // ONE-NAME rule, enforced server-side. The claimed nick MUST equal
1197    // the card's DID-derived persona. The client coerces this before
1198    // POSTing, but that's courtesy — a raw HTTP claim could otherwise
1199    // map an arbitrary nick (e.g. a well-known handle) onto a foreign
1200    // DID, so `wire dial <nick>@relay` would resolve to the impostor.
1201    // `verify_agent_card` above already proved the DID commits to the
1202    // card's key, so this binds nick → key transitively.
1203    let canonical_nick = crate::agent_card::display_handle_from_did(&did);
1204    if req.nick != canonical_nick {
1205        return (
1206            StatusCode::BAD_REQUEST,
1207            Json(json!({
1208                "error": "phyllis: that nick doesn't match your DID — wire publishes one name, and it's the one your key spells out",
1209                "nick": req.nick,
1210                "expected": canonical_nick,
1211            })),
1212        )
1213            .into_response();
1214    }
1215
1216    // FCFS check. Also snapshot the existing record (clone) so the
1217    // re-claim path below can preserve fields the client didn't include
1218    // in the request (notably `discoverable` from v0.5.19, so an old
1219    // client doing a profile-update re-claim doesn't accidentally
1220    // re-publish a hidden handle).
1221    let prior_record: Option<HandleRecord>;
1222    let first_claim = {
1223        let inner = relay.inner.lock().await;
1224        match inner.handles.get(&req.nick) {
1225            Some(existing) if existing.did != did => {
1226                return (
1227                    StatusCode::CONFLICT,
1228                    Json(json!({
1229                        "error": "phyllis: this line's already taken by a different identity (persona collision). Your handle is fixed to your key, so claim on another relay (`wire up @<other-relay>`) or mint a fresh identity (`wire nuke` then `wire up`).",
1230                        "nick": req.nick,
1231                        "claimed_by": existing.did,
1232                    })),
1233                )
1234                    .into_response();
1235            }
1236            Some(prev) => {
1237                prior_record = Some(prev.clone());
1238                false
1239            }
1240            None => {
1241                prior_record = None;
1242                true
1243            }
1244        }
1245    };
1246
1247    // v0.5.19 (#9.5): round claimed_at to whole seconds. Nanosecond
1248    // precision served no client purpose (display only) and acted as a
1249    // cross-tab fingerprint correlating one operator's multiple handles
1250    // claimed in the same session. Truncate before formatting.
1251    let now = time::OffsetDateTime::now_utc()
1252        .replace_nanosecond(0)
1253        .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1254        .format(&time::format_description::well_known::Rfc3339)
1255        .unwrap_or_default();
1256    // v0.5.19 (#9.1): preserve `discoverable` across re-claims. If the
1257    // request doesn't set it explicitly, keep whatever the existing
1258    // record had so a profile-update re-claim doesn't accidentally
1259    // re-publish a hidden handle. New first-time claims default to
1260    // discoverable=true explicitly.
1261    let discoverable = match (req.discoverable, &prior_record) {
1262        (Some(d), _) => Some(d),
1263        (None, Some(prev)) => prev.discoverable,
1264        (None, None) => Some(true),
1265    };
1266    let record = HandleRecord {
1267        nick: req.nick.clone(),
1268        did: did.clone(),
1269        card: req.card.clone(),
1270        slot_id: req.slot_id.clone(),
1271        relay_url: req.relay_url.clone(),
1272        claimed_at: now,
1273        discoverable,
1274    };
1275
1276    // Persist to disk first (durable), then update in-memory.
1277    let path = relay
1278        .state_dir
1279        .join("handles")
1280        .join(format!("{}.json", req.nick));
1281    let body = match serde_json::to_vec_pretty(&record) {
1282        Ok(b) => b,
1283        Err(e) => {
1284            return (
1285                StatusCode::INTERNAL_SERVER_ERROR,
1286                Json(json!({"error": format!("serialize failed: {e}")})),
1287            )
1288                .into_response();
1289        }
1290    };
1291    if let Err(e) = tokio::fs::write(&path, &body).await {
1292        return (
1293            StatusCode::INTERNAL_SERVER_ERROR,
1294            Json(json!({"error": format!("persist failed: {e}")})),
1295        )
1296            .into_response();
1297    }
1298    {
1299        let mut inner = relay.inner.lock().await;
1300        inner.handles.insert(req.nick.clone(), record);
1301    }
1302    relay
1303        .counters
1304        .handle_claims_total
1305        .fetch_add(1, Ordering::Relaxed);
1306    if first_claim {
1307        relay
1308            .counters
1309            .handle_first_claims_total
1310            .fetch_add(1, Ordering::Relaxed);
1311    }
1312    (
1313        StatusCode::CREATED,
1314        Json(json!({
1315            "nick": req.nick,
1316            "did": did,
1317            "status": if first_claim { "claimed" } else { "re-claimed" },
1318        })),
1319    )
1320        .into_response()
1321}
1322
1323#[derive(Deserialize)]
1324pub struct WellKnownAgentQuery {
1325    pub handle: String,
1326}
1327
1328#[derive(Deserialize)]
1329pub struct HandlesDirectoryQuery {
1330    pub cursor: Option<String>,
1331    pub limit: Option<usize>,
1332    pub vibe: Option<String>,
1333}
1334
1335// ─── short-URL invites (v0.5.10) ──────────────────────────────────────────
1336// One-curl onboarding: the invitor registers their `wire://pair?...` URL
1337// here, gets back a 6-hex token. Anyone who does
1338//   curl -fsSL https://wireup.net/i/<token> | sh
1339// gets wire installed (if needed) + the invite accepted, in one shot.
1340//
1341// Possession of the short URL = pair authorization (same shape as the
1342// underlying wire:// invite — it's just a redirector).
1343
1344#[derive(Deserialize)]
1345pub struct InviteRegisterRequest {
1346    /// The wire://pair?... URL produced by `wire invite`. Required.
1347    pub invite_url: String,
1348    /// Lifetime in seconds. Default 86400 (24h). Capped at 7 days.
1349    #[serde(default)]
1350    pub ttl_seconds: Option<u64>,
1351    /// If `Some(n)`, the short URL can be fetched N times before 410s.
1352    /// `None` = unlimited until TTL hits.
1353    #[serde(default)]
1354    pub uses: Option<u32>,
1355}
1356
1357impl Relay {
1358    /// Append one InviteRecord to `<state_dir>/invites.jsonl`.
1359    async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1360        use tokio::io::AsyncWriteExt;
1361        let mut line = serde_json::to_vec(rec)?;
1362        line.push(b'\n');
1363        let path = self.state_dir.join("invites.jsonl");
1364        let mut f = tokio::fs::OpenOptions::new()
1365            .create(true)
1366            .append(true)
1367            .open(&path)
1368            .await?;
1369        f.write_all(&line).await?;
1370        f.flush().await?;
1371        Ok(())
1372    }
1373}
1374
1375async fn invite_register(
1376    State(relay): State<Relay>,
1377    Json(req): Json<InviteRegisterRequest>,
1378) -> impl IntoResponse {
1379    if req.invite_url.is_empty() {
1380        return (
1381            StatusCode::BAD_REQUEST,
1382            Json(json!({"error": "invite_url required"})),
1383        )
1384            .into_response();
1385    }
1386    // Length cap on the embedded URL to keep persisted records bounded.
1387    if req.invite_url.len() > 8_192 {
1388        return (
1389            StatusCode::PAYLOAD_TOO_LARGE,
1390            Json(json!({"error": "invite_url > 8 KiB"})),
1391        )
1392            .into_response();
1393    }
1394    let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1395    let now = SystemTime::now()
1396        .duration_since(UNIX_EPOCH)
1397        .map(|d| d.as_secs())
1398        .unwrap_or(0);
1399    // 6-hex token → 16.7M space. Collision probability negligible at v0.5
1400    // scale; if a collision happens (1 in 16M) we 409 and the caller retries.
1401    let token = random_hex(3);
1402    let rec = InviteRecord {
1403        token: token.clone(),
1404        invite_url: req.invite_url,
1405        expires_unix: now + ttl,
1406        uses_remaining: req.uses,
1407        created_unix: now,
1408    };
1409    {
1410        let mut inner = relay.inner.lock().await;
1411        if inner.invites.contains_key(&token) {
1412            return (
1413                StatusCode::CONFLICT,
1414                Json(json!({"error": "token collision, retry"})),
1415            )
1416                .into_response();
1417        }
1418        inner.invites.insert(token.clone(), rec.clone());
1419    }
1420    if let Err(e) = relay.persist_invite(&rec).await {
1421        return (
1422            StatusCode::INTERNAL_SERVER_ERROR,
1423            Json(json!({"error": format!("persist failed: {e}")})),
1424        )
1425            .into_response();
1426    }
1427    (
1428        StatusCode::CREATED,
1429        Json(json!({
1430            "token": token,
1431            "path": format!("/i/{token}"),
1432            "expires_unix": rec.expires_unix,
1433            "uses_remaining": rec.uses_remaining,
1434        })),
1435    )
1436        .into_response()
1437}
1438
1439#[derive(Deserialize)]
1440pub struct InviteScriptQuery {
1441    /// `format=url` returns the raw `wire://pair?...` URL as text/plain
1442    /// (used by `wire accept https://wireup.net/i/<token>` to resolve a
1443    /// short URL programmatically). Default: shell-script template.
1444    /// Note: ?format=url does NOT decrement `uses_remaining` — it's a
1445    /// resolution lookup, not an acceptance. The actual accept happens
1446    /// when the wire:// URL is consumed by `pair_invite::accept_invite`.
1447    pub format: Option<String>,
1448}
1449
1450async fn invite_script(
1451    State(relay): State<Relay>,
1452    Path(token): Path<String>,
1453    Query(q): Query<InviteScriptQuery>,
1454) -> impl IntoResponse {
1455    // Token shape: 6 lowercase hex. Reject anything else immediately so a
1456    // path-traversal try never reaches the map lookup.
1457    if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1458        return (StatusCode::NOT_FOUND, "not found\n").into_response();
1459    }
1460    let want_raw_url = q.format.as_deref() == Some("url");
1461    let now = SystemTime::now()
1462        .duration_since(UNIX_EPOCH)
1463        .map(|d| d.as_secs())
1464        .unwrap_or(0);
1465    let invite_url = {
1466        let mut inner = relay.inner.lock().await;
1467        let Some(rec) = inner.invites.get_mut(&token) else {
1468            return (StatusCode::NOT_FOUND, "not found\n").into_response();
1469        };
1470        if rec.expires_unix <= now {
1471            return (StatusCode::GONE, "this invite has expired\n").into_response();
1472        }
1473        if let Some(n) = rec.uses_remaining {
1474            if n == 0 {
1475                return (StatusCode::GONE, "this invite has been used up\n").into_response();
1476            }
1477            // Only decrement on script-template fetch (the one that's
1478            // actually doing the pair). The raw-URL resolution path is a
1479            // lookup, not an accept.
1480            if !want_raw_url {
1481                rec.uses_remaining = Some(n - 1);
1482            }
1483        }
1484        rec.invite_url.clone()
1485    };
1486    if want_raw_url {
1487        return (
1488            StatusCode::OK,
1489            [
1490                (
1491                    axum::http::header::CONTENT_TYPE,
1492                    "text/plain; charset=utf-8",
1493                ),
1494                (
1495                    axum::http::header::CACHE_CONTROL,
1496                    "private, no-store, max-age=0",
1497                ),
1498            ],
1499            invite_url,
1500        )
1501            .into_response();
1502    }
1503    let escaped = invite_url.replace('\'', "'\\''");
1504    let script = format!(
1505        "#!/bin/sh\n\
1506         # wire — one-curl onboarding (install + pair in one shot)\n\
1507         # source: https://github.com/SlanchaAi/wire\n\
1508         set -eu\n\
1509         INVITE='{escaped}'\n\
1510         echo \"\u{2192} checking for wire CLI...\"\n\
1511         if ! command -v wire >/dev/null 2>&1; then\n  \
1512           echo \"\u{2192} wire not installed; installing first...\"\n  \
1513           curl -fsSL https://wireup.net/install.sh | sh\n  \
1514           case \":$PATH:\" in\n    \
1515             *:\"$HOME/.local/bin\":*) ;;\n    \
1516             *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n  \
1517           esac\n  \
1518           if ! command -v wire >/dev/null 2>&1; then\n    \
1519             echo \"\"\n    \
1520             echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n    \
1521             echo \"Open a new shell, then run:\"\n    \
1522             echo \"  wire accept '$INVITE'\"\n    \
1523             exit 0\n  \
1524           fi\n\
1525         fi\n\
1526         echo \"\u{2192} accepting invite...\"\n\
1527         wire accept \"$INVITE\"\n"
1528    );
1529    (
1530        StatusCode::OK,
1531        [
1532            (
1533                axum::http::header::CONTENT_TYPE,
1534                "text/x-shellscript; charset=utf-8",
1535            ),
1536            (
1537                axum::http::header::CACHE_CONTROL,
1538                "private, no-store, max-age=0",
1539            ),
1540        ],
1541        script,
1542    )
1543        .into_response()
1544}
1545
1546async fn handles_directory(
1547    State(relay): State<Relay>,
1548    Query(q): Query<HandlesDirectoryQuery>,
1549) -> impl IntoResponse {
1550    let limit = q.limit.unwrap_or(100).clamp(1, 500);
1551    let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1552    let inner = relay.inner.lock().await;
1553    let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1554    drop(inner);
1555    records.sort_by(|a, b| a.nick.cmp(&b.nick));
1556
1557    let cursor = q.cursor.as_deref();
1558    let mut eligible = Vec::new();
1559    for rec in records {
1560        if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1561            continue;
1562        }
1563        // Hygiene: hide test-shaped nicks from the public directory. Records
1564        // remain claimed (FCFS protection persists), they just don't surface
1565        // in the phone book. `demo-` is reserved for asciinema-cast handles,
1566        // `test-` for integration runs.
1567        if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1568            continue;
1569        }
1570        // v0.5.19 (#9.1): operator opt-out — hidden handles skip the
1571        // bulk directory but still resolve via `.well-known/wire/agent?
1572        // handle=X` for out-of-band sharing.
1573        if !rec.is_discoverable() {
1574            continue;
1575        }
1576        let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1577        if profile
1578            .get("listed")
1579            .and_then(Value::as_bool)
1580            .is_some_and(|listed| !listed)
1581        {
1582            continue;
1583        }
1584        if let Some(want) = &vibe_filter {
1585            let matched = profile
1586                .get("vibe")
1587                .and_then(Value::as_array)
1588                .map(|arr| {
1589                    arr.iter().any(|v| {
1590                        v.as_str()
1591                            .map(|s| s.eq_ignore_ascii_case(want))
1592                            .unwrap_or(false)
1593                    })
1594                })
1595                .unwrap_or(false);
1596            if !matched {
1597                continue;
1598            }
1599        }
1600        eligible.push((rec, profile));
1601    }
1602
1603    let has_more = eligible.len() > limit;
1604    let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1605    let next_cursor = if has_more {
1606        page.last().map(|(rec, _)| rec.nick.clone())
1607    } else {
1608        None
1609    };
1610    let handles: Vec<Value> = page
1611        .into_iter()
1612        .map(|(rec, profile)| {
1613            // v0.12.1: fall back to the DID-derived persona emoji when the
1614            // card carries no explicit profile emoji, so every phonebook
1615            // line shows a face next to the name. The persona is a
1616            // deterministic function of the DID, so the relay can compute it
1617            // without the claimant having set anything.
1618            let emoji = profile
1619                .get("emoji")
1620                .and_then(Value::as_str)
1621                .filter(|s| !s.is_empty())
1622                .map(str::to_string)
1623                .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1624            json!({
1625                "nick": rec.nick,
1626                "did": rec.did,
1627                "profile": {
1628                    "emoji": emoji,
1629                    "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1630                    "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1631                    "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1632                    "now": profile.get("now").cloned().unwrap_or(Value::Null),
1633                },
1634                "claimed_at": rec.claimed_at,
1635            })
1636        })
1637        .collect();
1638    (
1639        StatusCode::OK,
1640        Json(json!({
1641            "handles": handles,
1642            "next_cursor": next_cursor,
1643        })),
1644    )
1645        .into_response()
1646}
1647
1648/// `POST /v1/handle/intro/:nick` — drop a signed pair-introduction event
1649/// into a known nick's slot WITHOUT needing that slot's bearer token.
1650///
1651/// Why this exists: `.well-known/wire/agent` returns a nick's `slot_id` for
1652/// reachability, but NEVER its `slot_token` (that would leak read+write
1653/// authority to any handle-resolver). To zero-paste-pair, we need a way for
1654/// a stranger to deliver their signed agent-card to the nick's owner. This
1655/// endpoint provides exactly that, and ONLY that: the event must be `kind=1100`
1656/// (pair_drop / agent_card), self-signed, and the carrying agent-card embedded
1657/// in the body must verify-OK on its own.
1658///
1659/// Rate-limiting is the same governor that gates the other write endpoints.
1660/// Slot quota still applies — a flood of intros hits the standard 64MB cap.
1661async fn handle_intro(
1662    State(relay): State<Relay>,
1663    Path(nick): Path<String>,
1664    Json(req): Json<PostEventRequest>,
1665) -> impl IntoResponse {
1666    // Look up the nick. Must already be claimed.
1667    let slot_id = {
1668        let inner = relay.inner.lock().await;
1669        match inner.handles.get(&nick) {
1670            Some(rec) => rec.slot_id.clone(),
1671            None => {
1672                return (
1673                    StatusCode::NOT_FOUND,
1674                    Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1675                )
1676                    .into_response();
1677            }
1678        }
1679    };
1680
1681    // Only allow kind=1100 pair_drop / agent_card here. Anything else routes
1682    // to the standard /v1/events/:slot_id with bearer auth.
1683    let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1684    let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1685    if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1686        return (
1687            StatusCode::BAD_REQUEST,
1688            Json(json!({
1689                "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1690                "got_kind": kind,
1691                "got_type": type_str,
1692            })),
1693        )
1694            .into_response();
1695    }
1696
1697    // Body must embed a signed agent-card (so the receiver can pin from it).
1698    let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1699        Some(c) => c.clone(),
1700        None => {
1701            return (
1702                StatusCode::BAD_REQUEST,
1703                Json(json!({"error": "intro event body must embed 'card' field"})),
1704            )
1705                .into_response();
1706        }
1707    };
1708    if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1709        return (
1710            StatusCode::BAD_REQUEST,
1711            Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1712        )
1713            .into_response();
1714    }
1715
1716    // Size + quota checks (same as post_event).
1717    let body_bytes = match serde_json::to_vec(&req.event) {
1718        Ok(b) => b,
1719        Err(e) => {
1720            return (
1721                StatusCode::BAD_REQUEST,
1722                Json(json!({"error": format!("event not serializable: {e}")})),
1723            )
1724                .into_response();
1725        }
1726    };
1727    if body_bytes.len() > MAX_EVENT_BYTES {
1728        return (
1729            StatusCode::PAYLOAD_TOO_LARGE,
1730            Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1731        )
1732            .into_response();
1733    }
1734    {
1735        let inner = relay.inner.lock().await;
1736        let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1737        if used + body_bytes.len() > MAX_SLOT_BYTES {
1738            return (
1739                StatusCode::PAYLOAD_TOO_LARGE,
1740                Json(json!({
1741                    "error": "target slot quota exceeded",
1742                    "slot_bytes_used": used,
1743                    "slot_bytes_max": MAX_SLOT_BYTES,
1744                })),
1745            )
1746                .into_response();
1747        }
1748    }
1749
1750    let event_id = req
1751        .event
1752        .get("event_id")
1753        .and_then(Value::as_str)
1754        .map(str::to_string);
1755
1756    // Dedupe by event_id if present.
1757    let dup = {
1758        let inner = relay.inner.lock().await;
1759        let slot = inner.slots.get(&slot_id);
1760        if let (Some(eid), Some(slot)) = (&event_id, slot) {
1761            slot.iter()
1762                .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1763        } else {
1764            false
1765        }
1766    };
1767    if dup {
1768        return (
1769            StatusCode::OK,
1770            Json(json!({"event_id": event_id, "status": "duplicate"})),
1771        )
1772            .into_response();
1773    }
1774
1775    {
1776        let mut inner = relay.inner.lock().await;
1777        let event_size = body_bytes.len();
1778        let slot = inner.slots.entry(slot_id.clone()).or_default();
1779        slot.push(req.event.clone());
1780        *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1781    }
1782    if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1783        return (
1784            StatusCode::INTERNAL_SERVER_ERROR,
1785            Json(json!({"error": format!("persist failed: {e}")})),
1786        )
1787            .into_response();
1788    }
1789    (
1790        StatusCode::CREATED,
1791        Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1792    )
1793        .into_response()
1794}
1795
1796/// `GET /.well-known/wire/agent?handle=<nick>` — WebFinger-style resolver
1797/// for `nick@<this-relay-domain>` handles. Returns the signed agent-card +
1798/// slot coords if claimed; 404 if not.
1799///
1800/// The `handle` query parameter may be just `<nick>` or `<nick>@<domain>`.
1801/// Domain part is ignored (the relay only serves nicks it has on file).
1802/// `GET /.well-known/agent-card.json?handle=<nick>` — A2A v1.0-compatible
1803/// AgentCard serving wire's handle directory. Same data as `well_known_agent`
1804/// but in the schema A2A clients (MSFT/AWS/Salesforce/SAP/ServiceNow tooling,
1805/// agent-card-go, agent-card-python, A2A .NET SDK) already speak.
1806///
1807/// Wire-specific fields (DID, slot_id, profile blob, raw signed card) live
1808/// under the standard A2A `extensions` array using the wire extension URI.
1809/// A2A-only clients can pair to wire agents knowing only A2A vocabulary;
1810/// wire-native clients get the full richer card by following the extension.
1811async fn well_known_agent_card_a2a(
1812    State(relay): State<Relay>,
1813    Query(q): Query<WellKnownAgentQuery>,
1814) -> impl IntoResponse {
1815    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1816    if nick.is_empty() {
1817        return (
1818            StatusCode::BAD_REQUEST,
1819            Json(json!({"error": "handle missing nick"})),
1820        )
1821            .into_response();
1822    }
1823    let inner = relay.inner.lock().await;
1824    let rec = match inner.handles.get(&nick) {
1825        Some(r) => r.clone(),
1826        None => {
1827            return (
1828                StatusCode::NOT_FOUND,
1829                Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1830            )
1831                .into_response();
1832        }
1833    };
1834    drop(inner);
1835
1836    let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1837    let description = profile
1838        .get("motto")
1839        .and_then(Value::as_str)
1840        .unwrap_or("")
1841        .to_string();
1842    let display_name = profile
1843        .get("display_name")
1844        .and_then(Value::as_str)
1845        .unwrap_or(&rec.nick)
1846        .to_string();
1847    let relay_url = rec.relay_url.clone().unwrap_or_default();
1848    // Intro endpoint = where any A2A or wire client posts a signed pair-drop.
1849    let endpoint = if !relay_url.is_empty() {
1850        format!(
1851            "{}/v1/handle/intro/{}",
1852            relay_url.trim_end_matches('/'),
1853            rec.nick
1854        )
1855    } else {
1856        format!("/v1/handle/intro/{}", rec.nick)
1857    };
1858    let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1859
1860    // Build A2A v1.0 AgentCard shape with wire extension. Fields named to
1861    // match the A2A spec exactly so downstream tooling (agent-card-go etc.)
1862    // parses without custom code.
1863    let a2a_card = json!({
1864        "id": rec.did,
1865        "name": display_name,
1866        "description": description,
1867        "version": "wire/0.5",
1868        "endpoint": endpoint,
1869        "provider": {
1870            "name": "wire",
1871            "url": "https://github.com/SlanchaAi/wire"
1872        },
1873        "capabilities": {
1874            "streaming": false,
1875            "pushNotifications": false,
1876            "extendedAgentCard": true
1877        },
1878        "securitySchemes": {
1879            "ed25519-event-sig": {
1880                "type": "signature",
1881                "alg": "EdDSA",
1882                "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1883            }
1884        },
1885        "security": [{"ed25519-event-sig": []}],
1886        "skills": [],
1887        "extensions": [{
1888            // A2A extension URIs are opaque namespace identifiers, not
1889            // forwardable URLs. Changing this string is a coordinated
1890            // federation-spec bump because peers match it exactly.
1891            "uri": "https://slancha.ai/wire/ext/v0.5",
1892            "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1893            "required": false,
1894            "params": {
1895                "did": rec.did,
1896                "handle": rec.nick,
1897                "slot_id": rec.slot_id,
1898                "relay_url": rec.relay_url,
1899                "card": rec.card,
1900                "profile": profile,
1901                "claimed_at": rec.claimed_at,
1902            }
1903        }],
1904        "signature": card_sig,
1905    });
1906    (StatusCode::OK, Json(a2a_card)).into_response()
1907}
1908
1909async fn well_known_agent(
1910    State(relay): State<Relay>,
1911    Query(q): Query<WellKnownAgentQuery>,
1912) -> impl IntoResponse {
1913    let nick = q.handle.split('@').next().unwrap_or("").to_string();
1914    if nick.is_empty() {
1915        return (
1916            StatusCode::BAD_REQUEST,
1917            Json(json!({"error": "handle missing nick"})),
1918        )
1919            .into_response();
1920    }
1921    let inner = relay.inner.lock().await;
1922    match inner.handles.get(&nick) {
1923        Some(rec) => (
1924            StatusCode::OK,
1925            Json(json!({
1926                "nick": rec.nick,
1927                "did": rec.did,
1928                "card": rec.card,
1929                "slot_id": rec.slot_id,
1930                "relay_url": rec.relay_url,
1931                "claimed_at": rec.claimed_at,
1932            })),
1933        )
1934            .into_response(),
1935        None => (
1936            StatusCode::NOT_FOUND,
1937            Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1938        )
1939            .into_response(),
1940    }
1941}
1942
1943async fn list_events(
1944    State(relay): State<Relay>,
1945    Path(slot_id): Path<String>,
1946    Query(q): Query<ListEventsQuery>,
1947    headers: HeaderMap,
1948) -> impl IntoResponse {
1949    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1950        return resp;
1951    }
1952    let limit = q.limit.unwrap_or(100).min(1000);
1953    let mut inner = relay.inner.lock().await;
1954    // R4: record this pull as proof that the slot owner is still polling.
1955    // Anyone holding the slot_token (i.e., a paired peer) can later read
1956    // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
1957    let now_unix = std::time::SystemTime::now()
1958        .duration_since(std::time::UNIX_EPOCH)
1959        .map(|d| d.as_secs())
1960        .unwrap_or(0);
1961    inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1962    let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1963    let start = match q.since {
1964        Some(eid) => events
1965            .iter()
1966            .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1967            .map(|i| i + 1)
1968            .unwrap_or(0),
1969        None => 0,
1970    };
1971    let end = (start + limit).min(events.len());
1972    let slice = events[start..end].to_vec();
1973    (StatusCode::OK, Json(slice)).into_response()
1974}
1975
1976/// R4 — slot-attentiveness probe. Authenticated by slot_token (so only
1977/// paired peers can ask). Returns `last_pull_at_unix` (the slot owner's most
1978/// recent `list_events` call, in unix seconds) and `event_count` (total
1979/// stored). A remote sender uses this before `wire send <peer>` to warn the
1980/// operator if the peer hasn't polled recently.
1981async fn slot_state(
1982    State(relay): State<Relay>,
1983    Path(slot_id): Path<String>,
1984    headers: HeaderMap,
1985) -> impl IntoResponse {
1986    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1987        return resp;
1988    }
1989    let inner = relay.inner.lock().await;
1990    let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1991    let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1992    let responder_health = inner.responder_health.get(&slot_id).cloned();
1993    (
1994        StatusCode::OK,
1995        Json(json!({
1996            "slot_id": slot_id,
1997            "event_count": event_count,
1998            "last_pull_at_unix": last_pull_at_unix,
1999            "responder_health": responder_health,
2000        })),
2001    )
2002        .into_response()
2003}
2004
2005async fn responder_health_set(
2006    State(relay): State<Relay>,
2007    Path(slot_id): Path<String>,
2008    headers: HeaderMap,
2009    Json(record): Json<ResponderHealthRecord>,
2010) -> impl IntoResponse {
2011    if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
2012        return resp;
2013    }
2014    let path = relay
2015        .state_dir
2016        .join("responder-health")
2017        .join(format!("{slot_id}.json"));
2018    let body = match serde_json::to_vec_pretty(&record) {
2019        Ok(b) => b,
2020        Err(e) => {
2021            return (
2022                StatusCode::INTERNAL_SERVER_ERROR,
2023                Json(json!({"error": format!("serialize failed: {e}")})),
2024            )
2025                .into_response();
2026        }
2027    };
2028    if let Err(e) = tokio::fs::write(&path, body).await {
2029        return (
2030            StatusCode::INTERNAL_SERVER_ERROR,
2031            Json(json!({"error": format!("persist failed: {e}")})),
2032        )
2033            .into_response();
2034    }
2035    {
2036        let mut inner = relay.inner.lock().await;
2037        inner
2038            .responder_health
2039            .insert(slot_id.clone(), record.clone());
2040    }
2041    (StatusCode::OK, Json(record)).into_response()
2042}
2043
2044async fn check_token(
2045    relay: &Relay,
2046    headers: &HeaderMap,
2047    slot_id: &str,
2048) -> std::result::Result<(), axum::response::Response> {
2049    let auth = headers
2050        .get(AUTHORIZATION)
2051        .and_then(|h| h.to_str().ok())
2052        .and_then(|s| s.strip_prefix("Bearer "))
2053        .map(str::to_string);
2054    let presented = match auth {
2055        Some(t) => t,
2056        None => {
2057            return Err((
2058                StatusCode::UNAUTHORIZED,
2059                Json(json!({"error": "missing Bearer token"})),
2060            )
2061                .into_response());
2062        }
2063    };
2064    let inner = relay.inner.lock().await;
2065    let expected = match inner.tokens.get(slot_id) {
2066        Some(t) => t.clone(),
2067        None => {
2068            return Err((
2069                StatusCode::NOT_FOUND,
2070                Json(json!({"error": "unknown slot"})),
2071            )
2072                .into_response());
2073        }
2074    };
2075    drop(inner);
2076    if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2077        return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2078    }
2079    Ok(())
2080}
2081
2082fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2083    if a.len() != b.len() {
2084        return false;
2085    }
2086    let mut acc = 0u8;
2087    for (x, y) in a.iter().zip(b.iter()) {
2088        acc |= x ^ y;
2089    }
2090    acc == 0
2091}
2092
2093fn is_valid_slot_id(s: &str) -> bool {
2094    s.len() == 32
2095        && s.bytes()
2096            .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2097}
2098
2099fn random_hex(n_bytes: usize) -> String {
2100    let mut buf = vec![0u8; n_bytes];
2101    rand::thread_rng().fill_bytes(&mut buf);
2102    hex::encode(buf)
2103}
2104
2105/// Run the relay until SIGINT/SIGTERM.
2106pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2107    serve_with_mode(bind, state_dir, ServerMode::default()).await
2108}
2109
2110/// v0.5.17: server-mode-aware entry point. Same as `serve` but with the
2111/// `--local-only` toggle exposed so the binary can refuse to publish
2112/// phonebook + well-known surfaces on within-machine relays.
2113pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2114    let relay = Relay::new(state_dir).await?;
2115    relay.spawn_pair_sweeper();
2116    relay.spawn_counter_persister();
2117    let app = relay.clone().router_with_mode(mode);
2118    let listener = tokio::net::TcpListener::bind(bind)
2119        .await
2120        .with_context(|| format!("binding {bind}"))?;
2121    if mode.local_only {
2122        eprintln!(
2123            "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2124        );
2125    } else {
2126        eprintln!("wire relay-server listening on {bind}");
2127    }
2128    let shutdown_relay = relay.clone();
2129    axum::serve(listener, app)
2130        .with_graceful_shutdown(async move {
2131            let _ = tokio::signal::ctrl_c().await;
2132            eprintln!("\nshutting down — final counter snapshot");
2133            if let Err(e) = shutdown_relay.persist_counters().await {
2134                eprintln!("final counter persist failed: {e}");
2135            }
2136        })
2137        .await?;
2138    Ok(())
2139}
2140
2141/// v0.7.0-alpha.16: Unix Domain Socket entry point. Mirrors `serve_with_mode`
2142/// but binds to a UDS path instead of a TCP host:port. Implicitly forces
2143/// `mode.local_only = true` because UDS has no concept of "publish to a
2144/// public phonebook." Chmods the socket 0600 so only the owner uid can
2145/// connect — the SO_PEERCRED-equivalent trust anchor for sister sessions.
2146///
2147/// Removes any stale socket file at `path` first (otherwise bind fails
2148/// with EADDRINUSE). Removes the socket on graceful shutdown.
2149///
2150/// Implementation note: axum 0.7's `serve` is TcpListener-only, so we
2151/// run the manual hyper accept loop (per-connection
2152/// `hyper::server::conn::http1::Builder`). When axum 0.8 ships with
2153/// generic `Listener` support, this can collapse to one line.
2154#[cfg(unix)]
2155pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2156    use hyper::server::conn::http1;
2157    use hyper_util::rt::TokioIo;
2158    use tower_service::Service;
2159
2160    // Best-effort cleanup of stale socket file.
2161    if socket_path.exists() {
2162        std::fs::remove_file(&socket_path)
2163            .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2164    }
2165    if let Some(parent) = socket_path.parent() {
2166        std::fs::create_dir_all(parent)
2167            .with_context(|| format!("creating socket parent {parent:?}"))?;
2168    }
2169    let relay = Relay::new(state_dir).await?;
2170    relay.spawn_pair_sweeper();
2171    relay.spawn_counter_persister();
2172    let app: axum::Router = relay
2173        .clone()
2174        .router_with_mode(ServerMode { local_only: true });
2175    let listener = tokio::net::UnixListener::bind(&socket_path)
2176        .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2177
2178    // 0600: owner-rw only. Trust anchor is the kernel-attested peer uid
2179    // (SO_PEERCRED equivalent); chmod is defense-in-depth.
2180    use std::os::unix::fs::PermissionsExt;
2181    if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2182        eprintln!(
2183            "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2184             socket may be accessible to other uids. Investigate."
2185        );
2186    }
2187    eprintln!(
2188        "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2189        socket_path.display()
2190    );
2191
2192    // Manual accept loop + per-conn hyper serve (axum 0.7's serve is
2193    // TcpListener-only). On SIGINT, persist counters + remove socket.
2194    let shutdown_relay = relay.clone();
2195    let socket_path_for_cleanup = socket_path.clone();
2196    let mut make_service = app.into_make_service();
2197
2198    let serve_loop = async {
2199        loop {
2200            let (stream, _peer_addr) = match listener.accept().await {
2201                Ok(p) => p,
2202                Err(e) => {
2203                    eprintln!("wire relay-server (UDS): accept failed: {e}");
2204                    continue;
2205                }
2206            };
2207            let tower_service = match make_service.call(&stream).await {
2208                Ok(s) => s,
2209                Err(infallible) => match infallible {},
2210            };
2211            let io = TokioIo::new(stream);
2212            let hyper_service =
2213                hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2214                    let mut svc = tower_service.clone();
2215                    async move { Service::call(&mut svc, req).await }
2216                });
2217            tokio::task::spawn(async move {
2218                if let Err(e) = http1::Builder::new()
2219                    .serve_connection(io, hyper_service)
2220                    .await
2221                {
2222                    // Connection-level error; not fatal to the listener.
2223                    if !e.is_incomplete_message() {
2224                        eprintln!("wire relay-server (UDS): conn error: {e}");
2225                    }
2226                }
2227            });
2228        }
2229    };
2230
2231    let shutdown = async {
2232        let _ = tokio::signal::ctrl_c().await;
2233        eprintln!("\nshutting down — final counter snapshot");
2234        if let Err(e) = shutdown_relay.persist_counters().await {
2235            eprintln!("final counter persist failed: {e}");
2236        }
2237        let _ = std::fs::remove_file(&socket_path_for_cleanup);
2238    };
2239
2240    tokio::select! {
2241        _ = serve_loop => {},
2242        _ = shutdown => {},
2243    };
2244    Ok(())
2245}
2246
2247#[cfg(not(unix))]
2248pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2249    Err(anyhow::anyhow!(
2250        "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2251         Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2252    ))
2253}
2254
2255/// v0.5.17: relay-server mode toggles. Default = full federation
2256/// (current behavior). `local_only` strips phonebook + well-known
2257/// surfaces so the relay is invisible from off-box and from any
2258/// directory-scraping agent on the same box.
2259#[derive(Debug, Clone, Copy, Default)]
2260pub struct ServerMode {
2261    /// When true, skip phonebook listing + `.well-known/wire/agent` +
2262    /// `.well-known/agent-card.json` + landing/stats pages. Pair this
2263    /// with a loopback bind (`--local-only` enforces this at the CLI
2264    /// layer) for genuinely within-machine traffic.
2265    pub local_only: bool,
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270    use super::*;
2271
2272    #[test]
2273    fn constant_time_eq_basic() {
2274        assert!(constant_time_eq(b"abc", b"abc"));
2275        assert!(!constant_time_eq(b"abc", b"abd"));
2276        assert!(!constant_time_eq(b"abc", b"abcd")); // length mismatch
2277    }
2278
2279    #[test]
2280    fn random_hex_length() {
2281        let s = random_hex(16);
2282        assert_eq!(s.len(), 32); // 16 bytes -> 32 hex chars
2283        assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2284    }
2285
2286    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2287    async fn pair_slot_evicts_when_idle_past_ttl() {
2288        let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2289        let _ = std::fs::remove_dir_all(&dir);
2290        let relay = Relay::new(dir.clone()).await.unwrap();
2291
2292        // Seed a pair-slot manually with a past last_touched.
2293        {
2294            let mut inner = relay.inner.lock().await;
2295            inner
2296                .pair_lookup
2297                .insert("hash-A".to_string(), "id-A".to_string());
2298            inner.pair_slots.insert(
2299                "id-A".to_string(),
2300                PairSlot {
2301                    last_touched: std::time::Instant::now()
2302                        - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2303                    ..PairSlot::default()
2304                },
2305            );
2306
2307            // And a fresh one — should survive.
2308            inner
2309                .pair_lookup
2310                .insert("hash-B".to_string(), "id-B".to_string());
2311            inner
2312                .pair_slots
2313                .insert("id-B".to_string(), PairSlot::default());
2314
2315            assert_eq!(inner.pair_slots.len(), 2);
2316            assert_eq!(inner.pair_lookup.len(), 2);
2317        }
2318
2319        relay.evict_expired_pair_slots().await;
2320
2321        let inner = relay.inner.lock().await;
2322        assert_eq!(
2323            inner.pair_slots.len(),
2324            1,
2325            "expired slot should have been evicted"
2326        );
2327        assert!(inner.pair_slots.contains_key("id-B"));
2328        assert_eq!(inner.pair_lookup.len(), 1);
2329        assert!(inner.pair_lookup.contains_key("hash-B"));
2330        let _ = std::fs::remove_dir_all(&dir);
2331    }
2332
2333    #[test]
2334    fn slot_id_validator_accepts_only_lowercase_32hex() {
2335        assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2336        assert!(is_valid_slot_id(&random_hex(16)));
2337        // wrong length
2338        assert!(!is_valid_slot_id("abc"));
2339        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); // 31
2340        assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); // 33
2341        // uppercase
2342        assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2343        // path traversal attempts
2344        assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2345        assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2346        assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2347        // null bytes
2348        assert!(!is_valid_slot_id(
2349            "0123456789abcdef\0\x31\x32\x33456789abcdef"
2350        ));
2351    }
2352}