1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
175 pub discoverable: Option<bool>,
176}
177
178impl HandleRecord {
179 fn is_discoverable(&self) -> bool {
182 self.discoverable.unwrap_or(true)
183 }
184}
185
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct ResponderHealthRecord {
188 pub status: String,
189 #[serde(default, skip_serializing_if = "Option::is_none")]
190 pub reason: Option<String>,
191 #[serde(default, skip_serializing_if = "Option::is_none")]
192 pub last_success_at: Option<String>,
193 pub set_at: String,
194}
195
196#[derive(Clone, Debug)]
197struct PairSlot {
198 host_msg: Option<String>,
200 guest_msg: Option<String>,
202 host_bootstrap: Option<String>,
204 guest_bootstrap: Option<String>,
206 last_touched: std::time::Instant,
208}
209
210impl Default for PairSlot {
211 fn default() -> Self {
212 Self {
213 host_msg: None,
214 guest_msg: None,
215 host_bootstrap: None,
216 guest_bootstrap: None,
217 last_touched: std::time::Instant::now(),
218 }
219 }
220}
221
222const PAIR_SLOT_TTL_SECS: u64 = 300;
225
226#[derive(Deserialize)]
227pub struct AllocateRequest {
228 #[serde(default)]
230 pub handle: Option<String>,
231}
232
233#[derive(Deserialize)]
234pub struct PostEventRequest {
235 pub event: Value,
236}
237
238#[derive(Deserialize)]
239pub struct ListEventsQuery {
240 pub since: Option<String>,
242 pub limit: Option<usize>,
244}
245
246impl Relay {
247 pub async fn new(state_dir: PathBuf) -> Result<Self> {
248 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
249 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
250 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
251 let mut inner = Inner {
252 slots: HashMap::new(),
253 tokens: HashMap::new(),
254 slot_bytes: HashMap::new(),
255 last_pull_at_unix: HashMap::new(),
256 streams: HashMap::new(),
257 pair_lookup: HashMap::new(),
258 pair_slots: HashMap::new(),
259 handles: HashMap::new(),
260 responder_health: HashMap::new(),
261 invites: HashMap::new(),
262 };
263 let token_path = state_dir.join("tokens.json");
265 if token_path.exists() {
266 let body = tokio::fs::read_to_string(&token_path).await?;
267 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
268 }
269 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
271 while let Some(entry) = slots_dir.next_entry().await? {
272 let path = entry.path();
273 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
274 continue;
275 }
276 let stem = match path.file_stem().and_then(|s| s.to_str()) {
277 Some(s) => s.to_string(),
278 None => continue,
279 };
280 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
281 let mut events = Vec::new();
282 for line in body.lines() {
283 if let Ok(v) = serde_json::from_str::<Value>(line) {
284 events.push(v);
285 }
286 }
287 let bytes: usize = events
289 .iter()
290 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
291 .sum();
292 inner.slot_bytes.insert(stem.clone(), bytes);
293 inner.slots.insert(stem, events);
294 }
295 let handles_dir = state_dir.join("handles");
297 if handles_dir.exists() {
298 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
299 while let Some(entry) = rd.next_entry().await? {
300 let path = entry.path();
301 if path.extension().and_then(|x| x.to_str()) != Some("json") {
302 continue;
303 }
304 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
305 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
306 inner.handles.insert(rec.nick.clone(), rec);
307 }
308 }
309 }
310 let responder_health_dir = state_dir.join("responder-health");
312 if responder_health_dir.exists() {
313 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
314 while let Some(entry) = rd.next_entry().await? {
315 let path = entry.path();
316 if path.extension().and_then(|x| x.to_str()) != Some("json") {
317 continue;
318 }
319 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
320 continue;
321 };
322 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
323 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
324 inner.responder_health.insert(slot_id.to_string(), rec);
325 }
326 }
327 }
328 let invites_path = state_dir.join("invites.jsonl");
332 if invites_path.exists() {
333 let now_unix = SystemTime::now()
334 .duration_since(UNIX_EPOCH)
335 .map(|d| d.as_secs())
336 .unwrap_or(0);
337 let body = tokio::fs::read_to_string(&invites_path)
338 .await
339 .unwrap_or_default();
340 for line in body.lines() {
341 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
342 && rec.expires_unix > now_unix
343 {
344 inner.invites.insert(rec.token.clone(), rec);
345 }
346 }
347 }
348 let boot_unix = SystemTime::now()
349 .duration_since(UNIX_EPOCH)
350 .map(|d| d.as_secs())
351 .unwrap_or(0);
352 let snap: CountersSnapshot =
354 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
355 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
356 Err(_) => CountersSnapshot::default(),
357 };
358 Ok(Self {
359 inner: Arc::new(Mutex::new(inner)),
360 state_dir,
361 counters: Arc::new(RelayCounters {
362 boot_unix,
363 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
364 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
365 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
366 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
367 events_posted_total: AtomicU64::new(snap.events_posted_total),
368 }),
369 })
370 }
371
372 pub fn router(self) -> Router {
373 self.router_with_mode(ServerMode::default())
374 }
375
376 pub fn router_with_mode(self, mode: ServerMode) -> Router {
377 let governor_conf = std::sync::Arc::new(
384 GovernorConfigBuilder::default()
385 .per_second(10)
386 .burst_size(50)
387 .key_extractor(GlobalKeyExtractor)
388 .finish()
389 .expect("valid governor config"),
390 );
391 let governor_layer = GovernorLayer {
392 config: governor_conf,
393 };
394
395 let hot_writes = Router::new()
397 .route("/v1/slot/allocate", post(allocate_slot))
398 .route("/v1/pair", post(pair_open))
399 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
400 .route("/v1/pair/abandon", post(pair_abandon))
401 .layer(governor_layer);
402
403 let mut router = Router::new()
406 .route("/healthz", get(healthz))
407 .route("/v1/events/:slot_id", post(post_event).get(list_events))
408 .route("/v1/slot/:slot_id/state", get(slot_state))
409 .route(
410 "/v1/slot/:slot_id/responder-health",
411 post(responder_health_set),
412 )
413 .route("/v1/events/:slot_id/stream", get(stream_events))
414 .route("/v1/pair/:pair_id", get(pair_get))
415 .route("/v1/handle/intro/:nick", post(handle_intro));
416
417 if !mode.local_only {
423 router = router
424 .route("/", get(landing_index))
425 .route("/favicon.svg", get(landing_favicon))
426 .route("/og.png", get(landing_og))
427 .route("/install", get(landing_install_sh))
428 .route("/install.sh", get(landing_install_sh))
429 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
430 .route("/stats", get(stats_root))
431 .route("/stats.json", get(stats_json))
432 .route("/stats.html", get(landing_stats_html))
433 .route("/stats.history", get(stats_history))
434 .route("/phonebook", get(landing_phonebook_html))
435 .route("/phonebook.html", get(landing_phonebook_html))
436 .route("/v1/handle/claim", post(handle_claim))
437 .route("/v1/handles", get(handles_directory))
438 .route("/v1/invite/register", post(invite_register))
439 .route("/i/:token", get(invite_script))
440 .route("/.well-known/wire/agent", get(well_known_agent))
441 .route(
442 "/.well-known/agent-card.json",
443 get(well_known_agent_card_a2a),
444 );
445 } else {
446 router = router.route("/v1/handle/claim", post(handle_claim));
452 }
453
454 router.merge(hot_writes).with_state(self)
455 }
456
457 async fn evict_expired_pair_slots(&self) {
461 let now = std::time::Instant::now();
462 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
463 let mut inner = self.inner.lock().await;
464 let mut to_remove = Vec::new();
465 for (id, slot) in inner.pair_slots.iter() {
466 if now.duration_since(slot.last_touched) > ttl {
467 to_remove.push(id.clone());
468 }
469 }
470 for id in to_remove {
471 inner.pair_slots.remove(&id);
472 inner.pair_lookup.retain(|_, v| v != &id);
473 }
474 }
475
476 pub fn spawn_pair_sweeper(&self) {
481 let me = self.clone();
482 tokio::spawn(async move {
483 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
484 loop {
485 tick.tick().await;
486 me.evict_expired_pair_slots().await;
487 }
488 });
489 }
490
491 pub async fn persist_counters(&self) -> Result<()> {
495 let snap = CountersSnapshot {
496 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
497 handle_first_claims_total: self
498 .counters
499 .handle_first_claims_total
500 .load(Ordering::Relaxed),
501 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
502 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
503 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
504 };
505 let body = serde_json::to_vec_pretty(&snap)?;
506 let path = self.state_dir.join("counters.json");
507 tokio::fs::write(path, body).await?;
508 Ok(())
509 }
510
511 pub async fn append_history(&self) -> Result<()> {
516 use tokio::io::AsyncWriteExt;
517 let now = SystemTime::now()
518 .duration_since(UNIX_EPOCH)
519 .map(|d| d.as_secs())
520 .unwrap_or(0);
521 let (handles_active, slots_active, pair_slots_open, streams_active) = {
522 let inner = self.inner.lock().await;
523 (
524 inner.handles.len(),
525 inner.slots.len(),
526 inner.pair_slots.len(),
527 inner.streams.values().map(Vec::len).sum::<usize>(),
528 )
529 };
530 let entry = HistoryEntry {
531 ts: now,
532 handles_active,
533 slots_active,
534 pair_slots_open,
535 streams_active,
536 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
537 handle_first_claims_total: self
538 .counters
539 .handle_first_claims_total
540 .load(Ordering::Relaxed),
541 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
542 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
543 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
544 };
545 let line = serde_json::to_vec(&entry)?;
546 let path = self.state_dir.join("stats-history.jsonl");
547 let mut f = tokio::fs::OpenOptions::new()
548 .create(true)
549 .append(true)
550 .open(&path)
551 .await?;
552 f.write_all(&line).await?;
553 f.write_all(b"\n").await?;
554 f.flush().await?;
555 Ok(())
556 }
557
558 pub fn spawn_counter_persister(&self) {
562 let me = self.clone();
563 tokio::spawn(async move {
564 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
565 tick.tick().await;
568 loop {
569 tick.tick().await;
570 if let Err(e) = me.persist_counters().await {
571 eprintln!("counter persist failed: {e}");
572 }
573 if let Err(e) = me.append_history().await {
574 eprintln!("history append failed: {e}");
575 }
576 }
577 });
578 }
579
580 async fn persist_tokens(&self) -> Result<()> {
581 let body = {
582 let inner = self.inner.lock().await;
583 serde_json::to_string_pretty(&inner.tokens)?
584 };
585 let path = self.state_dir.join("tokens.json");
586 tokio::fs::write(path, body).await?;
587 Ok(())
588 }
589
590 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
591 if !is_valid_slot_id(slot_id) {
597 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
598 }
599 let path = self
600 .state_dir
601 .join("slots")
602 .join(format!("{slot_id}.jsonl"));
603 let mut line = serde_json::to_vec(event)?;
604 line.push(b'\n');
605 use tokio::io::AsyncWriteExt;
606 let mut f = tokio::fs::OpenOptions::new()
607 .create(true)
608 .append(true)
609 .open(&path)
610 .await
611 .with_context(|| format!("opening {path:?}"))?;
612 f.write_all(&line).await?;
613 f.flush().await?;
614 Ok(())
615 }
616}
617
618async fn healthz() -> impl IntoResponse {
619 (StatusCode::OK, "ok\n")
620}
621
622async fn stats_history(
626 State(relay): State<Relay>,
627 Query(q): Query<StatsHistoryQuery>,
628) -> impl IntoResponse {
629 let hours = q.hours.unwrap_or(24).min(168);
630 let now = SystemTime::now()
631 .duration_since(UNIX_EPOCH)
632 .map(|d| d.as_secs())
633 .unwrap_or(0);
634 let cutoff = now.saturating_sub(hours * 3600);
635 let path = relay.state_dir.join("stats-history.jsonl");
636 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
637 let entries: Vec<Value> = body
638 .lines()
639 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
640 .filter(|v| {
641 v.get("ts")
642 .and_then(Value::as_u64)
643 .map(|t| t >= cutoff)
644 .unwrap_or(false)
645 })
646 .collect();
647 (
648 StatusCode::OK,
649 Json(json!({
650 "hours": hours,
651 "now_unix": now,
652 "count": entries.len(),
653 "entries": entries,
654 })),
655 )
656}
657
658async fn landing_stats_html() -> impl IntoResponse {
659 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
660 (
661 StatusCode::OK,
662 [
663 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
664 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
665 ],
666 STATS_HTML,
667 )
668}
669
670async fn landing_phonebook_html() -> impl IntoResponse {
671 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
672 (
673 StatusCode::OK,
674 [
675 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
676 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
677 ],
678 PHONEBOOK_HTML,
679 )
680}
681
682async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
687 let wants_html = headers
688 .get(axum::http::header::ACCEPT)
689 .and_then(|v| v.to_str().ok())
690 .unwrap_or("")
691 .contains("text/html");
692 if wants_html {
693 landing_stats_html().await.into_response()
694 } else {
695 stats_json(State(relay)).await.into_response()
696 }
697}
698
699async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
700 let now = SystemTime::now()
701 .duration_since(UNIX_EPOCH)
702 .map(|d| d.as_secs())
703 .unwrap_or(0);
704 let inner = relay.inner.lock().await;
705 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
706 let body = json!({
707 "version": env!("CARGO_PKG_VERSION"),
708 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
709 "handles_active": inner.handles.len(),
710 "slots_active": inner.slots.len(),
711 "pair_slots_open": inner.pair_slots.len(),
712 "streams_active": streams_active,
713 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
714 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
715 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
716 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
717 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
718 });
719 (StatusCode::OK, Json(body))
720}
721
722async fn landing_index() -> impl IntoResponse {
726 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
727 (
728 StatusCode::OK,
729 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
730 INDEX_HTML,
731 )
732}
733
734async fn landing_favicon() -> impl IntoResponse {
735 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
736 (
737 StatusCode::OK,
738 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
739 FAVICON_SVG,
740 )
741}
742
743async fn landing_og() -> impl IntoResponse {
744 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
745 (
746 StatusCode::OK,
747 [
748 (axum::http::header::CONTENT_TYPE, "image/png"),
749 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
750 ],
751 OG_PNG,
752 )
753}
754
755async fn landing_install_sh() -> impl IntoResponse {
756 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
757 (
758 StatusCode::OK,
759 [
760 (
761 axum::http::header::CONTENT_TYPE,
762 "text/x-shellscript; charset=utf-8",
763 ),
764 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
765 ],
766 INSTALL_SH,
767 )
768}
769
770async fn landing_openshell_policy_sh() -> impl IntoResponse {
771 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
772 (
773 StatusCode::OK,
774 [
775 (
776 axum::http::header::CONTENT_TYPE,
777 "text/x-shellscript; charset=utf-8",
778 ),
779 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
780 ],
781 POLICY_SH,
782 )
783}
784
785async fn allocate_slot(
786 State(relay): State<Relay>,
787 Json(_req): Json<AllocateRequest>,
788) -> impl IntoResponse {
789 let slot_id = random_hex(16);
790 let slot_token = random_hex(32);
791 {
792 let mut inner = relay.inner.lock().await;
793 inner.slots.insert(slot_id.clone(), Vec::new());
794 inner.tokens.insert(slot_id.clone(), slot_token.clone());
795 }
796 if let Err(e) = relay.persist_tokens().await {
797 return (
798 StatusCode::INTERNAL_SERVER_ERROR,
799 Json(json!({"error": format!("persist failed: {e}")})),
800 )
801 .into_response();
802 }
803 relay
804 .counters
805 .slot_allocations_total
806 .fetch_add(1, Ordering::Relaxed);
807 (
808 StatusCode::CREATED,
809 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
810 )
811 .into_response()
812}
813
814async fn post_event(
815 State(relay): State<Relay>,
816 Path(slot_id): Path<String>,
817 headers: HeaderMap,
818 Json(req): Json<PostEventRequest>,
819) -> impl IntoResponse {
820 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
821 return resp;
822 }
823 let body_bytes = match serde_json::to_vec(&req.event) {
825 Ok(b) => b,
826 Err(e) => {
827 return (
828 StatusCode::BAD_REQUEST,
829 Json(json!({"error": format!("event not serializable: {e}")})),
830 )
831 .into_response();
832 }
833 };
834 if body_bytes.len() > MAX_EVENT_BYTES {
835 return (
836 StatusCode::PAYLOAD_TOO_LARGE,
837 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
838 )
839 .into_response();
840 }
841 {
843 let inner = relay.inner.lock().await;
844 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
845 if used + body_bytes.len() > MAX_SLOT_BYTES {
846 return (
847 StatusCode::PAYLOAD_TOO_LARGE,
848 Json(json!({
849 "error": "slot quota exceeded",
850 "slot_bytes_used": used,
851 "slot_bytes_max": MAX_SLOT_BYTES,
852 "remediation": "operator should `wire rotate-slot` to drain old slot",
853 })),
854 )
855 .into_response();
856 }
857 }
858 let event_id = req
859 .event
860 .get("event_id")
861 .and_then(Value::as_str)
862 .map(str::to_string);
863
864 let dup = {
866 let inner = relay.inner.lock().await;
867 let slot = inner.slots.get(&slot_id);
868 if let (Some(eid), Some(slot)) = (&event_id, slot) {
869 slot.iter()
870 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
871 } else {
872 false
873 }
874 };
875 if dup {
876 return (
877 StatusCode::OK,
878 Json(json!({"event_id": event_id, "status": "duplicate"})),
879 )
880 .into_response();
881 }
882
883 {
884 let mut inner = relay.inner.lock().await;
885 let event_size = body_bytes.len();
886 let slot = inner.slots.entry(slot_id.clone()).or_default();
887 slot.push(req.event.clone());
888 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
889 }
890 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
891 return (
892 StatusCode::INTERNAL_SERVER_ERROR,
893 Json(json!({"error": format!("persist failed: {e}")})),
894 )
895 .into_response();
896 }
897 relay
898 .counters
899 .events_posted_total
900 .fetch_add(1, Ordering::Relaxed);
901 {
906 let mut inner = relay.inner.lock().await;
907 if let Some(subs) = inner.streams.get_mut(&slot_id) {
908 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
909 }
910 }
911 (
912 StatusCode::CREATED,
913 Json(json!({"event_id": event_id, "status": "stored"})),
914 )
915 .into_response()
916}
917
918async fn stream_events(
931 State(relay): State<Relay>,
932 Path(slot_id): Path<String>,
933 headers: HeaderMap,
934) -> axum::response::Response {
935 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
936 use futures::stream::StreamExt;
937
938 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
939 return resp;
940 }
941
942 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
943 {
944 let mut inner = relay.inner.lock().await;
945 inner.streams.entry(slot_id.clone()).or_default().push(tx);
946 }
947
948 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
949 SseEvent::default()
950 .json_data(&ev)
951 .map_err(|e| std::io::Error::other(e.to_string()))
952 });
953
954 Sse::new(stream)
955 .keep_alive(
956 KeepAlive::new()
957 .interval(std::time::Duration::from_secs(30))
958 .text("phyllis: still on the line"),
959 )
960 .into_response()
961}
962
963#[derive(Deserialize)]
966pub struct PairOpenRequest {
967 pub code_hash: String,
968 pub msg: String,
970 pub role: String, }
972
973#[derive(Deserialize)]
974pub struct PairBootstrapRequest {
975 pub role: String,
976 pub sealed: String,
977}
978
979#[derive(Deserialize)]
980pub struct PairAbandonRequest {
981 pub code_hash: String,
984}
985
986async fn pair_abandon(
992 State(relay): State<Relay>,
993 Json(req): Json<PairAbandonRequest>,
994) -> impl IntoResponse {
995 let mut inner = relay.inner.lock().await;
996 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
997 inner.pair_slots.remove(&pair_id);
998 }
999 StatusCode::NO_CONTENT.into_response()
1000}
1001
1002async fn pair_open(
1003 State(relay): State<Relay>,
1004 Json(req): Json<PairOpenRequest>,
1005) -> impl IntoResponse {
1006 if req.role != "host" && req.role != "guest" {
1007 return (
1008 StatusCode::BAD_REQUEST,
1009 Json(json!({"error": "role must be 'host' or 'guest'"})),
1010 )
1011 .into_response();
1012 }
1013 relay.evict_expired_pair_slots().await;
1014 let mut inner = relay.inner.lock().await;
1015 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1016 Some(id) => id,
1017 None => {
1018 let new_id = random_hex(16);
1019 inner
1020 .pair_lookup
1021 .insert(req.code_hash.clone(), new_id.clone());
1022 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1023 relay
1024 .counters
1025 .pair_opens_total
1026 .fetch_add(1, Ordering::Relaxed);
1027 new_id
1028 }
1029 };
1030 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1031 slot.last_touched = std::time::Instant::now();
1032 if req.role == "host" {
1033 if slot.host_msg.is_some() {
1034 return (
1035 StatusCode::CONFLICT,
1036 Json(json!({"error": "host already registered for this code"})),
1037 )
1038 .into_response();
1039 }
1040 slot.host_msg = Some(req.msg);
1041 } else {
1042 if slot.guest_msg.is_some() {
1043 return (
1044 StatusCode::CONFLICT,
1045 Json(json!({"error": "guest already registered for this code"})),
1046 )
1047 .into_response();
1048 }
1049 slot.guest_msg = Some(req.msg);
1050 }
1051 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1052}
1053
1054#[derive(Deserialize)]
1055pub struct PairGetQuery {
1056 pub as_role: String,
1058}
1059
1060async fn pair_get(
1061 State(relay): State<Relay>,
1062 Path(pair_id): Path<String>,
1063 Query(q): Query<PairGetQuery>,
1064) -> impl IntoResponse {
1065 relay.evict_expired_pair_slots().await;
1066 let mut inner = relay.inner.lock().await;
1067 let slot = match inner.pair_slots.get_mut(&pair_id) {
1068 Some(s) => {
1069 s.last_touched = std::time::Instant::now();
1070 s.clone()
1071 }
1072 None => {
1073 return (
1074 StatusCode::NOT_FOUND,
1075 Json(json!({"error": "unknown pair_id"})),
1076 )
1077 .into_response();
1078 }
1079 };
1080 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1081 "host" => (slot.guest_msg, slot.guest_bootstrap),
1082 "guest" => (slot.host_msg, slot.host_bootstrap),
1083 _ => {
1084 return (
1085 StatusCode::BAD_REQUEST,
1086 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1087 )
1088 .into_response();
1089 }
1090 };
1091 (
1092 StatusCode::OK,
1093 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1094 )
1095 .into_response()
1096}
1097
1098async fn pair_bootstrap(
1099 State(relay): State<Relay>,
1100 Path(pair_id): Path<String>,
1101 Json(req): Json<PairBootstrapRequest>,
1102) -> impl IntoResponse {
1103 relay.evict_expired_pair_slots().await;
1104 let mut inner = relay.inner.lock().await;
1105 let slot = match inner.pair_slots.get_mut(&pair_id) {
1106 Some(s) => s,
1107 None => {
1108 return (
1109 StatusCode::NOT_FOUND,
1110 Json(json!({"error": "unknown pair_id"})),
1111 )
1112 .into_response();
1113 }
1114 };
1115 slot.last_touched = std::time::Instant::now();
1116 match req.role.as_str() {
1117 "host" => slot.host_bootstrap = Some(req.sealed),
1118 "guest" => slot.guest_bootstrap = Some(req.sealed),
1119 _ => {
1120 return (
1121 StatusCode::BAD_REQUEST,
1122 Json(json!({"error": "role must be 'host' or 'guest'"})),
1123 )
1124 .into_response();
1125 }
1126 }
1127 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1128}
1129
1130#[derive(Deserialize)]
1133pub struct HandleClaimRequest {
1134 pub nick: String,
1137 pub slot_id: String,
1139 pub relay_url: Option<String>,
1143 pub card: Value,
1146 #[serde(default, skip_serializing_if = "Option::is_none")]
1151 pub discoverable: Option<bool>,
1152}
1153
1154async fn handle_claim(
1161 State(relay): State<Relay>,
1162 headers: HeaderMap,
1163 Json(req): Json<HandleClaimRequest>,
1164) -> impl IntoResponse {
1165 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1168 return resp;
1169 }
1170 if !crate::pair_profile::is_valid_nick(&req.nick) {
1172 return (
1173 StatusCode::BAD_REQUEST,
1174 Json(json!({
1175 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1176 "nick": req.nick,
1177 })),
1178 )
1179 .into_response();
1180 }
1181 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1183 return (
1184 StatusCode::BAD_REQUEST,
1185 Json(json!({"error": format!("card signature invalid: {e}")})),
1186 )
1187 .into_response();
1188 }
1189 let did = match req.card.get("did").and_then(Value::as_str) {
1190 Some(d) => d.to_string(),
1191 None => {
1192 return (
1193 StatusCode::BAD_REQUEST,
1194 Json(json!({"error": "card missing 'did' field"})),
1195 )
1196 .into_response();
1197 }
1198 };
1199
1200 let prior_record: Option<HandleRecord>;
1206 let first_claim = {
1207 let inner = relay.inner.lock().await;
1208 match inner.handles.get(&req.nick) {
1209 Some(existing) if existing.did != did => {
1210 return (
1211 StatusCode::CONFLICT,
1212 Json(json!({
1213 "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1214 "nick": req.nick,
1215 "claimed_by": existing.did,
1216 })),
1217 )
1218 .into_response();
1219 }
1220 Some(prev) => {
1221 prior_record = Some(prev.clone());
1222 false
1223 }
1224 None => {
1225 prior_record = None;
1226 true
1227 }
1228 }
1229 };
1230
1231 let now = time::OffsetDateTime::now_utc()
1236 .replace_nanosecond(0)
1237 .unwrap_or_else(|_| time::OffsetDateTime::now_utc())
1238 .format(&time::format_description::well_known::Rfc3339)
1239 .unwrap_or_default();
1240 let discoverable = match (req.discoverable, &prior_record) {
1246 (Some(d), _) => Some(d),
1247 (None, Some(prev)) => prev.discoverable,
1248 (None, None) => None,
1249 };
1250 let record = HandleRecord {
1251 nick: req.nick.clone(),
1252 did: did.clone(),
1253 card: req.card.clone(),
1254 slot_id: req.slot_id.clone(),
1255 relay_url: req.relay_url.clone(),
1256 claimed_at: now,
1257 discoverable,
1258 };
1259
1260 let path = relay
1262 .state_dir
1263 .join("handles")
1264 .join(format!("{}.json", req.nick));
1265 let body = match serde_json::to_vec_pretty(&record) {
1266 Ok(b) => b,
1267 Err(e) => {
1268 return (
1269 StatusCode::INTERNAL_SERVER_ERROR,
1270 Json(json!({"error": format!("serialize failed: {e}")})),
1271 )
1272 .into_response();
1273 }
1274 };
1275 if let Err(e) = tokio::fs::write(&path, &body).await {
1276 return (
1277 StatusCode::INTERNAL_SERVER_ERROR,
1278 Json(json!({"error": format!("persist failed: {e}")})),
1279 )
1280 .into_response();
1281 }
1282 {
1283 let mut inner = relay.inner.lock().await;
1284 inner.handles.insert(req.nick.clone(), record);
1285 }
1286 relay
1287 .counters
1288 .handle_claims_total
1289 .fetch_add(1, Ordering::Relaxed);
1290 if first_claim {
1291 relay
1292 .counters
1293 .handle_first_claims_total
1294 .fetch_add(1, Ordering::Relaxed);
1295 }
1296 (
1297 StatusCode::CREATED,
1298 Json(json!({
1299 "nick": req.nick,
1300 "did": did,
1301 "status": if first_claim { "claimed" } else { "re-claimed" },
1302 })),
1303 )
1304 .into_response()
1305}
1306
1307#[derive(Deserialize)]
1308pub struct WellKnownAgentQuery {
1309 pub handle: String,
1310}
1311
1312#[derive(Deserialize)]
1313pub struct HandlesDirectoryQuery {
1314 pub cursor: Option<String>,
1315 pub limit: Option<usize>,
1316 pub vibe: Option<String>,
1317}
1318
1319#[derive(Deserialize)]
1329pub struct InviteRegisterRequest {
1330 pub invite_url: String,
1332 #[serde(default)]
1334 pub ttl_seconds: Option<u64>,
1335 #[serde(default)]
1338 pub uses: Option<u32>,
1339}
1340
1341impl Relay {
1342 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1344 use tokio::io::AsyncWriteExt;
1345 let mut line = serde_json::to_vec(rec)?;
1346 line.push(b'\n');
1347 let path = self.state_dir.join("invites.jsonl");
1348 let mut f = tokio::fs::OpenOptions::new()
1349 .create(true)
1350 .append(true)
1351 .open(&path)
1352 .await?;
1353 f.write_all(&line).await?;
1354 f.flush().await?;
1355 Ok(())
1356 }
1357}
1358
1359async fn invite_register(
1360 State(relay): State<Relay>,
1361 Json(req): Json<InviteRegisterRequest>,
1362) -> impl IntoResponse {
1363 if req.invite_url.is_empty() {
1364 return (
1365 StatusCode::BAD_REQUEST,
1366 Json(json!({"error": "invite_url required"})),
1367 )
1368 .into_response();
1369 }
1370 if req.invite_url.len() > 8_192 {
1372 return (
1373 StatusCode::PAYLOAD_TOO_LARGE,
1374 Json(json!({"error": "invite_url > 8 KiB"})),
1375 )
1376 .into_response();
1377 }
1378 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1379 let now = SystemTime::now()
1380 .duration_since(UNIX_EPOCH)
1381 .map(|d| d.as_secs())
1382 .unwrap_or(0);
1383 let token = random_hex(3);
1386 let rec = InviteRecord {
1387 token: token.clone(),
1388 invite_url: req.invite_url,
1389 expires_unix: now + ttl,
1390 uses_remaining: req.uses,
1391 created_unix: now,
1392 };
1393 {
1394 let mut inner = relay.inner.lock().await;
1395 if inner.invites.contains_key(&token) {
1396 return (
1397 StatusCode::CONFLICT,
1398 Json(json!({"error": "token collision, retry"})),
1399 )
1400 .into_response();
1401 }
1402 inner.invites.insert(token.clone(), rec.clone());
1403 }
1404 if let Err(e) = relay.persist_invite(&rec).await {
1405 return (
1406 StatusCode::INTERNAL_SERVER_ERROR,
1407 Json(json!({"error": format!("persist failed: {e}")})),
1408 )
1409 .into_response();
1410 }
1411 (
1412 StatusCode::CREATED,
1413 Json(json!({
1414 "token": token,
1415 "path": format!("/i/{token}"),
1416 "expires_unix": rec.expires_unix,
1417 "uses_remaining": rec.uses_remaining,
1418 })),
1419 )
1420 .into_response()
1421}
1422
1423#[derive(Deserialize)]
1424pub struct InviteScriptQuery {
1425 pub format: Option<String>,
1432}
1433
1434async fn invite_script(
1435 State(relay): State<Relay>,
1436 Path(token): Path<String>,
1437 Query(q): Query<InviteScriptQuery>,
1438) -> impl IntoResponse {
1439 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1442 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1443 }
1444 let want_raw_url = q.format.as_deref() == Some("url");
1445 let now = SystemTime::now()
1446 .duration_since(UNIX_EPOCH)
1447 .map(|d| d.as_secs())
1448 .unwrap_or(0);
1449 let invite_url = {
1450 let mut inner = relay.inner.lock().await;
1451 let Some(rec) = inner.invites.get_mut(&token) else {
1452 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1453 };
1454 if rec.expires_unix <= now {
1455 return (StatusCode::GONE, "this invite has expired\n").into_response();
1456 }
1457 if let Some(n) = rec.uses_remaining {
1458 if n == 0 {
1459 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1460 }
1461 if !want_raw_url {
1465 rec.uses_remaining = Some(n - 1);
1466 }
1467 }
1468 rec.invite_url.clone()
1469 };
1470 if want_raw_url {
1471 return (
1472 StatusCode::OK,
1473 [
1474 (
1475 axum::http::header::CONTENT_TYPE,
1476 "text/plain; charset=utf-8",
1477 ),
1478 (
1479 axum::http::header::CACHE_CONTROL,
1480 "private, no-store, max-age=0",
1481 ),
1482 ],
1483 invite_url,
1484 )
1485 .into_response();
1486 }
1487 let escaped = invite_url.replace('\'', "'\\''");
1488 let script = format!(
1489 "#!/bin/sh\n\
1490 # wire — one-curl onboarding (install + pair in one shot)\n\
1491 # source: https://github.com/SlanchaAi/wire\n\
1492 set -eu\n\
1493 INVITE='{escaped}'\n\
1494 echo \"\u{2192} checking for wire CLI...\"\n\
1495 if ! command -v wire >/dev/null 2>&1; then\n \
1496 echo \"\u{2192} wire not installed; installing first...\"\n \
1497 curl -fsSL https://wireup.net/install.sh | sh\n \
1498 case \":$PATH:\" in\n \
1499 *:\"$HOME/.local/bin\":*) ;;\n \
1500 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1501 esac\n \
1502 if ! command -v wire >/dev/null 2>&1; then\n \
1503 echo \"\"\n \
1504 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1505 echo \"Open a new shell, then run:\"\n \
1506 echo \" wire accept '$INVITE'\"\n \
1507 exit 0\n \
1508 fi\n\
1509 fi\n\
1510 echo \"\u{2192} accepting invite...\"\n\
1511 wire accept \"$INVITE\"\n"
1512 );
1513 (
1514 StatusCode::OK,
1515 [
1516 (
1517 axum::http::header::CONTENT_TYPE,
1518 "text/x-shellscript; charset=utf-8",
1519 ),
1520 (
1521 axum::http::header::CACHE_CONTROL,
1522 "private, no-store, max-age=0",
1523 ),
1524 ],
1525 script,
1526 )
1527 .into_response()
1528}
1529
1530async fn handles_directory(
1531 State(relay): State<Relay>,
1532 Query(q): Query<HandlesDirectoryQuery>,
1533) -> impl IntoResponse {
1534 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1535 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1536 let inner = relay.inner.lock().await;
1537 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1538 drop(inner);
1539 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1540
1541 let cursor = q.cursor.as_deref();
1542 let mut eligible = Vec::new();
1543 for rec in records {
1544 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1545 continue;
1546 }
1547 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1552 continue;
1553 }
1554 if !rec.is_discoverable() {
1558 continue;
1559 }
1560 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1561 if profile
1562 .get("listed")
1563 .and_then(Value::as_bool)
1564 .is_some_and(|listed| !listed)
1565 {
1566 continue;
1567 }
1568 if let Some(want) = &vibe_filter {
1569 let matched = profile
1570 .get("vibe")
1571 .and_then(Value::as_array)
1572 .map(|arr| {
1573 arr.iter().any(|v| {
1574 v.as_str()
1575 .map(|s| s.eq_ignore_ascii_case(want))
1576 .unwrap_or(false)
1577 })
1578 })
1579 .unwrap_or(false);
1580 if !matched {
1581 continue;
1582 }
1583 }
1584 eligible.push((rec, profile));
1585 }
1586
1587 let has_more = eligible.len() > limit;
1588 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1589 let next_cursor = if has_more {
1590 page.last().map(|(rec, _)| rec.nick.clone())
1591 } else {
1592 None
1593 };
1594 let handles: Vec<Value> = page
1595 .into_iter()
1596 .map(|(rec, profile)| {
1597 let emoji = profile
1603 .get("emoji")
1604 .and_then(Value::as_str)
1605 .filter(|s| !s.is_empty())
1606 .map(str::to_string)
1607 .unwrap_or_else(|| crate::character::Character::from_did(&rec.did).emoji);
1608 json!({
1609 "nick": rec.nick,
1610 "did": rec.did,
1611 "profile": {
1612 "emoji": emoji,
1613 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1614 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1615 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1616 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1617 },
1618 "claimed_at": rec.claimed_at,
1619 })
1620 })
1621 .collect();
1622 (
1623 StatusCode::OK,
1624 Json(json!({
1625 "handles": handles,
1626 "next_cursor": next_cursor,
1627 })),
1628 )
1629 .into_response()
1630}
1631
1632async fn handle_intro(
1646 State(relay): State<Relay>,
1647 Path(nick): Path<String>,
1648 Json(req): Json<PostEventRequest>,
1649) -> impl IntoResponse {
1650 let slot_id = {
1652 let inner = relay.inner.lock().await;
1653 match inner.handles.get(&nick) {
1654 Some(rec) => rec.slot_id.clone(),
1655 None => {
1656 return (
1657 StatusCode::NOT_FOUND,
1658 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1659 )
1660 .into_response();
1661 }
1662 }
1663 };
1664
1665 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1668 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1669 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1670 return (
1671 StatusCode::BAD_REQUEST,
1672 Json(json!({
1673 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1674 "got_kind": kind,
1675 "got_type": type_str,
1676 })),
1677 )
1678 .into_response();
1679 }
1680
1681 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1683 Some(c) => c.clone(),
1684 None => {
1685 return (
1686 StatusCode::BAD_REQUEST,
1687 Json(json!({"error": "intro event body must embed 'card' field"})),
1688 )
1689 .into_response();
1690 }
1691 };
1692 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1693 return (
1694 StatusCode::BAD_REQUEST,
1695 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1696 )
1697 .into_response();
1698 }
1699
1700 let body_bytes = match serde_json::to_vec(&req.event) {
1702 Ok(b) => b,
1703 Err(e) => {
1704 return (
1705 StatusCode::BAD_REQUEST,
1706 Json(json!({"error": format!("event not serializable: {e}")})),
1707 )
1708 .into_response();
1709 }
1710 };
1711 if body_bytes.len() > MAX_EVENT_BYTES {
1712 return (
1713 StatusCode::PAYLOAD_TOO_LARGE,
1714 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1715 )
1716 .into_response();
1717 }
1718 {
1719 let inner = relay.inner.lock().await;
1720 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1721 if used + body_bytes.len() > MAX_SLOT_BYTES {
1722 return (
1723 StatusCode::PAYLOAD_TOO_LARGE,
1724 Json(json!({
1725 "error": "target slot quota exceeded",
1726 "slot_bytes_used": used,
1727 "slot_bytes_max": MAX_SLOT_BYTES,
1728 })),
1729 )
1730 .into_response();
1731 }
1732 }
1733
1734 let event_id = req
1735 .event
1736 .get("event_id")
1737 .and_then(Value::as_str)
1738 .map(str::to_string);
1739
1740 let dup = {
1742 let inner = relay.inner.lock().await;
1743 let slot = inner.slots.get(&slot_id);
1744 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1745 slot.iter()
1746 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1747 } else {
1748 false
1749 }
1750 };
1751 if dup {
1752 return (
1753 StatusCode::OK,
1754 Json(json!({"event_id": event_id, "status": "duplicate"})),
1755 )
1756 .into_response();
1757 }
1758
1759 {
1760 let mut inner = relay.inner.lock().await;
1761 let event_size = body_bytes.len();
1762 let slot = inner.slots.entry(slot_id.clone()).or_default();
1763 slot.push(req.event.clone());
1764 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1765 }
1766 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1767 return (
1768 StatusCode::INTERNAL_SERVER_ERROR,
1769 Json(json!({"error": format!("persist failed: {e}")})),
1770 )
1771 .into_response();
1772 }
1773 (
1774 StatusCode::CREATED,
1775 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1776 )
1777 .into_response()
1778}
1779
1780async fn well_known_agent_card_a2a(
1796 State(relay): State<Relay>,
1797 Query(q): Query<WellKnownAgentQuery>,
1798) -> impl IntoResponse {
1799 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1800 if nick.is_empty() {
1801 return (
1802 StatusCode::BAD_REQUEST,
1803 Json(json!({"error": "handle missing nick"})),
1804 )
1805 .into_response();
1806 }
1807 let inner = relay.inner.lock().await;
1808 let rec = match inner.handles.get(&nick) {
1809 Some(r) => r.clone(),
1810 None => {
1811 return (
1812 StatusCode::NOT_FOUND,
1813 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1814 )
1815 .into_response();
1816 }
1817 };
1818 drop(inner);
1819
1820 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1821 let description = profile
1822 .get("motto")
1823 .and_then(Value::as_str)
1824 .unwrap_or("")
1825 .to_string();
1826 let display_name = profile
1827 .get("display_name")
1828 .and_then(Value::as_str)
1829 .unwrap_or(&rec.nick)
1830 .to_string();
1831 let relay_url = rec.relay_url.clone().unwrap_or_default();
1832 let endpoint = if !relay_url.is_empty() {
1834 format!(
1835 "{}/v1/handle/intro/{}",
1836 relay_url.trim_end_matches('/'),
1837 rec.nick
1838 )
1839 } else {
1840 format!("/v1/handle/intro/{}", rec.nick)
1841 };
1842 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1843
1844 let a2a_card = json!({
1848 "id": rec.did,
1849 "name": display_name,
1850 "description": description,
1851 "version": "wire/0.5",
1852 "endpoint": endpoint,
1853 "provider": {
1854 "name": "wire",
1855 "url": "https://github.com/SlanchaAi/wire"
1856 },
1857 "capabilities": {
1858 "streaming": false,
1859 "pushNotifications": false,
1860 "extendedAgentCard": true
1861 },
1862 "securitySchemes": {
1863 "ed25519-event-sig": {
1864 "type": "signature",
1865 "alg": "EdDSA",
1866 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1867 }
1868 },
1869 "security": [{"ed25519-event-sig": []}],
1870 "skills": [],
1871 "extensions": [{
1872 "uri": "https://slancha.ai/wire/ext/v0.5",
1876 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1877 "required": false,
1878 "params": {
1879 "did": rec.did,
1880 "handle": rec.nick,
1881 "slot_id": rec.slot_id,
1882 "relay_url": rec.relay_url,
1883 "card": rec.card,
1884 "profile": profile,
1885 "claimed_at": rec.claimed_at,
1886 }
1887 }],
1888 "signature": card_sig,
1889 });
1890 (StatusCode::OK, Json(a2a_card)).into_response()
1891}
1892
1893async fn well_known_agent(
1894 State(relay): State<Relay>,
1895 Query(q): Query<WellKnownAgentQuery>,
1896) -> impl IntoResponse {
1897 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1898 if nick.is_empty() {
1899 return (
1900 StatusCode::BAD_REQUEST,
1901 Json(json!({"error": "handle missing nick"})),
1902 )
1903 .into_response();
1904 }
1905 let inner = relay.inner.lock().await;
1906 match inner.handles.get(&nick) {
1907 Some(rec) => (
1908 StatusCode::OK,
1909 Json(json!({
1910 "nick": rec.nick,
1911 "did": rec.did,
1912 "card": rec.card,
1913 "slot_id": rec.slot_id,
1914 "relay_url": rec.relay_url,
1915 "claimed_at": rec.claimed_at,
1916 })),
1917 )
1918 .into_response(),
1919 None => (
1920 StatusCode::NOT_FOUND,
1921 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1922 )
1923 .into_response(),
1924 }
1925}
1926
1927async fn list_events(
1928 State(relay): State<Relay>,
1929 Path(slot_id): Path<String>,
1930 Query(q): Query<ListEventsQuery>,
1931 headers: HeaderMap,
1932) -> impl IntoResponse {
1933 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1934 return resp;
1935 }
1936 let limit = q.limit.unwrap_or(100).min(1000);
1937 let mut inner = relay.inner.lock().await;
1938 let now_unix = std::time::SystemTime::now()
1942 .duration_since(std::time::UNIX_EPOCH)
1943 .map(|d| d.as_secs())
1944 .unwrap_or(0);
1945 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1946 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1947 let start = match q.since {
1948 Some(eid) => events
1949 .iter()
1950 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1951 .map(|i| i + 1)
1952 .unwrap_or(0),
1953 None => 0,
1954 };
1955 let end = (start + limit).min(events.len());
1956 let slice = events[start..end].to_vec();
1957 (StatusCode::OK, Json(slice)).into_response()
1958}
1959
1960async fn slot_state(
1966 State(relay): State<Relay>,
1967 Path(slot_id): Path<String>,
1968 headers: HeaderMap,
1969) -> impl IntoResponse {
1970 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1971 return resp;
1972 }
1973 let inner = relay.inner.lock().await;
1974 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1975 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1976 let responder_health = inner.responder_health.get(&slot_id).cloned();
1977 (
1978 StatusCode::OK,
1979 Json(json!({
1980 "slot_id": slot_id,
1981 "event_count": event_count,
1982 "last_pull_at_unix": last_pull_at_unix,
1983 "responder_health": responder_health,
1984 })),
1985 )
1986 .into_response()
1987}
1988
1989async fn responder_health_set(
1990 State(relay): State<Relay>,
1991 Path(slot_id): Path<String>,
1992 headers: HeaderMap,
1993 Json(record): Json<ResponderHealthRecord>,
1994) -> impl IntoResponse {
1995 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1996 return resp;
1997 }
1998 let path = relay
1999 .state_dir
2000 .join("responder-health")
2001 .join(format!("{slot_id}.json"));
2002 let body = match serde_json::to_vec_pretty(&record) {
2003 Ok(b) => b,
2004 Err(e) => {
2005 return (
2006 StatusCode::INTERNAL_SERVER_ERROR,
2007 Json(json!({"error": format!("serialize failed: {e}")})),
2008 )
2009 .into_response();
2010 }
2011 };
2012 if let Err(e) = tokio::fs::write(&path, body).await {
2013 return (
2014 StatusCode::INTERNAL_SERVER_ERROR,
2015 Json(json!({"error": format!("persist failed: {e}")})),
2016 )
2017 .into_response();
2018 }
2019 {
2020 let mut inner = relay.inner.lock().await;
2021 inner
2022 .responder_health
2023 .insert(slot_id.clone(), record.clone());
2024 }
2025 (StatusCode::OK, Json(record)).into_response()
2026}
2027
2028async fn check_token(
2029 relay: &Relay,
2030 headers: &HeaderMap,
2031 slot_id: &str,
2032) -> std::result::Result<(), axum::response::Response> {
2033 let auth = headers
2034 .get(AUTHORIZATION)
2035 .and_then(|h| h.to_str().ok())
2036 .and_then(|s| s.strip_prefix("Bearer "))
2037 .map(str::to_string);
2038 let presented = match auth {
2039 Some(t) => t,
2040 None => {
2041 return Err((
2042 StatusCode::UNAUTHORIZED,
2043 Json(json!({"error": "missing Bearer token"})),
2044 )
2045 .into_response());
2046 }
2047 };
2048 let inner = relay.inner.lock().await;
2049 let expected = match inner.tokens.get(slot_id) {
2050 Some(t) => t.clone(),
2051 None => {
2052 return Err((
2053 StatusCode::NOT_FOUND,
2054 Json(json!({"error": "unknown slot"})),
2055 )
2056 .into_response());
2057 }
2058 };
2059 drop(inner);
2060 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2061 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2062 }
2063 Ok(())
2064}
2065
2066fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2067 if a.len() != b.len() {
2068 return false;
2069 }
2070 let mut acc = 0u8;
2071 for (x, y) in a.iter().zip(b.iter()) {
2072 acc |= x ^ y;
2073 }
2074 acc == 0
2075}
2076
2077fn is_valid_slot_id(s: &str) -> bool {
2078 s.len() == 32
2079 && s.bytes()
2080 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2081}
2082
2083fn random_hex(n_bytes: usize) -> String {
2084 let mut buf = vec![0u8; n_bytes];
2085 rand::thread_rng().fill_bytes(&mut buf);
2086 hex::encode(buf)
2087}
2088
2089pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2091 serve_with_mode(bind, state_dir, ServerMode::default()).await
2092}
2093
2094pub async fn serve_with_mode(bind: &str, state_dir: PathBuf, mode: ServerMode) -> Result<()> {
2098 let relay = Relay::new(state_dir).await?;
2099 relay.spawn_pair_sweeper();
2100 relay.spawn_counter_persister();
2101 let app = relay.clone().router_with_mode(mode);
2102 let listener = tokio::net::TcpListener::bind(bind)
2103 .await
2104 .with_context(|| format!("binding {bind}"))?;
2105 if mode.local_only {
2106 eprintln!(
2107 "wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled"
2108 );
2109 } else {
2110 eprintln!("wire relay-server listening on {bind}");
2111 }
2112 let shutdown_relay = relay.clone();
2113 axum::serve(listener, app)
2114 .with_graceful_shutdown(async move {
2115 let _ = tokio::signal::ctrl_c().await;
2116 eprintln!("\nshutting down — final counter snapshot");
2117 if let Err(e) = shutdown_relay.persist_counters().await {
2118 eprintln!("final counter persist failed: {e}");
2119 }
2120 })
2121 .await?;
2122 Ok(())
2123}
2124
2125#[cfg(unix)]
2139pub async fn serve_uds(socket_path: PathBuf, state_dir: PathBuf) -> Result<()> {
2140 use hyper::server::conn::http1;
2141 use hyper_util::rt::TokioIo;
2142 use tower_service::Service;
2143
2144 if socket_path.exists() {
2146 std::fs::remove_file(&socket_path)
2147 .with_context(|| format!("removing stale socket at {socket_path:?}"))?;
2148 }
2149 if let Some(parent) = socket_path.parent() {
2150 std::fs::create_dir_all(parent)
2151 .with_context(|| format!("creating socket parent {parent:?}"))?;
2152 }
2153 let relay = Relay::new(state_dir).await?;
2154 relay.spawn_pair_sweeper();
2155 relay.spawn_counter_persister();
2156 let app: axum::Router = relay
2157 .clone()
2158 .router_with_mode(ServerMode { local_only: true });
2159 let listener = tokio::net::UnixListener::bind(&socket_path)
2160 .with_context(|| format!("binding UDS at {socket_path:?}"))?;
2161
2162 use std::os::unix::fs::PermissionsExt;
2165 if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
2166 eprintln!(
2167 "wire relay-server (UDS): chmod 0600 on {socket_path:?} failed: {e} — \
2168 socket may be accessible to other uids. Investigate."
2169 );
2170 }
2171 eprintln!(
2172 "wire relay-server (UDS) listening on unix://{} — same-host, owner-uid only",
2173 socket_path.display()
2174 );
2175
2176 let shutdown_relay = relay.clone();
2179 let socket_path_for_cleanup = socket_path.clone();
2180 let mut make_service = app.into_make_service();
2181
2182 let serve_loop = async {
2183 loop {
2184 let (stream, _peer_addr) = match listener.accept().await {
2185 Ok(p) => p,
2186 Err(e) => {
2187 eprintln!("wire relay-server (UDS): accept failed: {e}");
2188 continue;
2189 }
2190 };
2191 let tower_service = match make_service.call(&stream).await {
2192 Ok(s) => s,
2193 Err(infallible) => match infallible {},
2194 };
2195 let io = TokioIo::new(stream);
2196 let hyper_service =
2197 hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
2198 let mut svc = tower_service.clone();
2199 async move { Service::call(&mut svc, req).await }
2200 });
2201 tokio::task::spawn(async move {
2202 if let Err(e) = http1::Builder::new()
2203 .serve_connection(io, hyper_service)
2204 .await
2205 {
2206 if !e.is_incomplete_message() {
2208 eprintln!("wire relay-server (UDS): conn error: {e}");
2209 }
2210 }
2211 });
2212 }
2213 };
2214
2215 let shutdown = async {
2216 let _ = tokio::signal::ctrl_c().await;
2217 eprintln!("\nshutting down — final counter snapshot");
2218 if let Err(e) = shutdown_relay.persist_counters().await {
2219 eprintln!("final counter persist failed: {e}");
2220 }
2221 let _ = std::fs::remove_file(&socket_path_for_cleanup);
2222 };
2223
2224 tokio::select! {
2225 _ = serve_loop => {},
2226 _ = shutdown => {},
2227 };
2228 Ok(())
2229}
2230
2231#[cfg(not(unix))]
2232pub async fn serve_uds(_socket_path: PathBuf, _state_dir: PathBuf) -> Result<()> {
2233 Err(anyhow::anyhow!(
2234 "UDS transport is Unix-only; Windows falls back to loopback HTTP. \
2235 Use `wire relay-server --bind 127.0.0.1:8771 --local-only` on Windows."
2236 ))
2237}
2238
2239#[derive(Debug, Clone, Copy, Default)]
2244pub struct ServerMode {
2245 pub local_only: bool,
2250}
2251
2252#[cfg(test)]
2253mod tests {
2254 use super::*;
2255
2256 #[test]
2257 fn constant_time_eq_basic() {
2258 assert!(constant_time_eq(b"abc", b"abc"));
2259 assert!(!constant_time_eq(b"abc", b"abd"));
2260 assert!(!constant_time_eq(b"abc", b"abcd")); }
2262
2263 #[test]
2264 fn random_hex_length() {
2265 let s = random_hex(16);
2266 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2268 }
2269
2270 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2271 async fn pair_slot_evicts_when_idle_past_ttl() {
2272 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2273 let _ = std::fs::remove_dir_all(&dir);
2274 let relay = Relay::new(dir.clone()).await.unwrap();
2275
2276 {
2278 let mut inner = relay.inner.lock().await;
2279 inner
2280 .pair_lookup
2281 .insert("hash-A".to_string(), "id-A".to_string());
2282 inner.pair_slots.insert(
2283 "id-A".to_string(),
2284 PairSlot {
2285 last_touched: std::time::Instant::now()
2286 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2287 ..PairSlot::default()
2288 },
2289 );
2290
2291 inner
2293 .pair_lookup
2294 .insert("hash-B".to_string(), "id-B".to_string());
2295 inner
2296 .pair_slots
2297 .insert("id-B".to_string(), PairSlot::default());
2298
2299 assert_eq!(inner.pair_slots.len(), 2);
2300 assert_eq!(inner.pair_lookup.len(), 2);
2301 }
2302
2303 relay.evict_expired_pair_slots().await;
2304
2305 let inner = relay.inner.lock().await;
2306 assert_eq!(
2307 inner.pair_slots.len(),
2308 1,
2309 "expired slot should have been evicted"
2310 );
2311 assert!(inner.pair_slots.contains_key("id-B"));
2312 assert_eq!(inner.pair_lookup.len(), 1);
2313 assert!(inner.pair_lookup.contains_key("hash-B"));
2314 let _ = std::fs::remove_dir_all(&dir);
2315 }
2316
2317 #[test]
2318 fn slot_id_validator_accepts_only_lowercase_32hex() {
2319 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2320 assert!(is_valid_slot_id(&random_hex(16)));
2321 assert!(!is_valid_slot_id("abc"));
2323 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2327 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2329 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2330 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2331 assert!(!is_valid_slot_id(
2333 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2334 ));
2335 }
2336}