1use anyhow::{Context, Result, anyhow, bail};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use thiserror::Error;
11
12#[derive(Clone)]
13pub struct RelayClient {
14 base_url: String,
15 client: reqwest::blocking::Client,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct AllocateResponse {
20 pub slot_id: String,
21 pub slot_token: String,
22}
23
24#[derive(Debug, Deserialize)]
25pub struct PostEventResponse {
26 pub event_id: Option<String>,
27 pub status: String,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum WireOrgTxtDid {
35 Org(String),
36 Op(String),
37}
38
39impl WireOrgTxtDid {
40 pub fn as_str(&self) -> &str {
41 match self {
42 WireOrgTxtDid::Org(did) | WireOrgTxtDid::Op(did) => did,
43 }
44 }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct WireOrgTxtRecord {
49 pub did: WireOrgTxtDid,
50 pub relay: Option<String>,
51 pub sso_iss: Option<String>,
52 pub sso_tenant: Option<String>,
53}
54
55#[derive(Debug, Error, PartialEq, Eq)]
56pub enum WireOrgTxtParseError {
57 #[error("DNS-TXT record missing required `did=` field")]
58 MissingDid,
59 #[error("DNS-TXT record missing required `v=` field")]
60 MissingVersion,
61 #[error("unsupported DNS-TXT record version `{0}`")]
62 UnsupportedVersion(String),
63 #[error("`did=` must be did:wire:org:* or did:wire:op:* with a long fingerprint suffix")]
64 InvalidDid(String),
65 #[error("duplicate DNS-TXT field `{0}`")]
66 DuplicateField(&'static str),
67 #[error("malformed DNS-TXT field `{0}`")]
68 MalformedField(String),
69}
70
71pub fn parse_wire_org_txt_record(record: &str) -> Result<WireOrgTxtRecord, WireOrgTxtParseError> {
79 let trimmed = record.trim();
80 let body = trimmed
81 .strip_prefix('"')
82 .and_then(|s| s.strip_suffix('"'))
83 .unwrap_or(trimmed);
84
85 let mut did: Option<String> = None;
86 let mut version: Option<String> = None;
87 let mut relay: Option<String> = None;
88 let mut sso_iss: Option<String> = None;
89 let mut sso_tenant: Option<String> = None;
90
91 fn set_once(
92 slot: &mut Option<String>,
93 field: &'static str,
94 value: &str,
95 ) -> Result<(), WireOrgTxtParseError> {
96 if slot.is_some() {
97 return Err(WireOrgTxtParseError::DuplicateField(field));
98 }
99 *slot = Some(value.trim().to_string());
100 Ok(())
101 }
102
103 for raw in body.split(';') {
104 let raw = raw.trim();
105 if raw.is_empty() {
106 continue;
107 }
108 let Some((key, value)) = raw.split_once('=') else {
109 return Err(WireOrgTxtParseError::MalformedField(raw.to_string()));
110 };
111 match key.trim() {
112 "did" => set_once(&mut did, "did", value)?,
113 "v" => set_once(&mut version, "v", value)?,
114 "relay" => set_once(&mut relay, "relay", value)?,
115 "sso_iss" => set_once(&mut sso_iss, "sso_iss", value)?,
116 "sso_tenant" => set_once(&mut sso_tenant, "sso_tenant", value)?,
117 _ => {
118 }
121 }
122 }
123
124 let version = version.ok_or(WireOrgTxtParseError::MissingVersion)?;
125 if version != "1" {
126 return Err(WireOrgTxtParseError::UnsupportedVersion(version));
127 }
128
129 let did = did.ok_or(WireOrgTxtParseError::MissingDid)?;
130 let did = if crate::agent_card::is_org_did(&did) {
131 WireOrgTxtDid::Org(did)
132 } else if crate::agent_card::is_op_did(&did) {
133 WireOrgTxtDid::Op(did)
134 } else {
135 return Err(WireOrgTxtParseError::InvalidDid(did));
136 };
137
138 Ok(WireOrgTxtRecord {
139 did,
140 relay,
141 sso_iss,
142 sso_tenant,
143 })
144}
145
146pub const INSECURE_SKIP_TLS_ENV: &str = "WIRE_INSECURE_SKIP_TLS_VERIFY";
154
155fn insecure_skip_tls_verify() -> bool {
156 matches!(
157 std::env::var(INSECURE_SKIP_TLS_ENV)
158 .unwrap_or_default()
159 .to_ascii_lowercase()
160 .as_str(),
161 "1" | "true" | "yes" | "on"
162 )
163}
164
165fn maybe_emit_insecure_banner() {
170 static ONCE: std::sync::OnceLock<()> = std::sync::OnceLock::new();
171 if insecure_skip_tls_verify() {
172 ONCE.get_or_init(|| {
173 eprintln!(
174 "\x1b[1;31mwire: WARNING\x1b[0m {INSECURE_SKIP_TLS_ENV}=1 is set; TLS verification is DISABLED for all relay traffic. \
175 MITM attacks against the relay path are undetectable in this mode. Unset to restore default trust validation."
176 );
177 });
178 }
179}
180
181pub fn build_blocking_client(
205 timeout: Option<std::time::Duration>,
206) -> Result<reqwest::blocking::Client> {
207 let mut b = reqwest::blocking::Client::builder();
208 if let Some(t) = timeout {
209 b = b.timeout(t);
210 }
211 if insecure_skip_tls_verify() {
212 maybe_emit_insecure_banner();
213 b = b.danger_accept_invalid_certs(true);
214 } else {
215 let cfg = crate::tls::shared_client_config();
220 b = b.use_preconfigured_tls((*cfg).clone());
221 }
222 b.build()
223 .with_context(|| "constructing reqwest blocking client")
224}
225
226pub fn format_transport_error(err: &anyhow::Error) -> String {
234 let mut parts: Vec<String> = err.chain().map(|c| c.to_string()).collect();
235 let lower = parts
239 .iter()
240 .map(|p| p.to_ascii_lowercase())
241 .collect::<Vec<_>>();
242 let class = if lower.iter().any(|p| {
243 p.contains("invalid peer certificate")
244 || p.contains("certificate verification")
245 || p.contains("unknownissuer")
246 || p.contains("certificate is not valid")
247 || p.contains("tls handshake")
248 }) {
249 Some("TLS error")
250 } else if lower.iter().any(|p| {
251 p.contains("dns error")
252 || p.contains("nodename nor servname")
253 || p.contains("failed to lookup address")
254 }) {
255 Some("DNS error")
256 } else if lower
257 .iter()
258 .any(|p| p.contains("operation timed out") || p.contains("deadline has elapsed"))
259 {
260 Some("timeout")
261 } else if lower
262 .iter()
263 .any(|p| p.contains("connection refused") || p.contains("connection reset"))
264 {
265 Some("connect error")
266 } else {
267 None
268 };
269 if let Some(c) = class {
270 parts.insert(0, c.to_string());
271 }
272 parts.join(": ")
273}
274
275#[cfg(unix)]
287pub fn uds_request(
288 socket_path: &std::path::Path,
289 method: &str,
290 request_target: &str,
291 headers: &[(&str, &str)],
292 body: &[u8],
293) -> Result<(u16, Vec<u8>)> {
294 use std::io::{Read, Write};
295 use std::os::unix::net::UnixStream;
296 let mut stream =
297 UnixStream::connect(socket_path).with_context(|| format!("connect UDS {socket_path:?}"))?;
298 stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
299 stream.set_write_timeout(Some(std::time::Duration::from_secs(30)))?;
300 let mut req = String::with_capacity(256 + headers.len() * 32 + body.len());
301 req.push_str(method);
302 req.push(' ');
303 req.push_str(request_target);
304 req.push_str(" HTTP/1.1\r\n");
305 req.push_str("Host: localhost\r\n");
306 req.push_str("Connection: close\r\n");
307 req.push_str(&format!("Content-Length: {}\r\n", body.len()));
308 for (k, v) in headers {
309 req.push_str(k);
310 req.push_str(": ");
311 req.push_str(v);
312 req.push_str("\r\n");
313 }
314 req.push_str("\r\n");
315 stream.write_all(req.as_bytes())?;
316 if !body.is_empty() {
317 stream.write_all(body)?;
318 }
319 stream.flush()?;
320 let mut raw = Vec::new();
321 stream.read_to_end(&mut raw)?;
322 let split = raw
324 .windows(4)
325 .position(|w| w == b"\r\n\r\n")
326 .ok_or_else(|| anyhow!("UDS response missing header/body delimiter"))?;
327 let head = std::str::from_utf8(&raw[..split])
328 .map_err(|e| anyhow!("UDS response head not UTF-8: {e}"))?;
329 let body = raw[split + 4..].to_vec();
330 let status_line = head.lines().next().unwrap_or("");
331 let status: u16 = status_line
333 .split_whitespace()
334 .nth(1)
335 .and_then(|s| s.parse().ok())
336 .ok_or_else(|| anyhow!("UDS response missing status code: {status_line:?}"))?;
337 Ok((status, body))
338}
339
340pub fn post_event_to_endpoint(
351 endpoint: &crate::endpoints::Endpoint,
352 event: &Value,
353) -> Result<PostEventResponse> {
354 #[cfg(unix)]
355 if let Some(socket_path) = endpoint.relay_url.strip_prefix("unix://") {
356 let body = serde_json::json!({"event": event}).to_string();
357 let auth_header = format!("Bearer {}", endpoint.slot_token);
358 let (status, body) = uds_request(
359 std::path::Path::new(socket_path),
360 "POST",
361 &format!("/v1/events/{}", endpoint.slot_id),
362 &[
363 ("Content-Type", "application/json"),
364 ("Authorization", &auth_header),
365 ],
366 body.as_bytes(),
367 )?;
368 if !(200..300).contains(&status) {
369 return Err(anyhow!(
379 "post_event (uds {socket_path}) failed: {status}: {}",
380 String::from_utf8_lossy(&body)
381 ));
382 }
383 return Ok(serde_json::from_slice(&body)?);
384 }
385 let client = RelayClient::new(&endpoint.relay_url);
386 client.post_event(&endpoint.slot_id, &endpoint.slot_token, event)
387}
388
389pub fn try_post_event_with_failover<F>(
413 endpoints: &[crate::endpoints::Endpoint],
414 event: &Value,
415 mut poster: F,
416) -> Result<(crate::endpoints::Endpoint, PostEventResponse)>
417where
418 F: FnMut(&crate::endpoints::Endpoint, &Value) -> Result<PostEventResponse>,
419{
420 if endpoints.is_empty() {
421 bail!(
422 "no endpoints to deliver to — peer has no pinned endpoints in relay_state. \
423 Re-run the pair flow (or `wire dial <peer>@<relay>`) to re-pin the peer's \
424 advertised endpoints."
425 );
426 }
427 let mut errs: Vec<String> = Vec::with_capacity(endpoints.len());
428 for ep in endpoints {
429 match poster(ep, event) {
430 Ok(resp) => return Ok((ep.clone(), resp)),
431 Err(e) => errs.push(format!("{} ({:?}): {e}", ep.relay_url, ep.scope)),
432 }
433 }
434 bail!(
435 "all {n} endpoint(s) failed:\n • {reasons}",
436 n = endpoints.len(),
437 reasons = errs.join("\n • ")
438 )
439}
440
441impl RelayClient {
442 pub fn new(base_url: &str) -> Self {
443 let client = build_blocking_client(Some(std::time::Duration::from_secs(30)))
444 .expect("reqwest client construction is infallible with rustls + native roots");
445 Self {
446 base_url: base_url.trim_end_matches('/').to_string(),
447 client,
448 }
449 }
450
451 pub fn allocate_slot(&self, handle_hint: Option<&str>) -> Result<AllocateResponse> {
455 let body = serde_json::json!({"handle": handle_hint});
456 let resp = self
457 .client
458 .post(format!("{}/v1/slot/allocate", self.base_url))
459 .json(&body)
460 .send()
461 .with_context(|| format!("POST {}/v1/slot/allocate", self.base_url))?;
462 let status = resp.status();
463 if !status.is_success() {
464 let detail = resp.text().unwrap_or_default();
465 return Err(anyhow!("allocate failed: {status}: {detail}"));
466 }
467 Ok(resp.json()?)
468 }
469
470 pub fn post_event(
474 &self,
475 slot_id: &str,
476 slot_token: &str,
477 event: &Value,
478 ) -> Result<PostEventResponse> {
479 let body = serde_json::json!({"event": event});
480 let resp = self
481 .client
482 .post(format!("{}/v1/events/{slot_id}", self.base_url))
483 .bearer_auth(slot_token)
484 .json(&body)
485 .send()
486 .with_context(|| format!("POST {}/v1/events/{slot_id}", self.base_url))?;
487 let status = resp.status();
488 if !status.is_success() {
489 let detail = resp.text().unwrap_or_default();
490 return Err(anyhow!("post_event failed: {status}: {detail}"));
500 }
501 Ok(resp.json()?)
502 }
503
504 pub fn list_events(
507 &self,
508 slot_id: &str,
509 slot_token: &str,
510 since: Option<&str>,
511 limit: Option<usize>,
512 ) -> Result<Vec<Value>> {
513 let mut url = format!("{}/v1/events/{slot_id}", self.base_url);
514 let mut sep = '?';
515 if let Some(s) = since {
516 url.push(sep);
517 url.push_str(&format!("since={s}"));
518 sep = '&';
519 }
520 if let Some(n) = limit {
521 url.push(sep);
522 url.push_str(&format!("limit={n}"));
523 }
524 let resp = self
525 .client
526 .get(&url)
527 .bearer_auth(slot_token)
528 .send()
529 .with_context(|| format!("GET {url}"))?;
530 let status = resp.status();
531 if !status.is_success() {
532 let detail = resp.text().unwrap_or_default();
533 return Err(anyhow!("list_events failed: {status}: {detail}"));
534 }
535 Ok(resp.json()?)
536 }
537
538 pub fn slot_state(&self, slot_id: &str, slot_token: &str) -> Result<(usize, Option<u64>)> {
544 let url = format!("{}/v1/slot/{slot_id}/state", self.base_url);
545 let resp = match self.client.get(&url).bearer_auth(slot_token).send() {
546 Ok(r) => r,
547 Err(_) => return Ok((0, None)),
548 };
549 if !resp.status().is_success() {
550 return Ok((0, None));
551 }
552 let v: Value = resp.json().unwrap_or(Value::Null);
553 let count = v.get("event_count").and_then(Value::as_u64).unwrap_or(0) as usize;
554 let last = v.get("last_pull_at_unix").and_then(Value::as_u64);
555 Ok((count, last))
556 }
557
558 pub fn responder_health_set(
559 &self,
560 slot_id: &str,
561 slot_token: &str,
562 record: &Value,
563 ) -> Result<Value> {
564 let resp = self
565 .client
566 .post(format!(
567 "{}/v1/slot/{slot_id}/responder-health",
568 self.base_url
569 ))
570 .bearer_auth(slot_token)
571 .json(record)
572 .send()
573 .with_context(|| {
574 format!("POST {}/v1/slot/{slot_id}/responder-health", self.base_url)
575 })?;
576 let status = resp.status();
577 if !status.is_success() {
578 let detail = resp.text().unwrap_or_default();
579 return Err(anyhow!("responder_health_set failed: {status}: {detail}"));
580 }
581 Ok(resp.json()?)
582 }
583
584 pub fn responder_health_get(&self, slot_id: &str, slot_token: &str) -> Result<Value> {
585 let resp = self
586 .client
587 .get(format!("{}/v1/slot/{slot_id}/state", self.base_url))
588 .bearer_auth(slot_token)
589 .send()
590 .with_context(|| format!("GET {}/v1/slot/{slot_id}/state", self.base_url))?;
591 let status = resp.status();
592 if !status.is_success() {
593 let detail = resp.text().unwrap_or_default();
594 return Err(anyhow!("responder_health_get failed: {status}: {detail}"));
595 }
596 let state: Value = resp.json()?;
597 Ok(state
598 .get("responder_health")
599 .cloned()
600 .unwrap_or(Value::Null))
601 }
602
603 pub fn healthz(&self) -> Result<bool> {
604 let resp = self
605 .client
606 .get(format!("{}/healthz", self.base_url))
607 .send()?;
608 Ok(resp.status().is_success())
609 }
610
611 pub fn check_healthz(&self) -> anyhow::Result<()> {
616 match self.healthz() {
617 Ok(true) => Ok(()),
618 Ok(false) => anyhow::bail!(
619 "phyllis: silent line — {}/healthz returned non-200.\n\
620 the host is reachable but the relay isn't returning ok. test:\n \
621 curl -v {}/healthz",
622 self.base_url,
623 self.base_url
624 ),
625 Err(e) => anyhow::bail!(
626 "phyllis: silent line — couldn't reach {}/healthz: {e:#}.\n\
627 test reachability from this machine:\n curl -v {}/healthz\n\
628 if curl also fails, a sandbox / proxy / firewall is the usual cause.\n\
629 (OpenShell sandbox? run `curl -fsSL https://wireup.net/openshell-policy.sh | bash -s <sandbox-name>` on the host first.)",
630 self.base_url,
631 self.base_url
632 ),
633 }
634 }
635
636 pub fn pair_open(&self, code_hash: &str, msg_b64: &str, role: &str) -> Result<String> {
640 let body = serde_json::json!({"code_hash": code_hash, "msg": msg_b64, "role": role});
641 let resp = self
642 .client
643 .post(format!("{}/v1/pair", self.base_url))
644 .json(&body)
645 .send()?;
646 let status = resp.status();
647 if !status.is_success() {
648 let detail = resp.text().unwrap_or_default();
649 return Err(anyhow!("pair_open failed: {status}: {detail}"));
650 }
651 let v: Value = resp.json()?;
652 v.get("pair_id")
653 .and_then(Value::as_str)
654 .map(str::to_string)
655 .ok_or_else(|| anyhow!("pair_open response missing pair_id"))
656 }
657
658 pub fn pair_abandon(&self, code_hash: &str) -> Result<()> {
663 let body = serde_json::json!({"code_hash": code_hash});
664 let resp = self
665 .client
666 .post(format!("{}/v1/pair/abandon", self.base_url))
667 .json(&body)
668 .send()?;
669 let status = resp.status();
670 if !status.is_success() {
671 let detail = resp.text().unwrap_or_default();
672 return Err(anyhow!("pair_abandon failed: {status}: {detail}"));
673 }
674 Ok(())
675 }
676
677 pub fn pair_get(
679 &self,
680 pair_id: &str,
681 as_role: &str,
682 ) -> Result<(Option<String>, Option<String>)> {
683 let resp = self
684 .client
685 .get(format!(
686 "{}/v1/pair/{pair_id}?as_role={as_role}",
687 self.base_url
688 ))
689 .send()?;
690 let status = resp.status();
691 if !status.is_success() {
692 let detail = resp.text().unwrap_or_default();
693 return Err(anyhow!("pair_get failed: {status}: {detail}"));
694 }
695 let v: Value = resp.json()?;
696 let peer_msg = v
697 .get("peer_msg")
698 .and_then(Value::as_str)
699 .map(str::to_string);
700 let peer_bootstrap = v
701 .get("peer_bootstrap")
702 .and_then(Value::as_str)
703 .map(str::to_string);
704 Ok((peer_msg, peer_bootstrap))
705 }
706
707 pub fn pair_bootstrap(&self, pair_id: &str, role: &str, sealed_b64: &str) -> Result<()> {
709 let body = serde_json::json!({"role": role, "sealed": sealed_b64});
710 let resp = self
711 .client
712 .post(format!("{}/v1/pair/{pair_id}/bootstrap", self.base_url))
713 .json(&body)
714 .send()?;
715 if !resp.status().is_success() {
716 let s = resp.status();
717 let detail = resp.text().unwrap_or_default();
718 return Err(anyhow!("pair_bootstrap failed: {s}: {detail}"));
719 }
720 Ok(())
721 }
722
723 pub fn handle_claim(
729 &self,
730 nick: &str,
731 slot_id: &str,
732 slot_token: &str,
733 relay_url: Option<&str>,
734 card: &Value,
735 ) -> Result<Value> {
736 self.handle_claim_v2(nick, slot_id, slot_token, relay_url, card, None)
737 }
738
739 pub fn handle_claim_v2(
745 &self,
746 nick: &str,
747 slot_id: &str,
748 slot_token: &str,
749 relay_url: Option<&str>,
750 card: &Value,
751 discoverable: Option<bool>,
752 ) -> Result<Value> {
753 let mut body = serde_json::json!({
754 "nick": nick,
755 "slot_id": slot_id,
756 "relay_url": relay_url,
757 "card": card,
758 });
759 if let Some(d) = discoverable {
760 body["discoverable"] = serde_json::json!(d);
761 }
762 let resp = self
763 .client
764 .post(format!("{}/v1/handle/claim", self.base_url))
765 .bearer_auth(slot_token)
766 .json(&body)
767 .send()
768 .with_context(|| format!("POST {}/v1/handle/claim", self.base_url))?;
769 let status = resp.status();
770 if !status.is_success() {
771 let detail = resp.text().unwrap_or_default();
772 return Err(anyhow!("handle_claim failed: {status}: {detail}"));
773 }
774 Ok(resp.json()?)
775 }
776
777 pub fn handle_intro(&self, nick: &str, event: &Value) -> Result<Value> {
781 let body = serde_json::json!({"event": event});
782 let resp = self
783 .client
784 .post(format!("{}/v1/handle/intro/{nick}", self.base_url))
785 .json(&body)
786 .send()
787 .with_context(|| format!("POST {}/v1/handle/intro/{nick}", self.base_url))?;
788 let status = resp.status();
789 if !status.is_success() {
790 let detail = resp.text().unwrap_or_default();
791 return Err(anyhow!("handle_intro failed: {status}: {detail}"));
792 }
793 Ok(resp.json()?)
794 }
795
796 pub fn well_known_agent_card_a2a(&self, handle: &str) -> Result<Value> {
802 let resp = self
803 .client
804 .get(format!("{}/.well-known/agent-card.json", self.base_url))
805 .query(&[("handle", handle)])
806 .send()
807 .with_context(|| {
808 format!(
809 "GET {}/.well-known/agent-card.json?handle={handle}",
810 self.base_url
811 )
812 })?;
813 let status = resp.status();
814 if !status.is_success() {
815 let detail = resp.text().unwrap_or_default();
816 return Err(anyhow!(
817 "well_known_agent_card_a2a failed: {status}: {detail}"
818 ));
819 }
820 Ok(resp.json()?)
821 }
822
823 pub fn well_known_agent(&self, handle: &str) -> Result<Value> {
827 let resp = self
828 .client
829 .get(format!("{}/.well-known/wire/agent", self.base_url))
830 .query(&[("handle", handle)])
831 .send()
832 .with_context(|| {
833 format!(
834 "GET {}/.well-known/wire/agent?handle={handle}",
835 self.base_url
836 )
837 })?;
838 let status = resp.status();
839 if !status.is_success() {
840 let detail = resp.text().unwrap_or_default();
841 return Err(anyhow!("well_known_agent failed: {status}: {detail}"));
842 }
843 Ok(resp.json()?)
844 }
845}
846
847#[cfg(all(test, unix))]
848mod uds_tests {
849 use super::*;
850 use std::io::{Read, Write};
851 use std::os::unix::net::UnixListener;
852 use std::thread;
853
854 fn spawn_canned_uds_server(socket_path: std::path::PathBuf, status: u16, body: &'static str) {
858 let listener = UnixListener::bind(&socket_path).expect("bind canned UDS");
859 thread::spawn(move || {
860 let (mut stream, _) = listener.accept().expect("accept canned UDS");
861 let mut req_buf = [0u8; 4096];
862 let _ = stream.read(&mut req_buf);
863 let body_bytes = body.as_bytes();
864 let status_text = match status {
865 200 => "OK",
866 201 => "Created",
867 400 => "Bad Request",
868 _ => "Status",
869 };
870 let resp = format!(
871 "HTTP/1.1 {status} {status_text}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
872 body_bytes.len()
873 );
874 let _ = stream.write_all(resp.as_bytes());
875 });
876 }
877
878 #[test]
879 fn uds_request_round_trips_200_with_body() {
880 let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
881 std::fs::create_dir_all(&tmpdir).unwrap();
882 let sock = tmpdir.join("rt.sock");
883 let _ = std::fs::remove_file(&sock);
884 spawn_canned_uds_server(sock.clone(), 200, r#"{"ok":true}"#);
885 std::thread::sleep(std::time::Duration::from_millis(50));
887 let (status, body) = uds_request(
888 &sock,
889 "POST",
890 "/v1/test",
891 &[("Content-Type", "application/json")],
892 b"{}",
893 )
894 .expect("uds_request succeeds");
895 assert_eq!(status, 200);
896 assert_eq!(body, br#"{"ok":true}"#);
897 }
898
899 #[test]
900 fn uds_request_surfaces_non_2xx_status() {
901 let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
902 std::fs::create_dir_all(&tmpdir).unwrap();
903 let sock = tmpdir.join("err.sock");
904 let _ = std::fs::remove_file(&sock);
905 spawn_canned_uds_server(sock.clone(), 400, r#"{"error":"bad"}"#);
906 std::thread::sleep(std::time::Duration::from_millis(50));
907 let (status, body) = uds_request(&sock, "GET", "/v1/test", &[], b"")
908 .expect("uds_request succeeds even on 4xx");
909 assert_eq!(status, 400);
910 assert_eq!(body, br#"{"error":"bad"}"#);
911 }
912
913 #[test]
914 fn uds_request_fails_on_nonexistent_socket() {
915 let nope = std::path::Path::new("/tmp/wire-uds-nonexistent-socket-aaa.sock");
916 let _ = std::fs::remove_file(nope);
917 let err = uds_request(nope, "GET", "/", &[], b"").unwrap_err();
918 let msg = format!("{err:#}");
919 assert!(
920 msg.contains("connect UDS"),
921 "expected connect error, got: {msg}"
922 );
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use super::*;
929 use proptest::prelude::*;
930
931 #[test]
932 fn url_normalization_trims_trailing_slash() {
933 let c = RelayClient::new("http://example.com/");
934 assert_eq!(c.base_url, "http://example.com");
935 let c = RelayClient::new("http://example.com");
936 assert_eq!(c.base_url, "http://example.com");
937 }
938
939 #[test]
940 fn format_transport_error_classifies_tls() {
941 let inner = anyhow!("invalid peer certificate: UnknownIssuer");
944 let middle: anyhow::Error = inner.context("hyper send");
945 let top = middle.context("POST https://relay.example/v1/events/abc");
946 let formatted = format_transport_error(&top);
947 assert!(
948 formatted.starts_with("TLS error:"),
949 "expected TLS class prefix, got: {formatted}"
950 );
951 assert!(
952 formatted.contains("UnknownIssuer"),
953 "lost root cause: {formatted}"
954 );
955 assert!(
956 formatted.contains("POST https://relay.example"),
957 "lost context URL: {formatted}"
958 );
959 }
960
961 #[test]
962 fn format_transport_error_classifies_timeout() {
963 let inner = anyhow!("operation timed out");
964 let top = inner.context("POST https://relay.example/v1/events/abc");
965 let formatted = format_transport_error(&top);
966 assert!(formatted.starts_with("timeout:"), "got: {formatted}");
967 }
968
969 #[test]
970 fn format_transport_error_classifies_dns() {
971 let inner = anyhow!("dns error: failed to lookup address");
972 let top = inner.context("POST https://relay.example/v1/events/abc");
973 let formatted = format_transport_error(&top);
974 assert!(formatted.starts_with("DNS error:"), "got: {formatted}");
975 }
976
977 #[test]
978 fn format_transport_error_falls_back_to_chain_join() {
979 let inner = anyhow!("Refused to connect for non-standard reason xyz");
982 let top = inner.context("POST https://relay.example/v1/events/abc");
983 let formatted = format_transport_error(&top);
984 assert!(formatted.contains("Refused to connect"));
985 assert!(formatted.contains("POST https://relay.example"));
986 }
987
988 #[test]
989 fn insecure_env_recognizes_truthy_values_and_default_off() {
990 use std::sync::{Mutex, OnceLock};
994 static GUARD: OnceLock<Mutex<()>> = OnceLock::new();
995 let _lock = GUARD.get_or_init(|| Mutex::new(())).lock().unwrap();
996
997 unsafe {
1000 std::env::remove_var(INSECURE_SKIP_TLS_ENV);
1001 }
1002 assert!(!insecure_skip_tls_verify(), "default must be secure");
1003
1004 for v in ["1", "true", "yes", "on", "TRUE", "Yes"] {
1005 unsafe {
1006 std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
1007 }
1008 assert!(insecure_skip_tls_verify(), "value {v:?} should be truthy");
1009 }
1010 for v in ["0", "false", "no", "off", ""] {
1012 unsafe {
1013 std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
1014 }
1015 assert!(
1016 !insecure_skip_tls_verify(),
1017 "value {v:?} must not enable insecure mode"
1018 );
1019 }
1020 unsafe {
1021 std::env::remove_var(INSECURE_SKIP_TLS_ENV);
1022 }
1023 }
1024
1025 fn org_did() -> &'static str {
1026 "did:wire:org:example-0123456789abcdef0123456789abcdef"
1027 }
1028
1029 fn op_did() -> &'static str {
1030 "did:wire:op:operator-abcdef0123456789abcdef0123456789"
1031 }
1032
1033 #[test]
1034 fn parse_wire_org_txt_record_dispatches_org_and_op_dids() {
1035 let org = parse_wire_org_txt_record(&format!(
1036 "did={}; relay=https://relay.example; sso_iss=https://issuer.example; sso_tenant=tenant; v=1",
1037 org_did()
1038 ))
1039 .unwrap();
1040 assert_eq!(org.did, WireOrgTxtDid::Org(org_did().to_string()));
1041 assert_eq!(org.relay.as_deref(), Some("https://relay.example"));
1042 assert_eq!(org.sso_iss.as_deref(), Some("https://issuer.example"));
1043 assert_eq!(org.sso_tenant.as_deref(), Some("tenant"));
1044
1045 let op = parse_wire_org_txt_record(&format!("did={}; v=1", op_did())).unwrap();
1046 assert_eq!(op.did, WireOrgTxtDid::Op(op_did().to_string()));
1047 assert_eq!(op.relay, None);
1048 }
1049
1050 #[test]
1051 fn parse_wire_org_txt_record_rejects_unknown_version_and_session_did() {
1052 let unknown_v = parse_wire_org_txt_record(&format!("did={}; v=2", org_did())).unwrap_err();
1053 assert_eq!(
1054 unknown_v,
1055 WireOrgTxtParseError::UnsupportedVersion("2".into())
1056 );
1057
1058 let session_did =
1059 parse_wire_org_txt_record("did=did:wire:session-01234567; v=1").unwrap_err();
1060 assert!(matches!(session_did, WireOrgTxtParseError::InvalidDid(_)));
1061 }
1062
1063 #[test]
1064 fn parse_wire_org_txt_record_rejects_duplicate_known_fields() {
1065 let err = parse_wire_org_txt_record(&format!("did={}; v=1; v=1", org_did())).unwrap_err();
1066 assert_eq!(err, WireOrgTxtParseError::DuplicateField("v"));
1067 }
1068
1069 proptest! {
1070 #[test]
1071 fn parse_wire_org_txt_record_ignores_unknown_fields_at_v1(
1072 unknown_fields in prop::collection::vec(
1073 (
1074 "[a-z_][a-z0-9_]{0,16}",
1075 "[A-Za-z0-9._:/-]{0,64}"
1076 ),
1077 0..32
1078 )
1079 ) {
1080 let mut record = format!("did={}; v=1", org_did());
1081 for (key, value) in unknown_fields {
1082 prop_assume!(!matches!(
1083 key.as_str(),
1084 "did" | "v" | "relay" | "sso_iss" | "sso_tenant"
1085 ));
1086 record.push_str("; ");
1087 record.push_str(&key);
1088 record.push('=');
1089 record.push_str(&value);
1090 }
1091
1092 let parsed = parse_wire_org_txt_record(&record).unwrap();
1093 prop_assert_eq!(parsed.did, WireOrgTxtDid::Org(org_did().to_string()));
1094 }
1095
1096 #[test]
1097 fn parse_wire_org_txt_record_rejects_every_unknown_version(
1098 version in "[A-Za-z0-9._-]{1,16}"
1099 ) {
1100 prop_assume!(version != "1");
1101 let record = format!("did={}; v={version}; future=opaque", org_did());
1102 let err = parse_wire_org_txt_record(&record).unwrap_err();
1103 prop_assert_eq!(err, WireOrgTxtParseError::UnsupportedVersion(version));
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod failover_tests {
1110 use super::*;
1111 use crate::endpoints::{Endpoint, EndpointScope};
1112 use std::sync::Mutex;
1113
1114 fn fed_ep(url: &str, slot: &str, token: &str) -> Endpoint {
1115 Endpoint::federation(url.to_string(), slot.to_string(), token.to_string())
1116 }
1117
1118 fn local_ep(url: &str, slot: &str, token: &str) -> Endpoint {
1119 Endpoint {
1120 relay_url: url.to_string(),
1121 slot_id: slot.to_string(),
1122 slot_token: token.to_string(),
1123 scope: EndpointScope::Local,
1124 }
1125 }
1126
1127 fn ok_resp() -> PostEventResponse {
1128 PostEventResponse {
1129 event_id: Some("evt-1".to_string()),
1130 status: "queued".to_string(),
1131 }
1132 }
1133
1134 #[test]
1135 fn first_endpoint_succeeds_no_further_attempts() {
1136 let endpoints = vec![
1140 fed_ep("https://good.example", "slot1", "tok1"),
1141 fed_ep("https://other.example", "slot2", "tok2"),
1142 ];
1143 let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1144 let result = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1145 attempts.lock().unwrap().push(ep.relay_url.clone());
1146 Ok(ok_resp())
1147 })
1148 .unwrap();
1149 assert_eq!(result.0.relay_url, "https://good.example");
1150 assert_eq!(
1151 *attempts.lock().unwrap(),
1152 vec!["https://good.example".to_string()],
1153 "must NOT try the second endpoint after the first succeeds"
1154 );
1155 }
1156
1157 #[test]
1158 fn skips_dead_endpoint_and_succeeds_on_next() {
1159 let endpoints = vec![
1165 fed_ep("https://copilot-agent@wireup.net", "slot-bad", "tok-bad"),
1168 fed_ep("https://wireup.net", "slot-good", "tok-good"),
1170 ];
1171 let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1172 let (delivered_ep, _resp) =
1173 try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1174 attempts.lock().unwrap().push(ep.relay_url.clone());
1175 if ep.relay_url.contains('@') {
1176 Err(anyhow!("400 Bad Request (userinfo embedded)"))
1177 } else {
1178 Ok(ok_resp())
1179 }
1180 })
1181 .unwrap();
1182 assert_eq!(
1183 delivered_ep.relay_url, "https://wireup.net",
1184 "the successful endpoint must be the one returned to the caller"
1185 );
1186 assert_eq!(
1187 *attempts.lock().unwrap(),
1188 vec![
1189 "https://copilot-agent@wireup.net".to_string(),
1190 "https://wireup.net".to_string()
1191 ],
1192 "must try `bad` first, then fall over to `good`"
1193 );
1194 }
1195
1196 #[test]
1197 fn respects_priority_order_caller_supplies() {
1198 let endpoints = vec![
1204 local_ep("http://127.0.0.1:8771", "loc1", "loctok"),
1205 fed_ep("https://wireup.net", "fed1", "fedtok"),
1206 ];
1207 let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1208 let _ = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1209 attempts.lock().unwrap().push(ep.relay_url.clone());
1210 Ok(ok_resp())
1211 })
1212 .unwrap();
1213 assert_eq!(
1214 attempts.lock().unwrap()[0],
1215 "http://127.0.0.1:8771",
1216 "Local-scope endpoint must be tried first (per the caller's priority order)"
1217 );
1218 }
1219
1220 #[test]
1221 fn all_failures_returns_combined_error() {
1222 let endpoints = vec![
1226 fed_ep("https://a.example", "s", "t"),
1227 fed_ep("https://b.example", "s", "t"),
1228 fed_ep("https://c.example", "s", "t"),
1229 ];
1230 let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1231 Err(anyhow!("simulated 500 from {}", ep.relay_url))
1232 })
1233 .unwrap_err()
1234 .to_string();
1235 assert!(
1236 err.contains("all 3 endpoint(s) failed"),
1237 "error must surface the total count: {err}"
1238 );
1239 for u in [
1242 "https://a.example",
1243 "https://b.example",
1244 "https://c.example",
1245 ] {
1246 assert!(
1247 err.contains(u),
1248 "combined error must include each failing endpoint URL ({u}): {err}"
1249 );
1250 }
1251 }
1252
1253 #[test]
1254 fn empty_endpoints_returns_actionable_error() {
1255 let endpoints: Vec<Endpoint> = Vec::new();
1259 let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |_, _| {
1260 unreachable!("poster must not be called when endpoint list is empty")
1261 })
1262 .unwrap_err()
1263 .to_string();
1264 assert!(
1265 err.contains("no endpoints to deliver to"),
1266 "empty-list error must be explicit: {err}"
1267 );
1268 assert!(
1269 err.contains("re-pin") || err.contains("dial") || err.contains("pair"),
1270 "empty-list error must point at the remediation path: {err}"
1271 );
1272 }
1273}