1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub discoverable: Option<bool>,
175}
176
177impl HandleRecord {
178 fn is_discoverable(&self) -> bool {
179 self.discoverable.unwrap_or(true)
180 }
181}
182
183#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
184pub struct ResponderHealthRecord {
185 pub status: String,
186 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub reason: Option<String>,
188 #[serde(default, skip_serializing_if = "Option::is_none")]
189 pub last_success_at: Option<String>,
190 pub set_at: String,
191}
192
193#[derive(Clone, Debug)]
194struct PairSlot {
195 host_msg: Option<String>,
197 guest_msg: Option<String>,
199 host_bootstrap: Option<String>,
201 guest_bootstrap: Option<String>,
203 last_touched: std::time::Instant,
205}
206
207impl Default for PairSlot {
208 fn default() -> Self {
209 Self {
210 host_msg: None,
211 guest_msg: None,
212 host_bootstrap: None,
213 guest_bootstrap: None,
214 last_touched: std::time::Instant::now(),
215 }
216 }
217}
218
219const PAIR_SLOT_TTL_SECS: u64 = 300;
222
223#[derive(Deserialize)]
224pub struct AllocateRequest {
225 #[serde(default)]
227 pub handle: Option<String>,
228}
229
230#[derive(Deserialize)]
231pub struct PostEventRequest {
232 pub event: Value,
233}
234
235#[derive(Deserialize)]
236pub struct ListEventsQuery {
237 pub since: Option<String>,
239 pub limit: Option<usize>,
241}
242
243impl Relay {
244 pub async fn new(state_dir: PathBuf) -> Result<Self> {
245 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
246 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
247 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
248 let mut inner = Inner {
249 slots: HashMap::new(),
250 tokens: HashMap::new(),
251 slot_bytes: HashMap::new(),
252 last_pull_at_unix: HashMap::new(),
253 streams: HashMap::new(),
254 pair_lookup: HashMap::new(),
255 pair_slots: HashMap::new(),
256 handles: HashMap::new(),
257 responder_health: HashMap::new(),
258 invites: HashMap::new(),
259 };
260 let token_path = state_dir.join("tokens.json");
262 if token_path.exists() {
263 let body = tokio::fs::read_to_string(&token_path).await?;
264 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
265 }
266 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
268 while let Some(entry) = slots_dir.next_entry().await? {
269 let path = entry.path();
270 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
271 continue;
272 }
273 let stem = match path.file_stem().and_then(|s| s.to_str()) {
274 Some(s) => s.to_string(),
275 None => continue,
276 };
277 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
278 let mut events = Vec::new();
279 for line in body.lines() {
280 if let Ok(v) = serde_json::from_str::<Value>(line) {
281 events.push(v);
282 }
283 }
284 let bytes: usize = events
286 .iter()
287 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
288 .sum();
289 inner.slot_bytes.insert(stem.clone(), bytes);
290 inner.slots.insert(stem, events);
291 }
292 let handles_dir = state_dir.join("handles");
294 if handles_dir.exists() {
295 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
296 while let Some(entry) = rd.next_entry().await? {
297 let path = entry.path();
298 if path.extension().and_then(|x| x.to_str()) != Some("json") {
299 continue;
300 }
301 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
302 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
303 inner.handles.insert(rec.nick.clone(), rec);
304 }
305 }
306 }
307 let responder_health_dir = state_dir.join("responder-health");
309 if responder_health_dir.exists() {
310 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
311 while let Some(entry) = rd.next_entry().await? {
312 let path = entry.path();
313 if path.extension().and_then(|x| x.to_str()) != Some("json") {
314 continue;
315 }
316 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
317 continue;
318 };
319 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
320 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
321 inner.responder_health.insert(slot_id.to_string(), rec);
322 }
323 }
324 }
325 let invites_path = state_dir.join("invites.jsonl");
329 if invites_path.exists() {
330 let now_unix = SystemTime::now()
331 .duration_since(UNIX_EPOCH)
332 .map(|d| d.as_secs())
333 .unwrap_or(0);
334 let body = tokio::fs::read_to_string(&invites_path)
335 .await
336 .unwrap_or_default();
337 for line in body.lines() {
338 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
339 && rec.expires_unix > now_unix
340 {
341 inner.invites.insert(rec.token.clone(), rec);
342 }
343 }
344 }
345 let boot_unix = SystemTime::now()
346 .duration_since(UNIX_EPOCH)
347 .map(|d| d.as_secs())
348 .unwrap_or(0);
349 let snap: CountersSnapshot =
351 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
352 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
353 Err(_) => CountersSnapshot::default(),
354 };
355 Ok(Self {
356 inner: Arc::new(Mutex::new(inner)),
357 state_dir,
358 counters: Arc::new(RelayCounters {
359 boot_unix,
360 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
361 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
362 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
363 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
364 events_posted_total: AtomicU64::new(snap.events_posted_total),
365 }),
366 })
367 }
368
369 pub fn router(self) -> Router {
370 self.router_with_mode(ServerMode::default())
371 }
372
373 pub fn router_with_mode(self, mode: ServerMode) -> Router {
374 let governor_conf = std::sync::Arc::new(
381 GovernorConfigBuilder::default()
382 .per_second(10)
383 .burst_size(50)
384 .key_extractor(GlobalKeyExtractor)
385 .finish()
386 .expect("valid governor config"),
387 );
388 let governor_layer = GovernorLayer {
389 config: governor_conf,
390 };
391
392 let hot_writes = Router::new()
394 .route("/v1/slot/allocate", post(allocate_slot))
395 .route("/v1/pair", post(pair_open))
396 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
397 .route("/v1/pair/abandon", post(pair_abandon))
398 .layer(governor_layer);
399
400 let mut router = Router::new()
403 .route("/healthz", get(healthz))
404 .route("/v1/events/:slot_id", post(post_event).get(list_events))
405 .route("/v1/slot/:slot_id/state", get(slot_state))
406 .route(
407 "/v1/slot/:slot_id/responder-health",
408 post(responder_health_set),
409 )
410 .route("/v1/events/:slot_id/stream", get(stream_events))
411 .route("/v1/pair/:pair_id", get(pair_get))
412 .route("/v1/handle/intro/:nick", post(handle_intro));
413
414 if !mode.local_only {
420 router = router
421 .route("/", get(landing_index))
422 .route("/favicon.svg", get(landing_favicon))
423 .route("/og.png", get(landing_og))
424 .route("/install", get(landing_install_sh))
425 .route("/install.sh", get(landing_install_sh))
426 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
427 .route("/stats", get(stats_root))
428 .route("/stats.json", get(stats_json))
429 .route("/stats.html", get(landing_stats_html))
430 .route("/stats.history", get(stats_history))
431 .route("/phonebook", get(landing_phonebook_html))
432 .route("/phonebook.html", get(landing_phonebook_html))
433 .route("/v1/handle/claim", post(handle_claim))
434 .route("/v1/handles", get(handles_directory))
435 .route("/v1/invite/register", post(invite_register))
436 .route("/i/:token", get(invite_script))
437 .route("/.well-known/wire/agent", get(well_known_agent))
438 .route(
439 "/.well-known/agent-card.json",
440 get(well_known_agent_card_a2a),
441 );
442 } else {
443 router = router.route("/v1/handle/claim", post(handle_claim));
449 }
450
451 router.merge(hot_writes).with_state(self)
452 }
453
454 async fn evict_expired_pair_slots(&self) {
458 let now = std::time::Instant::now();
459 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
460 let mut inner = self.inner.lock().await;
461 let mut to_remove = Vec::new();
462 for (id, slot) in inner.pair_slots.iter() {
463 if now.duration_since(slot.last_touched) > ttl {
464 to_remove.push(id.clone());
465 }
466 }
467 for id in to_remove {
468 inner.pair_slots.remove(&id);
469 inner.pair_lookup.retain(|_, v| v != &id);
470 }
471 }
472
473 pub fn spawn_pair_sweeper(&self) {
478 let me = self.clone();
479 tokio::spawn(async move {
480 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
481 loop {
482 tick.tick().await;
483 me.evict_expired_pair_slots().await;
484 }
485 });
486 }
487
488 pub async fn persist_counters(&self) -> Result<()> {
492 let snap = CountersSnapshot {
493 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
494 handle_first_claims_total: self
495 .counters
496 .handle_first_claims_total
497 .load(Ordering::Relaxed),
498 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
499 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
500 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
501 };
502 let body = serde_json::to_vec_pretty(&snap)?;
503 let path = self.state_dir.join("counters.json");
504 tokio::fs::write(path, body).await?;
505 Ok(())
506 }
507
508 pub async fn append_history(&self) -> Result<()> {
513 use tokio::io::AsyncWriteExt;
514 let now = SystemTime::now()
515 .duration_since(UNIX_EPOCH)
516 .map(|d| d.as_secs())
517 .unwrap_or(0);
518 let (handles_active, slots_active, pair_slots_open, streams_active) = {
519 let inner = self.inner.lock().await;
520 (
521 inner.handles.len(),
522 inner.slots.len(),
523 inner.pair_slots.len(),
524 inner.streams.values().map(Vec::len).sum::<usize>(),
525 )
526 };
527 let entry = HistoryEntry {
528 ts: now,
529 handles_active,
530 slots_active,
531 pair_slots_open,
532 streams_active,
533 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
534 handle_first_claims_total: self
535 .counters
536 .handle_first_claims_total
537 .load(Ordering::Relaxed),
538 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
539 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
540 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
541 };
542 let line = serde_json::to_vec(&entry)?;
543 let path = self.state_dir.join("stats-history.jsonl");
544 let mut f = tokio::fs::OpenOptions::new()
545 .create(true)
546 .append(true)
547 .open(&path)
548 .await?;
549 f.write_all(&line).await?;
550 f.write_all(b"\n").await?;
551 f.flush().await?;
552 Ok(())
553 }
554
555 pub fn spawn_counter_persister(&self) {
559 let me = self.clone();
560 tokio::spawn(async move {
561 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
562 tick.tick().await;
565 loop {
566 tick.tick().await;
567 if let Err(e) = me.persist_counters().await {
568 eprintln!("counter persist failed: {e}");
569 }
570 if let Err(e) = me.append_history().await {
571 eprintln!("history append failed: {e}");
572 }
573 }
574 });
575 }
576
577 async fn persist_tokens(&self) -> Result<()> {
578 let body = {
579 let inner = self.inner.lock().await;
580 serde_json::to_string_pretty(&inner.tokens)?
581 };
582 let path = self.state_dir.join("tokens.json");
583 tokio::fs::write(path, body).await?;
584 Ok(())
585 }
586
587 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
588 if !is_valid_slot_id(slot_id) {
594 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
595 }
596 let path = self
597 .state_dir
598 .join("slots")
599 .join(format!("{slot_id}.jsonl"));
600 let mut line = serde_json::to_vec(event)?;
601 line.push(b'\n');
602 use tokio::io::AsyncWriteExt;
603 let mut f = tokio::fs::OpenOptions::new()
604 .create(true)
605 .append(true)
606 .open(&path)
607 .await
608 .with_context(|| format!("opening {path:?}"))?;
609 f.write_all(&line).await?;
610 f.flush().await?;
611 Ok(())
612 }
613}
614
615async fn healthz() -> impl IntoResponse {
616 (StatusCode::OK, "ok\n")
617}
618
619async fn stats_history(
623 State(relay): State<Relay>,
624 Query(q): Query<StatsHistoryQuery>,
625) -> impl IntoResponse {
626 let hours = q.hours.unwrap_or(24).min(168);
627 let now = SystemTime::now()
628 .duration_since(UNIX_EPOCH)
629 .map(|d| d.as_secs())
630 .unwrap_or(0);
631 let cutoff = now.saturating_sub(hours * 3600);
632 let path = relay.state_dir.join("stats-history.jsonl");
633 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
634 let entries: Vec<Value> = body
635 .lines()
636 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
637 .filter(|v| {
638 v.get("ts")
639 .and_then(Value::as_u64)
640 .map(|t| t >= cutoff)
641 .unwrap_or(false)
642 })
643 .collect();
644 (
645 StatusCode::OK,
646 Json(json!({
647 "hours": hours,
648 "now_unix": now,
649 "count": entries.len(),
650 "entries": entries,
651 })),
652 )
653}
654
655async fn landing_stats_html() -> impl IntoResponse {
656 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
657 (
658 StatusCode::OK,
659 [
660 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
661 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
662 ],
663 STATS_HTML,
664 )
665}
666
667async fn landing_phonebook_html() -> impl IntoResponse {
668 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
669 (
670 StatusCode::OK,
671 [
672 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
673 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
674 ],
675 PHONEBOOK_HTML,
676 )
677}
678
679async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
684 let wants_html = headers
685 .get(axum::http::header::ACCEPT)
686 .and_then(|v| v.to_str().ok())
687 .unwrap_or("")
688 .contains("text/html");
689 if wants_html {
690 landing_stats_html().await.into_response()
691 } else {
692 stats_json(State(relay)).await.into_response()
693 }
694}
695
696async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
697 let now = SystemTime::now()
698 .duration_since(UNIX_EPOCH)
699 .map(|d| d.as_secs())
700 .unwrap_or(0);
701 let inner = relay.inner.lock().await;
702 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
703 let body = json!({
704 "version": env!("CARGO_PKG_VERSION"),
705 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
706 "handles_active": inner.handles.len(),
707 "slots_active": inner.slots.len(),
708 "pair_slots_open": inner.pair_slots.len(),
709 "streams_active": streams_active,
710 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
711 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
712 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
713 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
714 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
715 });
716 (StatusCode::OK, Json(body))
717}
718
719async fn landing_index() -> impl IntoResponse {
723 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
724 (
725 StatusCode::OK,
726 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
727 INDEX_HTML,
728 )
729}
730
731async fn landing_favicon() -> impl IntoResponse {
732 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
733 (
734 StatusCode::OK,
735 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
736 FAVICON_SVG,
737 )
738}
739
740async fn landing_og() -> impl IntoResponse {
741 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
742 (
743 StatusCode::OK,
744 [
745 (axum::http::header::CONTENT_TYPE, "image/png"),
746 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
747 ],
748 OG_PNG,
749 )
750}
751
752async fn landing_install_sh() -> impl IntoResponse {
753 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
754 (
755 StatusCode::OK,
756 [
757 (
758 axum::http::header::CONTENT_TYPE,
759 "text/x-shellscript; charset=utf-8",
760 ),
761 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
762 ],
763 INSTALL_SH,
764 )
765}
766
767async fn landing_openshell_policy_sh() -> impl IntoResponse {
768 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
769 (
770 StatusCode::OK,
771 [
772 (
773 axum::http::header::CONTENT_TYPE,
774 "text/x-shellscript; charset=utf-8",
775 ),
776 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
777 ],
778 POLICY_SH,
779 )
780}
781
782async fn allocate_slot(
783 State(relay): State<Relay>,
784 Json(_req): Json<AllocateRequest>,
785) -> impl IntoResponse {
786 let slot_id = random_hex(16);
787 let slot_token = random_hex(32);
788 {
789 let mut inner = relay.inner.lock().await;
790 inner.slots.insert(slot_id.clone(), Vec::new());
791 inner.tokens.insert(slot_id.clone(), slot_token.clone());
792 }
793 if let Err(e) = relay.persist_tokens().await {
794 return (
795 StatusCode::INTERNAL_SERVER_ERROR,
796 Json(json!({"error": format!("persist failed: {e}")})),
797 )
798 .into_response();
799 }
800 relay
801 .counters
802 .slot_allocations_total
803 .fetch_add(1, Ordering::Relaxed);
804 (
805 StatusCode::CREATED,
806 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
807 )
808 .into_response()
809}
810
811async fn post_event(
812 State(relay): State<Relay>,
813 Path(slot_id): Path<String>,
814 headers: HeaderMap,
815 Json(req): Json<PostEventRequest>,
816) -> impl IntoResponse {
817 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
818 return resp;
819 }
820 let body_bytes = match serde_json::to_vec(&req.event) {
822 Ok(b) => b,
823 Err(e) => {
824 return (
825 StatusCode::BAD_REQUEST,
826 Json(json!({"error": format!("event not serializable: {e}")})),
827 )
828 .into_response();
829 }
830 };
831 if body_bytes.len() > MAX_EVENT_BYTES {
832 return (
833 StatusCode::PAYLOAD_TOO_LARGE,
834 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
835 )
836 .into_response();
837 }
838 {
840 let inner = relay.inner.lock().await;
841 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
842 if used + body_bytes.len() > MAX_SLOT_BYTES {
843 return (
844 StatusCode::PAYLOAD_TOO_LARGE,
845 Json(json!({
846 "error": "slot quota exceeded",
847 "slot_bytes_used": used,
848 "slot_bytes_max": MAX_SLOT_BYTES,
849 "remediation": "operator should `wire rotate-slot` to drain old slot",
850 })),
851 )
852 .into_response();
853 }
854 }
855 let event_id = req
856 .event
857 .get("event_id")
858 .and_then(Value::as_str)
859 .map(str::to_string);
860
861 let dup = {
863 let inner = relay.inner.lock().await;
864 let slot = inner.slots.get(&slot_id);
865 if let (Some(eid), Some(slot)) = (&event_id, slot) {
866 slot.iter()
867 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
868 } else {
869 false
870 }
871 };
872 if dup {
873 return (
874 StatusCode::OK,
875 Json(json!({"event_id": event_id, "status": "duplicate"})),
876 )
877 .into_response();
878 }
879
880 {
881 let mut inner = relay.inner.lock().await;
882 let event_size = body_bytes.len();
883 let slot = inner.slots.entry(slot_id.clone()).or_default();
884 slot.push(req.event.clone());
885 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
886 }
887 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": format!("persist failed: {e}")})),
891 )
892 .into_response();
893 }
894 relay
895 .counters
896 .events_posted_total
897 .fetch_add(1, Ordering::Relaxed);
898 {
903 let mut inner = relay.inner.lock().await;
904 if let Some(subs) = inner.streams.get_mut(&slot_id) {
905 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
906 }
907 }
908 (
909 StatusCode::CREATED,
910 Json(json!({"event_id": event_id, "status": "stored"})),
911 )
912 .into_response()
913}
914
915async fn stream_events(
928 State(relay): State<Relay>,
929 Path(slot_id): Path<String>,
930 headers: HeaderMap,
931) -> axum::response::Response {
932 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
933 use futures::stream::StreamExt;
934
935 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
936 return resp;
937 }
938
939 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
940 {
941 let mut inner = relay.inner.lock().await;
942 inner.streams.entry(slot_id.clone()).or_default().push(tx);
943 }
944
945 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
946 SseEvent::default()
947 .json_data(&ev)
948 .map_err(|e| std::io::Error::other(e.to_string()))
949 });
950
951 Sse::new(stream)
952 .keep_alive(
953 KeepAlive::new()
954 .interval(std::time::Duration::from_secs(30))
955 .text("phyllis: still on the line"),
956 )
957 .into_response()
958}
959
960#[derive(Deserialize)]
963pub struct PairOpenRequest {
964 pub code_hash: String,
965 pub msg: String,
967 pub role: String, }
969
970#[derive(Deserialize)]
971pub struct PairBootstrapRequest {
972 pub role: String,
973 pub sealed: String,
974}
975
976#[derive(Deserialize)]
977pub struct PairAbandonRequest {
978 pub code_hash: String,
981}
982
983async fn pair_abandon(
989 State(relay): State<Relay>,
990 Json(req): Json<PairAbandonRequest>,
991) -> impl IntoResponse {
992 let mut inner = relay.inner.lock().await;
993 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
994 inner.pair_slots.remove(&pair_id);
995 }
996 StatusCode::NO_CONTENT.into_response()
997}
998
999async fn pair_open(
1000 State(relay): State<Relay>,
1001 Json(req): Json<PairOpenRequest>,
1002) -> impl IntoResponse {
1003 if req.role != "host" && req.role != "guest" {
1004 return (
1005 StatusCode::BAD_REQUEST,
1006 Json(json!({"error": "role must be 'host' or 'guest'"})),
1007 )
1008 .into_response();
1009 }
1010 relay.evict_expired_pair_slots().await;
1011 let mut inner = relay.inner.lock().await;
1012 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1013 Some(id) => id,
1014 None => {
1015 let new_id = random_hex(16);
1016 inner
1017 .pair_lookup
1018 .insert(req.code_hash.clone(), new_id.clone());
1019 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1020 relay
1021 .counters
1022 .pair_opens_total
1023 .fetch_add(1, Ordering::Relaxed);
1024 new_id
1025 }
1026 };
1027 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1028 slot.last_touched = std::time::Instant::now();
1029 if req.role == "host" {
1030 if slot.host_msg.is_some() {
1031 return (
1032 StatusCode::CONFLICT,
1033 Json(json!({"error": "host already registered for this code"})),
1034 )
1035 .into_response();
1036 }
1037 slot.host_msg = Some(req.msg);
1038 } else {
1039 if slot.guest_msg.is_some() {
1040 return (
1041 StatusCode::CONFLICT,
1042 Json(json!({"error": "guest already registered for this code"})),
1043 )
1044 .into_response();
1045 }
1046 slot.guest_msg = Some(req.msg);
1047 }
1048 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1049}
1050
1051#[derive(Deserialize)]
1052pub struct PairGetQuery {
1053 pub as_role: String,
1055}
1056
1057async fn pair_get(
1058 State(relay): State<Relay>,
1059 Path(pair_id): Path<String>,
1060 Query(q): Query<PairGetQuery>,
1061) -> impl IntoResponse {
1062 relay.evict_expired_pair_slots().await;
1063 let mut inner = relay.inner.lock().await;
1064 let slot = match inner.pair_slots.get_mut(&pair_id) {
1065 Some(s) => {
1066 s.last_touched = std::time::Instant::now();
1067 s.clone()
1068 }
1069 None => {
1070 return (
1071 StatusCode::NOT_FOUND,
1072 Json(json!({"error": "unknown pair_id"})),
1073 )
1074 .into_response();
1075 }
1076 };
1077 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1078 "host" => (slot.guest_msg, slot.guest_bootstrap),
1079 "guest" => (slot.host_msg, slot.host_bootstrap),
1080 _ => {
1081 return (
1082 StatusCode::BAD_REQUEST,
1083 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1084 )
1085 .into_response();
1086 }
1087 };
1088 (
1089 StatusCode::OK,
1090 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1091 )
1092 .into_response()
1093}
1094
1095async fn pair_bootstrap(
1096 State(relay): State<Relay>,
1097 Path(pair_id): Path<String>,
1098 Json(req): Json<PairBootstrapRequest>,
1099) -> impl IntoResponse {
1100 relay.evict_expired_pair_slots().await;
1101 let mut inner = relay.inner.lock().await;
1102 let slot = match inner.pair_slots.get_mut(&pair_id) {
1103 Some(s) => s,
1104 None => {
1105 return (
1106 StatusCode::NOT_FOUND,
1107 Json(json!({"error": "unknown pair_id"})),
1108 )
1109 .into_response();
1110 }
1111 };
1112 slot.last_touched = std::time::Instant::now();
1113 match req.role.as_str() {
1114 "host" => slot.host_bootstrap = Some(req.sealed),
1115 "guest" => slot.guest_bootstrap = Some(req.sealed),
1116 _ => {
1117 return (
1118 StatusCode::BAD_REQUEST,
1119 Json(json!({"error": "role must be 'host' or 'guest'"})),
1120 )
1121 .into_response();
1122 }
1123 }
1124 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1125}
1126
1127#[derive(Deserialize)]
1130pub struct HandleClaimRequest {
1131 pub nick: String,
1134 pub slot_id: String,
1136 pub relay_url: Option<String>,
1140 pub card: Value,
1143 #[serde(default, skip_serializing_if = "Option::is_none")]
1147 pub discoverable: Option<bool>,
1148}
1149
1150async fn handle_claim(
1157 State(relay): State<Relay>,
1158 headers: HeaderMap,
1159 Json(req): Json<HandleClaimRequest>,
1160) -> impl IntoResponse {
1161 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1164 return resp;
1165 }
1166 if !crate::pair_profile::is_valid_nick(&req.nick) {
1168 return (
1169 StatusCode::BAD_REQUEST,
1170 Json(json!({
1171 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1172 "nick": req.nick,
1173 })),
1174 )
1175 .into_response();
1176 }
1177 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1179 return (
1180 StatusCode::BAD_REQUEST,
1181 Json(json!({"error": format!("card signature invalid: {e}")})),
1182 )
1183 .into_response();
1184 }
1185 let did = match req.card.get("did").and_then(Value::as_str) {
1186 Some(d) => d.to_string(),
1187 None => {
1188 return (
1189 StatusCode::BAD_REQUEST,
1190 Json(json!({"error": "card missing 'did' field"})),
1191 )
1192 .into_response();
1193 }
1194 };
1195
1196 let canonical_nick = crate::agent_card::display_handle_from_did(&did);
1204 if req.nick != canonical_nick {
1205 return (
1206 StatusCode::BAD_REQUEST,
1207 Json(json!({
1208 "error": "phyllis: that nick doesn't match your DID — wire publishes one name, and it's the one your key spells out",
1209 "nick": req.nick,
1210 "expected": canonical_nick,
1211 })),
1212 )
1213 .into_response();
1214 }
1215
1216 let prior_record: Option<HandleRecord>;
1222 let first_claim = {
1223 let inner = relay.inner.lock().await;
1224 match inner.handles.get(&req.nick) {
1225 Some(existing) if existing.did != did => {
1226 return (
1227 StatusCode::CONFLICT,
1228 Json(json!({
1229 "error": "phyllis: this line's already taken by a different identity (persona collision). Your handle is fixed to your key, so claim on another relay (`wire up @<other-relay>`) or mint a fresh identity (`wire nuke` then `wire up`).",
1230 "nick": req.nick,
1231 "claimed_by": existing.did,
1232 })),
1233 )
1234 .into_response();
1235 }
1236 Some(prev) => {
1237 prior_record = Some(prev.clone());
1238 false
1239 }
1240 None => {
1241 prior_record = None;
1242 true
1243 }
1244 }
1245 };
1246
1247 let now = time::OffsetDateTime::now_utc()
1252 .replace_nanosecond(0)
1253 .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1254 .format(&time::format_description::well_known::Rfc3339)
1255 .unwrap_or_default();
1256 let discoverable = match (req.discoverable, &prior_record) {
1262 (Some(d), _) => Some(d),
1263 (None, Some(prev)) => prev.discoverable,
1264 (None, None) => Some(true),
1265 };
1266 let record = HandleRecord {
1267 nick: req.nick.clone(),
1268 did: did.clone(),
1269 card: req.card.clone(),
1270 slot_id: req.slot_id.clone(),
1271 relay_url: req.relay_url.clone(),
1272 claimed_at: now,
1273 discoverable,
1274 };
1275
1276 let path = relay
1278 .state_dir
1279 .join("handles")
1280 .join(format!("{}.json", req.nick));
1281 let body = match serde_json::to_vec_pretty(&record) {
1282 Ok(b) => b,
1283 Err(e) => {
1284 return (
1285 StatusCode::INTERNAL_SERVER_ERROR,
1286 Json(json!({"error": format!("serialize failed: {e}")})),
1287 )
1288 .into_response();
1289 }
1290 };
1291 if let Err(e) = tokio::fs::write(&path, &body).await {
1292 return (
1293 StatusCode::INTERNAL_SERVER_ERROR,
1294 Json(json!({"error": format!("persist failed: {e}")})),
1295 )
1296 .into_response();
1297 }
1298 {
1299 let mut inner = relay.inner.lock().await;
1300 inner.handles.insert(req.nick.clone(), record);
1301 }
1302 relay
1303 .counters
1304 .handle_claims_total
1305 .fetch_add(1, Ordering::Relaxed);
1306 if first_claim {
1307 relay
1308 .counters
1309 .handle_first_claims_total
1310 .fetch_add(1, Ordering::Relaxed);
1311 }
1312 (
1313 StatusCode::CREATED,
1314 Json(json!({
1315 "nick": req.nick,
1316 "did": did,
1317 "status": if first_claim { "claimed" } else { "re-claimed" },
1318 })),
1319 )
1320 .into_response()
1321}
1322
1323#[derive(Deserialize)]
1324pub struct WellKnownAgentQuery {
1325 pub handle: String,
1326}
1327
1328#[derive(Deserialize)]
1329pub struct HandlesDirectoryQuery {
1330 pub cursor: Option<String>,
1331 pub limit: Option<usize>,
1332 pub vibe: Option<String>,
1333}
1334
1335#[derive(Deserialize)]
1345pub struct InviteRegisterRequest {
1346 pub invite_url: String,
1348 #[serde(default)]
1350 pub ttl_seconds: Option<u64>,
1351 #[serde(default)]
1354 pub uses: Option<u32>,
1355}
1356
1357impl Relay {
1358 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1360 use tokio::io::AsyncWriteExt;
1361 let mut line = serde_json::to_vec(rec)?;
1362 line.push(b'\n');
1363 let path = self.state_dir.join("invites.jsonl");
1364 let mut f = tokio::fs::OpenOptions::new()
1365 .create(true)
1366 .append(true)
1367 .open(&path)
1368 .await?;
1369 f.write_all(&line).await?;
1370 f.flush().await?;
1371 Ok(())
1372 }
1373}
1374
1375async fn invite_register(
1376 State(relay): State<Relay>,
1377 Json(req): Json<InviteRegisterRequest>,
1378) -> impl IntoResponse {
1379 if req.invite_url.is_empty() {
1380 return (
1381 StatusCode::BAD_REQUEST,
1382 Json(json!({"error": "invite_url required"})),
1383 )
1384 .into_response();
1385 }
1386 if req.invite_url.len() > 8_192 {
1388 return (
1389 StatusCode::PAYLOAD_TOO_LARGE,
1390 Json(json!({"error": "invite_url > 8 KiB"})),
1391 )
1392 .into_response();
1393 }
1394 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1395 let now = SystemTime::now()
1396 .duration_since(UNIX_EPOCH)
1397 .map(|d| d.as_secs())
1398 .unwrap_or(0);
1399 let token = random_hex(3);
1402 let rec = InviteRecord {
1403 token: token.clone(),
1404 invite_url: req.invite_url,
1405 expires_unix: now + ttl,
1406 uses_remaining: req.uses,
1407 created_unix: now,
1408 };
1409 {
1410 let mut inner = relay.inner.lock().await;
1411 if inner.invites.contains_key(&token) {
1412 return (
1413 StatusCode::CONFLICT,
1414 Json(json!({"error": "token collision, retry"})),
1415 )
1416 .into_response();
1417 }
1418 inner.invites.insert(token.clone(), rec.clone());
1419 }
1420 if let Err(e) = relay.persist_invite(&rec).await {
1421 return (
1422 StatusCode::INTERNAL_SERVER_ERROR,
1423 Json(json!({"error": format!("persist failed: {e}")})),
1424 )
1425 .into_response();
1426 }
1427 (
1428 StatusCode::CREATED,
1429 Json(json!({
1430 "token": token,
1431 "path": format!("/i/{token}"),
1432 "expires_unix": rec.expires_unix,
1433 "uses_remaining": rec.uses_remaining,
1434 })),
1435 )
1436 .into_response()
1437}
1438
1439#[derive(Deserialize)]
1440pub struct InviteScriptQuery {
1441 pub format: Option<String>,
1448}
1449
1450async fn invite_script(
1451 State(relay): State<Relay>,
1452 Path(token): Path<String>,
1453 Query(q): Query<InviteScriptQuery>,
1454) -> impl IntoResponse {
1455 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1458 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1459 }
1460 let want_raw_url = q.format.as_deref() == Some("url");
1461 let now = SystemTime::now()
1462 .duration_since(UNIX_EPOCH)
1463 .map(|d| d.as_secs())
1464 .unwrap_or(0);
1465 let invite_url = {
1466 let mut inner = relay.inner.lock().await;
1467 let Some(rec) = inner.invites.get_mut(&token) else {
1468 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1469 };
1470 if rec.expires_unix <= now {
1471 return (StatusCode::GONE, "this invite has expired\n").into_response();
1472 }
1473 if let Some(n) = rec.uses_remaining {
1474 if n == 0 {
1475 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1476 }
1477 if !want_raw_url {
1481 rec.uses_remaining = Some(n - 1);
1482 }
1483 }
1484 rec.invite_url.clone()
1485 };
1486 if want_raw_url {
1487 return (
1488 StatusCode::OK,
1489 [
1490 (
1491 axum::http::header::CONTENT_TYPE,
1492 "text/plain; charset=utf-8",
1493 ),
1494 (
1495 axum::http::header::CACHE_CONTROL,
1496 "private, no-store, max-age=0",
1497 ),
1498 ],
1499 invite_url,
1500 )
1501 .into_response();
1502 }
1503 let escaped = invite_url.replace('\'', "'\\''");
1504 let script = format!(
1505 "#!/bin/sh\n\
1506 # wire — one-curl onboarding (install + pair in one shot)\n\
1507 # source: https://github.com/SlanchaAi/wire\n\
1508 set -eu\n\
1509 INVITE='{escaped}'\n\
1510 echo \"\u{2192} checking for wire CLI...\"\n\
1511 if ! command -v wire >/dev/null 2>&1; then\n \
1512 echo \"\u{2192} wire not installed; installing first...\"\n \
1513 curl -fsSL https://wireup.net/install.sh | sh\n \
1514 case \":$PATH:\" in\n \
1515 *:\"$HOME/.local/bin\":*) ;;\n \
1516 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1517 esac\n \
1518 if ! command -v wire >/dev/null 2>&1; then\n \
1519 echo \"\"\n \
1520 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1521 echo \"Open a new shell, then run:\"\n \
1522 echo \" wire accept '$INVITE'\"\n \
1523 exit 0\n \
1524 fi\n\
1525 fi\n\
1526 echo \"\u{2192} accepting invite...\"\n\
1527 wire accept \"$INVITE\"\n"
1528 );
1529 (
1530 StatusCode::OK,
1531 [
1532 (
1533 axum::http::header::CONTENT_TYPE,
1534 "text/x-shellscript; charset=utf-8",
1535 ),
1536 (
1537 axum::http::header::CACHE_CONTROL,
1538 "private, no-store, max-age=0",
1539 ),
1540 ],
1541 script,
1542 )
1543 .into_response()
1544}
1545
1546async fn handles_directory(
1547 State(relay): State<Relay>,
1548 Query(q): Query<HandlesDirectoryQuery>,
1549) -> impl IntoResponse {
1550 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1551 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1552 let inner = relay.inner.lock().await;
1553 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1554 drop(inner);
1555 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1556
1557 let cursor = q.cursor.as_deref();
1558 let mut eligible = Vec::new();
1559 for rec in records {
1560 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1561 continue;
1562 }
1563 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1568 continue;
1569 }
1570 if !rec.is_discoverable() {
1574 continue;
1575 }
1576 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1577 if profile
1578 .get("listed")
1579 .and_then(Value::as_bool)
1580 .is_some_and(|listed| !listed)
1581 {
1582 continue;
1583 }
1584 if let Some(want) = &vibe_filter {
1585 let matched = profile
1586 .get("vibe")
1587 .and_then(Value::as_array)
1588 .map(|arr| {
1589 arr.iter().any(|v| {
1590 v.as_str()
1591 .map(|s| s.eq_ignore_ascii_case(want))
1592 .unwrap_or(false)
1593 })
1594 })
1595 .unwrap_or(false);
1596 if !matched {
1597 continue;
1598 }
1599 }
1600 eligible.push((rec, profile));
1601 }
1602
1603 let has_more = eligible.len() > limit;
1604 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1605 let next_cursor = if has_more {
1606 page.last().map(|(rec, _)| rec.nick.clone())
1607 } else {
1608 None
1609 };
1610 let handles: Vec<Value> = page
1611 .into_iter()
1612 .map(|(rec, profile)| {
1613 let emoji = profile
1619 .get("emoji")
1620 .and_then(Value::as_str)
1621 .filter(|s| !s.is_empty())
1622 .map(str::to_string)
1623 .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1624 json!({
1625 "nick": rec.nick,
1626 "did": rec.did,
1627 "profile": {
1628 "emoji": emoji,
1629 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1630 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1631 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1632 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1633 },
1634 "claimed_at": rec.claimed_at,
1635 })
1636 })
1637 .collect();
1638 (
1639 StatusCode::OK,
1640 Json(json!({
1641 "handles": handles,
1642 "next_cursor": next_cursor,
1643 })),
1644 )
1645 .into_response()
1646}
1647
1648async fn handle_intro(
1662 State(relay): State<Relay>,
1663 Path(nick): Path<String>,
1664 Json(req): Json<PostEventRequest>,
1665) -> impl IntoResponse {
1666 let slot_id = {
1668 let inner = relay.inner.lock().await;
1669 match inner.handles.get(&nick) {
1670 Some(rec) => rec.slot_id.clone(),
1671 None => {
1672 return (
1673 StatusCode::NOT_FOUND,
1674 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1675 )
1676 .into_response();
1677 }
1678 }
1679 };
1680
1681 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1684 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1685 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1686 return (
1687 StatusCode::BAD_REQUEST,
1688 Json(json!({
1689 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1690 "got_kind": kind,
1691 "got_type": type_str,
1692 })),
1693 )
1694 .into_response();
1695 }
1696
1697 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1699 Some(c) => c.clone(),
1700 None => {
1701 return (
1702 StatusCode::BAD_REQUEST,
1703 Json(json!({"error": "intro event body must embed 'card' field"})),
1704 )
1705 .into_response();
1706 }
1707 };
1708 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1709 return (
1710 StatusCode::BAD_REQUEST,
1711 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1712 )
1713 .into_response();
1714 }
1715
1716 let body_bytes = match serde_json::to_vec(&req.event) {
1718 Ok(b) => b,
1719 Err(e) => {
1720 return (
1721 StatusCode::BAD_REQUEST,
1722 Json(json!({"error": format!("event not serializable: {e}")})),
1723 )
1724 .into_response();
1725 }
1726 };
1727 if body_bytes.len() > MAX_EVENT_BYTES {
1728 return (
1729 StatusCode::PAYLOAD_TOO_LARGE,
1730 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1731 )
1732 .into_response();
1733 }
1734 {
1735 let inner = relay.inner.lock().await;
1736 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1737 if used + body_bytes.len() > MAX_SLOT_BYTES {
1738 return (
1739 StatusCode::PAYLOAD_TOO_LARGE,
1740 Json(json!({
1741 "error": "target slot quota exceeded",
1742 "slot_bytes_used": used,
1743 "slot_bytes_max": MAX_SLOT_BYTES,
1744 })),
1745 )
1746 .into_response();
1747 }
1748 }
1749
1750 let event_id = req
1751 .event
1752 .get("event_id")
1753 .and_then(Value::as_str)
1754 .map(str::to_string);
1755
1756 let dup = {
1758 let inner = relay.inner.lock().await;
1759 let slot = inner.slots.get(&slot_id);
1760 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1761 slot.iter()
1762 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1763 } else {
1764 false
1765 }
1766 };
1767 if dup {
1768 return (
1769 StatusCode::OK,
1770 Json(json!({"event_id": event_id, "status": "duplicate"})),
1771 )
1772 .into_response();
1773 }
1774
1775 {
1776 let mut inner = relay.inner.lock().await;
1777 let event_size = body_bytes.len();
1778 let slot = inner.slots.entry(slot_id.clone()).or_default();
1779 slot.push(req.event.clone());
1780 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1781 }
1782 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1783 return (
1784 StatusCode::INTERNAL_SERVER_ERROR,
1785 Json(json!({"error": format!("persist failed: {e}")})),
1786 )
1787 .into_response();
1788 }
1789 (
1790 StatusCode::CREATED,
1791 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1792 )
1793 .into_response()
1794}
1795
1796async fn well_known_agent_card_a2a(
1812 State(relay): State<Relay>,
1813 Query(q): Query<WellKnownAgentQuery>,
1814) -> impl IntoResponse {
1815 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1816 if nick.is_empty() {
1817 return (
1818 StatusCode::BAD_REQUEST,
1819 Json(json!({"error": "handle missing nick"})),
1820 )
1821 .into_response();
1822 }
1823 let inner = relay.inner.lock().await;
1824 let rec = match inner.handles.get(&nick) {
1825 Some(r) => r.clone(),
1826 None => {
1827 return (
1828 StatusCode::NOT_FOUND,
1829 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1830 )
1831 .into_response();
1832 }
1833 };
1834 drop(inner);
1835
1836 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1837 let description = profile
1838 .get("motto")
1839 .and_then(Value::as_str)
1840 .unwrap_or("")
1841 .to_string();
1842 let display_name = profile
1843 .get("display_name")
1844 .and_then(Value::as_str)
1845 .unwrap_or(&rec.nick)
1846 .to_string();
1847 let relay_url = rec.relay_url.clone().unwrap_or_default();
1848 let endpoint = if !relay_url.is_empty() {
1850 format!(
1851 "{}/v1/handle/intro/{}",
1852 relay_url.trim_end_matches('/'),
1853 rec.nick
1854 )
1855 } else {
1856 format!("/v1/handle/intro/{}", rec.nick)
1857 };
1858 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1859
1860 let a2a_card = json!({
1864 "id": rec.did,
1865 "name": display_name,
1866 "description": description,
1867 "version": "wire/0.5",
1868 "endpoint": endpoint,
1869 "provider": {
1870 "name": "wire",
1871 "url": "https://github.com/SlanchaAi/wire"
1872 },
1873 "capabilities": {
1874 "streaming": false,
1875 "pushNotifications": false,
1876 "extendedAgentCard": true
1877 },
1878 "securitySchemes": {
1879 "ed25519-event-sig": {
1880 "type": "signature",
1881 "alg": "EdDSA",
1882 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1883 }
1884 },
1885 "security": [{"ed25519-event-sig": []}],
1886 "skills": [],
1887 "extensions": [{
1888 "uri": "https://slancha.ai/wire/ext/v0.5",
1892 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1893 "required": false,
1894 "params": {
1895 "did": rec.did,
1896 "handle": rec.nick,
1897 "slot_id": rec.slot_id,
1898 "relay_url": rec.relay_url,
1899 "card": rec.card,
1900 "profile": profile,
1901 "claimed_at": rec.claimed_at,
1902 }
1903 }],
1904 "signature": card_sig,
1905 });
1906 (StatusCode::OK, Json(a2a_card)).into_response()
1907}
1908
1909async fn well_known_agent(
1910 State(relay): State<Relay>,
1911 Query(q): Query<WellKnownAgentQuery>,
1912) -> impl IntoResponse {
1913 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1914 if nick.is_empty() {
1915 return (
1916 StatusCode::BAD_REQUEST,
1917 Json(json!({"error": "handle missing nick"})),
1918 )
1919 .into_response();
1920 }
1921 let inner = relay.inner.lock().await;
1922 match inner.handles.get(&nick) {
1923 Some(rec) => (
1924 StatusCode::OK,
1925 Json(json!({
1926 "nick": rec.nick,
1927 "did": rec.did,
1928 "card": rec.card,
1929 "slot_id": rec.slot_id,
1930 "relay_url": rec.relay_url,
1931 "claimed_at": rec.claimed_at,
1932 })),
1933 )
1934 .into_response(),
1935 None => (
1936 StatusCode::NOT_FOUND,
1937 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1938 )
1939 .into_response(),
1940 }
1941}
1942
1943async fn list_events(
1944 State(relay): State<Relay>,
1945 Path(slot_id): Path<String>,
1946 Query(q): Query<ListEventsQuery>,
1947 headers: HeaderMap,
1948) -> impl IntoResponse {
1949 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1950 return resp;
1951 }
1952 let limit = q.limit.unwrap_or(100).min(1000);
1953 let mut inner = relay.inner.lock().await;
1954 let now_unix = std::time::SystemTime::now()
1958 .duration_since(std::time::UNIX_EPOCH)
1959 .map(|d| d.as_secs())
1960 .unwrap_or(0);
1961 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1962 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1963 let start = match q.since {
1964 Some(eid) => events
1965 .iter()
1966 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1967 .map(|i| i + 1)
1968 .unwrap_or(0),
1969 None => 0,
1970 };
1971 let end = (start + limit).min(events.len());
1972 let slice = events[start..end].to_vec();
1973 (StatusCode::OK, Json(slice)).into_response()
1974}
1975
1976async fn slot_state(
1982 State(relay): State<Relay>,
1983 Path(slot_id): Path<String>,
1984 headers: HeaderMap,
1985) -> impl IntoResponse {
1986 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1987 return resp;
1988 }
1989 let inner = relay.inner.lock().await;
1990 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1991 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1992 let responder_health = inner.responder_health.get(&slot_id).cloned();
1993 (
1994 StatusCode::OK,
1995 Json(json!({
1996 "slot_id": slot_id,
1997 "event_count": event_count,
1998 "last_pull_at_unix": last_pull_at_unix,
1999 "responder_health": responder_health,
2000 })),
2001 )
2002 .into_response()
2003}
2004
2005async fn responder_health_set(
2006 State(relay): State<Relay>,
2007 Path(slot_id): Path<String>,
2008 headers: HeaderMap,
2009 Json(record): Json<ResponderHealthRecord>,
2010) -> impl IntoResponse {
2011 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
2012 return resp;
2013 }
2014 let path = relay
2015 .state_dir
2016 .join("responder-health")
2017 .join(format!("{slot_id}.json"));
2018 let body = match serde_json::to_vec_pretty(&record) {
2019 Ok(b) => b,
2020 Err(e) => {
2021 return (
2022 StatusCode::INTERNAL_SERVER_ERROR,
2023 Json(json!({"error": format!("serialize failed: {e}")})),
2024 )
2025 .into_response();
2026 }
2027 };
2028 if let Err(e) = tokio::fs::write(&path, body).await {
2029 return (
2030 StatusCode::INTERNAL_SERVER_ERROR,
2031 Json(json!({"error": format!("persist failed: {e}")})),
2032 )
2033 .into_response();
2034 }
2035 {
2036 let mut inner = relay.inner.lock().await;
2037 inner
2038 .responder_health
2039 .insert(slot_id.clone(), record.clone());
2040 }
2041 (StatusCode::OK, Json(record)).into_response()
2042}
2043
2044async fn check_token(
2045 relay: &Relay,
2046 headers: &HeaderMap,
2047 slot_id: &str,
2048) -> std::result::Result<(), axum::response::Response> {
2049 let auth = headers
2050 .get(AUTHORIZATION)
2051 .and_then(|h| h.to_str().ok())
2052 .and_then(|s| s.strip_prefix("Bearer "))
2053 .map(str::to_string);
2054 let presented = match auth {
2055 Some(t) => t,
2056 None => {
2057 return Err((
2058 StatusCode::UNAUTHORIZED,
2059 Json(json!({"error": "missing Bearer token"})),
2060 )
2061 .into_response());
2062 }
2063 };
2064 let inner = relay.inner.lock().await;
2065 let expected = match inner.tokens.get(slot_id) {
2066 Some(t) => t.clone(),
2067 None => {
2068 return Err((
2069 StatusCode::NOT_FOUND,
2070 Json(json!({"error": "unknown slot"})),
2071 )
2072 .into_response());
2073 }
2074 };
2075 drop(inner);
2076 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2077 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2078 }
2079 Ok(())
2080}
2081
2082fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2083 if a.len() != b.len() {
2084 return false;
2085 }
2086 let mut acc = 0u8;
2087 for (x, y) in a.iter().zip(b.iter()) {
2088 acc |= x ^ y;
2089 }
2090 acc == 0
2091}
2092
2093fn is_valid_slot_id(s: &str) -> bool {
2094 s.len() == 32
2095 && s.bytes()
2096 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2097}
2098
2099fn random_hex(n_bytes: usize) -> String {
2100 let mut buf = vec![0u8; n_bytes];
2101 rand::thread_rng().fill_bytes(&mut buf);
2102 hex::encode(buf)
2103}
2104
2105pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2107 serve_with_mode(bind, state_dir, ServerMode::default()).await
2108}
2109
2110pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2114 let relay = Relay::new(state_dir).await?;
2115 relay.spawn_pair_sweeper();
2116 relay.spawn_counter_persister();
2117 let app = relay.clone().router_with_mode(mode);
2118 let listener = tokio::net::TcpListener::bind(bind)
2119 .await
2120 .with_context(|| format!("binding {bind}"))?;
2121 if mode.local_only {
2122 eprintln!(
2123 "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2124 );
2125 } else {
2126 eprintln!("wire relay-server listening on {bind}");
2127 }
2128 let shutdown_relay = relay.clone();
2129 axum::serve(listener, app)
2130 .with_graceful_shutdown(async move {
2131 let _ = tokio::signal::ctrl_c().await;
2132 eprintln!("\nshutting down — final counter snapshot");
2133 if let Err(e) = shutdown_relay.persist_counters().await {
2134 eprintln!("final counter persist failed: {e}");
2135 }
2136 })
2137 .await?;
2138 Ok(())
2139}
2140
2141#[cfg(unix)]
2155pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2156 use hyper::server::conn::http1;
2157 use hyper_util::rt::TokioIo;
2158 use tower_service::Service;
2159
2160 if socket_path.exists() {
2162 std::fs::remove_file(&socket_path)
2163 .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2164 }
2165 if let Some(parent) = socket_path.parent() {
2166 std::fs::create_dir_all(parent)
2167 .with_context(|| format!("creating socket parent {parent:?}"))?;
2168 }
2169 let relay = Relay::new(state_dir).await?;
2170 relay.spawn_pair_sweeper();
2171 relay.spawn_counter_persister();
2172 let app: axum::Router = relay
2173 .clone()
2174 .router_with_mode(ServerMode { local_only: true });
2175 let listener = tokio::net::UnixListener::bind(&socket_path)
2176 .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2177
2178 use std::os::unix::fs::PermissionsExt;
2181 if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2182 eprintln!(
2183 "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2184 socket may be accessible to other uids. Investigate."
2185 );
2186 }
2187 eprintln!(
2188 "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2189 socket_path.display()
2190 );
2191
2192 let shutdown_relay = relay.clone();
2195 let socket_path_for_cleanup = socket_path.clone();
2196 let mut make_service = app.into_make_service();
2197
2198 let serve_loop = async {
2199 loop {
2200 let (stream, _peer_addr) = match listener.accept().await {
2201 Ok(p) => p,
2202 Err(e) => {
2203 eprintln!("wire relay-server (UDS): accept failed: {e}");
2204 continue;
2205 }
2206 };
2207 let tower_service = match make_service.call(&stream).await {
2208 Ok(s) => s,
2209 Err(infallible) => match infallible {},
2210 };
2211 let io = TokioIo::new(stream);
2212 let hyper_service =
2213 hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2214 let mut svc = tower_service.clone();
2215 async move { Service::call(&mut svc, req).await }
2216 });
2217 tokio::task::spawn(async move {
2218 if let Err(e) = http1::Builder::new()
2219 .serve_connection(io, hyper_service)
2220 .await
2221 {
2222 if !e.is_incomplete_message() {
2224 eprintln!("wire relay-server (UDS): conn error: {e}");
2225 }
2226 }
2227 });
2228 }
2229 };
2230
2231 let shutdown = async {
2232 let _ = tokio::signal::ctrl_c().await;
2233 eprintln!("\nshutting down — final counter snapshot");
2234 if let Err(e) = shutdown_relay.persist_counters().await {
2235 eprintln!("final counter persist failed: {e}");
2236 }
2237 let _ = std::fs::remove_file(&socket_path_for_cleanup);
2238 };
2239
2240 tokio::select! {
2241 _ = serve_loop => {},
2242 _ = shutdown => {},
2243 };
2244 Ok(())
2245}
2246
2247#[cfg(not(unix))]
2248pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2249 Err(anyhow::anyhow!(
2250 "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2251 Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2252 ))
2253}
2254
2255#[derive(Debug, Clone, Copy, Default)]
2260pub struct ServerMode {
2261 pub local_only: bool,
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270 use super::*;
2271
2272 #[test]
2273 fn constant_time_eq_basic() {
2274 assert!(constant_time_eq(b"abc", b"abc"));
2275 assert!(!constant_time_eq(b"abc", b"abd"));
2276 assert!(!constant_time_eq(b"abc", b"abcd")); }
2278
2279 #[test]
2280 fn random_hex_length() {
2281 let s = random_hex(16);
2282 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2284 }
2285
2286 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2287 async fn pair_slot_evicts_when_idle_past_ttl() {
2288 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2289 let _ = std::fs::remove_dir_all(&dir);
2290 let relay = Relay::new(dir.clone()).await.unwrap();
2291
2292 {
2294 let mut inner = relay.inner.lock().await;
2295 inner
2296 .pair_lookup
2297 .insert("hash-A".to_string(), "id-A".to_string());
2298 inner.pair_slots.insert(
2299 "id-A".to_string(),
2300 PairSlot {
2301 last_touched: std::time::Instant::now()
2302 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2303 ..PairSlot::default()
2304 },
2305 );
2306
2307 inner
2309 .pair_lookup
2310 .insert("hash-B".to_string(), "id-B".to_string());
2311 inner
2312 .pair_slots
2313 .insert("id-B".to_string(), PairSlot::default());
2314
2315 assert_eq!(inner.pair_slots.len(), 2);
2316 assert_eq!(inner.pair_lookup.len(), 2);
2317 }
2318
2319 relay.evict_expired_pair_slots().await;
2320
2321 let inner = relay.inner.lock().await;
2322 assert_eq!(
2323 inner.pair_slots.len(),
2324 1,
2325 "expired slot should have been evicted"
2326 );
2327 assert!(inner.pair_slots.contains_key("id-B"));
2328 assert_eq!(inner.pair_lookup.len(), 1);
2329 assert!(inner.pair_lookup.contains_key("hash-B"));
2330 let _ = std::fs::remove_dir_all(&dir);
2331 }
2332
2333 #[test]
2334 fn slot_id_validator_accepts_only_lowercase_32hex() {
2335 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2336 assert!(is_valid_slot_id(&random_hex(16)));
2337 assert!(!is_valid_slot_id("abc"));
2339 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2343 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2345 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2346 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2347 assert!(!is_valid_slot_id(
2349 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2350 ));
2351 }
2352}