1use anyhow::{Context, Result};
33use axum::{
34 Json, Router,
35 extract::{Path, Query, State},
36 http::{HeaderMap, StatusCode, header::AUTHORIZATION},
37 response::IntoResponse,
38 routing::{get, post},
39};
40use rand::RngCore;
41use serde::Deserialize;
42use serde_json::{Value, json};
43use std::collections::HashMap;
44use std::path::PathBuf;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::time::{SystemTime, UNIX_EPOCH};
48use tokio::sync::Mutex;
49use tower_governor::{
50 GovernorLayer, governor::GovernorConfigBuilder, key_extractor::GlobalKeyExtractor,
51};
52
53const MAX_EVENT_BYTES: usize = 256 * 1024;
54const MAX_SLOT_BYTES: usize = 64 * 1024 * 1024;
59
60#[derive(Clone)]
61pub struct Relay {
62 inner: Arc<Mutex<Inner>>,
63 state_dir: PathBuf,
64 counters: Arc<RelayCounters>,
65}
66
67struct RelayCounters {
72 boot_unix: u64,
73 handle_claims_total: AtomicU64,
74 handle_first_claims_total: AtomicU64,
75 slot_allocations_total: AtomicU64,
76 pair_opens_total: AtomicU64,
77 events_posted_total: AtomicU64,
78}
79
80#[derive(serde::Serialize, serde::Deserialize, Default)]
81struct CountersSnapshot {
82 handle_claims_total: u64,
83 handle_first_claims_total: u64,
84 slot_allocations_total: u64,
85 pair_opens_total: u64,
86 events_posted_total: u64,
87}
88
89#[derive(serde::Serialize, serde::Deserialize)]
94struct HistoryEntry {
95 ts: u64,
96 handles_active: usize,
97 slots_active: usize,
98 pair_slots_open: usize,
99 streams_active: usize,
100 handle_claims_total: u64,
101 handle_first_claims_total: u64,
102 slot_allocations_total: u64,
103 pair_opens_total: u64,
104 events_posted_total: u64,
105}
106
107#[derive(Deserialize)]
108pub struct StatsHistoryQuery {
109 pub hours: Option<u64>,
111}
112
113struct Inner {
114 slots: HashMap<String, Vec<Value>>,
116 tokens: HashMap<String, String>,
118 slot_bytes: HashMap<String, usize>,
120 last_pull_at_unix: HashMap<String, u64>,
125 streams: HashMap<String, Vec<tokio::sync::mpsc::UnboundedSender<Value>>>,
131 pair_lookup: HashMap<String, String>,
133 pair_slots: HashMap<String, PairSlot>,
135 handles: HashMap<String, HandleRecord>,
137 responder_health: HashMap<String, ResponderHealthRecord>,
139 invites: HashMap<String, InviteRecord>,
143}
144
145#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
148struct InviteRecord {
149 token: String,
150 invite_url: String,
151 expires_unix: u64,
152 uses_remaining: Option<u32>,
154 created_unix: u64,
155}
156
157#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
161struct HandleRecord {
162 pub nick: String,
163 pub did: String,
164 pub card: Value,
165 pub slot_id: String,
166 pub relay_url: Option<String>,
167 pub claimed_at: String,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct ResponderHealthRecord {
172 pub status: String,
173 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub reason: Option<String>,
175 #[serde(default, skip_serializing_if = "Option::is_none")]
176 pub last_success_at: Option<String>,
177 pub set_at: String,
178}
179
180#[derive(Clone, Debug)]
181struct PairSlot {
182 host_msg: Option<String>,
184 guest_msg: Option<String>,
186 host_bootstrap: Option<String>,
188 guest_bootstrap: Option<String>,
190 last_touched: std::time::Instant,
192}
193
194impl Default for PairSlot {
195 fn default() -> Self {
196 Self {
197 host_msg: None,
198 guest_msg: None,
199 host_bootstrap: None,
200 guest_bootstrap: None,
201 last_touched: std::time::Instant::now(),
202 }
203 }
204}
205
206const PAIR_SLOT_TTL_SECS: u64 = 300;
209
210#[derive(Deserialize)]
211pub struct AllocateRequest {
212 #[serde(default)]
214 pub handle: Option<String>,
215}
216
217#[derive(Deserialize)]
218pub struct PostEventRequest {
219 pub event: Value,
220}
221
222#[derive(Deserialize)]
223pub struct ListEventsQuery {
224 pub since: Option<String>,
226 pub limit: Option<usize>,
228}
229
230impl Relay {
231 pub async fn new(state_dir: PathBuf) -> Result<Self> {
232 tokio::fs::create_dir_all(state_dir.join("slots")).await?;
233 tokio::fs::create_dir_all(state_dir.join("handles")).await?;
234 tokio::fs::create_dir_all(state_dir.join("responder-health")).await?;
235 let mut inner = Inner {
236 slots: HashMap::new(),
237 tokens: HashMap::new(),
238 slot_bytes: HashMap::new(),
239 last_pull_at_unix: HashMap::new(),
240 streams: HashMap::new(),
241 pair_lookup: HashMap::new(),
242 pair_slots: HashMap::new(),
243 handles: HashMap::new(),
244 responder_health: HashMap::new(),
245 invites: HashMap::new(),
246 };
247 let token_path = state_dir.join("tokens.json");
249 if token_path.exists() {
250 let body = tokio::fs::read_to_string(&token_path).await?;
251 inner.tokens = serde_json::from_str(&body).unwrap_or_default();
252 }
253 let mut slots_dir = tokio::fs::read_dir(state_dir.join("slots")).await?;
255 while let Some(entry) = slots_dir.next_entry().await? {
256 let path = entry.path();
257 if path.extension().map(|x| x != "jsonl").unwrap_or(true) {
258 continue;
259 }
260 let stem = match path.file_stem().and_then(|s| s.to_str()) {
261 Some(s) => s.to_string(),
262 None => continue,
263 };
264 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
265 let mut events = Vec::new();
266 for line in body.lines() {
267 if let Ok(v) = serde_json::from_str::<Value>(line) {
268 events.push(v);
269 }
270 }
271 let bytes: usize = events
273 .iter()
274 .map(|e| serde_json::to_vec(e).map(|v| v.len()).unwrap_or(0))
275 .sum();
276 inner.slot_bytes.insert(stem.clone(), bytes);
277 inner.slots.insert(stem, events);
278 }
279 let handles_dir = state_dir.join("handles");
281 if handles_dir.exists() {
282 let mut rd = tokio::fs::read_dir(&handles_dir).await?;
283 while let Some(entry) = rd.next_entry().await? {
284 let path = entry.path();
285 if path.extension().and_then(|x| x.to_str()) != Some("json") {
286 continue;
287 }
288 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
289 if let Ok(rec) = serde_json::from_str::<HandleRecord>(&body) {
290 inner.handles.insert(rec.nick.clone(), rec);
291 }
292 }
293 }
294 let responder_health_dir = state_dir.join("responder-health");
296 if responder_health_dir.exists() {
297 let mut rd = tokio::fs::read_dir(&responder_health_dir).await?;
298 while let Some(entry) = rd.next_entry().await? {
299 let path = entry.path();
300 if path.extension().and_then(|x| x.to_str()) != Some("json") {
301 continue;
302 }
303 let Some(slot_id) = path.file_stem().and_then(|s| s.to_str()) else {
304 continue;
305 };
306 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
307 if let Ok(rec) = serde_json::from_str::<ResponderHealthRecord>(&body) {
308 inner.responder_health.insert(slot_id.to_string(), rec);
309 }
310 }
311 }
312 let invites_path = state_dir.join("invites.jsonl");
316 if invites_path.exists() {
317 let now_unix = SystemTime::now()
318 .duration_since(UNIX_EPOCH)
319 .map(|d| d.as_secs())
320 .unwrap_or(0);
321 let body = tokio::fs::read_to_string(&invites_path)
322 .await
323 .unwrap_or_default();
324 for line in body.lines() {
325 if let Ok(rec) = serde_json::from_str::<InviteRecord>(line)
326 && rec.expires_unix > now_unix
327 {
328 inner.invites.insert(rec.token.clone(), rec);
329 }
330 }
331 }
332 let boot_unix = SystemTime::now()
333 .duration_since(UNIX_EPOCH)
334 .map(|d| d.as_secs())
335 .unwrap_or(0);
336 let snap: CountersSnapshot =
338 match tokio::fs::read_to_string(state_dir.join("counters.json")).await {
339 Ok(body) => serde_json::from_str(&body).unwrap_or_default(),
340 Err(_) => CountersSnapshot::default(),
341 };
342 Ok(Self {
343 inner: Arc::new(Mutex::new(inner)),
344 state_dir,
345 counters: Arc::new(RelayCounters {
346 boot_unix,
347 handle_claims_total: AtomicU64::new(snap.handle_claims_total),
348 handle_first_claims_total: AtomicU64::new(snap.handle_first_claims_total),
349 slot_allocations_total: AtomicU64::new(snap.slot_allocations_total),
350 pair_opens_total: AtomicU64::new(snap.pair_opens_total),
351 events_posted_total: AtomicU64::new(snap.events_posted_total),
352 }),
353 })
354 }
355
356 pub fn router(self) -> Router {
357 self.router_with_mode(ServerMode::default())
358 }
359
360 pub fn router_with_mode(self, mode: ServerMode) -> Router {
361 let governor_conf = std::sync::Arc::new(
368 GovernorConfigBuilder::default()
369 .per_second(10)
370 .burst_size(50)
371 .key_extractor(GlobalKeyExtractor)
372 .finish()
373 .expect("valid governor config"),
374 );
375 let governor_layer = GovernorLayer {
376 config: governor_conf,
377 };
378
379 let hot_writes = Router::new()
381 .route("/v1/slot/allocate", post(allocate_slot))
382 .route("/v1/pair", post(pair_open))
383 .route("/v1/pair/:pair_id/bootstrap", post(pair_bootstrap))
384 .route("/v1/pair/abandon", post(pair_abandon))
385 .layer(governor_layer);
386
387 let mut router = Router::new()
390 .route("/healthz", get(healthz))
391 .route("/v1/events/:slot_id", post(post_event).get(list_events))
392 .route("/v1/slot/:slot_id/state", get(slot_state))
393 .route(
394 "/v1/slot/:slot_id/responder-health",
395 post(responder_health_set),
396 )
397 .route("/v1/events/:slot_id/stream", get(stream_events))
398 .route("/v1/pair/:pair_id", get(pair_get))
399 .route("/v1/handle/intro/:nick", post(handle_intro));
400
401 if !mode.local_only {
407 router = router
408 .route("/", get(landing_index))
409 .route("/favicon.svg", get(landing_favicon))
410 .route("/og.png", get(landing_og))
411 .route("/demo.cast", get(landing_demo_cast))
412 .route("/install", get(landing_install_sh))
413 .route("/install.sh", get(landing_install_sh))
414 .route("/openshell-policy.sh", get(landing_openshell_policy_sh))
415 .route("/stats", get(stats_root))
416 .route("/stats.json", get(stats_json))
417 .route("/stats.html", get(landing_stats_html))
418 .route("/stats.history", get(stats_history))
419 .route("/phonebook", get(landing_phonebook_html))
420 .route("/phonebook.html", get(landing_phonebook_html))
421 .route("/v1/handle/claim", post(handle_claim))
422 .route("/v1/handles", get(handles_directory))
423 .route("/v1/invite/register", post(invite_register))
424 .route("/i/:token", get(invite_script))
425 .route("/.well-known/wire/agent", get(well_known_agent))
426 .route(
427 "/.well-known/agent-card.json",
428 get(well_known_agent_card_a2a),
429 );
430 } else {
431 router = router.route("/v1/handle/claim", post(handle_claim));
437 }
438
439 router.merge(hot_writes).with_state(self)
440 }
441
442 async fn evict_expired_pair_slots(&self) {
446 let now = std::time::Instant::now();
447 let ttl = std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS);
448 let mut inner = self.inner.lock().await;
449 let mut to_remove = Vec::new();
450 for (id, slot) in inner.pair_slots.iter() {
451 if now.duration_since(slot.last_touched) > ttl {
452 to_remove.push(id.clone());
453 }
454 }
455 for id in to_remove {
456 inner.pair_slots.remove(&id);
457 inner.pair_lookup.retain(|_, v| v != &id);
458 }
459 }
460
461 pub fn spawn_pair_sweeper(&self) {
466 let me = self.clone();
467 tokio::spawn(async move {
468 let mut tick = tokio::time::interval(std::time::Duration::from_secs(60));
469 loop {
470 tick.tick().await;
471 me.evict_expired_pair_slots().await;
472 }
473 });
474 }
475
476 pub async fn persist_counters(&self) -> Result<()> {
480 let snap = CountersSnapshot {
481 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
482 handle_first_claims_total: self
483 .counters
484 .handle_first_claims_total
485 .load(Ordering::Relaxed),
486 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
487 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
488 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
489 };
490 let body = serde_json::to_vec_pretty(&snap)?;
491 let path = self.state_dir.join("counters.json");
492 tokio::fs::write(path, body).await?;
493 Ok(())
494 }
495
496 pub async fn append_history(&self) -> Result<()> {
501 use tokio::io::AsyncWriteExt;
502 let now = SystemTime::now()
503 .duration_since(UNIX_EPOCH)
504 .map(|d| d.as_secs())
505 .unwrap_or(0);
506 let (handles_active, slots_active, pair_slots_open, streams_active) = {
507 let inner = self.inner.lock().await;
508 (
509 inner.handles.len(),
510 inner.slots.len(),
511 inner.pair_slots.len(),
512 inner.streams.values().map(Vec::len).sum::<usize>(),
513 )
514 };
515 let entry = HistoryEntry {
516 ts: now,
517 handles_active,
518 slots_active,
519 pair_slots_open,
520 streams_active,
521 handle_claims_total: self.counters.handle_claims_total.load(Ordering::Relaxed),
522 handle_first_claims_total: self
523 .counters
524 .handle_first_claims_total
525 .load(Ordering::Relaxed),
526 slot_allocations_total: self.counters.slot_allocations_total.load(Ordering::Relaxed),
527 pair_opens_total: self.counters.pair_opens_total.load(Ordering::Relaxed),
528 events_posted_total: self.counters.events_posted_total.load(Ordering::Relaxed),
529 };
530 let line = serde_json::to_vec(&entry)?;
531 let path = self.state_dir.join("stats-history.jsonl");
532 let mut f = tokio::fs::OpenOptions::new()
533 .create(true)
534 .append(true)
535 .open(&path)
536 .await?;
537 f.write_all(&line).await?;
538 f.write_all(b"\n").await?;
539 f.flush().await?;
540 Ok(())
541 }
542
543 pub fn spawn_counter_persister(&self) {
547 let me = self.clone();
548 tokio::spawn(async move {
549 let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
550 tick.tick().await;
553 loop {
554 tick.tick().await;
555 if let Err(e) = me.persist_counters().await {
556 eprintln!("counter persist failed: {e}");
557 }
558 if let Err(e) = me.append_history().await {
559 eprintln!("history append failed: {e}");
560 }
561 }
562 });
563 }
564
565 async fn persist_tokens(&self) -> Result<()> {
566 let body = {
567 let inner = self.inner.lock().await;
568 serde_json::to_string_pretty(&inner.tokens)?
569 };
570 let path = self.state_dir.join("tokens.json");
571 tokio::fs::write(path, body).await?;
572 Ok(())
573 }
574
575 async fn append_event_to_disk(&self, slot_id: &str, event: &Value) -> Result<()> {
576 if !is_valid_slot_id(slot_id) {
582 return Err(anyhow::anyhow!("invalid slot_id format: {slot_id:?}"));
583 }
584 let path = self
585 .state_dir
586 .join("slots")
587 .join(format!("{slot_id}.jsonl"));
588 let mut line = serde_json::to_vec(event)?;
589 line.push(b'\n');
590 use tokio::io::AsyncWriteExt;
591 let mut f = tokio::fs::OpenOptions::new()
592 .create(true)
593 .append(true)
594 .open(&path)
595 .await
596 .with_context(|| format!("opening {path:?}"))?;
597 f.write_all(&line).await?;
598 f.flush().await?;
599 Ok(())
600 }
601}
602
603async fn healthz() -> impl IntoResponse {
604 (StatusCode::OK, "ok\n")
605}
606
607async fn stats_history(
611 State(relay): State<Relay>,
612 Query(q): Query<StatsHistoryQuery>,
613) -> impl IntoResponse {
614 let hours = q.hours.unwrap_or(24).min(168);
615 let now = SystemTime::now()
616 .duration_since(UNIX_EPOCH)
617 .map(|d| d.as_secs())
618 .unwrap_or(0);
619 let cutoff = now.saturating_sub(hours * 3600);
620 let path = relay.state_dir.join("stats-history.jsonl");
621 let body = tokio::fs::read_to_string(&path).await.unwrap_or_default();
622 let entries: Vec<Value> = body
623 .lines()
624 .filter_map(|l| serde_json::from_str::<Value>(l).ok())
625 .filter(|v| {
626 v.get("ts")
627 .and_then(Value::as_u64)
628 .map(|t| t >= cutoff)
629 .unwrap_or(false)
630 })
631 .collect();
632 (
633 StatusCode::OK,
634 Json(json!({
635 "hours": hours,
636 "now_unix": now,
637 "count": entries.len(),
638 "entries": entries,
639 })),
640 )
641}
642
643async fn landing_stats_html() -> impl IntoResponse {
644 static STATS_HTML: &[u8] = include_bytes!("../landing/stats.html");
645 (
646 StatusCode::OK,
647 [
648 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
649 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
650 ],
651 STATS_HTML,
652 )
653}
654
655async fn landing_phonebook_html() -> impl IntoResponse {
656 static PHONEBOOK_HTML: &[u8] = include_bytes!("../landing/phonebook.html");
657 (
658 StatusCode::OK,
659 [
660 (axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8"),
661 (axum::http::header::CACHE_CONTROL, "public, max-age=60"),
662 ],
663 PHONEBOOK_HTML,
664 )
665}
666
667async fn stats_root(State(relay): State<Relay>, headers: HeaderMap) -> axum::response::Response {
672 let wants_html = headers
673 .get(axum::http::header::ACCEPT)
674 .and_then(|v| v.to_str().ok())
675 .unwrap_or("")
676 .contains("text/html");
677 if wants_html {
678 landing_stats_html().await.into_response()
679 } else {
680 stats_json(State(relay)).await.into_response()
681 }
682}
683
684async fn stats_json(State(relay): State<Relay>) -> impl IntoResponse {
685 let now = SystemTime::now()
686 .duration_since(UNIX_EPOCH)
687 .map(|d| d.as_secs())
688 .unwrap_or(0);
689 let inner = relay.inner.lock().await;
690 let streams_active: usize = inner.streams.values().map(Vec::len).sum();
691 let body = json!({
692 "version": env!("CARGO_PKG_VERSION"),
693 "uptime_seconds": now.saturating_sub(relay.counters.boot_unix),
694 "handles_active": inner.handles.len(),
695 "slots_active": inner.slots.len(),
696 "pair_slots_open": inner.pair_slots.len(),
697 "streams_active": streams_active,
698 "handle_claims_total": relay.counters.handle_claims_total.load(Ordering::Relaxed),
699 "handle_first_claims_total": relay.counters.handle_first_claims_total.load(Ordering::Relaxed),
700 "slot_allocations_total": relay.counters.slot_allocations_total.load(Ordering::Relaxed),
701 "pair_opens_total": relay.counters.pair_opens_total.load(Ordering::Relaxed),
702 "events_posted_total": relay.counters.events_posted_total.load(Ordering::Relaxed),
703 });
704 (StatusCode::OK, Json(body))
705}
706
707async fn landing_index() -> impl IntoResponse {
711 static INDEX_HTML: &[u8] = include_bytes!("../landing/index.html");
712 (
713 StatusCode::OK,
714 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
715 INDEX_HTML,
716 )
717}
718
719async fn landing_favicon() -> impl IntoResponse {
720 static FAVICON_SVG: &[u8] = include_bytes!("../landing/favicon.svg");
721 (
722 StatusCode::OK,
723 [(axum::http::header::CONTENT_TYPE, "image/svg+xml")],
724 FAVICON_SVG,
725 )
726}
727
728async fn landing_og() -> impl IntoResponse {
729 static OG_PNG: &[u8] = include_bytes!("../landing/og.png");
730 (
731 StatusCode::OK,
732 [
733 (axum::http::header::CONTENT_TYPE, "image/png"),
734 (axum::http::header::CACHE_CONTROL, "public, max-age=86400"),
735 ],
736 OG_PNG,
737 )
738}
739
740async fn landing_demo_cast() -> impl IntoResponse {
741 static DEMO_CAST: &[u8] = include_bytes!("../landing/demo.cast");
742 (
743 StatusCode::OK,
744 [
745 (axum::http::header::CONTENT_TYPE, "application/x-asciicast"),
746 (axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
747 ],
748 DEMO_CAST,
749 )
750}
751
752async fn landing_install_sh() -> impl IntoResponse {
753 static INSTALL_SH: &[u8] = include_bytes!("../landing/install.sh");
754 (
755 StatusCode::OK,
756 [
757 (
758 axum::http::header::CONTENT_TYPE,
759 "text/x-shellscript; charset=utf-8",
760 ),
761 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
762 ],
763 INSTALL_SH,
764 )
765}
766
767async fn landing_openshell_policy_sh() -> impl IntoResponse {
768 static POLICY_SH: &[u8] = include_bytes!("../landing/openshell-policy.sh");
769 (
770 StatusCode::OK,
771 [
772 (
773 axum::http::header::CONTENT_TYPE,
774 "text/x-shellscript; charset=utf-8",
775 ),
776 (axum::http::header::CACHE_CONTROL, "public, max-age=300"),
777 ],
778 POLICY_SH,
779 )
780}
781
782async fn allocate_slot(
783 State(relay): State<Relay>,
784 Json(_req): Json<AllocateRequest>,
785) -> impl IntoResponse {
786 let slot_id = random_hex(16);
787 let slot_token = random_hex(32);
788 {
789 let mut inner = relay.inner.lock().await;
790 inner.slots.insert(slot_id.clone(), Vec::new());
791 inner.tokens.insert(slot_id.clone(), slot_token.clone());
792 }
793 if let Err(e) = relay.persist_tokens().await {
794 return (
795 StatusCode::INTERNAL_SERVER_ERROR,
796 Json(json!({"error": format!("persist failed: {e}")})),
797 )
798 .into_response();
799 }
800 relay
801 .counters
802 .slot_allocations_total
803 .fetch_add(1, Ordering::Relaxed);
804 (
805 StatusCode::CREATED,
806 Json(json!({"slot_id": slot_id, "slot_token": slot_token})),
807 )
808 .into_response()
809}
810
811async fn post_event(
812 State(relay): State<Relay>,
813 Path(slot_id): Path<String>,
814 headers: HeaderMap,
815 Json(req): Json<PostEventRequest>,
816) -> impl IntoResponse {
817 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
818 return resp;
819 }
820 let body_bytes = match serde_json::to_vec(&req.event) {
822 Ok(b) => b,
823 Err(e) => {
824 return (
825 StatusCode::BAD_REQUEST,
826 Json(json!({"error": format!("event not serializable: {e}")})),
827 )
828 .into_response();
829 }
830 };
831 if body_bytes.len() > MAX_EVENT_BYTES {
832 return (
833 StatusCode::PAYLOAD_TOO_LARGE,
834 Json(json!({"error": "event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
835 )
836 .into_response();
837 }
838 {
840 let inner = relay.inner.lock().await;
841 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
842 if used + body_bytes.len() > MAX_SLOT_BYTES {
843 return (
844 StatusCode::PAYLOAD_TOO_LARGE,
845 Json(json!({
846 "error": "slot quota exceeded",
847 "slot_bytes_used": used,
848 "slot_bytes_max": MAX_SLOT_BYTES,
849 "remediation": "operator should `wire rotate-slot` to drain old slot",
850 })),
851 )
852 .into_response();
853 }
854 }
855 let event_id = req
856 .event
857 .get("event_id")
858 .and_then(Value::as_str)
859 .map(str::to_string);
860
861 let dup = {
863 let inner = relay.inner.lock().await;
864 let slot = inner.slots.get(&slot_id);
865 if let (Some(eid), Some(slot)) = (&event_id, slot) {
866 slot.iter()
867 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
868 } else {
869 false
870 }
871 };
872 if dup {
873 return (
874 StatusCode::OK,
875 Json(json!({"event_id": event_id, "status": "duplicate"})),
876 )
877 .into_response();
878 }
879
880 {
881 let mut inner = relay.inner.lock().await;
882 let event_size = body_bytes.len();
883 let slot = inner.slots.entry(slot_id.clone()).or_default();
884 slot.push(req.event.clone());
885 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
886 }
887 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": format!("persist failed: {e}")})),
891 )
892 .into_response();
893 }
894 relay
895 .counters
896 .events_posted_total
897 .fetch_add(1, Ordering::Relaxed);
898 {
903 let mut inner = relay.inner.lock().await;
904 if let Some(subs) = inner.streams.get_mut(&slot_id) {
905 subs.retain(|tx| tx.send(req.event.clone()).is_ok());
906 }
907 }
908 (
909 StatusCode::CREATED,
910 Json(json!({"event_id": event_id, "status": "stored"})),
911 )
912 .into_response()
913}
914
915async fn stream_events(
928 State(relay): State<Relay>,
929 Path(slot_id): Path<String>,
930 headers: HeaderMap,
931) -> axum::response::Response {
932 use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
933 use futures::stream::StreamExt;
934
935 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
936 return resp;
937 }
938
939 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
940 {
941 let mut inner = relay.inner.lock().await;
942 inner.streams.entry(slot_id.clone()).or_default().push(tx);
943 }
944
945 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|ev| {
946 SseEvent::default()
947 .json_data(&ev)
948 .map_err(|e| std::io::Error::other(e.to_string()))
949 });
950
951 Sse::new(stream)
952 .keep_alive(
953 KeepAlive::new()
954 .interval(std::time::Duration::from_secs(30))
955 .text("phyllis: still on the line"),
956 )
957 .into_response()
958}
959
960#[derive(Deserialize)]
963pub struct PairOpenRequest {
964 pub code_hash: String,
965 pub msg: String,
967 pub role: String, }
969
970#[derive(Deserialize)]
971pub struct PairBootstrapRequest {
972 pub role: String,
973 pub sealed: String,
974}
975
976#[derive(Deserialize)]
977pub struct PairAbandonRequest {
978 pub code_hash: String,
981}
982
983async fn pair_abandon(
989 State(relay): State<Relay>,
990 Json(req): Json<PairAbandonRequest>,
991) -> impl IntoResponse {
992 let mut inner = relay.inner.lock().await;
993 if let Some(pair_id) = inner.pair_lookup.remove(&req.code_hash) {
994 inner.pair_slots.remove(&pair_id);
995 }
996 StatusCode::NO_CONTENT.into_response()
997}
998
999async fn pair_open(
1000 State(relay): State<Relay>,
1001 Json(req): Json<PairOpenRequest>,
1002) -> impl IntoResponse {
1003 if req.role != "host" && req.role != "guest" {
1004 return (
1005 StatusCode::BAD_REQUEST,
1006 Json(json!({"error": "role must be 'host' or 'guest'"})),
1007 )
1008 .into_response();
1009 }
1010 relay.evict_expired_pair_slots().await;
1011 let mut inner = relay.inner.lock().await;
1012 let pair_id = match inner.pair_lookup.get(&req.code_hash).cloned() {
1013 Some(id) => id,
1014 None => {
1015 let new_id = random_hex(16);
1016 inner
1017 .pair_lookup
1018 .insert(req.code_hash.clone(), new_id.clone());
1019 inner.pair_slots.insert(new_id.clone(), PairSlot::default());
1020 relay
1021 .counters
1022 .pair_opens_total
1023 .fetch_add(1, Ordering::Relaxed);
1024 new_id
1025 }
1026 };
1027 let slot = inner.pair_slots.entry(pair_id.clone()).or_default();
1028 slot.last_touched = std::time::Instant::now();
1029 if req.role == "host" {
1030 if slot.host_msg.is_some() {
1031 return (
1032 StatusCode::CONFLICT,
1033 Json(json!({"error": "host already registered for this code"})),
1034 )
1035 .into_response();
1036 }
1037 slot.host_msg = Some(req.msg);
1038 } else {
1039 if slot.guest_msg.is_some() {
1040 return (
1041 StatusCode::CONFLICT,
1042 Json(json!({"error": "guest already registered for this code"})),
1043 )
1044 .into_response();
1045 }
1046 slot.guest_msg = Some(req.msg);
1047 }
1048 (StatusCode::CREATED, Json(json!({"pair_id": pair_id}))).into_response()
1049}
1050
1051#[derive(Deserialize)]
1052pub struct PairGetQuery {
1053 pub as_role: String,
1055}
1056
1057async fn pair_get(
1058 State(relay): State<Relay>,
1059 Path(pair_id): Path<String>,
1060 Query(q): Query<PairGetQuery>,
1061) -> impl IntoResponse {
1062 relay.evict_expired_pair_slots().await;
1063 let mut inner = relay.inner.lock().await;
1064 let slot = match inner.pair_slots.get_mut(&pair_id) {
1065 Some(s) => {
1066 s.last_touched = std::time::Instant::now();
1067 s.clone()
1068 }
1069 None => {
1070 return (
1071 StatusCode::NOT_FOUND,
1072 Json(json!({"error": "unknown pair_id"})),
1073 )
1074 .into_response();
1075 }
1076 };
1077 let (peer_msg, peer_bootstrap) = match q.as_role.as_str() {
1078 "host" => (slot.guest_msg, slot.guest_bootstrap),
1079 "guest" => (slot.host_msg, slot.host_bootstrap),
1080 _ => {
1081 return (
1082 StatusCode::BAD_REQUEST,
1083 Json(json!({"error": "as_role must be 'host' or 'guest'"})),
1084 )
1085 .into_response();
1086 }
1087 };
1088 (
1089 StatusCode::OK,
1090 Json(json!({"peer_msg": peer_msg, "peer_bootstrap": peer_bootstrap})),
1091 )
1092 .into_response()
1093}
1094
1095async fn pair_bootstrap(
1096 State(relay): State<Relay>,
1097 Path(pair_id): Path<String>,
1098 Json(req): Json<PairBootstrapRequest>,
1099) -> impl IntoResponse {
1100 relay.evict_expired_pair_slots().await;
1101 let mut inner = relay.inner.lock().await;
1102 let slot = match inner.pair_slots.get_mut(&pair_id) {
1103 Some(s) => s,
1104 None => {
1105 return (
1106 StatusCode::NOT_FOUND,
1107 Json(json!({"error": "unknown pair_id"})),
1108 )
1109 .into_response();
1110 }
1111 };
1112 slot.last_touched = std::time::Instant::now();
1113 match req.role.as_str() {
1114 "host" => slot.host_bootstrap = Some(req.sealed),
1115 "guest" => slot.guest_bootstrap = Some(req.sealed),
1116 _ => {
1117 return (
1118 StatusCode::BAD_REQUEST,
1119 Json(json!({"error": "role must be 'host' or 'guest'"})),
1120 )
1121 .into_response();
1122 }
1123 }
1124 (StatusCode::CREATED, Json(json!({"ok": true}))).into_response()
1125}
1126
1127#[derive(Deserialize)]
1130pub struct HandleClaimRequest {
1131 pub nick: String,
1134 pub slot_id: String,
1136 pub relay_url: Option<String>,
1140 pub card: Value,
1143}
1144
1145async fn handle_claim(
1152 State(relay): State<Relay>,
1153 headers: HeaderMap,
1154 Json(req): Json<HandleClaimRequest>,
1155) -> impl IntoResponse {
1156 if let Err(resp) = check_token(&relay, &headers, &req.slot_id).await {
1159 return resp;
1160 }
1161 if !crate::pair_profile::is_valid_nick(&req.nick) {
1163 return (
1164 StatusCode::BAD_REQUEST,
1165 Json(json!({
1166 "error": "phyllis: that handle won't fit in the books — nicks need 2-32 chars, lowercase [a-z0-9_-], not on the reserved list",
1167 "nick": req.nick,
1168 })),
1169 )
1170 .into_response();
1171 }
1172 if let Err(e) = crate::agent_card::verify_agent_card(&req.card) {
1174 return (
1175 StatusCode::BAD_REQUEST,
1176 Json(json!({"error": format!("card signature invalid: {e}")})),
1177 )
1178 .into_response();
1179 }
1180 let did = match req.card.get("did").and_then(Value::as_str) {
1181 Some(d) => d.to_string(),
1182 None => {
1183 return (
1184 StatusCode::BAD_REQUEST,
1185 Json(json!({"error": "card missing 'did' field"})),
1186 )
1187 .into_response();
1188 }
1189 };
1190
1191 let first_claim = {
1193 let inner = relay.inner.lock().await;
1194 match inner.handles.get(&req.nick) {
1195 Some(existing) if existing.did != did => {
1196 return (
1197 StatusCode::CONFLICT,
1198 Json(json!({
1199 "error": "phyllis: this line's already taken by someone else — pick another handle or buzz the rightful owner",
1200 "nick": req.nick,
1201 "claimed_by": existing.did,
1202 })),
1203 )
1204 .into_response();
1205 }
1206 Some(_) => false,
1207 None => true,
1208 }
1209 };
1210
1211 let now = time::OffsetDateTime::now_utc()
1212 .format(&time::format_description::well_known::Rfc3339)
1213 .unwrap_or_default();
1214 let record = HandleRecord {
1215 nick: req.nick.clone(),
1216 did: did.clone(),
1217 card: req.card.clone(),
1218 slot_id: req.slot_id.clone(),
1219 relay_url: req.relay_url.clone(),
1220 claimed_at: now,
1221 };
1222
1223 let path = relay
1225 .state_dir
1226 .join("handles")
1227 .join(format!("{}.json", req.nick));
1228 let body = match serde_json::to_vec_pretty(&record) {
1229 Ok(b) => b,
1230 Err(e) => {
1231 return (
1232 StatusCode::INTERNAL_SERVER_ERROR,
1233 Json(json!({"error": format!("serialize failed: {e}")})),
1234 )
1235 .into_response();
1236 }
1237 };
1238 if let Err(e) = tokio::fs::write(&path, &body).await {
1239 return (
1240 StatusCode::INTERNAL_SERVER_ERROR,
1241 Json(json!({"error": format!("persist failed: {e}")})),
1242 )
1243 .into_response();
1244 }
1245 {
1246 let mut inner = relay.inner.lock().await;
1247 inner.handles.insert(req.nick.clone(), record);
1248 }
1249 relay
1250 .counters
1251 .handle_claims_total
1252 .fetch_add(1, Ordering::Relaxed);
1253 if first_claim {
1254 relay
1255 .counters
1256 .handle_first_claims_total
1257 .fetch_add(1, Ordering::Relaxed);
1258 }
1259 (
1260 StatusCode::CREATED,
1261 Json(json!({
1262 "nick": req.nick,
1263 "did": did,
1264 "status": if first_claim { "claimed" } else { "re-claimed" },
1265 })),
1266 )
1267 .into_response()
1268}
1269
1270#[derive(Deserialize)]
1271pub struct WellKnownAgentQuery {
1272 pub handle: String,
1273}
1274
1275#[derive(Deserialize)]
1276pub struct HandlesDirectoryQuery {
1277 pub cursor: Option<String>,
1278 pub limit: Option<usize>,
1279 pub vibe: Option<String>,
1280}
1281
1282#[derive(Deserialize)]
1292pub struct InviteRegisterRequest {
1293 pub invite_url: String,
1295 #[serde(default)]
1297 pub ttl_seconds: Option<u64>,
1298 #[serde(default)]
1301 pub uses: Option<u32>,
1302}
1303
1304impl Relay {
1305 async fn persist_invite(&self, rec: &InviteRecord) -> Result<()> {
1307 use tokio::io::AsyncWriteExt;
1308 let mut line = serde_json::to_vec(rec)?;
1309 line.push(b'\n');
1310 let path = self.state_dir.join("invites.jsonl");
1311 let mut f = tokio::fs::OpenOptions::new()
1312 .create(true)
1313 .append(true)
1314 .open(&path)
1315 .await?;
1316 f.write_all(&line).await?;
1317 f.flush().await?;
1318 Ok(())
1319 }
1320}
1321
1322async fn invite_register(
1323 State(relay): State<Relay>,
1324 Json(req): Json<InviteRegisterRequest>,
1325) -> impl IntoResponse {
1326 if req.invite_url.is_empty() {
1327 return (
1328 StatusCode::BAD_REQUEST,
1329 Json(json!({"error": "invite_url required"})),
1330 )
1331 .into_response();
1332 }
1333 if req.invite_url.len() > 8_192 {
1335 return (
1336 StatusCode::PAYLOAD_TOO_LARGE,
1337 Json(json!({"error": "invite_url > 8 KiB"})),
1338 )
1339 .into_response();
1340 }
1341 let ttl = req.ttl_seconds.unwrap_or(86_400).clamp(60, 7 * 86_400);
1342 let now = SystemTime::now()
1343 .duration_since(UNIX_EPOCH)
1344 .map(|d| d.as_secs())
1345 .unwrap_or(0);
1346 let token = random_hex(3);
1349 let rec = InviteRecord {
1350 token: token.clone(),
1351 invite_url: req.invite_url,
1352 expires_unix: now + ttl,
1353 uses_remaining: req.uses,
1354 created_unix: now,
1355 };
1356 {
1357 let mut inner = relay.inner.lock().await;
1358 if inner.invites.contains_key(&token) {
1359 return (
1360 StatusCode::CONFLICT,
1361 Json(json!({"error": "token collision, retry"})),
1362 )
1363 .into_response();
1364 }
1365 inner.invites.insert(token.clone(), rec.clone());
1366 }
1367 if let Err(e) = relay.persist_invite(&rec).await {
1368 return (
1369 StatusCode::INTERNAL_SERVER_ERROR,
1370 Json(json!({"error": format!("persist failed: {e}")})),
1371 )
1372 .into_response();
1373 }
1374 (
1375 StatusCode::CREATED,
1376 Json(json!({
1377 "token": token,
1378 "path": format!("/i/{token}"),
1379 "expires_unix": rec.expires_unix,
1380 "uses_remaining": rec.uses_remaining,
1381 })),
1382 )
1383 .into_response()
1384}
1385
1386#[derive(Deserialize)]
1387pub struct InviteScriptQuery {
1388 pub format: Option<String>,
1395}
1396
1397async fn invite_script(
1398 State(relay): State<Relay>,
1399 Path(token): Path<String>,
1400 Query(q): Query<InviteScriptQuery>,
1401) -> impl IntoResponse {
1402 if token.len() != 6 || !token.chars().all(|c| c.is_ascii_hexdigit()) {
1405 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1406 }
1407 let want_raw_url = q.format.as_deref() == Some("url");
1408 let now = SystemTime::now()
1409 .duration_since(UNIX_EPOCH)
1410 .map(|d| d.as_secs())
1411 .unwrap_or(0);
1412 let invite_url = {
1413 let mut inner = relay.inner.lock().await;
1414 let Some(rec) = inner.invites.get_mut(&token) else {
1415 return (StatusCode::NOT_FOUND, "not found\n").into_response();
1416 };
1417 if rec.expires_unix <= now {
1418 return (StatusCode::GONE, "this invite has expired\n").into_response();
1419 }
1420 if let Some(n) = rec.uses_remaining {
1421 if n == 0 {
1422 return (StatusCode::GONE, "this invite has been used up\n").into_response();
1423 }
1424 if !want_raw_url {
1428 rec.uses_remaining = Some(n - 1);
1429 }
1430 }
1431 rec.invite_url.clone()
1432 };
1433 if want_raw_url {
1434 return (
1435 StatusCode::OK,
1436 [
1437 (
1438 axum::http::header::CONTENT_TYPE,
1439 "text/plain; charset=utf-8",
1440 ),
1441 (
1442 axum::http::header::CACHE_CONTROL,
1443 "private, no-store, max-age=0",
1444 ),
1445 ],
1446 invite_url,
1447 )
1448 .into_response();
1449 }
1450 let escaped = invite_url.replace('\'', "'\\''");
1451 let script = format!(
1452 "#!/bin/sh\n\
1453 # wire — one-curl onboarding (install + pair in one shot)\n\
1454 # source: https://github.com/SlanchaAi/wire\n\
1455 set -eu\n\
1456 INVITE='{escaped}'\n\
1457 echo \"\u{2192} checking for wire CLI...\"\n\
1458 if ! command -v wire >/dev/null 2>&1; then\n \
1459 echo \"\u{2192} wire not installed; installing first...\"\n \
1460 curl -fsSL https://wireup.net/install.sh | sh\n \
1461 case \":$PATH:\" in\n \
1462 *:\"$HOME/.local/bin\":*) ;;\n \
1463 *) export PATH=\"$HOME/.local/bin:$PATH\" ;;\n \
1464 esac\n \
1465 if ! command -v wire >/dev/null 2>&1; then\n \
1466 echo \"\"\n \
1467 echo \"wire was installed to ~/.local/bin but it's not on \\$PATH yet.\"\n \
1468 echo \"Open a new shell, then run:\"\n \
1469 echo \" wire accept '$INVITE'\"\n \
1470 exit 0\n \
1471 fi\n\
1472 fi\n\
1473 echo \"\u{2192} accepting invite...\"\n\
1474 wire accept \"$INVITE\"\n"
1475 );
1476 (
1477 StatusCode::OK,
1478 [
1479 (
1480 axum::http::header::CONTENT_TYPE,
1481 "text/x-shellscript; charset=utf-8",
1482 ),
1483 (
1484 axum::http::header::CACHE_CONTROL,
1485 "private, no-store, max-age=0",
1486 ),
1487 ],
1488 script,
1489 )
1490 .into_response()
1491}
1492
1493async fn handles_directory(
1494 State(relay): State<Relay>,
1495 Query(q): Query<HandlesDirectoryQuery>,
1496) -> impl IntoResponse {
1497 let limit = q.limit.unwrap_or(100).clamp(1, 500);
1498 let vibe_filter = q.vibe.as_ref().map(|v| v.to_ascii_lowercase());
1499 let inner = relay.inner.lock().await;
1500 let mut records: Vec<HandleRecord> = inner.handles.values().cloned().collect();
1501 drop(inner);
1502 records.sort_by(|a, b| a.nick.cmp(&b.nick));
1503
1504 let cursor = q.cursor.as_deref();
1505 let mut eligible = Vec::new();
1506 for rec in records {
1507 if cursor.is_some_and(|c| rec.nick.as_str() <= c) {
1508 continue;
1509 }
1510 if rec.nick.starts_with("demo-") || rec.nick.starts_with("test-") {
1515 continue;
1516 }
1517 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1518 if profile
1519 .get("listed")
1520 .and_then(Value::as_bool)
1521 .is_some_and(|listed| !listed)
1522 {
1523 continue;
1524 }
1525 if let Some(want) = &vibe_filter {
1526 let matched = profile
1527 .get("vibe")
1528 .and_then(Value::as_array)
1529 .map(|arr| {
1530 arr.iter().any(|v| {
1531 v.as_str()
1532 .map(|s| s.eq_ignore_ascii_case(want))
1533 .unwrap_or(false)
1534 })
1535 })
1536 .unwrap_or(false);
1537 if !matched {
1538 continue;
1539 }
1540 }
1541 eligible.push((rec, profile));
1542 }
1543
1544 let has_more = eligible.len() > limit;
1545 let page = eligible.into_iter().take(limit).collect::<Vec<_>>();
1546 let next_cursor = if has_more {
1547 page.last().map(|(rec, _)| rec.nick.clone())
1548 } else {
1549 None
1550 };
1551 let handles: Vec<Value> = page
1552 .into_iter()
1553 .map(|(rec, profile)| {
1554 json!({
1555 "nick": rec.nick,
1556 "did": rec.did,
1557 "profile": {
1558 "emoji": profile.get("emoji").cloned().unwrap_or(Value::Null),
1559 "motto": profile.get("motto").cloned().unwrap_or(Value::Null),
1560 "vibe": profile.get("vibe").cloned().unwrap_or(Value::Null),
1561 "pronouns": profile.get("pronouns").cloned().unwrap_or(Value::Null),
1562 "now": profile.get("now").cloned().unwrap_or(Value::Null),
1563 },
1564 "claimed_at": rec.claimed_at,
1565 })
1566 })
1567 .collect();
1568 (
1569 StatusCode::OK,
1570 Json(json!({
1571 "handles": handles,
1572 "next_cursor": next_cursor,
1573 })),
1574 )
1575 .into_response()
1576}
1577
1578async fn handle_intro(
1592 State(relay): State<Relay>,
1593 Path(nick): Path<String>,
1594 Json(req): Json<PostEventRequest>,
1595) -> impl IntoResponse {
1596 let slot_id = {
1598 let inner = relay.inner.lock().await;
1599 match inner.handles.get(&nick) {
1600 Some(rec) => rec.slot_id.clone(),
1601 None => {
1602 return (
1603 StatusCode::NOT_FOUND,
1604 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1605 )
1606 .into_response();
1607 }
1608 }
1609 };
1610
1611 let kind = req.event.get("kind").and_then(Value::as_u64).unwrap_or(0);
1614 let type_str = req.event.get("type").and_then(Value::as_str).unwrap_or("");
1615 if kind != 1100 && type_str != "pair_drop" && type_str != "agent_card" {
1616 return (
1617 StatusCode::BAD_REQUEST,
1618 Json(json!({
1619 "error": "intro endpoint only accepts kind=1100 pair_drop / agent_card events",
1620 "got_kind": kind,
1621 "got_type": type_str,
1622 })),
1623 )
1624 .into_response();
1625 }
1626
1627 let embedded_card = match req.event.get("body").and_then(|b| b.get("card")) {
1629 Some(c) => c.clone(),
1630 None => {
1631 return (
1632 StatusCode::BAD_REQUEST,
1633 Json(json!({"error": "intro event body must embed 'card' field"})),
1634 )
1635 .into_response();
1636 }
1637 };
1638 if let Err(e) = crate::agent_card::verify_agent_card(&embedded_card) {
1639 return (
1640 StatusCode::BAD_REQUEST,
1641 Json(json!({"error": format!("embedded card signature invalid: {e}")})),
1642 )
1643 .into_response();
1644 }
1645
1646 let body_bytes = match serde_json::to_vec(&req.event) {
1648 Ok(b) => b,
1649 Err(e) => {
1650 return (
1651 StatusCode::BAD_REQUEST,
1652 Json(json!({"error": format!("event not serializable: {e}")})),
1653 )
1654 .into_response();
1655 }
1656 };
1657 if body_bytes.len() > MAX_EVENT_BYTES {
1658 return (
1659 StatusCode::PAYLOAD_TOO_LARGE,
1660 Json(json!({"error": "intro event exceeds 256 KiB", "max_bytes": MAX_EVENT_BYTES})),
1661 )
1662 .into_response();
1663 }
1664 {
1665 let inner = relay.inner.lock().await;
1666 let used = inner.slot_bytes.get(&slot_id).copied().unwrap_or(0);
1667 if used + body_bytes.len() > MAX_SLOT_BYTES {
1668 return (
1669 StatusCode::PAYLOAD_TOO_LARGE,
1670 Json(json!({
1671 "error": "target slot quota exceeded",
1672 "slot_bytes_used": used,
1673 "slot_bytes_max": MAX_SLOT_BYTES,
1674 })),
1675 )
1676 .into_response();
1677 }
1678 }
1679
1680 let event_id = req
1681 .event
1682 .get("event_id")
1683 .and_then(Value::as_str)
1684 .map(str::to_string);
1685
1686 let dup = {
1688 let inner = relay.inner.lock().await;
1689 let slot = inner.slots.get(&slot_id);
1690 if let (Some(eid), Some(slot)) = (&event_id, slot) {
1691 slot.iter()
1692 .any(|e| e.get("event_id").and_then(Value::as_str) == Some(eid))
1693 } else {
1694 false
1695 }
1696 };
1697 if dup {
1698 return (
1699 StatusCode::OK,
1700 Json(json!({"event_id": event_id, "status": "duplicate"})),
1701 )
1702 .into_response();
1703 }
1704
1705 {
1706 let mut inner = relay.inner.lock().await;
1707 let event_size = body_bytes.len();
1708 let slot = inner.slots.entry(slot_id.clone()).or_default();
1709 slot.push(req.event.clone());
1710 *inner.slot_bytes.entry(slot_id.clone()).or_insert(0) += event_size;
1711 }
1712 if let Err(e) = relay.append_event_to_disk(&slot_id, &req.event).await {
1713 return (
1714 StatusCode::INTERNAL_SERVER_ERROR,
1715 Json(json!({"error": format!("persist failed: {e}")})),
1716 )
1717 .into_response();
1718 }
1719 (
1720 StatusCode::CREATED,
1721 Json(json!({"event_id": event_id, "status": "dropped", "to_nick": nick})),
1722 )
1723 .into_response()
1724}
1725
1726async fn well_known_agent_card_a2a(
1742 State(relay): State<Relay>,
1743 Query(q): Query<WellKnownAgentQuery>,
1744) -> impl IntoResponse {
1745 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1746 if nick.is_empty() {
1747 return (
1748 StatusCode::BAD_REQUEST,
1749 Json(json!({"error": "handle missing nick"})),
1750 )
1751 .into_response();
1752 }
1753 let inner = relay.inner.lock().await;
1754 let rec = match inner.handles.get(&nick) {
1755 Some(r) => r.clone(),
1756 None => {
1757 return (
1758 StatusCode::NOT_FOUND,
1759 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1760 )
1761 .into_response();
1762 }
1763 };
1764 drop(inner);
1765
1766 let profile = rec.card.get("profile").cloned().unwrap_or(Value::Null);
1767 let description = profile
1768 .get("motto")
1769 .and_then(Value::as_str)
1770 .unwrap_or("")
1771 .to_string();
1772 let display_name = profile
1773 .get("display_name")
1774 .and_then(Value::as_str)
1775 .unwrap_or(&rec.nick)
1776 .to_string();
1777 let relay_url = rec.relay_url.clone().unwrap_or_default();
1778 let endpoint = if !relay_url.is_empty() {
1780 format!(
1781 "{}/v1/handle/intro/{}",
1782 relay_url.trim_end_matches('/'),
1783 rec.nick
1784 )
1785 } else {
1786 format!("/v1/handle/intro/{}", rec.nick)
1787 };
1788 let card_sig = rec.card.get("signature").cloned().unwrap_or(Value::Null);
1789
1790 let a2a_card = json!({
1794 "id": rec.did,
1795 "name": display_name,
1796 "description": description,
1797 "version": "wire/0.5",
1798 "endpoint": endpoint,
1799 "provider": {
1800 "name": "wire",
1801 "url": "https://github.com/SlanchaAi/wire"
1802 },
1803 "capabilities": {
1804 "streaming": false,
1805 "pushNotifications": false,
1806 "extendedAgentCard": true
1807 },
1808 "securitySchemes": {
1809 "ed25519-event-sig": {
1810 "type": "signature",
1811 "alg": "EdDSA",
1812 "description": "Wire-style signed events (kind=1100 pair_drop for intro; verify against embedded card pubkey)."
1813 }
1814 },
1815 "security": [{"ed25519-event-sig": []}],
1816 "skills": [],
1817 "extensions": [{
1818 "uri": "https://slancha.ai/wire/ext/v0.5",
1822 "description": "Wire-native fields: full signed agent-card, profile blob, DID, slot_id, mailbox relay coords.",
1823 "required": false,
1824 "params": {
1825 "did": rec.did,
1826 "handle": rec.nick,
1827 "slot_id": rec.slot_id,
1828 "relay_url": rec.relay_url,
1829 "card": rec.card,
1830 "profile": profile,
1831 "claimed_at": rec.claimed_at,
1832 }
1833 }],
1834 "signature": card_sig,
1835 });
1836 (StatusCode::OK, Json(a2a_card)).into_response()
1837}
1838
1839async fn well_known_agent(
1840 State(relay): State<Relay>,
1841 Query(q): Query<WellKnownAgentQuery>,
1842) -> impl IntoResponse {
1843 let nick = q.handle.split('@').next().unwrap_or("").to_string();
1844 if nick.is_empty() {
1845 return (
1846 StatusCode::BAD_REQUEST,
1847 Json(json!({"error": "handle missing nick"})),
1848 )
1849 .into_response();
1850 }
1851 let inner = relay.inner.lock().await;
1852 match inner.handles.get(&nick) {
1853 Some(rec) => (
1854 StatusCode::OK,
1855 Json(json!({
1856 "nick": rec.nick,
1857 "did": rec.did,
1858 "card": rec.card,
1859 "slot_id": rec.slot_id,
1860 "relay_url": rec.relay_url,
1861 "claimed_at": rec.claimed_at,
1862 })),
1863 )
1864 .into_response(),
1865 None => (
1866 StatusCode::NOT_FOUND,
1867 Json(json!({"error": format!("phyllis: that number's been disconnected — {nick:?} isn't claimed on this switchboard")})),
1868 )
1869 .into_response(),
1870 }
1871}
1872
1873async fn list_events(
1874 State(relay): State<Relay>,
1875 Path(slot_id): Path<String>,
1876 Query(q): Query<ListEventsQuery>,
1877 headers: HeaderMap,
1878) -> impl IntoResponse {
1879 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1880 return resp;
1881 }
1882 let limit = q.limit.unwrap_or(100).min(1000);
1883 let mut inner = relay.inner.lock().await;
1884 let now_unix = std::time::SystemTime::now()
1888 .duration_since(std::time::UNIX_EPOCH)
1889 .map(|d| d.as_secs())
1890 .unwrap_or(0);
1891 inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
1892 let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
1893 let start = match q.since {
1894 Some(eid) => events
1895 .iter()
1896 .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
1897 .map(|i| i + 1)
1898 .unwrap_or(0),
1899 None => 0,
1900 };
1901 let end = (start + limit).min(events.len());
1902 let slice = events[start..end].to_vec();
1903 (StatusCode::OK, Json(slice)).into_response()
1904}
1905
1906async fn slot_state(
1912 State(relay): State<Relay>,
1913 Path(slot_id): Path<String>,
1914 headers: HeaderMap,
1915) -> impl IntoResponse {
1916 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1917 return resp;
1918 }
1919 let inner = relay.inner.lock().await;
1920 let event_count = inner.slots.get(&slot_id).map(|v| v.len()).unwrap_or(0);
1921 let last_pull_at_unix = inner.last_pull_at_unix.get(&slot_id).copied();
1922 let responder_health = inner.responder_health.get(&slot_id).cloned();
1923 (
1924 StatusCode::OK,
1925 Json(json!({
1926 "slot_id": slot_id,
1927 "event_count": event_count,
1928 "last_pull_at_unix": last_pull_at_unix,
1929 "responder_health": responder_health,
1930 })),
1931 )
1932 .into_response()
1933}
1934
1935async fn responder_health_set(
1936 State(relay): State<Relay>,
1937 Path(slot_id): Path<String>,
1938 headers: HeaderMap,
1939 Json(record): Json<ResponderHealthRecord>,
1940) -> impl IntoResponse {
1941 if let Err(resp) = check_token(&relay, &headers, &slot_id).await {
1942 return resp;
1943 }
1944 let path = relay
1945 .state_dir
1946 .join("responder-health")
1947 .join(format!("{slot_id}.json"));
1948 let body = match serde_json::to_vec_pretty(&record) {
1949 Ok(b) => b,
1950 Err(e) => {
1951 return (
1952 StatusCode::INTERNAL_SERVER_ERROR,
1953 Json(json!({"error": format!("serialize failed: {e}")})),
1954 )
1955 .into_response();
1956 }
1957 };
1958 if let Err(e) = tokio::fs::write(&path, body).await {
1959 return (
1960 StatusCode::INTERNAL_SERVER_ERROR,
1961 Json(json!({"error": format!("persist failed: {e}")})),
1962 )
1963 .into_response();
1964 }
1965 {
1966 let mut inner = relay.inner.lock().await;
1967 inner
1968 .responder_health
1969 .insert(slot_id.clone(), record.clone());
1970 }
1971 (StatusCode::OK, Json(record)).into_response()
1972}
1973
1974async fn check_token(
1975 relay: &Relay,
1976 headers: &HeaderMap,
1977 slot_id: &str,
1978) -> std::result::Result<(), axum::response::Response> {
1979 let auth = headers
1980 .get(AUTHORIZATION)
1981 .and_then(|h| h.to_str().ok())
1982 .and_then(|s| s.strip_prefix("Bearer "))
1983 .map(str::to_string);
1984 let presented = match auth {
1985 Some(t) => t,
1986 None => {
1987 return Err((
1988 StatusCode::UNAUTHORIZED,
1989 Json(json!({"error": "missing Bearer token"})),
1990 )
1991 .into_response());
1992 }
1993 };
1994 let inner = relay.inner.lock().await;
1995 let expected = match inner.tokens.get(slot_id) {
1996 Some(t) => t.clone(),
1997 None => {
1998 return Err((
1999 StatusCode::NOT_FOUND,
2000 Json(json!({"error": "unknown slot"})),
2001 )
2002 .into_response());
2003 }
2004 };
2005 drop(inner);
2006 if !constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
2007 return Err((StatusCode::FORBIDDEN, Json(json!({"error": "bad token"}))).into_response());
2008 }
2009 Ok(())
2010}
2011
2012fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2013 if a.len() != b.len() {
2014 return false;
2015 }
2016 let mut acc = 0u8;
2017 for (x, y) in a.iter().zip(b.iter()) {
2018 acc |= x ^ y;
2019 }
2020 acc == 0
2021}
2022
2023fn is_valid_slot_id(s: &str) -> bool {
2024 s.len() == 32
2025 && s.bytes()
2026 .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase())
2027}
2028
2029fn random_hex(n_bytes: usize) -> String {
2030 let mut buf = vec![0u8; n_bytes];
2031 rand::thread_rng().fill_bytes(&mut buf);
2032 hex::encode(buf)
2033}
2034
2035pub async fn serve(bind: &str, state_dir: PathBuf) -> Result<()> {
2037 serve_with_mode(bind, state_dir, ServerMode::default()).await
2038}
2039
2040pub async fn serve_with_mode(
2044 bind: &str,
2045 state_dir: PathBuf,
2046 mode: ServerMode,
2047) -> Result<()> {
2048 let relay = Relay::new(state_dir).await?;
2049 relay.spawn_pair_sweeper();
2050 relay.spawn_counter_persister();
2051 let app = relay.clone().router_with_mode(mode);
2052 let listener = tokio::net::TcpListener::bind(bind)
2053 .await
2054 .with_context(|| format!("binding {bind}"))?;
2055 if mode.local_only {
2056 eprintln!("wire relay-server (LOCAL-ONLY) listening on {bind} — phonebook + well-known endpoints disabled");
2057 } else {
2058 eprintln!("wire relay-server listening on {bind}");
2059 }
2060 let shutdown_relay = relay.clone();
2061 axum::serve(listener, app)
2062 .with_graceful_shutdown(async move {
2063 let _ = tokio::signal::ctrl_c().await;
2064 eprintln!("\nshutting down — final counter snapshot");
2065 if let Err(e) = shutdown_relay.persist_counters().await {
2066 eprintln!("final counter persist failed: {e}");
2067 }
2068 })
2069 .await?;
2070 Ok(())
2071}
2072
2073#[derive(Debug, Clone, Copy, Default)]
2078pub struct ServerMode {
2079 pub local_only: bool,
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088 use super::*;
2089
2090 #[test]
2091 fn constant_time_eq_basic() {
2092 assert!(constant_time_eq(b"abc", b"abc"));
2093 assert!(!constant_time_eq(b"abc", b"abd"));
2094 assert!(!constant_time_eq(b"abc", b"abcd")); }
2096
2097 #[test]
2098 fn random_hex_length() {
2099 let s = random_hex(16);
2100 assert_eq!(s.len(), 32); assert!(s.chars().all(|c| c.is_ascii_hexdigit()));
2102 }
2103
2104 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2105 async fn pair_slot_evicts_when_idle_past_ttl() {
2106 let dir = std::env::temp_dir().join(format!("wire-evict-{}", random_hex(8)));
2107 let _ = std::fs::remove_dir_all(&dir);
2108 let relay = Relay::new(dir.clone()).await.unwrap();
2109
2110 {
2112 let mut inner = relay.inner.lock().await;
2113 inner
2114 .pair_lookup
2115 .insert("hash-A".to_string(), "id-A".to_string());
2116 inner.pair_slots.insert(
2117 "id-A".to_string(),
2118 PairSlot {
2119 last_touched: std::time::Instant::now()
2120 - std::time::Duration::from_secs(PAIR_SLOT_TTL_SECS + 60),
2121 ..PairSlot::default()
2122 },
2123 );
2124
2125 inner
2127 .pair_lookup
2128 .insert("hash-B".to_string(), "id-B".to_string());
2129 inner
2130 .pair_slots
2131 .insert("id-B".to_string(), PairSlot::default());
2132
2133 assert_eq!(inner.pair_slots.len(), 2);
2134 assert_eq!(inner.pair_lookup.len(), 2);
2135 }
2136
2137 relay.evict_expired_pair_slots().await;
2138
2139 let inner = relay.inner.lock().await;
2140 assert_eq!(
2141 inner.pair_slots.len(),
2142 1,
2143 "expired slot should have been evicted"
2144 );
2145 assert!(inner.pair_slots.contains_key("id-B"));
2146 assert_eq!(inner.pair_lookup.len(), 1);
2147 assert!(inner.pair_lookup.contains_key("hash-B"));
2148 let _ = std::fs::remove_dir_all(&dir);
2149 }
2150
2151 #[test]
2152 fn slot_id_validator_accepts_only_lowercase_32hex() {
2153 assert!(is_valid_slot_id("0123456789abcdef0123456789abcdef"));
2154 assert!(is_valid_slot_id(&random_hex(16)));
2155 assert!(!is_valid_slot_id("abc"));
2157 assert!(!is_valid_slot_id("0123456789abcdef0123456789abcde")); assert!(!is_valid_slot_id("0123456789abcdef0123456789abcdef0")); assert!(!is_valid_slot_id("0123456789ABCDEF0123456789abcdef"));
2161 assert!(!is_valid_slot_id("../etc/passwd0123456789abcdef0000"));
2163 assert!(!is_valid_slot_id("..%2Fetc%2Fpasswd00000000000000000"));
2164 assert!(!is_valid_slot_id("/absolute/path/that/looks/like/key"));
2165 assert!(!is_valid_slot_id(
2167 "0123456789abcdef\0\x31\x32\x33456789abcdef"
2168 ));
2169 }
2170}