1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
175 pub discoverable: Option<bool>,
176}
177
178impl HandleRecord {
179 fn is_discoverable(&self) -> bool {
182 self.discoverable.unwrap_or(true)
183 }
184}
185
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct ResponderHealthRecord {
188 pub status: String,
189 #[serde(default, skip_serializing_if = "Option::is_none")]
190 pub reason: Option<String>,
191 #[serde(default, skip_serializing_if = "Option::is_none")]
192 pub last_success_at: Option<String>,
193 pub set_at: String,
194}
195
196#[derive(Clone, Debug)]
197struct PairSlot {
198 host_msg: Option<String>,
200 guest_msg: Option<String>,
202 host_bootstrap: Option<String>,
204 guest_bootstrap: Option<String>,
206 last_touched: std::time::Instant,
208}
209
210impl Default for PairSlot {
211 fn default() -> Self {
212 Self {
213 host_msg: None,
214 guest_msg: None,
215 host_bootstrap: None,
216 guest_bootstrap: None,
217 last_touched: std::time::Instant::now(),
218 }
219 }
220}
221
222const PAIR_SLOT_TTL_SECS: u64 = 300;
225
226#[derive(Deserialize)]
227pub struct AllocateRequest {
228 #[serde(default)]
230 pub handle: Option<String>,
231}
232
233#[derive(Deserialize)]
234pub struct PostEventRequest {
235 pub event: Value,
236}
237
238#[derive(Deserialize)]
239pub struct ListEventsQuery {
240 pub since: Option<String>,
242 pub limit: Option<usize>,
244}
245
246impl Relay {
247 pub async fn new(state_dir: PathBuf) -> Result<Self> {
248 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
249 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
250 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
251 let mut inner = Inner {
252 slots: HashMap::new(),
253 tokens: HashMap::new(),
254 slot_bytes: HashMap::new(),
255 last_pull_at_unix: HashMap::new(),
256 streams: HashMap::new(),
257 pair_lookup: HashMap::new(),
258 pair_slots: HashMap::new(),
259 handles: HashMap::new(),
260 responder_health: HashMap::new(),
261 invites: HashMap::new(),
262 };
263 let token_path = state_dir.join("tokens.json");
265 if token_path.exists() {
266 let body = tokio::fs::read_to_string(&token_path).await?;
267 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
268 }
269 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
271 while let Some(entry) = slots_dir.next_entry().await? {
272 let path = entry.path();
273 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
274 continue;
275 }
276 let stem = match path.file_stem().and_then(|s| s.to_str()) {
277 Some(s) => s.to_string(),
278 None => continue,
279 };
280 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
281 let mut events = Vec::new();
282 for line in body.lines() {
283 if let Ok(v) = serde_json::from_str::<Value>(line) {
284 events.push(v);
285 }
286 }
287 let bytes: usize = events
289 .iter()
290 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
291 .sum();
292 inner.slot_bytes.insert(stem.clone(), bytes);
293 inner.slots.insert(stem, events);
294 }
295 let handles_dir = state_dir.join("handles");
297 if handles_dir.exists() {
298 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
299 while let Some(entry) = rd.next_entry().await? {
300 let path = entry.path();
301 if path.extension().and_then(|x| x.to_str()) != Some("json") {
302 continue;
303 }
304 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
305 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
306 inner.handles.insert(rec.nick.clone(), rec);
307 }
308 }
309 }
310 let responder_health_dir = state_dir.join("responder-health");
312 if responder_health_dir.exists() {
313 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
314 while let Some(entry) = rd.next_entry().await? {
315 let path = entry.path();
316 if path.extension().and_then(|x| x.to_str()) != Some("json") {
317 continue;
318 }
319 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
320 continue;
321 };
322 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
323 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
324 inner.responder_health.insert(slot_id.to_string(), rec);
325 }
326 }
327 }
328 let invites_path = state_dir.join("invites.jsonl");
332 if invites_path.exists() {
333 let now_unix = SystemTime::now()
334 .duration_since(UNIX_EPOCH)
335 .map(|d| d.as_secs())
336 .unwrap_or(0);
337 let body = tokio::fs::read_to_string(&invites_path)
338 .await
339 .unwrap_or_default();
340 for line in body.lines() {
341 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
342 && rec.expires_unix > now_unix
343 {
344 inner.invites.insert(rec.token.clone(), rec);
345 }
346 }
347 }
348 let boot_unix = SystemTime::now()
349 .duration_since(UNIX_EPOCH)
350 .map(|d| d.as_secs())
351 .unwrap_or(0);
352 let snap: CountersSnapshot =
354 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
355 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
356 Err(_) => CountersSnapshot::default(),
357 };
358 Ok(Self {
359 inner: Arc::new(Mutex::new(inner)),
360 state_dir,
361 counters: Arc::new(RelayCounters {
362 boot_unix,
363 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
364 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
365 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
366 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
367 events_posted_total: AtomicU64::new(snap.events_posted_total),
368 }),
369 })
370 }
371
372 pub fn router(self) -> Router {
373 self.router_with_mode(ServerMode::default())
374 }
375
376 pub fn router_with_mode(self, mode: ServerMode) -> Router {
377 let governor_conf = std::sync::Arc::new(
384 GovernorConfigBuilder::default()
385 .per_second(10)
386 .burst_size(50)
387 .key_extractor(GlobalKeyExtractor)
388 .finish()
389 .expect("valid governor config"),
390 );
391 let governor_layer = GovernorLayer {
392 config: governor_conf,
393 };
394
395 let hot_writes = Router::new()
397 .route("/v1/slot/allocate", post(allocate_slot))
398 .route("/v1/pair", post(pair_open))
399 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
400 .route("/v1/pair/abandon", post(pair_abandon))
401 .layer(governor_layer);
402
403 let mut router = Router::new()
406 .route("/healthz", get(healthz))
407 .route("/v1/events/:slot_id", post(post_event).get(list_events))
408 .route("/v1/slot/:slot_id/state", get(slot_state))
409 .route(
410 "/v1/slot/:slot_id/responder-health",
411 post(responder_health_set),
412 )
413 .route("/v1/events/:slot_id/stream", get(stream_events))
414 .route("/v1/pair/:pair_id", get(pair_get))
415 .route("/v1/handle/intro/:nick", post(handle_intro));
416
417 if !mode.local_only {
423 router = router
424 .route("/", get(landing_index))
425 .route("/favicon.svg", get(landing_favicon))
426 .route("/og.png", get(landing_og))
427 .route("/demo.cast", get(landing_demo_cast))
428 .route("/install", get(landing_install_sh))
429 .route("/install.sh", get(landing_install_sh))
430 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
431 .route("/stats", get(stats_root))
432 .route("/stats.json", get(stats_json))
433 .route("/stats.html", get(landing_stats_html))
434 .route("/stats.history", get(stats_history))
435 .route("/phonebook", get(landing_phonebook_html))
436 .route("/phonebook.html", get(landing_phonebook_html))
437 .route("/v1/handle/claim", post(handle_claim))
438 .route("/v1/handles", get(handles_directory))
439 .route("/v1/invite/register", post(invite_register))
440 .route("/i/:token", get(invite_script))
441 .route("/.well-known/wire/agent", get(well_known_agent))
442 .route(
443 "/.well-known/agent-card.json",
444 get(well_known_agent_card_a2a),
445 );
446 } else {
447 router = router.route("/v1/handle/claim", post(handle_claim));
453 }
454
455 router.merge(hot_writes).with_state(self)
456 }
457
458 async fn evict_expired_pair_slots(&self) {
462 let now = std::time::Instant::now();
463 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
464 let mut inner = self.inner.lock().await;
465 let mut to_remove = Vec::new();
466 for (id, slot) in inner.pair_slots.iter() {
467 if now.duration_since(slot.last_touched) > ttl {
468 to_remove.push(id.clone());
469 }
470 }
471 for id in to_remove {
472 inner.pair_slots.remove(&id);
473 inner.pair_lookup.retain(|_, v| v != &id);
474 }
475 }
476
477 pub fn spawn_pair_sweeper(&self) {
482 let me = self.clone();
483 tokio::spawn(async move {
484 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
485 loop {
486 tick.tick().await;
487 me.evict_expired_pair_slots().await;
488 }
489 });
490 }
491
492 pub async fn persist_counters(&self) -> Result<()> {
496 let snap = CountersSnapshot {
497 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
498 handle_first_claims_total: self
499 .counters
500 .handle_first_claims_total
501 .load(Ordering::Relaxed),
502 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
503 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
504 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
505 };
506 let body = serde_json::to_vec_pretty(&snap)?;
507 let path = self.state_dir.join("counters.json");
508 tokio::fs::write(path, body).await?;
509 Ok(())
510 }
511
512 pub async fn append_history(&self) -> Result<()> {
517 use tokio::io::AsyncWriteExt;
518 let now = SystemTime::now()
519 .duration_since(UNIX_EPOCH)
520 .map(|d| d.as_secs())
521 .unwrap_or(0);
522 let (handles_active, slots_active, pair_slots_open, streams_active) = {
523 let inner = self.inner.lock().await;
524 (
525 inner.handles.len(),
526 inner.slots.len(),
527 inner.pair_slots.len(),
528 inner.streams.values().map(Vec::len).sum::<usize>(),
529 )
530 };
531 let entry = HistoryEntry {
532 ts: now,
533 handles_active,
534 slots_active,
535 pair_slots_open,
536 streams_active,
537 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
538 handle_first_claims_total: self
539 .counters
540 .handle_first_claims_total
541 .load(Ordering::Relaxed),
542 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
543 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
544 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
545 };
546 let line = serde_json::to_vec(&entry)?;
547 let path = self.state_dir.join("stats-history.jsonl");
548 let mut f = tokio::fs::OpenOptions::new()
549 .create(true)
550 .append(true)
551 .open(&path)
552 .await?;
553 f.write_all(&line).await?;
554 f.write_all(b"\n").await?;
555 f.flush().await?;
556 Ok(())
557 }
558
559 pub fn spawn_counter_persister(&self) {
563 let me = self.clone();
564 tokio::spawn(async move {
565 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
566 tick.tick().await;
569 loop {
570 tick.tick().await;
571 if let Err(e) = me.persist_counters().await {
572 eprintln!("counter persist failed: {e}");
573 }
574 if let Err(e) = me.append_history().await {
575 eprintln!("history append failed: {e}");
576 }
577 }
578 });
579 }
580
581 async fn persist_tokens(&self) -> Result<()> {
582 let body = {
583 let inner = self.inner.lock().await;
584 serde_json::to_string_pretty(&inner.tokens)?
585 };
586 let path = self.state_dir.join("tokens.json");
587 tokio::fs::write(path, body).await?;
588 Ok(())
589 }
590
591 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
592 if !is_valid_slot_id(slot_id) {
598 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
599 }
600 let path = self
601 .state_dir
602 .join("slots")
603 .join(format!("{slot_id}.jsonl"));
604 let mut line = serde_json::to_vec(event)?;
605 line.push(b'\n');
606 use tokio::io::AsyncWriteExt;
607 let mut f = tokio::fs::OpenOptions::new()
608 .create(true)
609 .append(true)
610 .open(&path)
611 .await
612 .with_context(|| format!("opening {path:?}"))?;
613 f.write_all(&line).await?;
614 f.flush().await?;
615 Ok(())
616 }
617}
618
619async fn healthz() -> impl IntoResponse {
620 (StatusCode::OK, "ok\n")
621}
622
623async fn stats_history(
627 State(relay): State<Relay>,
628 Query(q): Query<StatsHistoryQuery>,
629) -> impl IntoResponse {
630 let hours = q.hours.unwrap_or(24).min(168);
631 let now = SystemTime::now()
632 .duration_since(UNIX_EPOCH)
633 .map(|d| d.as_secs())
634 .unwrap_or(0);
635 let cutoff = now.saturating_sub(hours * 3600);
636 let path = relay.state_dir.join("stats-history.jsonl");
637 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
638 let entries: Vec<Value> = body
639 .lines()
640 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
641 .filter(|v| {
642 v.get("ts")
643 .and_then(Value::as_u64)
644 .map(|t| t >= cutoff)
645 .unwrap_or(false)
646 })
647 .collect();
648 (
649 StatusCode::OK,
650 Json(json!({
651 "hours": hours,
652 "now_unix": now,
653 "count": entries.len(),
654 "entries": entries,
655 })),
656 )
657}
658
659async fn landing_stats_html() -> impl IntoResponse {
660 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
661 (
662 StatusCode::OK,
663 [
664 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
665 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
666 ],
667 STATS_HTML,
668 )
669}
670
671async fn landing_phonebook_html() -> impl IntoResponse {
672 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
673 (
674 StatusCode::OK,
675 [
676 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
677 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
678 ],
679 PHONEBOOK_HTML,
680 )
681}
682
683async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
688 let wants_html = headers
689 .get(axum::http::header::ACCEPT)
690 .and_then(|v| v.to_str().ok())
691 .unwrap_or("")
692 .contains("text/html");
693 if wants_html {
694 landing_stats_html().await.into_response()
695 } else {
696 stats_json(State(relay)).await.into_response()
697 }
698}
699
700async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
701 let now = SystemTime::now()
702 .duration_since(UNIX_EPOCH)
703 .map(|d| d.as_secs())
704 .unwrap_or(0);
705 let inner = relay.inner.lock().await;
706 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
707 let body = json!({
708 "version": env!("CARGO_PKG_VERSION"),
709 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
710 "handles_active": inner.handles.len(),
711 "slots_active": inner.slots.len(),
712 "pair_slots_open": inner.pair_slots.len(),
713 "streams_active": streams_active,
714 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
715 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
716 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
717 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
718 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
719 });
720 (StatusCode::OK, Json(body))
721}
722
723async fn landing_index() -> impl IntoResponse {
727 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
728 (
729 StatusCode::OK,
730 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
731 INDEX_HTML,
732 )
733}
734
735async fn landing_favicon() -> impl IntoResponse {
736 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
737 (
738 StatusCode::OK,
739 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
740 FAVICON_SVG,
741 )
742}
743
744async fn landing_og() -> impl IntoResponse {
745 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
746 (
747 StatusCode::OK,
748 [
749 (axum::http::header::CONTENT_TYPE, "image/png"),
750 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
751 ],
752 OG_PNG,
753 )
754}
755
756async fn landing_demo_cast() -> impl IntoResponse {
757 static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
758 (
759 StatusCode::OK,
760 [
761 (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
762 (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
763 ],
764 DEMO_CAST,
765 )
766}
767
768async fn landing_install_sh() -> impl IntoResponse {
769 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
770 (
771 StatusCode::OK,
772 [
773 (
774 axum::http::header::CONTENT_TYPE,
775 "text/x-shellscript; charset=utf-8",
776 ),
777 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
778 ],
779 INSTALL_SH,
780 )
781}
782
783async fn landing_openshell_policy_sh() -> impl IntoResponse {
784 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
785 (
786 StatusCode::OK,
787 [
788 (
789 axum::http::header::CONTENT_TYPE,
790 "text/x-shellscript; charset=utf-8",
791 ),
792 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
793 ],
794 POLICY_SH,
795 )
796}
797
798async fn allocate_slot(
799 State(relay): State<Relay>,
800 Json(_req): Json<AllocateRequest>,
801) -> impl IntoResponse {
802 let slot_id = random_hex(16);
803 let slot_token = random_hex(32);
804 {
805 let mut inner = relay.inner.lock().await;
806 inner.slots.insert(slot_id.clone(), Vec::new());
807 inner.tokens.insert(slot_id.clone(), slot_token.clone());
808 }
809 if let Err(e) = relay.persist_tokens().await {
810 return (
811 StatusCode::INTERNAL_SERVER_ERROR,
812 Json(json!({"error": format!("persist failed: {e}")})),
813 )
814 .into_response();
815 }
816 relay
817 .counters
818 .slot_allocations_total
819 .fetch_add(1, Ordering::Relaxed);
820 (
821 StatusCode::CREATED,
822 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
823 )
824 .into_response()
825}
826
827async fn post_event(
828 State(relay): State<Relay>,
829 Path(slot_id): Path<String>,
830 headers: HeaderMap,
831 Json(req): Json<PostEventRequest>,
832) -> impl IntoResponse {
833 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
834 return resp;
835 }
836 let body_bytes = match serde_json::to_vec(&req.event) {
838 Ok(b) => b,
839 Err(e) => {
840 return (
841 StatusCode::BAD_REQUEST,
842 Json(json!({"error": format!("event not serializable: {e}")})),
843 )
844 .into_response();
845 }
846 };
847 if body_bytes.len() > MAX_EVENT_BYTES {
848 return (
849 StatusCode::PAYLOAD_TOO_LARGE,
850 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
851 )
852 .into_response();
853 }
854 {
856 let inner = relay.inner.lock().await;
857 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
858 if used + body_bytes.len() > MAX_SLOT_BYTES {
859 return (
860 StatusCode::PAYLOAD_TOO_LARGE,
861 Json(json!({
862 "error": "slot quota exceeded",
863 "slot_bytes_used": used,
864 "slot_bytes_max": MAX_SLOT_BYTES,
865 "remediation": "operator should `wire rotate-slot` to drain old slot",
866 })),
867 )
868 .into_response();
869 }
870 }
871 let event_id = req
872 .event
873 .get("event_id")
874 .and_then(Value::as_str)
875 .map(str::to_string);
876
877 let dup = {
879 let inner = relay.inner.lock().await;
880 let slot = inner.slots.get(&slot_id);
881 if let (Some(eid), Some(slot)) = (&event_id, slot) {
882 slot.iter()
883 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
884 } else {
885 false
886 }
887 };
888 if dup {
889 return (
890 StatusCode::OK,
891 Json(json!({"event_id": event_id, "status": "duplicate"})),
892 )
893 .into_response();
894 }
895
896 {
897 let mut inner = relay.inner.lock().await;
898 let event_size = body_bytes.len();
899 let slot = inner.slots.entry(slot_id.clone()).or_default();
900 slot.push(req.event.clone());
901 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
902 }
903 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
904 return (
905 StatusCode::INTERNAL_SERVER_ERROR,
906 Json(json!({"error": format!("persist failed: {e}")})),
907 )
908 .into_response();
909 }
910 relay
911 .counters
912 .events_posted_total
913 .fetch_add(1, Ordering::Relaxed);
914 {
919 let mut inner = relay.inner.lock().await;
920 if let Some(subs) = inner.streams.get_mut(&slot_id) {
921 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
922 }
923 }
924 (
925 StatusCode::CREATED,
926 Json(json!({"event_id": event_id, "status": "stored"})),
927 )
928 .into_response()
929}
930
931async fn stream_events(
944 State(relay): State<Relay>,
945 Path(slot_id): Path<String>,
946 headers: HeaderMap,
947) -> axum::response::Response {
948 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
949 use futures::stream::StreamExt;
950
951 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
952 return resp;
953 }
954
955 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
956 {
957 let mut inner = relay.inner.lock().await;
958 inner.streams.entry(slot_id.clone()).or_default().push(tx);
959 }
960
961 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
962 SseEvent::default()
963 .json_data(&ev)
964 .map_err(|e| std::io::Error::other(e.to_string()))
965 });
966
967 Sse::new(stream)
968 .keep_alive(
969 KeepAlive::new()
970 .interval(std::time::Duration::from_secs(30))
971 .text("phyllis: still on the line"),
972 )
973 .into_response()
974}
975
976#[derive(Deserialize)]
979pub struct PairOpenRequest {
980 pub code_hash: String,
981 pub msg: String,
983 pub role: String, }
985
986#[derive(Deserialize)]
987pub struct PairBootstrapRequest {
988 pub role: String,
989 pub sealed: String,
990}
991
992#[derive(Deserialize)]
993pub struct PairAbandonRequest {
994 pub code_hash: String,
997}
998
999async fn pair_abandon(
1005 State(relay): State<Relay>,
1006 Json(req): Json<PairAbandonRequest>,
1007) -> impl IntoResponse {
1008 let mut inner = relay.inner.lock().await;
1009 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
1010 inner.pair_slots.remove(&pair_id);
1011 }
1012 StatusCode::NO_CONTENT.into_response()
1013}
1014
1015async fn pair_open(
1016 State(relay): State<Relay>,
1017 Json(req): Json<PairOpenRequest>,
1018) -> impl IntoResponse {
1019 if req.role != "host" && req.role != "guest" {
1020 return (
1021 StatusCode::BAD_REQUEST,
1022 Json(json!({"error": "role must be 'host' or 'guest'"})),
1023 )
1024 .into_response();
1025 }
1026 relay.evict_expired_pair_slots().await;
1027 let mut inner = relay.inner.lock().await;
1028 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1029 Some(id) => id,
1030 None => {
1031 let new_id = random_hex(16);
1032 inner
1033 .pair_lookup
1034 .insert(req.code_hash.clone(), new_id.clone());
1035 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1036 relay
1037 .counters
1038 .pair_opens_total
1039 .fetch_add(1, Ordering::Relaxed);
1040 new_id
1041 }
1042 };
1043 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1044 slot.last_touched = std::time::Instant::now();
1045 if req.role == "host" {
1046 if slot.host_msg.is_some() {
1047 return (
1048 StatusCode::CONFLICT,
1049 Json(json!({"error": "host already registered for this code"})),
1050 )
1051 .into_response();
1052 }
1053 slot.host_msg = Some(req.msg);
1054 } else {
1055 if slot.guest_msg.is_some() {
1056 return (
1057 StatusCode::CONFLICT,
1058 Json(json!({"error": "guest already registered for this code"})),
1059 )
1060 .into_response();
1061 }
1062 slot.guest_msg = Some(req.msg);
1063 }
1064 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1065}
1066
1067#[derive(Deserialize)]
1068pub struct PairGetQuery {
1069 pub as_role: String,
1071}
1072
1073async fn pair_get(
1074 State(relay): State<Relay>,
1075 Path(pair_id): Path<String>,
1076 Query(q): Query<PairGetQuery>,
1077) -> impl IntoResponse {
1078 relay.evict_expired_pair_slots().await;
1079 let mut inner = relay.inner.lock().await;
1080 let slot = match inner.pair_slots.get_mut(&pair_id) {
1081 Some(s) => {
1082 s.last_touched = std::time::Instant::now();
1083 s.clone()
1084 }
1085 None => {
1086 return (
1087 StatusCode::NOT_FOUND,
1088 Json(json!({"error": "unknown pair_id"})),
1089 )
1090 .into_response();
1091 }
1092 };
1093 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1094 "host" => (slot.guest_msg, slot.guest_bootstrap),
1095 "guest" => (slot.host_msg, slot.host_bootstrap),
1096 _ => {
1097 return (
1098 StatusCode::BAD_REQUEST,
1099 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1100 )
1101 .into_response();
1102 }
1103 };
1104 (
1105 StatusCode::OK,
1106 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1107 )
1108 .into_response()
1109}
1110
1111async fn pair_bootstrap(
1112 State(relay): State<Relay>,
1113 Path(pair_id): Path<String>,
1114 Json(req): Json<PairBootstrapRequest>,
1115) -> impl IntoResponse {
1116 relay.evict_expired_pair_slots().await;
1117 let mut inner = relay.inner.lock().await;
1118 let slot = match inner.pair_slots.get_mut(&pair_id) {
1119 Some(s) => s,
1120 None => {
1121 return (
1122 StatusCode::NOT_FOUND,
1123 Json(json!({"error": "unknown pair_id"})),
1124 )
1125 .into_response();
1126 }
1127 };
1128 slot.last_touched = std::time::Instant::now();
1129 match req.role.as_str() {
1130 "host" => slot.host_bootstrap = Some(req.sealed),
1131 "guest" => slot.guest_bootstrap = Some(req.sealed),
1132 _ => {
1133 return (
1134 StatusCode::BAD_REQUEST,
1135 Json(json!({"error": "role must be 'host' or 'guest'"})),
1136 )
1137 .into_response();
1138 }
1139 }
1140 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1141}
1142
1143#[derive(Deserialize)]
1146pub struct HandleClaimRequest {
1147 pub nick: String,
1150 pub slot_id: String,
1152 pub relay_url: Option<String>,
1156 pub card: Value,
1159 #[serde(default, skip_serializing_if = "Option::is_none")]
1164 pub discoverable: Option<bool>,
1165}
1166
1167async fn handle_claim(
1174 State(relay): State<Relay>,
1175 headers: HeaderMap,
1176 Json(req): Json<HandleClaimRequest>,
1177) -> impl IntoResponse {
1178 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1181 return resp;
1182 }
1183 if !crate::pair_profile::is_valid_nick(&req.nick) {
1185 return (
1186 StatusCode::BAD_REQUEST,
1187 Json(json!({
1188 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1189 "nick": req.nick,
1190 })),
1191 )
1192 .into_response();
1193 }
1194 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1196 return (
1197 StatusCode::BAD_REQUEST,
1198 Json(json!({"error": format!("card signature invalid: {e}")})),
1199 )
1200 .into_response();
1201 }
1202 let did = match req.card.get("did").and_then(Value::as_str) {
1203 Some(d) => d.to_string(),
1204 None => {
1205 return (
1206 StatusCode::BAD_REQUEST,
1207 Json(json!({"error": "card missing 'did' field"})),
1208 )
1209 .into_response();
1210 }
1211 };
1212
1213 let prior_record: Option<HandleRecord>;
1219 let first_claim = {
1220 let inner = relay.inner.lock().await;
1221 match inner.handles.get(&req.nick) {
1222 Some(existing) if existing.did != did => {
1223 return (
1224 StatusCode::CONFLICT,
1225 Json(json!({
1226 "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1227 "nick": req.nick,
1228 "claimed_by": existing.did,
1229 })),
1230 )
1231 .into_response();
1232 }
1233 Some(prev) => {
1234 prior_record = Some(prev.clone());
1235 false
1236 }
1237 None => {
1238 prior_record = None;
1239 true
1240 }
1241 }
1242 };
1243
1244 let now = time::OffsetDateTime::now_utc()
1249 .replace_nanosecond(0)
1250 .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1251 .format(&time::format_description::well_known::Rfc3339)
1252 .unwrap_or_default();
1253 let discoverable = match (req.discoverable, &prior_record) {
1259 (Some(d), _) => Some(d),
1260 (None, Some(prev)) => prev.discoverable,
1261 (None, None) => None,
1262 };
1263 let record = HandleRecord {
1264 nick: req.nick.clone(),
1265 did: did.clone(),
1266 card: req.card.clone(),
1267 slot_id: req.slot_id.clone(),
1268 relay_url: req.relay_url.clone(),
1269 claimed_at: now,
1270 discoverable,
1271 };
1272
1273 let path = relay
1275 .state_dir
1276 .join("handles")
1277 .join(format!("{}.json", req.nick));
1278 let body = match serde_json::to_vec_pretty(&record) {
1279 Ok(b) => b,
1280 Err(e) => {
1281 return (
1282 StatusCode::INTERNAL_SERVER_ERROR,
1283 Json(json!({"error": format!("serialize failed: {e}")})),
1284 )
1285 .into_response();
1286 }
1287 };
1288 if let Err(e) = tokio::fs::write(&path, &body).await {
1289 return (
1290 StatusCode::INTERNAL_SERVER_ERROR,
1291 Json(json!({"error": format!("persist failed: {e}")})),
1292 )
1293 .into_response();
1294 }
1295 {
1296 let mut inner = relay.inner.lock().await;
1297 inner.handles.insert(req.nick.clone(), record);
1298 }
1299 relay
1300 .counters
1301 .handle_claims_total
1302 .fetch_add(1, Ordering::Relaxed);
1303 if first_claim {
1304 relay
1305 .counters
1306 .handle_first_claims_total
1307 .fetch_add(1, Ordering::Relaxed);
1308 }
1309 (
1310 StatusCode::CREATED,
1311 Json(json!({
1312 "nick": req.nick,
1313 "did": did,
1314 "status": if first_claim { "claimed" } else { "re-claimed" },
1315 })),
1316 )
1317 .into_response()
1318}
1319
1320#[derive(Deserialize)]
1321pub struct WellKnownAgentQuery {
1322 pub handle: String,
1323}
1324
1325#[derive(Deserialize)]
1326pub struct HandlesDirectoryQuery {
1327 pub cursor: Option<String>,
1328 pub limit: Option<usize>,
1329 pub vibe: Option<String>,
1330}
1331
1332#[derive(Deserialize)]
1342pub struct InviteRegisterRequest {
1343 pub invite_url: String,
1345 #[serde(default)]
1347 pub ttl_seconds: Option<u64>,
1348 #[serde(default)]
1351 pub uses: Option<u32>,
1352}
1353
1354impl Relay {
1355 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1357 use tokio::io::AsyncWriteExt;
1358 let mut line = serde_json::to_vec(rec)?;
1359 line.push(b'\n');
1360 let path = self.state_dir.join("invites.jsonl");
1361 let mut f = tokio::fs::OpenOptions::new()
1362 .create(true)
1363 .append(true)
1364 .open(&path)
1365 .await?;
1366 f.write_all(&line).await?;
1367 f.flush().await?;
1368 Ok(())
1369 }
1370}
1371
1372async fn invite_register(
1373 State(relay): State<Relay>,
1374 Json(req): Json<InviteRegisterRequest>,
1375) -> impl IntoResponse {
1376 if req.invite_url.is_empty() {
1377 return (
1378 StatusCode::BAD_REQUEST,
1379 Json(json!({"error": "invite_url required"})),
1380 )
1381 .into_response();
1382 }
1383 if req.invite_url.len() > 8_192 {
1385 return (
1386 StatusCode::PAYLOAD_TOO_LARGE,
1387 Json(json!({"error": "invite_url > 8 KiB"})),
1388 )
1389 .into_response();
1390 }
1391 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1392 let now = SystemTime::now()
1393 .duration_since(UNIX_EPOCH)
1394 .map(|d| d.as_secs())
1395 .unwrap_or(0);
1396 let token = random_hex(3);
1399 let rec = InviteRecord {
1400 token: token.clone(),
1401 invite_url: req.invite_url,
1402 expires_unix: now + ttl,
1403 uses_remaining: req.uses,
1404 created_unix: now,
1405 };
1406 {
1407 let mut inner = relay.inner.lock().await;
1408 if inner.invites.contains_key(&token) {
1409 return (
1410 StatusCode::CONFLICT,
1411 Json(json!({"error": "token collision, retry"})),
1412 )
1413 .into_response();
1414 }
1415 inner.invites.insert(token.clone(), rec.clone());
1416 }
1417 if let Err(e) = relay.persist_invite(&rec).await {
1418 return (
1419 StatusCode::INTERNAL_SERVER_ERROR,
1420 Json(json!({"error": format!("persist failed: {e}")})),
1421 )
1422 .into_response();
1423 }
1424 (
1425 StatusCode::CREATED,
1426 Json(json!({
1427 "token": token,
1428 "path": format!("/i/{token}"),
1429 "expires_unix": rec.expires_unix,
1430 "uses_remaining": rec.uses_remaining,
1431 })),
1432 )
1433 .into_response()
1434}
1435
1436#[derive(Deserialize)]
1437pub struct InviteScriptQuery {
1438 pub format: Option<String>,
1445}
1446
1447async fn invite_script(
1448 State(relay): State<Relay>,
1449 Path(token): Path<String>,
1450 Query(q): Query<InviteScriptQuery>,
1451) -> impl IntoResponse {
1452 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1455 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1456 }
1457 let want_raw_url = q.format.as_deref() == Some("url");
1458 let now = SystemTime::now()
1459 .duration_since(UNIX_EPOCH)
1460 .map(|d| d.as_secs())
1461 .unwrap_or(0);
1462 let invite_url = {
1463 let mut inner = relay.inner.lock().await;
1464 let Some(rec) = inner.invites.get_mut(&token) else {
1465 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1466 };
1467 if rec.expires_unix <= now {
1468 return (StatusCode::GONE, "this invite has expired\n").into_response();
1469 }
1470 if let Some(n) = rec.uses_remaining {
1471 if n == 0 {
1472 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1473 }
1474 if !want_raw_url {
1478 rec.uses_remaining = Some(n - 1);
1479 }
1480 }
1481 rec.invite_url.clone()
1482 };
1483 if want_raw_url {
1484 return (
1485 StatusCode::OK,
1486 [
1487 (
1488 axum::http::header::CONTENT_TYPE,
1489 "text/plain; charset=utf-8",
1490 ),
1491 (
1492 axum::http::header::CACHE_CONTROL,
1493 "private, no-store, max-age=0",
1494 ),
1495 ],
1496 invite_url,
1497 )
1498 .into_response();
1499 }
1500 let escaped = invite_url.replace('\'', "'\\''");
1501 let script = format!(
1502 "#!/bin/sh\n\
1503 # wire — one-curl onboarding (install + pair in one shot)\n\
1504 # source: https://github.com/SlanchaAi/wire\n\
1505 set -eu\n\
1506 INVITE='{escaped}'\n\
1507 echo \"\u{2192} checking for wire CLI...\"\n\
1508 if ! command -v wire >/dev/null 2>&1; then\n \
1509 echo \"\u{2192} wire not installed; installing first...\"\n \
1510 curl -fsSL https://wireup.net/install.sh | sh\n \
1511 case \":$PATH:\" in\n \
1512 *:\"$HOME/.local/bin\":*) ;;\n \
1513 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1514 esac\n \
1515 if ! command -v wire >/dev/null 2>&1; then\n \
1516 echo \"\"\n \
1517 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1518 echo \"Open a new shell, then run:\"\n \
1519 echo \" wire accept '$INVITE'\"\n \
1520 exit 0\n \
1521 fi\n\
1522 fi\n\
1523 echo \"\u{2192} accepting invite...\"\n\
1524 wire accept \"$INVITE\"\n"
1525 );
1526 (
1527 StatusCode::OK,
1528 [
1529 (
1530 axum::http::header::CONTENT_TYPE,
1531 "text/x-shellscript; charset=utf-8",
1532 ),
1533 (
1534 axum::http::header::CACHE_CONTROL,
1535 "private, no-store, max-age=0",
1536 ),
1537 ],
1538 script,
1539 )
1540 .into_response()
1541}
1542
1543async fn handles_directory(
1544 State(relay): State<Relay>,
1545 Query(q): Query<HandlesDirectoryQuery>,
1546) -> impl IntoResponse {
1547 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1548 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1549 let inner = relay.inner.lock().await;
1550 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1551 drop(inner);
1552 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1553
1554 let cursor = q.cursor.as_deref();
1555 let mut eligible = Vec::new();
1556 for rec in records {
1557 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1558 continue;
1559 }
1560 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1565 continue;
1566 }
1567 if !rec.is_discoverable() {
1571 continue;
1572 }
1573 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1574 if profile
1575 .get("listed")
1576 .and_then(Value::as_bool)
1577 .is_some_and(|listed| !listed)
1578 {
1579 continue;
1580 }
1581 if let Some(want) = &vibe_filter {
1582 let matched = profile
1583 .get("vibe")
1584 .and_then(Value::as_array)
1585 .map(|arr| {
1586 arr.iter().any(|v| {
1587 v.as_str()
1588 .map(|s| s.eq_ignore_ascii_case(want))
1589 .unwrap_or(false)
1590 })
1591 })
1592 .unwrap_or(false);
1593 if !matched {
1594 continue;
1595 }
1596 }
1597 eligible.push((rec, profile));
1598 }
1599
1600 let has_more = eligible.len() > limit;
1601 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1602 let next_cursor = if has_more {
1603 page.last().map(|(rec, _)| rec.nick.clone())
1604 } else {
1605 None
1606 };
1607 let handles: Vec<Value> = page
1608 .into_iter()
1609 .map(|(rec, profile)| {
1610 json!({
1611 "nick": rec.nick,
1612 "did": rec.did,
1613 "profile": {
1614 "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1615 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1616 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1617 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1618 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1619 },
1620 "claimed_at": rec.claimed_at,
1621 })
1622 })
1623 .collect();
1624 (
1625 StatusCode::OK,
1626 Json(json!({
1627 "handles": handles,
1628 "next_cursor": next_cursor,
1629 })),
1630 )
1631 .into_response()
1632}
1633
1634async fn handle_intro(
1648 State(relay): State<Relay>,
1649 Path(nick): Path<String>,
1650 Json(req): Json<PostEventRequest>,
1651) -> impl IntoResponse {
1652 let slot_id = {
1654 let inner = relay.inner.lock().await;
1655 match inner.handles.get(&nick) {
1656 Some(rec) => rec.slot_id.clone(),
1657 None => {
1658 return (
1659 StatusCode::NOT_FOUND,
1660 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1661 )
1662 .into_response();
1663 }
1664 }
1665 };
1666
1667 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1670 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1671 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1672 return (
1673 StatusCode::BAD_REQUEST,
1674 Json(json!({
1675 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1676 "got_kind": kind,
1677 "got_type": type_str,
1678 })),
1679 )
1680 .into_response();
1681 }
1682
1683 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1685 Some(c) => c.clone(),
1686 None => {
1687 return (
1688 StatusCode::BAD_REQUEST,
1689 Json(json!({"error": "intro event body must embed 'card' field"})),
1690 )
1691 .into_response();
1692 }
1693 };
1694 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1695 return (
1696 StatusCode::BAD_REQUEST,
1697 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1698 )
1699 .into_response();
1700 }
1701
1702 let body_bytes = match serde_json::to_vec(&req.event) {
1704 Ok(b) => b,
1705 Err(e) => {
1706 return (
1707 StatusCode::BAD_REQUEST,
1708 Json(json!({"error": format!("event not serializable: {e}")})),
1709 )
1710 .into_response();
1711 }
1712 };
1713 if body_bytes.len() > MAX_EVENT_BYTES {
1714 return (
1715 StatusCode::PAYLOAD_TOO_LARGE,
1716 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1717 )
1718 .into_response();
1719 }
1720 {
1721 let inner = relay.inner.lock().await;
1722 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1723 if used + body_bytes.len() > MAX_SLOT_BYTES {
1724 return (
1725 StatusCode::PAYLOAD_TOO_LARGE,
1726 Json(json!({
1727 "error": "target slot quota exceeded",
1728 "slot_bytes_used": used,
1729 "slot_bytes_max": MAX_SLOT_BYTES,
1730 })),
1731 )
1732 .into_response();
1733 }
1734 }
1735
1736 let event_id = req
1737 .event
1738 .get("event_id")
1739 .and_then(Value::as_str)
1740 .map(str::to_string);
1741
1742 let dup = {
1744 let inner = relay.inner.lock().await;
1745 let slot = inner.slots.get(&slot_id);
1746 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1747 slot.iter()
1748 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1749 } else {
1750 false
1751 }
1752 };
1753 if dup {
1754 return (
1755 StatusCode::OK,
1756 Json(json!({"event_id": event_id, "status": "duplicate"})),
1757 )
1758 .into_response();
1759 }
1760
1761 {
1762 let mut inner = relay.inner.lock().await;
1763 let event_size = body_bytes.len();
1764 let slot = inner.slots.entry(slot_id.clone()).or_default();
1765 slot.push(req.event.clone());
1766 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1767 }
1768 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1769 return (
1770 StatusCode::INTERNAL_SERVER_ERROR,
1771 Json(json!({"error": format!("persist failed: {e}")})),
1772 )
1773 .into_response();
1774 }
1775 (
1776 StatusCode::CREATED,
1777 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1778 )
1779 .into_response()
1780}
1781
1782async fn well_known_agent_card_a2a(
1798 State(relay): State<Relay>,
1799 Query(q): Query<WellKnownAgentQuery>,
1800) -> impl IntoResponse {
1801 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1802 if nick.is_empty() {
1803 return (
1804 StatusCode::BAD_REQUEST,
1805 Json(json!({"error": "handle missing nick"})),
1806 )
1807 .into_response();
1808 }
1809 let inner = relay.inner.lock().await;
1810 let rec = match inner.handles.get(&nick) {
1811 Some(r) => r.clone(),
1812 None => {
1813 return (
1814 StatusCode::NOT_FOUND,
1815 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1816 )
1817 .into_response();
1818 }
1819 };
1820 drop(inner);
1821
1822 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1823 let description = profile
1824 .get("motto")
1825 .and_then(Value::as_str)
1826 .unwrap_or("")
1827 .to_string();
1828 let display_name = profile
1829 .get("display_name")
1830 .and_then(Value::as_str)
1831 .unwrap_or(&rec.nick)
1832 .to_string();
1833 let relay_url = rec.relay_url.clone().unwrap_or_default();
1834 let endpoint = if !relay_url.is_empty() {
1836 format!(
1837 "{}/v1/handle/intro/{}",
1838 relay_url.trim_end_matches('/'),
1839 rec.nick
1840 )
1841 } else {
1842 format!("/v1/handle/intro/{}", rec.nick)
1843 };
1844 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1845
1846 let a2a_card = json!({
1850 "id": rec.did,
1851 "name": display_name,
1852 "description": description,
1853 "version": "wire/0.5",
1854 "endpoint": endpoint,
1855 "provider": {
1856 "name": "wire",
1857 "url": "https://github.com/SlanchaAi/wire"
1858 },
1859 "capabilities": {
1860 "streaming": false,
1861 "pushNotifications": false,
1862 "extendedAgentCard": true
1863 },
1864 "securitySchemes": {
1865 "ed25519-event-sig": {
1866 "type": "signature",
1867 "alg": "EdDSA",
1868 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1869 }
1870 },
1871 "security": [{"ed25519-event-sig": []}],
1872 "skills": [],
1873 "extensions": [{
1874 "uri": "https://slancha.ai/wire/ext/v0.5",
1878 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1879 "required": false,
1880 "params": {
1881 "did": rec.did,
1882 "handle": rec.nick,
1883 "slot_id": rec.slot_id,
1884 "relay_url": rec.relay_url,
1885 "card": rec.card,
1886 "profile": profile,
1887 "claimed_at": rec.claimed_at,
1888 }
1889 }],
1890 "signature": card_sig,
1891 });
1892 (StatusCode::OK, Json(a2a_card)).into_response()
1893}
1894
1895async fn well_known_agent(
1896 State(relay): State<Relay>,
1897 Query(q): Query<WellKnownAgentQuery>,
1898) -> impl IntoResponse {
1899 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1900 if nick.is_empty() {
1901 return (
1902 StatusCode::BAD_REQUEST,
1903 Json(json!({"error": "handle missing nick"})),
1904 )
1905 .into_response();
1906 }
1907 let inner = relay.inner.lock().await;
1908 match inner.handles.get(&nick) {
1909 Some(rec) => (
1910 StatusCode::OK,
1911 Json(json!({
1912 "nick": rec.nick,
1913 "did": rec.did,
1914 "card": rec.card,
1915 "slot_id": rec.slot_id,
1916 "relay_url": rec.relay_url,
1917 "claimed_at": rec.claimed_at,
1918 })),
1919 )
1920 .into_response(),
1921 None => (
1922 StatusCode::NOT_FOUND,
1923 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1924 )
1925 .into_response(),
1926 }
1927}
1928
1929async fn list_events(
1930 State(relay): State<Relay>,
1931 Path(slot_id): Path<String>,
1932 Query(q): Query<ListEventsQuery>,
1933 headers: HeaderMap,
1934) -> impl IntoResponse {
1935 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1936 return resp;
1937 }
1938 let limit = q.limit.unwrap_or(100).min(1000);
1939 let mut inner = relay.inner.lock().await;
1940 let now_unix = std::time::SystemTime::now()
1944 .duration_since(std::time::UNIX_EPOCH)
1945 .map(|d| d.as_secs())
1946 .unwrap_or(0);
1947 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1948 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1949 let start = match q.since {
1950 Some(eid) => events
1951 .iter()
1952 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1953 .map(|i| i + 1)
1954 .unwrap_or(0),
1955 None => 0,
1956 };
1957 let end = (start + limit).min(events.len());
1958 let slice = events[start..end].to_vec();
1959 (StatusCode::OK, Json(slice)).into_response()
1960}
1961
1962async fn slot_state(
1968 State(relay): State<Relay>,
1969 Path(slot_id): Path<String>,
1970 headers: HeaderMap,
1971) -> impl IntoResponse {
1972 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1973 return resp;
1974 }
1975 let inner = relay.inner.lock().await;
1976 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1977 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1978 let responder_health = inner.responder_health.get(&slot_id).cloned();
1979 (
1980 StatusCode::OK,
1981 Json(json!({
1982 "slot_id": slot_id,
1983 "event_count": event_count,
1984 "last_pull_at_unix": last_pull_at_unix,
1985 "responder_health": responder_health,
1986 })),
1987 )
1988 .into_response()
1989}
1990
1991async fn responder_health_set(
1992 State(relay): State<Relay>,
1993 Path(slot_id): Path<String>,
1994 headers: HeaderMap,
1995 Json(record): Json<ResponderHealthRecord>,
1996) -> impl IntoResponse {
1997 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1998 return resp;
1999 }
2000 let path = relay
2001 .state_dir
2002 .join("responder-health")
2003 .join(format!("{slot_id}.json"));
2004 let body = match serde_json::to_vec_pretty(&record) {
2005 Ok(b) => b,
2006 Err(e) => {
2007 return (
2008 StatusCode::INTERNAL_SERVER_ERROR,
2009 Json(json!({"error": format!("serialize failed: {e}")})),
2010 )
2011 .into_response();
2012 }
2013 };
2014 if let Err(e) = tokio::fs::write(&path, body).await {
2015 return (
2016 StatusCode::INTERNAL_SERVER_ERROR,
2017 Json(json!({"error": format!("persist failed: {e}")})),
2018 )
2019 .into_response();
2020 }
2021 {
2022 let mut inner = relay.inner.lock().await;
2023 inner
2024 .responder_health
2025 .insert(slot_id.clone(), record.clone());
2026 }
2027 (StatusCode::OK, Json(record)).into_response()
2028}
2029
2030async fn check_token(
2031 relay: &Relay,
2032 headers: &HeaderMap,
2033 slot_id: &str,
2034) -> std::result::Result<(), axum::response::Response> {
2035 let auth = headers
2036 .get(AUTHORIZATION)
2037 .and_then(|h| h.to_str().ok())
2038 .and_then(|s| s.strip_prefix("Bearer "))
2039 .map(str::to_string);
2040 let presented = match auth {
2041 Some(t) => t,
2042 None => {
2043 return Err((
2044 StatusCode::UNAUTHORIZED,
2045 Json(json!({"error": "missing Bearer token"})),
2046 )
2047 .into_response());
2048 }
2049 };
2050 let inner = relay.inner.lock().await;
2051 let expected = match inner.tokens.get(slot_id) {
2052 Some(t) => t.clone(),
2053 None => {
2054 return Err((
2055 StatusCode::NOT_FOUND,
2056 Json(json!({"error": "unknown slot"})),
2057 )
2058 .into_response());
2059 }
2060 };
2061 drop(inner);
2062 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2063 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2064 }
2065 Ok(())
2066}
2067
2068fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2069 if a.len() != b.len() {
2070 return false;
2071 }
2072 let mut acc = 0u8;
2073 for (x, y) in a.iter().zip(b.iter()) {
2074 acc |= x ^ y;
2075 }
2076 acc == 0
2077}
2078
2079fn is_valid_slot_id(s: &str) -> bool {
2080 s.len() == 32
2081 && s.bytes()
2082 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2083}
2084
2085fn random_hex(n_bytes: usize) -> String {
2086 let mut buf = vec![0u8; n_bytes];
2087 rand::thread_rng().fill_bytes(&mut buf);
2088 hex::encode(buf)
2089}
2090
2091pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2093 serve_with_mode(bind, state_dir, ServerMode::default()).await
2094}
2095
2096pub async fn serve_with_mode(
2100 bind: &str,
2101 state_dir: PathBuf,
2102 mode: ServerMode,
2103) -> Result<()> {
2104 let relay = Relay::new(state_dir).await?;
2105 relay.spawn_pair_sweeper();
2106 relay.spawn_counter_persister();
2107 let app = relay.clone().router_with_mode(mode);
2108 let listener = tokio::net::TcpListener::bind(bind)
2109 .await
2110 .with_context(|| format!("binding {bind}"))?;
2111 if mode.local_only {
2112 eprintln!("wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled");
2113 } else {
2114 eprintln!("wire relay-server listening on {bind}");
2115 }
2116 let shutdown_relay = relay.clone();
2117 axum::serve(listener, app)
2118 .with_graceful_shutdown(async move {
2119 let _ = tokio::signal::ctrl_c().await;
2120 eprintln!("\nshutting down — final counter snapshot");
2121 if let Err(e) = shutdown_relay.persist_counters().await {
2122 eprintln!("final counter persist failed: {e}");
2123 }
2124 })
2125 .await?;
2126 Ok(())
2127}
2128
2129#[derive(Debug, Clone, Copy, Default)]
2134pub struct ServerMode {
2135 pub local_only: bool,
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144 use super::*;
2145
2146 #[test]
2147 fn constant_time_eq_basic() {
2148 assert!(constant_time_eq(b"abc", b"abc"));
2149 assert!(!constant_time_eq(b"abc", b"abd"));
2150 assert!(!constant_time_eq(b"abc", b"abcd")); }
2152
2153 #[test]
2154 fn random_hex_length() {
2155 let s = random_hex(16);
2156 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2158 }
2159
2160 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2161 async fn pair_slot_evicts_when_idle_past_ttl() {
2162 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2163 let _ = std::fs::remove_dir_all(&dir);
2164 let relay = Relay::new(dir.clone()).await.unwrap();
2165
2166 {
2168 let mut inner = relay.inner.lock().await;
2169 inner
2170 .pair_lookup
2171 .insert("hash-A".to_string(), "id-A".to_string());
2172 inner.pair_slots.insert(
2173 "id-A".to_string(),
2174 PairSlot {
2175 last_touched: std::time::Instant::now()
2176 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2177 ..PairSlot::default()
2178 },
2179 );
2180
2181 inner
2183 .pair_lookup
2184 .insert("hash-B".to_string(), "id-B".to_string());
2185 inner
2186 .pair_slots
2187 .insert("id-B".to_string(), PairSlot::default());
2188
2189 assert_eq!(inner.pair_slots.len(), 2);
2190 assert_eq!(inner.pair_lookup.len(), 2);
2191 }
2192
2193 relay.evict_expired_pair_slots().await;
2194
2195 let inner = relay.inner.lock().await;
2196 assert_eq!(
2197 inner.pair_slots.len(),
2198 1,
2199 "expired slot should have been evicted"
2200 );
2201 assert!(inner.pair_slots.contains_key("id-B"));
2202 assert_eq!(inner.pair_lookup.len(), 1);
2203 assert!(inner.pair_lookup.contains_key("hash-B"));
2204 let _ = std::fs::remove_dir_all(&dir);
2205 }
2206
2207 #[test]
2208 fn slot_id_validator_accepts_only_lowercase_32hex() {
2209 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2210 assert!(is_valid_slot_id(&random_hex(16)));
2211 assert!(!is_valid_slot_id("abc"));
2213 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2217 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2219 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2220 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2221 assert!(!is_valid_slot_id(
2223 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2224 ));
2225 }
2226}