1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub discoverable: Option<bool>,
175}
176
177impl HandleRecord {
178 fn is_discoverable(&self) -> bool {
179 self.discoverable.unwrap_or(true)
180 }
181}
182
183#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
184pub struct ResponderHealthRecord {
185 pub status: String,
186 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub reason: Option<String>,
188 #[serde(default, skip_serializing_if = "Option::is_none")]
189 pub last_success_at: Option<String>,
190 pub set_at: String,
191}
192
193#[derive(Clone, Debug)]
194struct PairSlot {
195 host_msg: Option<String>,
197 guest_msg: Option<String>,
199 host_bootstrap: Option<String>,
201 guest_bootstrap: Option<String>,
203 last_touched: std::time::Instant,
205}
206
207impl Default for PairSlot {
208 fn default() -> Self {
209 Self {
210 host_msg: None,
211 guest_msg: None,
212 host_bootstrap: None,
213 guest_bootstrap: None,
214 last_touched: std::time::Instant::now(),
215 }
216 }
217}
218
219const PAIR_SLOT_TTL_SECS: u64 = 300;
222
223#[derive(Deserialize)]
224pub struct AllocateRequest {
225 #[serde(default)]
227 pub handle: Option<String>,
228}
229
230#[derive(Deserialize)]
231pub struct PostEventRequest {
232 pub event: Value,
233}
234
235#[derive(Deserialize)]
236pub struct ListEventsQuery {
237 pub since: Option<String>,
239 pub limit: Option<usize>,
241}
242
243impl Relay {
244 pub async fn new(state_dir: PathBuf) -> Result<Self> {
245 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
246 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
247 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
248 let mut inner = Inner {
249 slots: HashMap::new(),
250 tokens: HashMap::new(),
251 slot_bytes: HashMap::new(),
252 last_pull_at_unix: HashMap::new(),
253 streams: HashMap::new(),
254 pair_lookup: HashMap::new(),
255 pair_slots: HashMap::new(),
256 handles: HashMap::new(),
257 responder_health: HashMap::new(),
258 invites: HashMap::new(),
259 };
260 let token_path = state_dir.join("tokens.json");
262 if token_path.exists() {
263 let body = tokio::fs::read_to_string(&token_path).await?;
264 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
265 }
266 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
268 while let Some(entry) = slots_dir.next_entry().await? {
269 let path = entry.path();
270 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
271 continue;
272 }
273 let stem = match path.file_stem().and_then(|s| s.to_str()) {
274 Some(s) => s.to_string(),
275 None => continue,
276 };
277 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
278 let mut events = Vec::new();
279 for line in body.lines() {
280 if let Ok(v) = serde_json::from_str::<Value>(line) {
281 events.push(v);
282 }
283 }
284 let bytes: usize = events
286 .iter()
287 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
288 .sum();
289 inner.slot_bytes.insert(stem.clone(), bytes);
290 inner.slots.insert(stem, events);
291 }
292 let handles_dir = state_dir.join("handles");
294 if handles_dir.exists() {
295 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
296 while let Some(entry) = rd.next_entry().await? {
297 let path = entry.path();
298 if path.extension().and_then(|x| x.to_str()) != Some("json") {
299 continue;
300 }
301 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
302 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
303 inner.handles.insert(rec.nick.clone(), rec);
304 }
305 }
306 }
307 let responder_health_dir = state_dir.join("responder-health");
309 if responder_health_dir.exists() {
310 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
311 while let Some(entry) = rd.next_entry().await? {
312 let path = entry.path();
313 if path.extension().and_then(|x| x.to_str()) != Some("json") {
314 continue;
315 }
316 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
317 continue;
318 };
319 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
320 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
321 inner.responder_health.insert(slot_id.to_string(), rec);
322 }
323 }
324 }
325 let invites_path = state_dir.join("invites.jsonl");
329 if invites_path.exists() {
330 let now_unix = SystemTime::now()
331 .duration_since(UNIX_EPOCH)
332 .map(|d| d.as_secs())
333 .unwrap_or(0);
334 let body = tokio::fs::read_to_string(&invites_path)
335 .await
336 .unwrap_or_default();
337 for line in body.lines() {
338 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
339 && rec.expires_unix > now_unix
340 {
341 inner.invites.insert(rec.token.clone(), rec);
342 }
343 }
344 }
345 let boot_unix = SystemTime::now()
346 .duration_since(UNIX_EPOCH)
347 .map(|d| d.as_secs())
348 .unwrap_or(0);
349 let snap: CountersSnapshot =
351 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
352 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
353 Err(_) => CountersSnapshot::default(),
354 };
355 Ok(Self {
356 inner: Arc::new(Mutex::new(inner)),
357 state_dir,
358 counters: Arc::new(RelayCounters {
359 boot_unix,
360 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
361 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
362 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
363 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
364 events_posted_total: AtomicU64::new(snap.events_posted_total),
365 }),
366 })
367 }
368
369 pub fn router(self) -> Router {
370 self.router_with_mode(ServerMode::default())
371 }
372
373 pub fn router_with_mode(self, mode: ServerMode) -> Router {
374 let governor_conf = std::sync::Arc::new(
381 GovernorConfigBuilder::default()
382 .per_second(10)
383 .burst_size(50)
384 .key_extractor(GlobalKeyExtractor)
385 .finish()
386 .expect("valid governor config"),
387 );
388 let governor_layer = GovernorLayer {
389 config: governor_conf,
390 };
391
392 let hot_writes = Router::new()
394 .route("/v1/slot/allocate", post(allocate_slot))
395 .route("/v1/pair", post(pair_open))
396 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
397 .route("/v1/pair/abandon", post(pair_abandon))
398 .layer(governor_layer);
399
400 let mut router = Router::new()
403 .route("/healthz", get(healthz))
404 .route("/v1/events/:slot_id", post(post_event).get(list_events))
405 .route("/v1/slot/:slot_id/state", get(slot_state))
406 .route(
407 "/v1/slot/:slot_id/responder-health",
408 post(responder_health_set),
409 )
410 .route("/v1/events/:slot_id/stream", get(stream_events))
411 .route("/v1/pair/:pair_id", get(pair_get))
412 .route("/v1/handle/intro/:nick", post(handle_intro));
413
414 if !mode.local_only {
420 router = router
421 .route("/", get(landing_index))
422 .route("/favicon.svg", get(landing_favicon))
423 .route("/og.png", get(landing_og))
424 .route("/install", get(landing_install_sh))
425 .route("/install.sh", get(landing_install_sh))
426 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
427 .route("/stats", get(stats_root))
428 .route("/stats.json", get(stats_json))
429 .route("/stats.html", get(landing_stats_html))
430 .route("/stats.history", get(stats_history))
431 .route("/phonebook", get(landing_phonebook_html))
432 .route("/phonebook.html", get(landing_phonebook_html))
433 .route("/v1/handle/claim", post(handle_claim))
434 .route("/v1/handles", get(handles_directory))
435 .route("/v1/invite/register", post(invite_register))
436 .route("/i/:token", get(invite_script))
437 .route("/.well-known/wire/agent", get(well_known_agent))
438 .route(
439 "/.well-known/agent-card.json",
440 get(well_known_agent_card_a2a),
441 );
442 } else {
443 router = router.route("/v1/handle/claim", post(handle_claim));
449 }
450
451 router.merge(hot_writes).with_state(self)
452 }
453
454 async fn evict_expired_pair_slots(&self) {
458 let now = std::time::Instant::now();
459 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
460 let mut inner = self.inner.lock().await;
461 let mut to_remove = Vec::new();
462 for (id, slot) in inner.pair_slots.iter() {
463 if now.duration_since(slot.last_touched) > ttl {
464 to_remove.push(id.clone());
465 }
466 }
467 for id in to_remove {
468 inner.pair_slots.remove(&id);
469 inner.pair_lookup.retain(|_, v| v != &id);
470 }
471 }
472
473 pub fn spawn_pair_sweeper(&self) {
478 let me = self.clone();
479 tokio::spawn(async move {
480 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
481 loop {
482 tick.tick().await;
483 me.evict_expired_pair_slots().await;
484 }
485 });
486 }
487
488 pub async fn persist_counters(&self) -> Result<()> {
492 let snap = CountersSnapshot {
493 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
494 handle_first_claims_total: self
495 .counters
496 .handle_first_claims_total
497 .load(Ordering::Relaxed),
498 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
499 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
500 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
501 };
502 let body = serde_json::to_vec_pretty(&snap)?;
503 let path = self.state_dir.join("counters.json");
504 tokio::fs::write(path, body).await?;
505 Ok(())
506 }
507
508 pub async fn append_history(&self) -> Result<()> {
513 use tokio::io::AsyncWriteExt;
514 let now = SystemTime::now()
515 .duration_since(UNIX_EPOCH)
516 .map(|d| d.as_secs())
517 .unwrap_or(0);
518 let (handles_active, slots_active, pair_slots_open, streams_active) = {
519 let inner = self.inner.lock().await;
520 (
521 inner.handles.len(),
522 inner.slots.len(),
523 inner.pair_slots.len(),
524 inner.streams.values().map(Vec::len).sum::<usize>(),
525 )
526 };
527 let entry = HistoryEntry {
528 ts: now,
529 handles_active,
530 slots_active,
531 pair_slots_open,
532 streams_active,
533 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
534 handle_first_claims_total: self
535 .counters
536 .handle_first_claims_total
537 .load(Ordering::Relaxed),
538 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
539 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
540 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
541 };
542 let line = serde_json::to_vec(&entry)?;
543 let path = self.state_dir.join("stats-history.jsonl");
544 let mut f = tokio::fs::OpenOptions::new()
545 .create(true)
546 .append(true)
547 .open(&path)
548 .await?;
549 f.write_all(&line).await?;
550 f.write_all(b"\n").await?;
551 f.flush().await?;
552 Ok(())
553 }
554
555 pub fn spawn_counter_persister(&self) {
559 let me = self.clone();
560 tokio::spawn(async move {
561 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
562 tick.tick().await;
565 loop {
566 tick.tick().await;
567 if let Err(e) = me.persist_counters().await {
568 eprintln!("counter persist failed: {e}");
569 }
570 if let Err(e) = me.append_history().await {
571 eprintln!("history append failed: {e}");
572 }
573 }
574 });
575 }
576
577 async fn persist_tokens(&self) -> Result<()> {
578 let body = {
579 let inner = self.inner.lock().await;
580 serde_json::to_string_pretty(&inner.tokens)?
581 };
582 let path = self.state_dir.join("tokens.json");
583 tokio::fs::write(path, body).await?;
584 Ok(())
585 }
586
587 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
588 if !is_valid_slot_id(slot_id) {
594 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
595 }
596 let path = self
597 .state_dir
598 .join("slots")
599 .join(format!("{slot_id}.jsonl"));
600 let mut line = serde_json::to_vec(event)?;
601 line.push(b'\n');
602 use tokio::io::AsyncWriteExt;
603 let mut f = tokio::fs::OpenOptions::new()
604 .create(true)
605 .append(true)
606 .open(&path)
607 .await
608 .with_context(|| format!("opening {path:?}"))?;
609 f.write_all(&line).await?;
610 f.flush().await?;
611 Ok(())
612 }
613}
614
615async fn healthz() -> impl IntoResponse {
616 (StatusCode::OK, "ok\n")
617}
618
619async fn stats_history(
623 State(relay): State<Relay>,
624 Query(q): Query<StatsHistoryQuery>,
625) -> impl IntoResponse {
626 let hours = q.hours.unwrap_or(24).min(168);
627 let now = SystemTime::now()
628 .duration_since(UNIX_EPOCH)
629 .map(|d| d.as_secs())
630 .unwrap_or(0);
631 let cutoff = now.saturating_sub(hours * 3600);
632 let path = relay.state_dir.join("stats-history.jsonl");
633 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
634 let entries: Vec<Value> = body
635 .lines()
636 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
637 .filter(|v| {
638 v.get("ts")
639 .and_then(Value::as_u64)
640 .map(|t| t >= cutoff)
641 .unwrap_or(false)
642 })
643 .collect();
644 (
645 StatusCode::OK,
646 Json(json!({
647 "hours": hours,
648 "now_unix": now,
649 "count": entries.len(),
650 "entries": entries,
651 })),
652 )
653}
654
655async fn landing_stats_html() -> impl IntoResponse {
656 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
657 (
658 StatusCode::OK,
659 [
660 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
661 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
662 ],
663 STATS_HTML,
664 )
665}
666
667async fn landing_phonebook_html() -> impl IntoResponse {
668 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
669 (
670 StatusCode::OK,
671 [
672 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
673 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
674 ],
675 PHONEBOOK_HTML,
676 )
677}
678
679async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
684 let wants_html = headers
685 .get(axum::http::header::ACCEPT)
686 .and_then(|v| v.to_str().ok())
687 .unwrap_or("")
688 .contains("text/html");
689 if wants_html {
690 landing_stats_html().await.into_response()
691 } else {
692 stats_json(State(relay)).await.into_response()
693 }
694}
695
696async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
697 let now = SystemTime::now()
698 .duration_since(UNIX_EPOCH)
699 .map(|d| d.as_secs())
700 .unwrap_or(0);
701 let inner = relay.inner.lock().await;
702 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
703 let body = json!({
704 "version": env!("CARGO_PKG_VERSION"),
705 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
706 "handles_active": inner.handles.len(),
707 "slots_active": inner.slots.len(),
708 "pair_slots_open": inner.pair_slots.len(),
709 "streams_active": streams_active,
710 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
711 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
712 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
713 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
714 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
715 });
716 (StatusCode::OK, Json(body))
717}
718
719async fn landing_index() -> impl IntoResponse {
723 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
724 (
725 StatusCode::OK,
726 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
727 INDEX_HTML,
728 )
729}
730
731async fn landing_favicon() -> impl IntoResponse {
732 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
733 (
734 StatusCode::OK,
735 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
736 FAVICON_SVG,
737 )
738}
739
740async fn landing_og() -> impl IntoResponse {
741 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
742 (
743 StatusCode::OK,
744 [
745 (axum::http::header::CONTENT_TYPE, "image/png"),
746 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
747 ],
748 OG_PNG,
749 )
750}
751
752async fn landing_install_sh() -> impl IntoResponse {
753 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
754 (
755 StatusCode::OK,
756 [
757 (
758 axum::http::header::CONTENT_TYPE,
759 "text/x-shellscript; charset=utf-8",
760 ),
761 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
762 ],
763 INSTALL_SH,
764 )
765}
766
767async fn landing_openshell_policy_sh() -> impl IntoResponse {
768 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
769 (
770 StatusCode::OK,
771 [
772 (
773 axum::http::header::CONTENT_TYPE,
774 "text/x-shellscript; charset=utf-8",
775 ),
776 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
777 ],
778 POLICY_SH,
779 )
780}
781
782async fn allocate_slot(
783 State(relay): State<Relay>,
784 Json(_req): Json<AllocateRequest>,
785) -> impl IntoResponse {
786 let slot_id = random_hex(16);
787 let slot_token = random_hex(32);
788 {
789 let mut inner = relay.inner.lock().await;
790 inner.slots.insert(slot_id.clone(), Vec::new());
791 inner.tokens.insert(slot_id.clone(), slot_token.clone());
792 }
793 if let Err(e) = relay.persist_tokens().await {
794 return (
795 StatusCode::INTERNAL_SERVER_ERROR,
796 Json(json!({"error": format!("persist failed: {e}")})),
797 )
798 .into_response();
799 }
800 relay
801 .counters
802 .slot_allocations_total
803 .fetch_add(1, Ordering::Relaxed);
804 (
805 StatusCode::CREATED,
806 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
807 )
808 .into_response()
809}
810
811async fn post_event(
812 State(relay): State<Relay>,
813 Path(slot_id): Path<String>,
814 headers: HeaderMap,
815 Json(req): Json<PostEventRequest>,
816) -> impl IntoResponse {
817 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
818 return resp;
819 }
820 let body_bytes = match serde_json::to_vec(&req.event) {
822 Ok(b) => b,
823 Err(e) => {
824 return (
825 StatusCode::BAD_REQUEST,
826 Json(json!({"error": format!("event not serializable: {e}")})),
827 )
828 .into_response();
829 }
830 };
831 if body_bytes.len() > MAX_EVENT_BYTES {
832 return (
833 StatusCode::PAYLOAD_TOO_LARGE,
834 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
835 )
836 .into_response();
837 }
838 {
840 let inner = relay.inner.lock().await;
841 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
842 if used + body_bytes.len() > MAX_SLOT_BYTES {
843 return (
844 StatusCode::PAYLOAD_TOO_LARGE,
845 Json(json!({
846 "error": "slot quota exceeded",
847 "slot_bytes_used": used,
848 "slot_bytes_max": MAX_SLOT_BYTES,
849 "remediation": "operator should `wire rotate-slot` to drain old slot",
850 })),
851 )
852 .into_response();
853 }
854 }
855 let event_id = req
856 .event
857 .get("event_id")
858 .and_then(Value::as_str)
859 .map(str::to_string);
860
861 let dup = {
863 let inner = relay.inner.lock().await;
864 let slot = inner.slots.get(&slot_id);
865 if let (Some(eid), Some(slot)) = (&event_id, slot) {
866 slot.iter()
867 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
868 } else {
869 false
870 }
871 };
872 if dup {
873 return (
874 StatusCode::OK,
875 Json(json!({"event_id": event_id, "status": "duplicate"})),
876 )
877 .into_response();
878 }
879
880 {
881 let mut inner = relay.inner.lock().await;
882 let event_size = body_bytes.len();
883 let slot = inner.slots.entry(slot_id.clone()).or_default();
884 slot.push(req.event.clone());
885 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
886 }
887 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": format!("persist failed: {e}")})),
891 )
892 .into_response();
893 }
894 relay
895 .counters
896 .events_posted_total
897 .fetch_add(1, Ordering::Relaxed);
898 {
903 let mut inner = relay.inner.lock().await;
904 if let Some(subs) = inner.streams.get_mut(&slot_id) {
905 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
906 }
907 }
908 (
909 StatusCode::CREATED,
910 Json(json!({"event_id": event_id, "status": "stored"})),
911 )
912 .into_response()
913}
914
915async fn stream_events(
928 State(relay): State<Relay>,
929 Path(slot_id): Path<String>,
930 headers: HeaderMap,
931) -> axum::response::Response {
932 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
933 use futures::stream::StreamExt;
934
935 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
936 return resp;
937 }
938
939 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
940 {
941 let mut inner = relay.inner.lock().await;
942 inner.streams.entry(slot_id.clone()).or_default().push(tx);
943 }
944
945 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
946 SseEvent::default()
947 .json_data(&ev)
948 .map_err(|e| std::io::Error::other(e.to_string()))
949 });
950
951 Sse::new(stream)
952 .keep_alive(
953 KeepAlive::new()
954 .interval(std::time::Duration::from_secs(30))
955 .text("phyllis: still on the line"),
956 )
957 .into_response()
958}
959
960#[derive(Deserialize)]
963pub struct PairOpenRequest {
964 pub code_hash: String,
965 pub msg: String,
967 pub role: String, }
969
970#[derive(Deserialize)]
971pub struct PairBootstrapRequest {
972 pub role: String,
973 pub sealed: String,
974}
975
976#[derive(Deserialize)]
977pub struct PairAbandonRequest {
978 pub code_hash: String,
981}
982
983async fn pair_abandon(
989 State(relay): State<Relay>,
990 Json(req): Json<PairAbandonRequest>,
991) -> impl IntoResponse {
992 let mut inner = relay.inner.lock().await;
993 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
994 inner.pair_slots.remove(&pair_id);
995 }
996 StatusCode::NO_CONTENT.into_response()
997}
998
999async fn pair_open(
1000 State(relay): State<Relay>,
1001 Json(req): Json<PairOpenRequest>,
1002) -> impl IntoResponse {
1003 if req.role != "host" && req.role != "guest" {
1004 return (
1005 StatusCode::BAD_REQUEST,
1006 Json(json!({"error": "role must be 'host' or 'guest'"})),
1007 )
1008 .into_response();
1009 }
1010 relay.evict_expired_pair_slots().await;
1011 let mut inner = relay.inner.lock().await;
1012 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1013 Some(id) => id,
1014 None => {
1015 let new_id = random_hex(16);
1016 inner
1017 .pair_lookup
1018 .insert(req.code_hash.clone(), new_id.clone());
1019 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1020 relay
1021 .counters
1022 .pair_opens_total
1023 .fetch_add(1, Ordering::Relaxed);
1024 new_id
1025 }
1026 };
1027 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1028 slot.last_touched = std::time::Instant::now();
1029 if req.role == "host" {
1030 if slot.host_msg.is_some() {
1031 return (
1032 StatusCode::CONFLICT,
1033 Json(json!({"error": "host already registered for this code"})),
1034 )
1035 .into_response();
1036 }
1037 slot.host_msg = Some(req.msg);
1038 } else {
1039 if slot.guest_msg.is_some() {
1040 return (
1041 StatusCode::CONFLICT,
1042 Json(json!({"error": "guest already registered for this code"})),
1043 )
1044 .into_response();
1045 }
1046 slot.guest_msg = Some(req.msg);
1047 }
1048 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1049}
1050
1051#[derive(Deserialize)]
1052pub struct PairGetQuery {
1053 pub as_role: String,
1055}
1056
1057async fn pair_get(
1058 State(relay): State<Relay>,
1059 Path(pair_id): Path<String>,
1060 Query(q): Query<PairGetQuery>,
1061) -> impl IntoResponse {
1062 relay.evict_expired_pair_slots().await;
1063 let mut inner = relay.inner.lock().await;
1064 let slot = match inner.pair_slots.get_mut(&pair_id) {
1065 Some(s) => {
1066 s.last_touched = std::time::Instant::now();
1067 s.clone()
1068 }
1069 None => {
1070 return (
1071 StatusCode::NOT_FOUND,
1072 Json(json!({"error": "unknown pair_id"})),
1073 )
1074 .into_response();
1075 }
1076 };
1077 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1078 "host" => (slot.guest_msg, slot.guest_bootstrap),
1079 "guest" => (slot.host_msg, slot.host_bootstrap),
1080 _ => {
1081 return (
1082 StatusCode::BAD_REQUEST,
1083 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1084 )
1085 .into_response();
1086 }
1087 };
1088 (
1089 StatusCode::OK,
1090 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1091 )
1092 .into_response()
1093}
1094
1095async fn pair_bootstrap(
1096 State(relay): State<Relay>,
1097 Path(pair_id): Path<String>,
1098 Json(req): Json<PairBootstrapRequest>,
1099) -> impl IntoResponse {
1100 relay.evict_expired_pair_slots().await;
1101 let mut inner = relay.inner.lock().await;
1102 let slot = match inner.pair_slots.get_mut(&pair_id) {
1103 Some(s) => s,
1104 None => {
1105 return (
1106 StatusCode::NOT_FOUND,
1107 Json(json!({"error": "unknown pair_id"})),
1108 )
1109 .into_response();
1110 }
1111 };
1112 slot.last_touched = std::time::Instant::now();
1113 match req.role.as_str() {
1114 "host" => slot.host_bootstrap = Some(req.sealed),
1115 "guest" => slot.guest_bootstrap = Some(req.sealed),
1116 _ => {
1117 return (
1118 StatusCode::BAD_REQUEST,
1119 Json(json!({"error": "role must be 'host' or 'guest'"})),
1120 )
1121 .into_response();
1122 }
1123 }
1124 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1125}
1126
1127#[derive(Deserialize)]
1130pub struct HandleClaimRequest {
1131 pub nick: String,
1134 pub slot_id: String,
1136 pub relay_url: Option<String>,
1140 pub card: Value,
1143 #[serde(default, skip_serializing_if = "Option::is_none")]
1147 pub discoverable: Option<bool>,
1148}
1149
1150async fn handle_claim(
1157 State(relay): State<Relay>,
1158 headers: HeaderMap,
1159 Json(req): Json<HandleClaimRequest>,
1160) -> impl IntoResponse {
1161 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1164 return resp;
1165 }
1166 if !crate::pair_profile::is_valid_nick(&req.nick) {
1168 return (
1169 StatusCode::BAD_REQUEST,
1170 Json(json!({
1171 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1172 "nick": req.nick,
1173 })),
1174 )
1175 .into_response();
1176 }
1177 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1179 return (
1180 StatusCode::BAD_REQUEST,
1181 Json(json!({"error": format!("card signature invalid: {e}")})),
1182 )
1183 .into_response();
1184 }
1185 let did = match req.card.get("did").and_then(Value::as_str) {
1186 Some(d) => d.to_string(),
1187 None => {
1188 return (
1189 StatusCode::BAD_REQUEST,
1190 Json(json!({"error": "card missing 'did' field"})),
1191 )
1192 .into_response();
1193 }
1194 };
1195
1196 let prior_record: Option<HandleRecord>;
1202 let first_claim = {
1203 let inner = relay.inner.lock().await;
1204 match inner.handles.get(&req.nick) {
1205 Some(existing) if existing.did != did => {
1206 return (
1207 StatusCode::CONFLICT,
1208 Json(json!({
1209 "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1210 "nick": req.nick,
1211 "claimed_by": existing.did,
1212 })),
1213 )
1214 .into_response();
1215 }
1216 Some(prev) => {
1217 prior_record = Some(prev.clone());
1218 false
1219 }
1220 None => {
1221 prior_record = None;
1222 true
1223 }
1224 }
1225 };
1226
1227 let now = time::OffsetDateTime::now_utc()
1232 .replace_nanosecond(0)
1233 .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1234 .format(&time::format_description::well_known::Rfc3339)
1235 .unwrap_or_default();
1236 let discoverable = match (req.discoverable, &prior_record) {
1242 (Some(d), _) => Some(d),
1243 (None, Some(prev)) => prev.discoverable,
1244 (None, None) => Some(true),
1245 };
1246 let record = HandleRecord {
1247 nick: req.nick.clone(),
1248 did: did.clone(),
1249 card: req.card.clone(),
1250 slot_id: req.slot_id.clone(),
1251 relay_url: req.relay_url.clone(),
1252 claimed_at: now,
1253 discoverable,
1254 };
1255
1256 let path = relay
1258 .state_dir
1259 .join("handles")
1260 .join(format!("{}.json", req.nick));
1261 let body = match serde_json::to_vec_pretty(&record) {
1262 Ok(b) => b,
1263 Err(e) => {
1264 return (
1265 StatusCode::INTERNAL_SERVER_ERROR,
1266 Json(json!({"error": format!("serialize failed: {e}")})),
1267 )
1268 .into_response();
1269 }
1270 };
1271 if let Err(e) = tokio::fs::write(&path, &body).await {
1272 return (
1273 StatusCode::INTERNAL_SERVER_ERROR,
1274 Json(json!({"error": format!("persist failed: {e}")})),
1275 )
1276 .into_response();
1277 }
1278 {
1279 let mut inner = relay.inner.lock().await;
1280 inner.handles.insert(req.nick.clone(), record);
1281 }
1282 relay
1283 .counters
1284 .handle_claims_total
1285 .fetch_add(1, Ordering::Relaxed);
1286 if first_claim {
1287 relay
1288 .counters
1289 .handle_first_claims_total
1290 .fetch_add(1, Ordering::Relaxed);
1291 }
1292 (
1293 StatusCode::CREATED,
1294 Json(json!({
1295 "nick": req.nick,
1296 "did": did,
1297 "status": if first_claim { "claimed" } else { "re-claimed" },
1298 })),
1299 )
1300 .into_response()
1301}
1302
1303#[derive(Deserialize)]
1304pub struct WellKnownAgentQuery {
1305 pub handle: String,
1306}
1307
1308#[derive(Deserialize)]
1309pub struct HandlesDirectoryQuery {
1310 pub cursor: Option<String>,
1311 pub limit: Option<usize>,
1312 pub vibe: Option<String>,
1313}
1314
1315#[derive(Deserialize)]
1325pub struct InviteRegisterRequest {
1326 pub invite_url: String,
1328 #[serde(default)]
1330 pub ttl_seconds: Option<u64>,
1331 #[serde(default)]
1334 pub uses: Option<u32>,
1335}
1336
1337impl Relay {
1338 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1340 use tokio::io::AsyncWriteExt;
1341 let mut line = serde_json::to_vec(rec)?;
1342 line.push(b'\n');
1343 let path = self.state_dir.join("invites.jsonl");
1344 let mut f = tokio::fs::OpenOptions::new()
1345 .create(true)
1346 .append(true)
1347 .open(&path)
1348 .await?;
1349 f.write_all(&line).await?;
1350 f.flush().await?;
1351 Ok(())
1352 }
1353}
1354
1355async fn invite_register(
1356 State(relay): State<Relay>,
1357 Json(req): Json<InviteRegisterRequest>,
1358) -> impl IntoResponse {
1359 if req.invite_url.is_empty() {
1360 return (
1361 StatusCode::BAD_REQUEST,
1362 Json(json!({"error": "invite_url required"})),
1363 )
1364 .into_response();
1365 }
1366 if req.invite_url.len() > 8_192 {
1368 return (
1369 StatusCode::PAYLOAD_TOO_LARGE,
1370 Json(json!({"error": "invite_url > 8 KiB"})),
1371 )
1372 .into_response();
1373 }
1374 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1375 let now = SystemTime::now()
1376 .duration_since(UNIX_EPOCH)
1377 .map(|d| d.as_secs())
1378 .unwrap_or(0);
1379 let token = random_hex(3);
1382 let rec = InviteRecord {
1383 token: token.clone(),
1384 invite_url: req.invite_url,
1385 expires_unix: now + ttl,
1386 uses_remaining: req.uses,
1387 created_unix: now,
1388 };
1389 {
1390 let mut inner = relay.inner.lock().await;
1391 if inner.invites.contains_key(&token) {
1392 return (
1393 StatusCode::CONFLICT,
1394 Json(json!({"error": "token collision, retry"})),
1395 )
1396 .into_response();
1397 }
1398 inner.invites.insert(token.clone(), rec.clone());
1399 }
1400 if let Err(e) = relay.persist_invite(&rec).await {
1401 return (
1402 StatusCode::INTERNAL_SERVER_ERROR,
1403 Json(json!({"error": format!("persist failed: {e}")})),
1404 )
1405 .into_response();
1406 }
1407 (
1408 StatusCode::CREATED,
1409 Json(json!({
1410 "token": token,
1411 "path": format!("/i/{token}"),
1412 "expires_unix": rec.expires_unix,
1413 "uses_remaining": rec.uses_remaining,
1414 })),
1415 )
1416 .into_response()
1417}
1418
1419#[derive(Deserialize)]
1420pub struct InviteScriptQuery {
1421 pub format: Option<String>,
1428}
1429
1430async fn invite_script(
1431 State(relay): State<Relay>,
1432 Path(token): Path<String>,
1433 Query(q): Query<InviteScriptQuery>,
1434) -> impl IntoResponse {
1435 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1438 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1439 }
1440 let want_raw_url = q.format.as_deref() == Some("url");
1441 let now = SystemTime::now()
1442 .duration_since(UNIX_EPOCH)
1443 .map(|d| d.as_secs())
1444 .unwrap_or(0);
1445 let invite_url = {
1446 let mut inner = relay.inner.lock().await;
1447 let Some(rec) = inner.invites.get_mut(&token) else {
1448 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1449 };
1450 if rec.expires_unix <= now {
1451 return (StatusCode::GONE, "this invite has expired\n").into_response();
1452 }
1453 if let Some(n) = rec.uses_remaining {
1454 if n == 0 {
1455 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1456 }
1457 if !want_raw_url {
1461 rec.uses_remaining = Some(n - 1);
1462 }
1463 }
1464 rec.invite_url.clone()
1465 };
1466 if want_raw_url {
1467 return (
1468 StatusCode::OK,
1469 [
1470 (
1471 axum::http::header::CONTENT_TYPE,
1472 "text/plain; charset=utf-8",
1473 ),
1474 (
1475 axum::http::header::CACHE_CONTROL,
1476 "private, no-store, max-age=0",
1477 ),
1478 ],
1479 invite_url,
1480 )
1481 .into_response();
1482 }
1483 let escaped = invite_url.replace('\'', "'\\''");
1484 let script = format!(
1485 "#!/bin/sh\n\
1486 # wire — one-curl onboarding (install + pair in one shot)\n\
1487 # source: https://github.com/SlanchaAi/wire\n\
1488 set -eu\n\
1489 INVITE='{escaped}'\n\
1490 echo \"\u{2192} checking for wire CLI...\"\n\
1491 if ! command -v wire >/dev/null 2>&1; then\n \
1492 echo \"\u{2192} wire not installed; installing first...\"\n \
1493 curl -fsSL https://wireup.net/install.sh | sh\n \
1494 case \":$PATH:\" in\n \
1495 *:\"$HOME/.local/bin\":*) ;;\n \
1496 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1497 esac\n \
1498 if ! command -v wire >/dev/null 2>&1; then\n \
1499 echo \"\"\n \
1500 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1501 echo \"Open a new shell, then run:\"\n \
1502 echo \" wire accept '$INVITE'\"\n \
1503 exit 0\n \
1504 fi\n\
1505 fi\n\
1506 echo \"\u{2192} accepting invite...\"\n\
1507 wire accept \"$INVITE\"\n"
1508 );
1509 (
1510 StatusCode::OK,
1511 [
1512 (
1513 axum::http::header::CONTENT_TYPE,
1514 "text/x-shellscript; charset=utf-8",
1515 ),
1516 (
1517 axum::http::header::CACHE_CONTROL,
1518 "private, no-store, max-age=0",
1519 ),
1520 ],
1521 script,
1522 )
1523 .into_response()
1524}
1525
1526async fn handles_directory(
1527 State(relay): State<Relay>,
1528 Query(q): Query<HandlesDirectoryQuery>,
1529) -> impl IntoResponse {
1530 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1531 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1532 let inner = relay.inner.lock().await;
1533 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1534 drop(inner);
1535 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1536
1537 let cursor = q.cursor.as_deref();
1538 let mut eligible = Vec::new();
1539 for rec in records {
1540 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1541 continue;
1542 }
1543 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1548 continue;
1549 }
1550 if !rec.is_discoverable() {
1554 continue;
1555 }
1556 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1557 if profile
1558 .get("listed")
1559 .and_then(Value::as_bool)
1560 .is_some_and(|listed| !listed)
1561 {
1562 continue;
1563 }
1564 if let Some(want) = &vibe_filter {
1565 let matched = profile
1566 .get("vibe")
1567 .and_then(Value::as_array)
1568 .map(|arr| {
1569 arr.iter().any(|v| {
1570 v.as_str()
1571 .map(|s| s.eq_ignore_ascii_case(want))
1572 .unwrap_or(false)
1573 })
1574 })
1575 .unwrap_or(false);
1576 if !matched {
1577 continue;
1578 }
1579 }
1580 eligible.push((rec, profile));
1581 }
1582
1583 let has_more = eligible.len() > limit;
1584 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1585 let next_cursor = if has_more {
1586 page.last().map(|(rec, _)| rec.nick.clone())
1587 } else {
1588 None
1589 };
1590 let handles: Vec<Value> = page
1591 .into_iter()
1592 .map(|(rec, profile)| {
1593 let emoji = profile
1599 .get("emoji")
1600 .and_then(Value::as_str)
1601 .filter(|s| !s.is_empty())
1602 .map(str::to_string)
1603 .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1604 json!({
1605 "nick": rec.nick,
1606 "did": rec.did,
1607 "profile": {
1608 "emoji": emoji,
1609 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1610 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1611 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1612 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1613 },
1614 "claimed_at": rec.claimed_at,
1615 })
1616 })
1617 .collect();
1618 (
1619 StatusCode::OK,
1620 Json(json!({
1621 "handles": handles,
1622 "next_cursor": next_cursor,
1623 })),
1624 )
1625 .into_response()
1626}
1627
1628async fn handle_intro(
1642 State(relay): State<Relay>,
1643 Path(nick): Path<String>,
1644 Json(req): Json<PostEventRequest>,
1645) -> impl IntoResponse {
1646 let slot_id = {
1648 let inner = relay.inner.lock().await;
1649 match inner.handles.get(&nick) {
1650 Some(rec) => rec.slot_id.clone(),
1651 None => {
1652 return (
1653 StatusCode::NOT_FOUND,
1654 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1655 )
1656 .into_response();
1657 }
1658 }
1659 };
1660
1661 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1664 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1665 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1666 return (
1667 StatusCode::BAD_REQUEST,
1668 Json(json!({
1669 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1670 "got_kind": kind,
1671 "got_type": type_str,
1672 })),
1673 )
1674 .into_response();
1675 }
1676
1677 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1679 Some(c) => c.clone(),
1680 None => {
1681 return (
1682 StatusCode::BAD_REQUEST,
1683 Json(json!({"error": "intro event body must embed 'card' field"})),
1684 )
1685 .into_response();
1686 }
1687 };
1688 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1689 return (
1690 StatusCode::BAD_REQUEST,
1691 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1692 )
1693 .into_response();
1694 }
1695
1696 let body_bytes = match serde_json::to_vec(&req.event) {
1698 Ok(b) => b,
1699 Err(e) => {
1700 return (
1701 StatusCode::BAD_REQUEST,
1702 Json(json!({"error": format!("event not serializable: {e}")})),
1703 )
1704 .into_response();
1705 }
1706 };
1707 if body_bytes.len() > MAX_EVENT_BYTES {
1708 return (
1709 StatusCode::PAYLOAD_TOO_LARGE,
1710 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1711 )
1712 .into_response();
1713 }
1714 {
1715 let inner = relay.inner.lock().await;
1716 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1717 if used + body_bytes.len() > MAX_SLOT_BYTES {
1718 return (
1719 StatusCode::PAYLOAD_TOO_LARGE,
1720 Json(json!({
1721 "error": "target slot quota exceeded",
1722 "slot_bytes_used": used,
1723 "slot_bytes_max": MAX_SLOT_BYTES,
1724 })),
1725 )
1726 .into_response();
1727 }
1728 }
1729
1730 let event_id = req
1731 .event
1732 .get("event_id")
1733 .and_then(Value::as_str)
1734 .map(str::to_string);
1735
1736 let dup = {
1738 let inner = relay.inner.lock().await;
1739 let slot = inner.slots.get(&slot_id);
1740 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1741 slot.iter()
1742 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1743 } else {
1744 false
1745 }
1746 };
1747 if dup {
1748 return (
1749 StatusCode::OK,
1750 Json(json!({"event_id": event_id, "status": "duplicate"})),
1751 )
1752 .into_response();
1753 }
1754
1755 {
1756 let mut inner = relay.inner.lock().await;
1757 let event_size = body_bytes.len();
1758 let slot = inner.slots.entry(slot_id.clone()).or_default();
1759 slot.push(req.event.clone());
1760 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1761 }
1762 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1763 return (
1764 StatusCode::INTERNAL_SERVER_ERROR,
1765 Json(json!({"error": format!("persist failed: {e}")})),
1766 )
1767 .into_response();
1768 }
1769 (
1770 StatusCode::CREATED,
1771 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1772 )
1773 .into_response()
1774}
1775
1776async fn well_known_agent_card_a2a(
1792 State(relay): State<Relay>,
1793 Query(q): Query<WellKnownAgentQuery>,
1794) -> impl IntoResponse {
1795 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1796 if nick.is_empty() {
1797 return (
1798 StatusCode::BAD_REQUEST,
1799 Json(json!({"error": "handle missing nick"})),
1800 )
1801 .into_response();
1802 }
1803 let inner = relay.inner.lock().await;
1804 let rec = match inner.handles.get(&nick) {
1805 Some(r) => r.clone(),
1806 None => {
1807 return (
1808 StatusCode::NOT_FOUND,
1809 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1810 )
1811 .into_response();
1812 }
1813 };
1814 drop(inner);
1815
1816 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1817 let description = profile
1818 .get("motto")
1819 .and_then(Value::as_str)
1820 .unwrap_or("")
1821 .to_string();
1822 let display_name = profile
1823 .get("display_name")
1824 .and_then(Value::as_str)
1825 .unwrap_or(&rec.nick)
1826 .to_string();
1827 let relay_url = rec.relay_url.clone().unwrap_or_default();
1828 let endpoint = if !relay_url.is_empty() {
1830 format!(
1831 "{}/v1/handle/intro/{}",
1832 relay_url.trim_end_matches('/'),
1833 rec.nick
1834 )
1835 } else {
1836 format!("/v1/handle/intro/{}", rec.nick)
1837 };
1838 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1839
1840 let a2a_card = json!({
1844 "id": rec.did,
1845 "name": display_name,
1846 "description": description,
1847 "version": "wire/0.5",
1848 "endpoint": endpoint,
1849 "provider": {
1850 "name": "wire",
1851 "url": "https://github.com/SlanchaAi/wire"
1852 },
1853 "capabilities": {
1854 "streaming": false,
1855 "pushNotifications": false,
1856 "extendedAgentCard": true
1857 },
1858 "securitySchemes": {
1859 "ed25519-event-sig": {
1860 "type": "signature",
1861 "alg": "EdDSA",
1862 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1863 }
1864 },
1865 "security": [{"ed25519-event-sig": []}],
1866 "skills": [],
1867 "extensions": [{
1868 "uri": "https://slancha.ai/wire/ext/v0.5",
1872 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1873 "required": false,
1874 "params": {
1875 "did": rec.did,
1876 "handle": rec.nick,
1877 "slot_id": rec.slot_id,
1878 "relay_url": rec.relay_url,
1879 "card": rec.card,
1880 "profile": profile,
1881 "claimed_at": rec.claimed_at,
1882 }
1883 }],
1884 "signature": card_sig,
1885 });
1886 (StatusCode::OK, Json(a2a_card)).into_response()
1887}
1888
1889async fn well_known_agent(
1890 State(relay): State<Relay>,
1891 Query(q): Query<WellKnownAgentQuery>,
1892) -> impl IntoResponse {
1893 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1894 if nick.is_empty() {
1895 return (
1896 StatusCode::BAD_REQUEST,
1897 Json(json!({"error": "handle missing nick"})),
1898 )
1899 .into_response();
1900 }
1901 let inner = relay.inner.lock().await;
1902 match inner.handles.get(&nick) {
1903 Some(rec) => (
1904 StatusCode::OK,
1905 Json(json!({
1906 "nick": rec.nick,
1907 "did": rec.did,
1908 "card": rec.card,
1909 "slot_id": rec.slot_id,
1910 "relay_url": rec.relay_url,
1911 "claimed_at": rec.claimed_at,
1912 })),
1913 )
1914 .into_response(),
1915 None => (
1916 StatusCode::NOT_FOUND,
1917 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1918 )
1919 .into_response(),
1920 }
1921}
1922
1923async fn list_events(
1924 State(relay): State<Relay>,
1925 Path(slot_id): Path<String>,
1926 Query(q): Query<ListEventsQuery>,
1927 headers: HeaderMap,
1928) -> impl IntoResponse {
1929 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1930 return resp;
1931 }
1932 let limit = q.limit.unwrap_or(100).min(1000);
1933 let mut inner = relay.inner.lock().await;
1934 let now_unix = std::time::SystemTime::now()
1938 .duration_since(std::time::UNIX_EPOCH)
1939 .map(|d| d.as_secs())
1940 .unwrap_or(0);
1941 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1942 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1943 let start = match q.since {
1944 Some(eid) => events
1945 .iter()
1946 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1947 .map(|i| i + 1)
1948 .unwrap_or(0),
1949 None => 0,
1950 };
1951 let end = (start + limit).min(events.len());
1952 let slice = events[start..end].to_vec();
1953 (StatusCode::OK, Json(slice)).into_response()
1954}
1955
1956async fn slot_state(
1962 State(relay): State<Relay>,
1963 Path(slot_id): Path<String>,
1964 headers: HeaderMap,
1965) -> impl IntoResponse {
1966 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1967 return resp;
1968 }
1969 let inner = relay.inner.lock().await;
1970 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1971 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1972 let responder_health = inner.responder_health.get(&slot_id).cloned();
1973 (
1974 StatusCode::OK,
1975 Json(json!({
1976 "slot_id": slot_id,
1977 "event_count": event_count,
1978 "last_pull_at_unix": last_pull_at_unix,
1979 "responder_health": responder_health,
1980 })),
1981 )
1982 .into_response()
1983}
1984
1985async fn responder_health_set(
1986 State(relay): State<Relay>,
1987 Path(slot_id): Path<String>,
1988 headers: HeaderMap,
1989 Json(record): Json<ResponderHealthRecord>,
1990) -> impl IntoResponse {
1991 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1992 return resp;
1993 }
1994 let path = relay
1995 .state_dir
1996 .join("responder-health")
1997 .join(format!("{slot_id}.json"));
1998 let body = match serde_json::to_vec_pretty(&record) {
1999 Ok(b) => b,
2000 Err(e) => {
2001 return (
2002 StatusCode::INTERNAL_SERVER_ERROR,
2003 Json(json!({"error": format!("serialize failed: {e}")})),
2004 )
2005 .into_response();
2006 }
2007 };
2008 if let Err(e) = tokio::fs::write(&path, body).await {
2009 return (
2010 StatusCode::INTERNAL_SERVER_ERROR,
2011 Json(json!({"error": format!("persist failed: {e}")})),
2012 )
2013 .into_response();
2014 }
2015 {
2016 let mut inner = relay.inner.lock().await;
2017 inner
2018 .responder_health
2019 .insert(slot_id.clone(), record.clone());
2020 }
2021 (StatusCode::OK, Json(record)).into_response()
2022}
2023
2024async fn check_token(
2025 relay: &Relay,
2026 headers: &HeaderMap,
2027 slot_id: &str,
2028) -> std::result::Result<(), axum::response::Response> {
2029 let auth = headers
2030 .get(AUTHORIZATION)
2031 .and_then(|h| h.to_str().ok())
2032 .and_then(|s| s.strip_prefix("Bearer "))
2033 .map(str::to_string);
2034 let presented = match auth {
2035 Some(t) => t,
2036 None => {
2037 return Err((
2038 StatusCode::UNAUTHORIZED,
2039 Json(json!({"error": "missing Bearer token"})),
2040 )
2041 .into_response());
2042 }
2043 };
2044 let inner = relay.inner.lock().await;
2045 let expected = match inner.tokens.get(slot_id) {
2046 Some(t) => t.clone(),
2047 None => {
2048 return Err((
2049 StatusCode::NOT_FOUND,
2050 Json(json!({"error": "unknown slot"})),
2051 )
2052 .into_response());
2053 }
2054 };
2055 drop(inner);
2056 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2057 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2058 }
2059 Ok(())
2060}
2061
2062fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2063 if a.len() != b.len() {
2064 return false;
2065 }
2066 let mut acc = 0u8;
2067 for (x, y) in a.iter().zip(b.iter()) {
2068 acc |= x ^ y;
2069 }
2070 acc == 0
2071}
2072
2073fn is_valid_slot_id(s: &str) -> bool {
2074 s.len() == 32
2075 && s.bytes()
2076 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2077}
2078
2079fn random_hex(n_bytes: usize) -> String {
2080 let mut buf = vec![0u8; n_bytes];
2081 rand::thread_rng().fill_bytes(&mut buf);
2082 hex::encode(buf)
2083}
2084
2085pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2087 serve_with_mode(bind, state_dir, ServerMode::default()).await
2088}
2089
2090pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2094 let relay = Relay::new(state_dir).await?;
2095 relay.spawn_pair_sweeper();
2096 relay.spawn_counter_persister();
2097 let app = relay.clone().router_with_mode(mode);
2098 let listener = tokio::net::TcpListener::bind(bind)
2099 .await
2100 .with_context(|| format!("binding {bind}"))?;
2101 if mode.local_only {
2102 eprintln!(
2103 "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2104 );
2105 } else {
2106 eprintln!("wire relay-server listening on {bind}");
2107 }
2108 let shutdown_relay = relay.clone();
2109 axum::serve(listener, app)
2110 .with_graceful_shutdown(async move {
2111 let _ = tokio::signal::ctrl_c().await;
2112 eprintln!("\nshutting down — final counter snapshot");
2113 if let Err(e) = shutdown_relay.persist_counters().await {
2114 eprintln!("final counter persist failed: {e}");
2115 }
2116 })
2117 .await?;
2118 Ok(())
2119}
2120
2121#[cfg(unix)]
2135pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2136 use hyper::server::conn::http1;
2137 use hyper_util::rt::TokioIo;
2138 use tower_service::Service;
2139
2140 if socket_path.exists() {
2142 std::fs::remove_file(&socket_path)
2143 .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2144 }
2145 if let Some(parent) = socket_path.parent() {
2146 std::fs::create_dir_all(parent)
2147 .with_context(|| format!("creating socket parent {parent:?}"))?;
2148 }
2149 let relay = Relay::new(state_dir).await?;
2150 relay.spawn_pair_sweeper();
2151 relay.spawn_counter_persister();
2152 let app: axum::Router = relay
2153 .clone()
2154 .router_with_mode(ServerMode { local_only: true });
2155 let listener = tokio::net::UnixListener::bind(&socket_path)
2156 .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2157
2158 use std::os::unix::fs::PermissionsExt;
2161 if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2162 eprintln!(
2163 "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2164 socket may be accessible to other uids. Investigate."
2165 );
2166 }
2167 eprintln!(
2168 "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2169 socket_path.display()
2170 );
2171
2172 let shutdown_relay = relay.clone();
2175 let socket_path_for_cleanup = socket_path.clone();
2176 let mut make_service = app.into_make_service();
2177
2178 let serve_loop = async {
2179 loop {
2180 let (stream, _peer_addr) = match listener.accept().await {
2181 Ok(p) => p,
2182 Err(e) => {
2183 eprintln!("wire relay-server (UDS): accept failed: {e}");
2184 continue;
2185 }
2186 };
2187 let tower_service = match make_service.call(&stream).await {
2188 Ok(s) => s,
2189 Err(infallible) => match infallible {},
2190 };
2191 let io = TokioIo::new(stream);
2192 let hyper_service =
2193 hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2194 let mut svc = tower_service.clone();
2195 async move { Service::call(&mut svc, req).await }
2196 });
2197 tokio::task::spawn(async move {
2198 if let Err(e) = http1::Builder::new()
2199 .serve_connection(io, hyper_service)
2200 .await
2201 {
2202 if !e.is_incomplete_message() {
2204 eprintln!("wire relay-server (UDS): conn error: {e}");
2205 }
2206 }
2207 });
2208 }
2209 };
2210
2211 let shutdown = async {
2212 let _ = tokio::signal::ctrl_c().await;
2213 eprintln!("\nshutting down — final counter snapshot");
2214 if let Err(e) = shutdown_relay.persist_counters().await {
2215 eprintln!("final counter persist failed: {e}");
2216 }
2217 let _ = std::fs::remove_file(&socket_path_for_cleanup);
2218 };
2219
2220 tokio::select! {
2221 _ = serve_loop => {},
2222 _ = shutdown => {},
2223 };
2224 Ok(())
2225}
2226
2227#[cfg(not(unix))]
2228pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2229 Err(anyhow::anyhow!(
2230 "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2231 Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2232 ))
2233}
2234
2235#[derive(Debug, Clone, Copy, Default)]
2240pub struct ServerMode {
2241 pub local_only: bool,
2246}
2247
2248#[cfg(test)]
2249mod tests {
2250 use super::*;
2251
2252 #[test]
2253 fn constant_time_eq_basic() {
2254 assert!(constant_time_eq(b"abc", b"abc"));
2255 assert!(!constant_time_eq(b"abc", b"abd"));
2256 assert!(!constant_time_eq(b"abc", b"abcd")); }
2258
2259 #[test]
2260 fn random_hex_length() {
2261 let s = random_hex(16);
2262 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2264 }
2265
2266 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2267 async fn pair_slot_evicts_when_idle_past_ttl() {
2268 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2269 let _ = std::fs::remove_dir_all(&dir);
2270 let relay = Relay::new(dir.clone()).await.unwrap();
2271
2272 {
2274 let mut inner = relay.inner.lock().await;
2275 inner
2276 .pair_lookup
2277 .insert("hash-A".to_string(), "id-A".to_string());
2278 inner.pair_slots.insert(
2279 "id-A".to_string(),
2280 PairSlot {
2281 last_touched: std::time::Instant::now()
2282 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2283 ..PairSlot::default()
2284 },
2285 );
2286
2287 inner
2289 .pair_lookup
2290 .insert("hash-B".to_string(), "id-B".to_string());
2291 inner
2292 .pair_slots
2293 .insert("id-B".to_string(), PairSlot::default());
2294
2295 assert_eq!(inner.pair_slots.len(), 2);
2296 assert_eq!(inner.pair_lookup.len(), 2);
2297 }
2298
2299 relay.evict_expired_pair_slots().await;
2300
2301 let inner = relay.inner.lock().await;
2302 assert_eq!(
2303 inner.pair_slots.len(),
2304 1,
2305 "expired slot should have been evicted"
2306 );
2307 assert!(inner.pair_slots.contains_key("id-B"));
2308 assert_eq!(inner.pair_lookup.len(), 1);
2309 assert!(inner.pair_lookup.contains_key("hash-B"));
2310 let _ = std::fs::remove_dir_all(&dir);
2311 }
2312
2313 #[test]
2314 fn slot_id_validator_accepts_only_lowercase_32hex() {
2315 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2316 assert!(is_valid_slot_id(&random_hex(16)));
2317 assert!(!is_valid_slot_id("abc"));
2319 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2323 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2325 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2326 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2327 assert!(!is_valid_slot_id(
2329 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2330 ));
2331 }
2332}