1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct ResponderHealthRecord {
172 pub status: String,
173 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub reason: Option<String>,
175 #[serde(default, skip_serializing_if = "Option::is_none")]
176 pub last_success_at: Option<String>,
177 pub set_at: String,
178}
179
180#[derive(Clone, Debug)]
181struct PairSlot {
182 host_msg: Option<String>,
184 guest_msg: Option<String>,
186 host_bootstrap: Option<String>,
188 guest_bootstrap: Option<String>,
190 last_touched: std::time::Instant,
192}
193
194impl Default for PairSlot {
195 fn default() -> Self {
196 Self {
197 host_msg: None,
198 guest_msg: None,
199 host_bootstrap: None,
200 guest_bootstrap: None,
201 last_touched: std::time::Instant::now(),
202 }
203 }
204}
205
206const PAIR_SLOT_TTL_SECS: u64 = 300;
209
210#[derive(Deserialize)]
211pub struct AllocateRequest {
212 #[serde(default)]
214 pub handle: Option<String>,
215}
216
217#[derive(Deserialize)]
218pub struct PostEventRequest {
219 pub event: Value,
220}
221
222#[derive(Deserialize)]
223pub struct ListEventsQuery {
224 pub since: Option<String>,
226 pub limit: Option<usize>,
228}
229
230impl Relay {
231 pub async fn new(state_dir: PathBuf) -> Result<Self> {
232 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
233 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
234 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
235 let mut inner = Inner {
236 slots: HashMap::new(),
237 tokens: HashMap::new(),
238 slot_bytes: HashMap::new(),
239 last_pull_at_unix: HashMap::new(),
240 streams: HashMap::new(),
241 pair_lookup: HashMap::new(),
242 pair_slots: HashMap::new(),
243 handles: HashMap::new(),
244 responder_health: HashMap::new(),
245 invites: HashMap::new(),
246 };
247 let token_path = state_dir.join("tokens.json");
249 if token_path.exists() {
250 let body = tokio::fs::read_to_string(&token_path).await?;
251 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
252 }
253 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
255 while let Some(entry) = slots_dir.next_entry().await? {
256 let path = entry.path();
257 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
258 continue;
259 }
260 let stem = match path.file_stem().and_then(|s| s.to_str()) {
261 Some(s) => s.to_string(),
262 None => continue,
263 };
264 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
265 let mut events = Vec::new();
266 for line in body.lines() {
267 if let Ok(v) = serde_json::from_str::<Value>(line) {
268 events.push(v);
269 }
270 }
271 let bytes: usize = events
273 .iter()
274 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
275 .sum();
276 inner.slot_bytes.insert(stem.clone(), bytes);
277 inner.slots.insert(stem, events);
278 }
279 let handles_dir = state_dir.join("handles");
281 if handles_dir.exists() {
282 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
283 while let Some(entry) = rd.next_entry().await? {
284 let path = entry.path();
285 if path.extension().and_then(|x| x.to_str()) != Some("json") {
286 continue;
287 }
288 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
289 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
290 inner.handles.insert(rec.nick.clone(), rec);
291 }
292 }
293 }
294 let responder_health_dir = state_dir.join("responder-health");
296 if responder_health_dir.exists() {
297 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
298 while let Some(entry) = rd.next_entry().await? {
299 let path = entry.path();
300 if path.extension().and_then(|x| x.to_str()) != Some("json") {
301 continue;
302 }
303 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
304 continue;
305 };
306 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
307 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
308 inner.responder_health.insert(slot_id.to_string(), rec);
309 }
310 }
311 }
312 let invites_path = state_dir.join("invites.jsonl");
316 if invites_path.exists() {
317 let now_unix = SystemTime::now()
318 .duration_since(UNIX_EPOCH)
319 .map(|d| d.as_secs())
320 .unwrap_or(0);
321 let body = tokio::fs::read_to_string(&invites_path)
322 .await
323 .unwrap_or_default();
324 for line in body.lines() {
325 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
326 && rec.expires_unix > now_unix
327 {
328 inner.invites.insert(rec.token.clone(), rec);
329 }
330 }
331 }
332 let boot_unix = SystemTime::now()
333 .duration_since(UNIX_EPOCH)
334 .map(|d| d.as_secs())
335 .unwrap_or(0);
336 let snap: CountersSnapshot =
338 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
339 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
340 Err(_) => CountersSnapshot::default(),
341 };
342 Ok(Self {
343 inner: Arc::new(Mutex::new(inner)),
344 state_dir,
345 counters: Arc::new(RelayCounters {
346 boot_unix,
347 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
348 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
349 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
350 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
351 events_posted_total: AtomicU64::new(snap.events_posted_total),
352 }),
353 })
354 }
355
356 pub fn router(self) -> Router {
357 let governor_conf = std::sync::Arc::new(
364 GovernorConfigBuilder::default()
365 .per_second(10)
366 .burst_size(50)
367 .key_extractor(GlobalKeyExtractor)
368 .finish()
369 .expect("valid governor config"),
370 );
371 let governor_layer = GovernorLayer {
372 config: governor_conf,
373 };
374
375 let hot_writes = Router::new()
377 .route("/v1/slot/allocate", post(allocate_slot))
378 .route("/v1/pair", post(pair_open))
379 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
380 .route("/v1/pair/abandon", post(pair_abandon))
381 .layer(governor_layer);
382
383 Router::new()
384 .route("/", get(landing_index))
385 .route("/favicon.svg", get(landing_favicon))
386 .route("/og.png", get(landing_og))
387 .route("/demo.cast", get(landing_demo_cast))
388 .route("/install", get(landing_install_sh))
389 .route("/install.sh", get(landing_install_sh))
390 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
391 .route("/healthz", get(healthz))
392 .route("/stats", get(stats_root))
393 .route("/stats.json", get(stats_json))
394 .route("/stats.html", get(landing_stats_html))
395 .route("/stats.history", get(stats_history))
396 .route("/phonebook", get(landing_phonebook_html))
397 .route("/phonebook.html", get(landing_phonebook_html))
398 .route("/v1/events/:slot_id", post(post_event).get(list_events))
399 .route("/v1/slot/:slot_id/state", get(slot_state))
400 .route(
401 "/v1/slot/:slot_id/responder-health",
402 post(responder_health_set),
403 )
404 .route("/v1/events/:slot_id/stream", get(stream_events))
405 .route("/v1/pair/:pair_id", get(pair_get))
406 .route("/v1/handle/claim", post(handle_claim))
407 .route("/v1/handle/intro/:nick", post(handle_intro))
408 .route("/v1/handles", get(handles_directory))
409 .route("/v1/invite/register", post(invite_register))
410 .route("/i/:token", get(invite_script))
411 .route("/.well-known/wire/agent", get(well_known_agent))
412 .route(
413 "/.well-known/agent-card.json",
414 get(well_known_agent_card_a2a),
415 )
416 .merge(hot_writes)
417 .with_state(self)
418 }
419
420 async fn evict_expired_pair_slots(&self) {
424 let now = std::time::Instant::now();
425 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
426 let mut inner = self.inner.lock().await;
427 let mut to_remove = Vec::new();
428 for (id, slot) in inner.pair_slots.iter() {
429 if now.duration_since(slot.last_touched) > ttl {
430 to_remove.push(id.clone());
431 }
432 }
433 for id in to_remove {
434 inner.pair_slots.remove(&id);
435 inner.pair_lookup.retain(|_, v| v != &id);
436 }
437 }
438
439 pub fn spawn_pair_sweeper(&self) {
444 let me = self.clone();
445 tokio::spawn(async move {
446 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
447 loop {
448 tick.tick().await;
449 me.evict_expired_pair_slots().await;
450 }
451 });
452 }
453
454 pub async fn persist_counters(&self) -> Result<()> {
458 let snap = CountersSnapshot {
459 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
460 handle_first_claims_total: self
461 .counters
462 .handle_first_claims_total
463 .load(Ordering::Relaxed),
464 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
465 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
466 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
467 };
468 let body = serde_json::to_vec_pretty(&snap)?;
469 let path = self.state_dir.join("counters.json");
470 tokio::fs::write(path, body).await?;
471 Ok(())
472 }
473
474 pub async fn append_history(&self) -> Result<()> {
479 use tokio::io::AsyncWriteExt;
480 let now = SystemTime::now()
481 .duration_since(UNIX_EPOCH)
482 .map(|d| d.as_secs())
483 .unwrap_or(0);
484 let (handles_active, slots_active, pair_slots_open, streams_active) = {
485 let inner = self.inner.lock().await;
486 (
487 inner.handles.len(),
488 inner.slots.len(),
489 inner.pair_slots.len(),
490 inner.streams.values().map(Vec::len).sum::<usize>(),
491 )
492 };
493 let entry = HistoryEntry {
494 ts: now,
495 handles_active,
496 slots_active,
497 pair_slots_open,
498 streams_active,
499 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
500 handle_first_claims_total: self
501 .counters
502 .handle_first_claims_total
503 .load(Ordering::Relaxed),
504 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
505 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
506 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
507 };
508 let line = serde_json::to_vec(&entry)?;
509 let path = self.state_dir.join("stats-history.jsonl");
510 let mut f = tokio::fs::OpenOptions::new()
511 .create(true)
512 .append(true)
513 .open(&path)
514 .await?;
515 f.write_all(&line).await?;
516 f.write_all(b"\n").await?;
517 f.flush().await?;
518 Ok(())
519 }
520
521 pub fn spawn_counter_persister(&self) {
525 let me = self.clone();
526 tokio::spawn(async move {
527 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
528 tick.tick().await;
531 loop {
532 tick.tick().await;
533 if let Err(e) = me.persist_counters().await {
534 eprintln!("counter persist failed: {e}");
535 }
536 if let Err(e) = me.append_history().await {
537 eprintln!("history append failed: {e}");
538 }
539 }
540 });
541 }
542
543 async fn persist_tokens(&self) -> Result<()> {
544 let body = {
545 let inner = self.inner.lock().await;
546 serde_json::to_string_pretty(&inner.tokens)?
547 };
548 let path = self.state_dir.join("tokens.json");
549 tokio::fs::write(path, body).await?;
550 Ok(())
551 }
552
553 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
554 if !is_valid_slot_id(slot_id) {
560 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
561 }
562 let path = self
563 .state_dir
564 .join("slots")
565 .join(format!("{slot_id}.jsonl"));
566 let mut line = serde_json::to_vec(event)?;
567 line.push(b'\n');
568 use tokio::io::AsyncWriteExt;
569 let mut f = tokio::fs::OpenOptions::new()
570 .create(true)
571 .append(true)
572 .open(&path)
573 .await
574 .with_context(|| format!("opening {path:?}"))?;
575 f.write_all(&line).await?;
576 f.flush().await?;
577 Ok(())
578 }
579}
580
581async fn healthz() -> impl IntoResponse {
582 (StatusCode::OK, "ok\n")
583}
584
585async fn stats_history(
589 State(relay): State<Relay>,
590 Query(q): Query<StatsHistoryQuery>,
591) -> impl IntoResponse {
592 let hours = q.hours.unwrap_or(24).min(168);
593 let now = SystemTime::now()
594 .duration_since(UNIX_EPOCH)
595 .map(|d| d.as_secs())
596 .unwrap_or(0);
597 let cutoff = now.saturating_sub(hours * 3600);
598 let path = relay.state_dir.join("stats-history.jsonl");
599 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
600 let entries: Vec<Value> = body
601 .lines()
602 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
603 .filter(|v| {
604 v.get("ts")
605 .and_then(Value::as_u64)
606 .map(|t| t >= cutoff)
607 .unwrap_or(false)
608 })
609 .collect();
610 (
611 StatusCode::OK,
612 Json(json!({
613 "hours": hours,
614 "now_unix": now,
615 "count": entries.len(),
616 "entries": entries,
617 })),
618 )
619}
620
621async fn landing_stats_html() -> impl IntoResponse {
622 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
623 (
624 StatusCode::OK,
625 [
626 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
627 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
628 ],
629 STATS_HTML,
630 )
631}
632
633async fn landing_phonebook_html() -> impl IntoResponse {
634 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
635 (
636 StatusCode::OK,
637 [
638 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
639 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
640 ],
641 PHONEBOOK_HTML,
642 )
643}
644
645async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
650 let wants_html = headers
651 .get(axum::http::header::ACCEPT)
652 .and_then(|v| v.to_str().ok())
653 .unwrap_or("")
654 .contains("text/html");
655 if wants_html {
656 landing_stats_html().await.into_response()
657 } else {
658 stats_json(State(relay)).await.into_response()
659 }
660}
661
662async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
663 let now = SystemTime::now()
664 .duration_since(UNIX_EPOCH)
665 .map(|d| d.as_secs())
666 .unwrap_or(0);
667 let inner = relay.inner.lock().await;
668 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
669 let body = json!({
670 "version": env!("CARGO_PKG_VERSION"),
671 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
672 "handles_active": inner.handles.len(),
673 "slots_active": inner.slots.len(),
674 "pair_slots_open": inner.pair_slots.len(),
675 "streams_active": streams_active,
676 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
677 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
678 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
679 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
680 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
681 });
682 (StatusCode::OK, Json(body))
683}
684
685async fn landing_index() -> impl IntoResponse {
689 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
690 (
691 StatusCode::OK,
692 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
693 INDEX_HTML,
694 )
695}
696
697async fn landing_favicon() -> impl IntoResponse {
698 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
699 (
700 StatusCode::OK,
701 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
702 FAVICON_SVG,
703 )
704}
705
706async fn landing_og() -> impl IntoResponse {
707 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
708 (
709 StatusCode::OK,
710 [
711 (axum::http::header::CONTENT_TYPE, "image/png"),
712 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
713 ],
714 OG_PNG,
715 )
716}
717
718async fn landing_demo_cast() -> impl IntoResponse {
719 static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
720 (
721 StatusCode::OK,
722 [
723 (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
724 (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
725 ],
726 DEMO_CAST,
727 )
728}
729
730async fn landing_install_sh() -> impl IntoResponse {
731 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
732 (
733 StatusCode::OK,
734 [
735 (
736 axum::http::header::CONTENT_TYPE,
737 "text/x-shellscript; charset=utf-8",
738 ),
739 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
740 ],
741 INSTALL_SH,
742 )
743}
744
745async fn landing_openshell_policy_sh() -> impl IntoResponse {
746 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
747 (
748 StatusCode::OK,
749 [
750 (
751 axum::http::header::CONTENT_TYPE,
752 "text/x-shellscript; charset=utf-8",
753 ),
754 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
755 ],
756 POLICY_SH,
757 )
758}
759
760async fn allocate_slot(
761 State(relay): State<Relay>,
762 Json(_req): Json<AllocateRequest>,
763) -> impl IntoResponse {
764 let slot_id = random_hex(16);
765 let slot_token = random_hex(32);
766 {
767 let mut inner = relay.inner.lock().await;
768 inner.slots.insert(slot_id.clone(), Vec::new());
769 inner.tokens.insert(slot_id.clone(), slot_token.clone());
770 }
771 if let Err(e) = relay.persist_tokens().await {
772 return (
773 StatusCode::INTERNAL_SERVER_ERROR,
774 Json(json!({"error": format!("persist failed: {e}")})),
775 )
776 .into_response();
777 }
778 relay
779 .counters
780 .slot_allocations_total
781 .fetch_add(1, Ordering::Relaxed);
782 (
783 StatusCode::CREATED,
784 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
785 )
786 .into_response()
787}
788
789async fn post_event(
790 State(relay): State<Relay>,
791 Path(slot_id): Path<String>,
792 headers: HeaderMap,
793 Json(req): Json<PostEventRequest>,
794) -> impl IntoResponse {
795 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
796 return resp;
797 }
798 let body_bytes = match serde_json::to_vec(&req.event) {
800 Ok(b) => b,
801 Err(e) => {
802 return (
803 StatusCode::BAD_REQUEST,
804 Json(json!({"error": format!("event not serializable: {e}")})),
805 )
806 .into_response();
807 }
808 };
809 if body_bytes.len() > MAX_EVENT_BYTES {
810 return (
811 StatusCode::PAYLOAD_TOO_LARGE,
812 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
813 )
814 .into_response();
815 }
816 {
818 let inner = relay.inner.lock().await;
819 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
820 if used + body_bytes.len() > MAX_SLOT_BYTES {
821 return (
822 StatusCode::PAYLOAD_TOO_LARGE,
823 Json(json!({
824 "error": "slot quota exceeded",
825 "slot_bytes_used": used,
826 "slot_bytes_max": MAX_SLOT_BYTES,
827 "remediation": "operator should `wire rotate-slot` to drain old slot",
828 })),
829 )
830 .into_response();
831 }
832 }
833 let event_id = req
834 .event
835 .get("event_id")
836 .and_then(Value::as_str)
837 .map(str::to_string);
838
839 let dup = {
841 let inner = relay.inner.lock().await;
842 let slot = inner.slots.get(&slot_id);
843 if let (Some(eid), Some(slot)) = (&event_id, slot) {
844 slot.iter()
845 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
846 } else {
847 false
848 }
849 };
850 if dup {
851 return (
852 StatusCode::OK,
853 Json(json!({"event_id": event_id, "status": "duplicate"})),
854 )
855 .into_response();
856 }
857
858 {
859 let mut inner = relay.inner.lock().await;
860 let event_size = body_bytes.len();
861 let slot = inner.slots.entry(slot_id.clone()).or_default();
862 slot.push(req.event.clone());
863 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
864 }
865 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
866 return (
867 StatusCode::INTERNAL_SERVER_ERROR,
868 Json(json!({"error": format!("persist failed: {e}")})),
869 )
870 .into_response();
871 }
872 relay
873 .counters
874 .events_posted_total
875 .fetch_add(1, Ordering::Relaxed);
876 {
881 let mut inner = relay.inner.lock().await;
882 if let Some(subs) = inner.streams.get_mut(&slot_id) {
883 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
884 }
885 }
886 (
887 StatusCode::CREATED,
888 Json(json!({"event_id": event_id, "status": "stored"})),
889 )
890 .into_response()
891}
892
893async fn stream_events(
906 State(relay): State<Relay>,
907 Path(slot_id): Path<String>,
908 headers: HeaderMap,
909) -> axum::response::Response {
910 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
911 use futures::stream::StreamExt;
912
913 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
914 return resp;
915 }
916
917 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
918 {
919 let mut inner = relay.inner.lock().await;
920 inner.streams.entry(slot_id.clone()).or_default().push(tx);
921 }
922
923 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
924 SseEvent::default()
925 .json_data(&ev)
926 .map_err(|e| std::io::Error::other(e.to_string()))
927 });
928
929 Sse::new(stream)
930 .keep_alive(
931 KeepAlive::new()
932 .interval(std::time::Duration::from_secs(30))
933 .text("phyllis: still on the line"),
934 )
935 .into_response()
936}
937
938#[derive(Deserialize)]
941pub struct PairOpenRequest {
942 pub code_hash: String,
943 pub msg: String,
945 pub role: String, }
947
948#[derive(Deserialize)]
949pub struct PairBootstrapRequest {
950 pub role: String,
951 pub sealed: String,
952}
953
954#[derive(Deserialize)]
955pub struct PairAbandonRequest {
956 pub code_hash: String,
959}
960
961async fn pair_abandon(
967 State(relay): State<Relay>,
968 Json(req): Json<PairAbandonRequest>,
969) -> impl IntoResponse {
970 let mut inner = relay.inner.lock().await;
971 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
972 inner.pair_slots.remove(&pair_id);
973 }
974 StatusCode::NO_CONTENT.into_response()
975}
976
977async fn pair_open(
978 State(relay): State<Relay>,
979 Json(req): Json<PairOpenRequest>,
980) -> impl IntoResponse {
981 if req.role != "host" && req.role != "guest" {
982 return (
983 StatusCode::BAD_REQUEST,
984 Json(json!({"error": "role must be 'host' or 'guest'"})),
985 )
986 .into_response();
987 }
988 relay.evict_expired_pair_slots().await;
989 let mut inner = relay.inner.lock().await;
990 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
991 Some(id) => id,
992 None => {
993 let new_id = random_hex(16);
994 inner
995 .pair_lookup
996 .insert(req.code_hash.clone(), new_id.clone());
997 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
998 relay
999 .counters
1000 .pair_opens_total
1001 .fetch_add(1, Ordering::Relaxed);
1002 new_id
1003 }
1004 };
1005 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1006 slot.last_touched = std::time::Instant::now();
1007 if req.role == "host" {
1008 if slot.host_msg.is_some() {
1009 return (
1010 StatusCode::CONFLICT,
1011 Json(json!({"error": "host already registered for this code"})),
1012 )
1013 .into_response();
1014 }
1015 slot.host_msg = Some(req.msg);
1016 } else {
1017 if slot.guest_msg.is_some() {
1018 return (
1019 StatusCode::CONFLICT,
1020 Json(json!({"error": "guest already registered for this code"})),
1021 )
1022 .into_response();
1023 }
1024 slot.guest_msg = Some(req.msg);
1025 }
1026 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1027}
1028
1029#[derive(Deserialize)]
1030pub struct PairGetQuery {
1031 pub as_role: String,
1033}
1034
1035async fn pair_get(
1036 State(relay): State<Relay>,
1037 Path(pair_id): Path<String>,
1038 Query(q): Query<PairGetQuery>,
1039) -> impl IntoResponse {
1040 relay.evict_expired_pair_slots().await;
1041 let mut inner = relay.inner.lock().await;
1042 let slot = match inner.pair_slots.get_mut(&pair_id) {
1043 Some(s) => {
1044 s.last_touched = std::time::Instant::now();
1045 s.clone()
1046 }
1047 None => {
1048 return (
1049 StatusCode::NOT_FOUND,
1050 Json(json!({"error": "unknown pair_id"})),
1051 )
1052 .into_response();
1053 }
1054 };
1055 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1056 "host" => (slot.guest_msg, slot.guest_bootstrap),
1057 "guest" => (slot.host_msg, slot.host_bootstrap),
1058 _ => {
1059 return (
1060 StatusCode::BAD_REQUEST,
1061 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1062 )
1063 .into_response();
1064 }
1065 };
1066 (
1067 StatusCode::OK,
1068 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1069 )
1070 .into_response()
1071}
1072
1073async fn pair_bootstrap(
1074 State(relay): State<Relay>,
1075 Path(pair_id): Path<String>,
1076 Json(req): Json<PairBootstrapRequest>,
1077) -> impl IntoResponse {
1078 relay.evict_expired_pair_slots().await;
1079 let mut inner = relay.inner.lock().await;
1080 let slot = match inner.pair_slots.get_mut(&pair_id) {
1081 Some(s) => s,
1082 None => {
1083 return (
1084 StatusCode::NOT_FOUND,
1085 Json(json!({"error": "unknown pair_id"})),
1086 )
1087 .into_response();
1088 }
1089 };
1090 slot.last_touched = std::time::Instant::now();
1091 match req.role.as_str() {
1092 "host" => slot.host_bootstrap = Some(req.sealed),
1093 "guest" => slot.guest_bootstrap = Some(req.sealed),
1094 _ => {
1095 return (
1096 StatusCode::BAD_REQUEST,
1097 Json(json!({"error": "role must be 'host' or 'guest'"})),
1098 )
1099 .into_response();
1100 }
1101 }
1102 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1103}
1104
1105#[derive(Deserialize)]
1108pub struct HandleClaimRequest {
1109 pub nick: String,
1112 pub slot_id: String,
1114 pub relay_url: Option<String>,
1118 pub card: Value,
1121}
1122
1123async fn handle_claim(
1130 State(relay): State<Relay>,
1131 headers: HeaderMap,
1132 Json(req): Json<HandleClaimRequest>,
1133) -> impl IntoResponse {
1134 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1137 return resp;
1138 }
1139 if !crate::pair_profile::is_valid_nick(&req.nick) {
1141 return (
1142 StatusCode::BAD_REQUEST,
1143 Json(json!({
1144 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1145 "nick": req.nick,
1146 })),
1147 )
1148 .into_response();
1149 }
1150 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1152 return (
1153 StatusCode::BAD_REQUEST,
1154 Json(json!({"error": format!("card signature invalid: {e}")})),
1155 )
1156 .into_response();
1157 }
1158 let did = match req.card.get("did").and_then(Value::as_str) {
1159 Some(d) => d.to_string(),
1160 None => {
1161 return (
1162 StatusCode::BAD_REQUEST,
1163 Json(json!({"error": "card missing 'did' field"})),
1164 )
1165 .into_response();
1166 }
1167 };
1168
1169 let first_claim = {
1171 let inner = relay.inner.lock().await;
1172 match inner.handles.get(&req.nick) {
1173 Some(existing) if existing.did != did => {
1174 return (
1175 StatusCode::CONFLICT,
1176 Json(json!({
1177 "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1178 "nick": req.nick,
1179 "claimed_by": existing.did,
1180 })),
1181 )
1182 .into_response();
1183 }
1184 Some(_) => false,
1185 None => true,
1186 }
1187 };
1188
1189 let now = time::OffsetDateTime::now_utc()
1190 .format(&time::format_description::well_known::Rfc3339)
1191 .unwrap_or_default();
1192 let record = HandleRecord {
1193 nick: req.nick.clone(),
1194 did: did.clone(),
1195 card: req.card.clone(),
1196 slot_id: req.slot_id.clone(),
1197 relay_url: req.relay_url.clone(),
1198 claimed_at: now,
1199 };
1200
1201 let path = relay
1203 .state_dir
1204 .join("handles")
1205 .join(format!("{}.json", req.nick));
1206 let body = match serde_json::to_vec_pretty(&record) {
1207 Ok(b) => b,
1208 Err(e) => {
1209 return (
1210 StatusCode::INTERNAL_SERVER_ERROR,
1211 Json(json!({"error": format!("serialize failed: {e}")})),
1212 )
1213 .into_response();
1214 }
1215 };
1216 if let Err(e) = tokio::fs::write(&path, &body).await {
1217 return (
1218 StatusCode::INTERNAL_SERVER_ERROR,
1219 Json(json!({"error": format!("persist failed: {e}")})),
1220 )
1221 .into_response();
1222 }
1223 {
1224 let mut inner = relay.inner.lock().await;
1225 inner.handles.insert(req.nick.clone(), record);
1226 }
1227 relay
1228 .counters
1229 .handle_claims_total
1230 .fetch_add(1, Ordering::Relaxed);
1231 if first_claim {
1232 relay
1233 .counters
1234 .handle_first_claims_total
1235 .fetch_add(1, Ordering::Relaxed);
1236 }
1237 (
1238 StatusCode::CREATED,
1239 Json(json!({
1240 "nick": req.nick,
1241 "did": did,
1242 "status": if first_claim { "claimed" } else { "re-claimed" },
1243 })),
1244 )
1245 .into_response()
1246}
1247
1248#[derive(Deserialize)]
1249pub struct WellKnownAgentQuery {
1250 pub handle: String,
1251}
1252
1253#[derive(Deserialize)]
1254pub struct HandlesDirectoryQuery {
1255 pub cursor: Option<String>,
1256 pub limit: Option<usize>,
1257 pub vibe: Option<String>,
1258}
1259
1260#[derive(Deserialize)]
1270pub struct InviteRegisterRequest {
1271 pub invite_url: String,
1273 #[serde(default)]
1275 pub ttl_seconds: Option<u64>,
1276 #[serde(default)]
1279 pub uses: Option<u32>,
1280}
1281
1282impl Relay {
1283 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1285 use tokio::io::AsyncWriteExt;
1286 let mut line = serde_json::to_vec(rec)?;
1287 line.push(b'\n');
1288 let path = self.state_dir.join("invites.jsonl");
1289 let mut f = tokio::fs::OpenOptions::new()
1290 .create(true)
1291 .append(true)
1292 .open(&path)
1293 .await?;
1294 f.write_all(&line).await?;
1295 f.flush().await?;
1296 Ok(())
1297 }
1298}
1299
1300async fn invite_register(
1301 State(relay): State<Relay>,
1302 Json(req): Json<InviteRegisterRequest>,
1303) -> impl IntoResponse {
1304 if req.invite_url.is_empty() {
1305 return (
1306 StatusCode::BAD_REQUEST,
1307 Json(json!({"error": "invite_url required"})),
1308 )
1309 .into_response();
1310 }
1311 if req.invite_url.len() > 8_192 {
1313 return (
1314 StatusCode::PAYLOAD_TOO_LARGE,
1315 Json(json!({"error": "invite_url > 8 KiB"})),
1316 )
1317 .into_response();
1318 }
1319 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1320 let now = SystemTime::now()
1321 .duration_since(UNIX_EPOCH)
1322 .map(|d| d.as_secs())
1323 .unwrap_or(0);
1324 let token = random_hex(3);
1327 let rec = InviteRecord {
1328 token: token.clone(),
1329 invite_url: req.invite_url,
1330 expires_unix: now + ttl,
1331 uses_remaining: req.uses,
1332 created_unix: now,
1333 };
1334 {
1335 let mut inner = relay.inner.lock().await;
1336 if inner.invites.contains_key(&token) {
1337 return (
1338 StatusCode::CONFLICT,
1339 Json(json!({"error": "token collision, retry"})),
1340 )
1341 .into_response();
1342 }
1343 inner.invites.insert(token.clone(), rec.clone());
1344 }
1345 if let Err(e) = relay.persist_invite(&rec).await {
1346 return (
1347 StatusCode::INTERNAL_SERVER_ERROR,
1348 Json(json!({"error": format!("persist failed: {e}")})),
1349 )
1350 .into_response();
1351 }
1352 (
1353 StatusCode::CREATED,
1354 Json(json!({
1355 "token": token,
1356 "path": format!("/i/{token}"),
1357 "expires_unix": rec.expires_unix,
1358 "uses_remaining": rec.uses_remaining,
1359 })),
1360 )
1361 .into_response()
1362}
1363
1364#[derive(Deserialize)]
1365pub struct InviteScriptQuery {
1366 pub format: Option<String>,
1373}
1374
1375async fn invite_script(
1376 State(relay): State<Relay>,
1377 Path(token): Path<String>,
1378 Query(q): Query<InviteScriptQuery>,
1379) -> impl IntoResponse {
1380 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1383 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1384 }
1385 let want_raw_url = q.format.as_deref() == Some("url");
1386 let now = SystemTime::now()
1387 .duration_since(UNIX_EPOCH)
1388 .map(|d| d.as_secs())
1389 .unwrap_or(0);
1390 let invite_url = {
1391 let mut inner = relay.inner.lock().await;
1392 let Some(rec) = inner.invites.get_mut(&token) else {
1393 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1394 };
1395 if rec.expires_unix <= now {
1396 return (StatusCode::GONE, "this invite has expired\n").into_response();
1397 }
1398 if let Some(n) = rec.uses_remaining {
1399 if n == 0 {
1400 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1401 }
1402 if !want_raw_url {
1406 rec.uses_remaining = Some(n - 1);
1407 }
1408 }
1409 rec.invite_url.clone()
1410 };
1411 if want_raw_url {
1412 return (
1413 StatusCode::OK,
1414 [
1415 (
1416 axum::http::header::CONTENT_TYPE,
1417 "text/plain; charset=utf-8",
1418 ),
1419 (
1420 axum::http::header::CACHE_CONTROL,
1421 "private, no-store, max-age=0",
1422 ),
1423 ],
1424 invite_url,
1425 )
1426 .into_response();
1427 }
1428 let escaped = invite_url.replace('\'', "'\\''");
1429 let script = format!(
1430 "#!/bin/sh\n\
1431 # wire — one-curl onboarding (install + pair in one shot)\n\
1432 # source: https://github.com/SlanchaAi/wire\n\
1433 set -eu\n\
1434 INVITE='{escaped}'\n\
1435 echo \"\u{2192} checking for wire CLI...\"\n\
1436 if ! command -v wire >/dev/null 2>&1; then\n \
1437 echo \"\u{2192} wire not installed; installing first...\"\n \
1438 curl -fsSL https://wireup.net/install.sh | sh\n \
1439 case \":$PATH:\" in\n \
1440 *:\"$HOME/.local/bin\":*) ;;\n \
1441 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1442 esac\n \
1443 if ! command -v wire >/dev/null 2>&1; then\n \
1444 echo \"\"\n \
1445 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1446 echo \"Open a new shell, then run:\"\n \
1447 echo \" wire accept '$INVITE'\"\n \
1448 exit 0\n \
1449 fi\n\
1450 fi\n\
1451 echo \"\u{2192} accepting invite...\"\n\
1452 wire accept \"$INVITE\"\n"
1453 );
1454 (
1455 StatusCode::OK,
1456 [
1457 (
1458 axum::http::header::CONTENT_TYPE,
1459 "text/x-shellscript; charset=utf-8",
1460 ),
1461 (
1462 axum::http::header::CACHE_CONTROL,
1463 "private, no-store, max-age=0",
1464 ),
1465 ],
1466 script,
1467 )
1468 .into_response()
1469}
1470
1471async fn handles_directory(
1472 State(relay): State<Relay>,
1473 Query(q): Query<HandlesDirectoryQuery>,
1474) -> impl IntoResponse {
1475 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1476 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1477 let inner = relay.inner.lock().await;
1478 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1479 drop(inner);
1480 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1481
1482 let cursor = q.cursor.as_deref();
1483 let mut eligible = Vec::new();
1484 for rec in records {
1485 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1486 continue;
1487 }
1488 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1493 continue;
1494 }
1495 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1496 if profile
1497 .get("listed")
1498 .and_then(Value::as_bool)
1499 .is_some_and(|listed| !listed)
1500 {
1501 continue;
1502 }
1503 if let Some(want) = &vibe_filter {
1504 let matched = profile
1505 .get("vibe")
1506 .and_then(Value::as_array)
1507 .map(|arr| {
1508 arr.iter().any(|v| {
1509 v.as_str()
1510 .map(|s| s.eq_ignore_ascii_case(want))
1511 .unwrap_or(false)
1512 })
1513 })
1514 .unwrap_or(false);
1515 if !matched {
1516 continue;
1517 }
1518 }
1519 eligible.push((rec, profile));
1520 }
1521
1522 let has_more = eligible.len() > limit;
1523 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1524 let next_cursor = if has_more {
1525 page.last().map(|(rec, _)| rec.nick.clone())
1526 } else {
1527 None
1528 };
1529 let handles: Vec<Value> = page
1530 .into_iter()
1531 .map(|(rec, profile)| {
1532 json!({
1533 "nick": rec.nick,
1534 "did": rec.did,
1535 "profile": {
1536 "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1537 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1538 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1539 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1540 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1541 },
1542 "claimed_at": rec.claimed_at,
1543 })
1544 })
1545 .collect();
1546 (
1547 StatusCode::OK,
1548 Json(json!({
1549 "handles": handles,
1550 "next_cursor": next_cursor,
1551 })),
1552 )
1553 .into_response()
1554}
1555
1556async fn handle_intro(
1570 State(relay): State<Relay>,
1571 Path(nick): Path<String>,
1572 Json(req): Json<PostEventRequest>,
1573) -> impl IntoResponse {
1574 let slot_id = {
1576 let inner = relay.inner.lock().await;
1577 match inner.handles.get(&nick) {
1578 Some(rec) => rec.slot_id.clone(),
1579 None => {
1580 return (
1581 StatusCode::NOT_FOUND,
1582 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1583 )
1584 .into_response();
1585 }
1586 }
1587 };
1588
1589 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1592 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1593 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1594 return (
1595 StatusCode::BAD_REQUEST,
1596 Json(json!({
1597 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1598 "got_kind": kind,
1599 "got_type": type_str,
1600 })),
1601 )
1602 .into_response();
1603 }
1604
1605 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1607 Some(c) => c.clone(),
1608 None => {
1609 return (
1610 StatusCode::BAD_REQUEST,
1611 Json(json!({"error": "intro event body must embed 'card' field"})),
1612 )
1613 .into_response();
1614 }
1615 };
1616 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1617 return (
1618 StatusCode::BAD_REQUEST,
1619 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1620 )
1621 .into_response();
1622 }
1623
1624 let body_bytes = match serde_json::to_vec(&req.event) {
1626 Ok(b) => b,
1627 Err(e) => {
1628 return (
1629 StatusCode::BAD_REQUEST,
1630 Json(json!({"error": format!("event not serializable: {e}")})),
1631 )
1632 .into_response();
1633 }
1634 };
1635 if body_bytes.len() > MAX_EVENT_BYTES {
1636 return (
1637 StatusCode::PAYLOAD_TOO_LARGE,
1638 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1639 )
1640 .into_response();
1641 }
1642 {
1643 let inner = relay.inner.lock().await;
1644 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1645 if used + body_bytes.len() > MAX_SLOT_BYTES {
1646 return (
1647 StatusCode::PAYLOAD_TOO_LARGE,
1648 Json(json!({
1649 "error": "target slot quota exceeded",
1650 "slot_bytes_used": used,
1651 "slot_bytes_max": MAX_SLOT_BYTES,
1652 })),
1653 )
1654 .into_response();
1655 }
1656 }
1657
1658 let event_id = req
1659 .event
1660 .get("event_id")
1661 .and_then(Value::as_str)
1662 .map(str::to_string);
1663
1664 let dup = {
1666 let inner = relay.inner.lock().await;
1667 let slot = inner.slots.get(&slot_id);
1668 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1669 slot.iter()
1670 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1671 } else {
1672 false
1673 }
1674 };
1675 if dup {
1676 return (
1677 StatusCode::OK,
1678 Json(json!({"event_id": event_id, "status": "duplicate"})),
1679 )
1680 .into_response();
1681 }
1682
1683 {
1684 let mut inner = relay.inner.lock().await;
1685 let event_size = body_bytes.len();
1686 let slot = inner.slots.entry(slot_id.clone()).or_default();
1687 slot.push(req.event.clone());
1688 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1689 }
1690 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1691 return (
1692 StatusCode::INTERNAL_SERVER_ERROR,
1693 Json(json!({"error": format!("persist failed: {e}")})),
1694 )
1695 .into_response();
1696 }
1697 (
1698 StatusCode::CREATED,
1699 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1700 )
1701 .into_response()
1702}
1703
1704async fn well_known_agent_card_a2a(
1720 State(relay): State<Relay>,
1721 Query(q): Query<WellKnownAgentQuery>,
1722) -> impl IntoResponse {
1723 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1724 if nick.is_empty() {
1725 return (
1726 StatusCode::BAD_REQUEST,
1727 Json(json!({"error": "handle missing nick"})),
1728 )
1729 .into_response();
1730 }
1731 let inner = relay.inner.lock().await;
1732 let rec = match inner.handles.get(&nick) {
1733 Some(r) => r.clone(),
1734 None => {
1735 return (
1736 StatusCode::NOT_FOUND,
1737 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1738 )
1739 .into_response();
1740 }
1741 };
1742 drop(inner);
1743
1744 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1745 let description = profile
1746 .get("motto")
1747 .and_then(Value::as_str)
1748 .unwrap_or("")
1749 .to_string();
1750 let display_name = profile
1751 .get("display_name")
1752 .and_then(Value::as_str)
1753 .unwrap_or(&rec.nick)
1754 .to_string();
1755 let relay_url = rec.relay_url.clone().unwrap_or_default();
1756 let endpoint = if !relay_url.is_empty() {
1758 format!(
1759 "{}/v1/handle/intro/{}",
1760 relay_url.trim_end_matches('/'),
1761 rec.nick
1762 )
1763 } else {
1764 format!("/v1/handle/intro/{}", rec.nick)
1765 };
1766 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1767
1768 let a2a_card = json!({
1772 "id": rec.did,
1773 "name": display_name,
1774 "description": description,
1775 "version": "wire/0.5",
1776 "endpoint": endpoint,
1777 "provider": {
1778 "name": "wire",
1779 "url": "https://github.com/SlanchaAi/wire"
1780 },
1781 "capabilities": {
1782 "streaming": false,
1783 "pushNotifications": false,
1784 "extendedAgentCard": true
1785 },
1786 "securitySchemes": {
1787 "ed25519-event-sig": {
1788 "type": "signature",
1789 "alg": "EdDSA",
1790 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1791 }
1792 },
1793 "security": [{"ed25519-event-sig": []}],
1794 "skills": [],
1795 "extensions": [{
1796 "uri": "https://slancha.ai/wire/ext/v0.5",
1800 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1801 "required": false,
1802 "params": {
1803 "did": rec.did,
1804 "handle": rec.nick,
1805 "slot_id": rec.slot_id,
1806 "relay_url": rec.relay_url,
1807 "card": rec.card,
1808 "profile": profile,
1809 "claimed_at": rec.claimed_at,
1810 }
1811 }],
1812 "signature": card_sig,
1813 });
1814 (StatusCode::OK, Json(a2a_card)).into_response()
1815}
1816
1817async fn well_known_agent(
1818 State(relay): State<Relay>,
1819 Query(q): Query<WellKnownAgentQuery>,
1820) -> impl IntoResponse {
1821 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1822 if nick.is_empty() {
1823 return (
1824 StatusCode::BAD_REQUEST,
1825 Json(json!({"error": "handle missing nick"})),
1826 )
1827 .into_response();
1828 }
1829 let inner = relay.inner.lock().await;
1830 match inner.handles.get(&nick) {
1831 Some(rec) => (
1832 StatusCode::OK,
1833 Json(json!({
1834 "nick": rec.nick,
1835 "did": rec.did,
1836 "card": rec.card,
1837 "slot_id": rec.slot_id,
1838 "relay_url": rec.relay_url,
1839 "claimed_at": rec.claimed_at,
1840 })),
1841 )
1842 .into_response(),
1843 None => (
1844 StatusCode::NOT_FOUND,
1845 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1846 )
1847 .into_response(),
1848 }
1849}
1850
1851async fn list_events(
1852 State(relay): State<Relay>,
1853 Path(slot_id): Path<String>,
1854 Query(q): Query<ListEventsQuery>,
1855 headers: HeaderMap,
1856) -> impl IntoResponse {
1857 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1858 return resp;
1859 }
1860 let limit = q.limit.unwrap_or(100).min(1000);
1861 let mut inner = relay.inner.lock().await;
1862 let now_unix = std::time::SystemTime::now()
1866 .duration_since(std::time::UNIX_EPOCH)
1867 .map(|d| d.as_secs())
1868 .unwrap_or(0);
1869 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1870 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1871 let start = match q.since {
1872 Some(eid) => events
1873 .iter()
1874 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1875 .map(|i| i + 1)
1876 .unwrap_or(0),
1877 None => 0,
1878 };
1879 let end = (start + limit).min(events.len());
1880 let slice = events[start..end].to_vec();
1881 (StatusCode::OK, Json(slice)).into_response()
1882}
1883
1884async fn slot_state(
1890 State(relay): State<Relay>,
1891 Path(slot_id): Path<String>,
1892 headers: HeaderMap,
1893) -> impl IntoResponse {
1894 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1895 return resp;
1896 }
1897 let inner = relay.inner.lock().await;
1898 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1899 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1900 let responder_health = inner.responder_health.get(&slot_id).cloned();
1901 (
1902 StatusCode::OK,
1903 Json(json!({
1904 "slot_id": slot_id,
1905 "event_count": event_count,
1906 "last_pull_at_unix": last_pull_at_unix,
1907 "responder_health": responder_health,
1908 })),
1909 )
1910 .into_response()
1911}
1912
1913async fn responder_health_set(
1914 State(relay): State<Relay>,
1915 Path(slot_id): Path<String>,
1916 headers: HeaderMap,
1917 Json(record): Json<ResponderHealthRecord>,
1918) -> impl IntoResponse {
1919 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1920 return resp;
1921 }
1922 let path = relay
1923 .state_dir
1924 .join("responder-health")
1925 .join(format!("{slot_id}.json"));
1926 let body = match serde_json::to_vec_pretty(&record) {
1927 Ok(b) => b,
1928 Err(e) => {
1929 return (
1930 StatusCode::INTERNAL_SERVER_ERROR,
1931 Json(json!({"error": format!("serialize failed: {e}")})),
1932 )
1933 .into_response();
1934 }
1935 };
1936 if let Err(e) = tokio::fs::write(&path, body).await {
1937 return (
1938 StatusCode::INTERNAL_SERVER_ERROR,
1939 Json(json!({"error": format!("persist failed: {e}")})),
1940 )
1941 .into_response();
1942 }
1943 {
1944 let mut inner = relay.inner.lock().await;
1945 inner
1946 .responder_health
1947 .insert(slot_id.clone(), record.clone());
1948 }
1949 (StatusCode::OK, Json(record)).into_response()
1950}
1951
1952async fn check_token(
1953 relay: &Relay,
1954 headers: &HeaderMap,
1955 slot_id: &str,
1956) -> std::result::Result<(), axum::response::Response> {
1957 let auth = headers
1958 .get(AUTHORIZATION)
1959 .and_then(|h| h.to_str().ok())
1960 .and_then(|s| s.strip_prefix("Bearer "))
1961 .map(str::to_string);
1962 let presented = match auth {
1963 Some(t) => t,
1964 None => {
1965 return Err((
1966 StatusCode::UNAUTHORIZED,
1967 Json(json!({"error": "missing Bearer token"})),
1968 )
1969 .into_response());
1970 }
1971 };
1972 let inner = relay.inner.lock().await;
1973 let expected = match inner.tokens.get(slot_id) {
1974 Some(t) => t.clone(),
1975 None => {
1976 return Err((
1977 StatusCode::NOT_FOUND,
1978 Json(json!({"error": "unknown slot"})),
1979 )
1980 .into_response());
1981 }
1982 };
1983 drop(inner);
1984 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
1985 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
1986 }
1987 Ok(())
1988}
1989
1990fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1991 if a.len() != b.len() {
1992 return false;
1993 }
1994 let mut acc = 0u8;
1995 for (x, y) in a.iter().zip(b.iter()) {
1996 acc |= x ^ y;
1997 }
1998 acc == 0
1999}
2000
2001fn is_valid_slot_id(s: &str) -> bool {
2002 s.len() == 32
2003 && s.bytes()
2004 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2005}
2006
2007fn random_hex(n_bytes: usize) -> String {
2008 let mut buf = vec![0u8; n_bytes];
2009 rand::thread_rng().fill_bytes(&mut buf);
2010 hex::encode(buf)
2011}
2012
2013pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2015 let relay = Relay::new(state_dir).await?;
2016 relay.spawn_pair_sweeper();
2017 relay.spawn_counter_persister();
2018 let app = relay.clone().router();
2019 let listener = tokio::net::TcpListener::bind(bind)
2020 .await
2021 .with_context(|| format!("binding {bind}"))?;
2022 eprintln!("wire relay-server listening on {bind}");
2023 let shutdown_relay = relay.clone();
2024 axum::serve(listener, app)
2025 .with_graceful_shutdown(async move {
2026 let _ = tokio::signal::ctrl_c().await;
2027 eprintln!("\nshutting down — final counter snapshot");
2028 if let Err(e) = shutdown_relay.persist_counters().await {
2029 eprintln!("final counter persist failed: {e}");
2030 }
2031 })
2032 .await?;
2033 Ok(())
2034}
2035
2036#[cfg(test)]
2037mod tests {
2038 use super::*;
2039
2040 #[test]
2041 fn constant_time_eq_basic() {
2042 assert!(constant_time_eq(b"abc", b"abc"));
2043 assert!(!constant_time_eq(b"abc", b"abd"));
2044 assert!(!constant_time_eq(b"abc", b"abcd")); }
2046
2047 #[test]
2048 fn random_hex_length() {
2049 let s = random_hex(16);
2050 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2052 }
2053
2054 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2055 async fn pair_slot_evicts_when_idle_past_ttl() {
2056 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2057 let _ = std::fs::remove_dir_all(&dir);
2058 let relay = Relay::new(dir.clone()).await.unwrap();
2059
2060 {
2062 let mut inner = relay.inner.lock().await;
2063 inner
2064 .pair_lookup
2065 .insert("hash-A".to_string(), "id-A".to_string());
2066 inner.pair_slots.insert(
2067 "id-A".to_string(),
2068 PairSlot {
2069 last_touched: std::time::Instant::now()
2070 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2071 ..PairSlot::default()
2072 },
2073 );
2074
2075 inner
2077 .pair_lookup
2078 .insert("hash-B".to_string(), "id-B".to_string());
2079 inner
2080 .pair_slots
2081 .insert("id-B".to_string(), PairSlot::default());
2082
2083 assert_eq!(inner.pair_slots.len(), 2);
2084 assert_eq!(inner.pair_lookup.len(), 2);
2085 }
2086
2087 relay.evict_expired_pair_slots().await;
2088
2089 let inner = relay.inner.lock().await;
2090 assert_eq!(
2091 inner.pair_slots.len(),
2092 1,
2093 "expired slot should have been evicted"
2094 );
2095 assert!(inner.pair_slots.contains_key("id-B"));
2096 assert_eq!(inner.pair_lookup.len(), 1);
2097 assert!(inner.pair_lookup.contains_key("hash-B"));
2098 let _ = std::fs::remove_dir_all(&dir);
2099 }
2100
2101 #[test]
2102 fn slot_id_validator_accepts_only_lowercase_32hex() {
2103 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2104 assert!(is_valid_slot_id(&random_hex(16)));
2105 assert!(!is_valid_slot_id("abc"));
2107 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2111 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2113 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2114 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2115 assert!(!is_valid_slot_id(
2117 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2118 ));
2119 }
2120}