use anyhow::{Context, Result, anyhow, bail};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[derive(Clone)]
pub struct RelayClient {
base_url: String,
client: reqwest::blocking::Client,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AllocateResponse {
pub slot_id: String,
pub slot_token: String,
}
#[derive(Debug, Deserialize)]
pub struct PostEventResponse {
pub event_id: Option<String>,
pub status: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WireOrgTxtDid {
Org(String),
Op(String),
}
impl WireOrgTxtDid {
pub fn as_str(&self) -> &str {
match self {
WireOrgTxtDid::Org(did) | WireOrgTxtDid::Op(did) => did,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireOrgTxtRecord {
pub did: WireOrgTxtDid,
pub relay: Option<String>,
pub sso_iss: Option<String>,
pub sso_tenant: Option<String>,
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum WireOrgTxtParseError {
#[error("DNS-TXT record missing required `did=` field")]
MissingDid,
#[error("DNS-TXT record missing required `v=` field")]
MissingVersion,
#[error("unsupported DNS-TXT record version `{0}`")]
UnsupportedVersion(String),
#[error("`did=` must be did:wire:org:* or did:wire:op:* with a long fingerprint suffix")]
InvalidDid(String),
#[error("duplicate DNS-TXT field `{0}`")]
DuplicateField(&'static str),
#[error("malformed DNS-TXT field `{0}`")]
MalformedField(String),
}
pub fn parse_wire_org_txt_record(record: &str) -> Result<WireOrgTxtRecord, WireOrgTxtParseError> {
let trimmed = record.trim();
let body = trimmed
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(trimmed);
let mut did: Option<String> = None;
let mut version: Option<String> = None;
let mut relay: Option<String> = None;
let mut sso_iss: Option<String> = None;
let mut sso_tenant: Option<String> = None;
fn set_once(
slot: &mut Option<String>,
field: &'static str,
value: &str,
) -> Result<(), WireOrgTxtParseError> {
if slot.is_some() {
return Err(WireOrgTxtParseError::DuplicateField(field));
}
*slot = Some(value.trim().to_string());
Ok(())
}
for raw in body.split(';') {
let raw = raw.trim();
if raw.is_empty() {
continue;
}
let Some((key, value)) = raw.split_once('=') else {
return Err(WireOrgTxtParseError::MalformedField(raw.to_string()));
};
match key.trim() {
"did" => set_once(&mut did, "did", value)?,
"v" => set_once(&mut version, "v", value)?,
"relay" => set_once(&mut relay, "relay", value)?,
"sso_iss" => set_once(&mut sso_iss, "sso_iss", value)?,
"sso_tenant" => set_once(&mut sso_tenant, "sso_tenant", value)?,
_ => {
}
}
}
let version = version.ok_or(WireOrgTxtParseError::MissingVersion)?;
if version != "1" {
return Err(WireOrgTxtParseError::UnsupportedVersion(version));
}
let did = did.ok_or(WireOrgTxtParseError::MissingDid)?;
let did = if crate::agent_card::is_org_did(&did) {
WireOrgTxtDid::Org(did)
} else if crate::agent_card::is_op_did(&did) {
WireOrgTxtDid::Op(did)
} else {
return Err(WireOrgTxtParseError::InvalidDid(did));
};
Ok(WireOrgTxtRecord {
did,
relay,
sso_iss,
sso_tenant,
})
}
pub const INSECURE_SKIP_TLS_ENV: &str = "WIRE_INSECURE_SKIP_TLS_VERIFY";
fn insecure_skip_tls_verify() -> bool {
matches!(
std::env::var(INSECURE_SKIP_TLS_ENV)
.unwrap_or_default()
.to_ascii_lowercase()
.as_str(),
"1" | "true" | "yes" | "on"
)
}
fn maybe_emit_insecure_banner() {
static ONCE: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if insecure_skip_tls_verify() {
ONCE.get_or_init(|| {
eprintln!(
"\x1b[1;31mwire: WARNING\x1b[0m {INSECURE_SKIP_TLS_ENV}=1 is set; TLS verification is DISABLED for all relay traffic. \
MITM attacks against the relay path are undetectable in this mode. Unset to restore default trust validation."
);
});
}
}
pub fn build_blocking_client(
timeout: Option<std::time::Duration>,
) -> Result<reqwest::blocking::Client> {
let mut b = reqwest::blocking::Client::builder();
if let Some(t) = timeout {
b = b.timeout(t);
}
if insecure_skip_tls_verify() {
maybe_emit_insecure_banner();
b = b.danger_accept_invalid_certs(true);
} else {
let cfg = crate::tls::shared_client_config();
b = b.use_preconfigured_tls((*cfg).clone());
}
b.build()
.with_context(|| "constructing reqwest blocking client")
}
pub fn format_transport_error(err: &anyhow::Error) -> String {
let mut parts: Vec<String> = err.chain().map(|c| c.to_string()).collect();
let lower = parts
.iter()
.map(|p| p.to_ascii_lowercase())
.collect::<Vec<_>>();
let class = if lower.iter().any(|p| {
p.contains("invalid peer certificate")
|| p.contains("certificate verification")
|| p.contains("unknownissuer")
|| p.contains("certificate is not valid")
|| p.contains("tls handshake")
}) {
Some("TLS error")
} else if lower.iter().any(|p| {
p.contains("dns error")
|| p.contains("nodename nor servname")
|| p.contains("failed to lookup address")
}) {
Some("DNS error")
} else if lower
.iter()
.any(|p| p.contains("operation timed out") || p.contains("deadline has elapsed"))
{
Some("timeout")
} else if lower
.iter()
.any(|p| p.contains("connection refused") || p.contains("connection reset"))
{
Some("connect error")
} else {
None
};
if let Some(c) = class {
parts.insert(0, c.to_string());
}
parts.join(": ")
}
#[cfg(unix)]
pub fn uds_request(
socket_path: &std::path::Path,
method: &str,
request_target: &str,
headers: &[(&str, &str)],
body: &[u8],
) -> Result<(u16, Vec<u8>)> {
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
let mut stream =
UnixStream::connect(socket_path).with_context(|| format!("connect UDS {socket_path:?}"))?;
stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
stream.set_write_timeout(Some(std::time::Duration::from_secs(30)))?;
let mut req = String::with_capacity(256 + headers.len() * 32 + body.len());
req.push_str(method);
req.push(' ');
req.push_str(request_target);
req.push_str(" HTTP/1.1\r\n");
req.push_str("Host: localhost\r\n");
req.push_str("Connection: close\r\n");
req.push_str(&format!("Content-Length: {}\r\n", body.len()));
for (k, v) in headers {
req.push_str(k);
req.push_str(": ");
req.push_str(v);
req.push_str("\r\n");
}
req.push_str("\r\n");
stream.write_all(req.as_bytes())?;
if !body.is_empty() {
stream.write_all(body)?;
}
stream.flush()?;
let mut raw = Vec::new();
stream.read_to_end(&mut raw)?;
let split = raw
.windows(4)
.position(|w| w == b"\r\n\r\n")
.ok_or_else(|| anyhow!("UDS response missing header/body delimiter"))?;
let head = std::str::from_utf8(&raw[..split])
.map_err(|e| anyhow!("UDS response head not UTF-8: {e}"))?;
let body = raw[split + 4..].to_vec();
let status_line = head.lines().next().unwrap_or("");
let status: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.ok_or_else(|| anyhow!("UDS response missing status code: {status_line:?}"))?;
Ok((status, body))
}
pub fn post_event_to_endpoint(
endpoint: &crate::endpoints::Endpoint,
event: &Value,
) -> Result<PostEventResponse> {
#[cfg(unix)]
if let Some(socket_path) = endpoint.relay_url.strip_prefix("unix://") {
let body = serde_json::json!({"event": event}).to_string();
let auth_header = format!("Bearer {}", endpoint.slot_token);
let (status, body) = uds_request(
std::path::Path::new(socket_path),
"POST",
&format!("/v1/events/{}", endpoint.slot_id),
&[
("Content-Type", "application/json"),
("Authorization", &auth_header),
],
body.as_bytes(),
)?;
if !(200..300).contains(&status) {
return Err(anyhow!(
"post_event (uds {socket_path}) failed: {status}: {}",
String::from_utf8_lossy(&body)
));
}
return Ok(serde_json::from_slice(&body)?);
}
let client = RelayClient::new(&endpoint.relay_url);
client.post_event(&endpoint.slot_id, &endpoint.slot_token, event)
}
pub fn try_post_event_with_failover<F>(
endpoints: &[crate::endpoints::Endpoint],
event: &Value,
mut poster: F,
) -> Result<(crate::endpoints::Endpoint, PostEventResponse)>
where
F: FnMut(&crate::endpoints::Endpoint, &Value) -> Result<PostEventResponse>,
{
if endpoints.is_empty() {
bail!(
"no endpoints to deliver to — peer has no pinned endpoints in relay_state. \
Re-run the pair flow (or `wire dial <peer>@<relay>`) to re-pin the peer's \
advertised endpoints."
);
}
let mut errs: Vec<String> = Vec::with_capacity(endpoints.len());
for ep in endpoints {
match poster(ep, event) {
Ok(resp) => return Ok((ep.clone(), resp)),
Err(e) => errs.push(format!("{} ({:?}): {e}", ep.relay_url, ep.scope)),
}
}
bail!(
"all {n} endpoint(s) failed:\n • {reasons}",
n = endpoints.len(),
reasons = errs.join("\n • ")
)
}
impl RelayClient {
pub fn new(base_url: &str) -> Self {
let client = build_blocking_client(Some(std::time::Duration::from_secs(30)))
.expect("reqwest client construction is infallible with rustls + native roots");
Self {
base_url: base_url.trim_end_matches('/').to_string(),
client,
}
}
pub fn allocate_slot(&self, handle_hint: Option<&str>) -> Result<AllocateResponse> {
let body = serde_json::json!({"handle": handle_hint});
let resp = self
.client
.post(format!("{}/v1/slot/allocate", self.base_url))
.json(&body)
.send()
.with_context(|| format!("POST {}/v1/slot/allocate", self.base_url))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("allocate failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn post_event(
&self,
slot_id: &str,
slot_token: &str,
event: &Value,
) -> Result<PostEventResponse> {
let body = serde_json::json!({"event": event});
let resp = self
.client
.post(format!("{}/v1/events/{slot_id}", self.base_url))
.bearer_auth(slot_token)
.json(&body)
.send()
.with_context(|| format!("POST {}/v1/events/{slot_id}", self.base_url))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("post_event failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn list_events(
&self,
slot_id: &str,
slot_token: &str,
since: Option<&str>,
limit: Option<usize>,
) -> Result<Vec<Value>> {
let mut url = format!("{}/v1/events/{slot_id}", self.base_url);
let mut sep = '?';
if let Some(s) = since {
url.push(sep);
url.push_str(&format!("since={s}"));
sep = '&';
}
if let Some(n) = limit {
url.push(sep);
url.push_str(&format!("limit={n}"));
}
let resp = self
.client
.get(&url)
.bearer_auth(slot_token)
.send()
.with_context(|| format!("GET {url}"))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("list_events failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn slot_state(&self, slot_id: &str, slot_token: &str) -> Result<(usize, Option<u64>)> {
let url = format!("{}/v1/slot/{slot_id}/state", self.base_url);
let resp = match self.client.get(&url).bearer_auth(slot_token).send() {
Ok(r) => r,
Err(_) => return Ok((0, None)),
};
if !resp.status().is_success() {
return Ok((0, None));
}
let v: Value = resp.json().unwrap_or(Value::Null);
let count = v.get("event_count").and_then(Value::as_u64).unwrap_or(0) as usize;
let last = v.get("last_pull_at_unix").and_then(Value::as_u64);
Ok((count, last))
}
pub fn responder_health_set(
&self,
slot_id: &str,
slot_token: &str,
record: &Value,
) -> Result<Value> {
let resp = self
.client
.post(format!(
"{}/v1/slot/{slot_id}/responder-health",
self.base_url
))
.bearer_auth(slot_token)
.json(record)
.send()
.with_context(|| {
format!("POST {}/v1/slot/{slot_id}/responder-health", self.base_url)
})?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("responder_health_set failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn responder_health_get(&self, slot_id: &str, slot_token: &str) -> Result<Value> {
let resp = self
.client
.get(format!("{}/v1/slot/{slot_id}/state", self.base_url))
.bearer_auth(slot_token)
.send()
.with_context(|| format!("GET {}/v1/slot/{slot_id}/state", self.base_url))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("responder_health_get failed: {status}: {detail}"));
}
let state: Value = resp.json()?;
Ok(state
.get("responder_health")
.cloned()
.unwrap_or(Value::Null))
}
pub fn healthz(&self) -> Result<bool> {
let resp = self
.client
.get(format!("{}/healthz", self.base_url))
.send()?;
Ok(resp.status().is_success())
}
pub fn check_healthz(&self) -> anyhow::Result<()> {
match self.healthz() {
Ok(true) => Ok(()),
Ok(false) => anyhow::bail!(
"phyllis: silent line — {}/healthz returned non-200.\n\
the host is reachable but the relay isn't returning ok. test:\n \
curl -v {}/healthz",
self.base_url,
self.base_url
),
Err(e) => anyhow::bail!(
"phyllis: silent line — couldn't reach {}/healthz: {e:#}.\n\
test reachability from this machine:\n curl -v {}/healthz\n\
if curl also fails, a sandbox / proxy / firewall is the usual cause.\n\
(OpenShell sandbox? run `curl -fsSL https://wireup.net/openshell-policy.sh | bash -s <sandbox-name>` on the host first.)",
self.base_url,
self.base_url
),
}
}
pub fn pair_open(&self, code_hash: &str, msg_b64: &str, role: &str) -> Result<String> {
let body = serde_json::json!({"code_hash": code_hash, "msg": msg_b64, "role": role});
let resp = self
.client
.post(format!("{}/v1/pair", self.base_url))
.json(&body)
.send()?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("pair_open failed: {status}: {detail}"));
}
let v: Value = resp.json()?;
v.get("pair_id")
.and_then(Value::as_str)
.map(str::to_string)
.ok_or_else(|| anyhow!("pair_open response missing pair_id"))
}
pub fn pair_abandon(&self, code_hash: &str) -> Result<()> {
let body = serde_json::json!({"code_hash": code_hash});
let resp = self
.client
.post(format!("{}/v1/pair/abandon", self.base_url))
.json(&body)
.send()?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("pair_abandon failed: {status}: {detail}"));
}
Ok(())
}
pub fn pair_get(
&self,
pair_id: &str,
as_role: &str,
) -> Result<(Option<String>, Option<String>)> {
let resp = self
.client
.get(format!(
"{}/v1/pair/{pair_id}?as_role={as_role}",
self.base_url
))
.send()?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("pair_get failed: {status}: {detail}"));
}
let v: Value = resp.json()?;
let peer_msg = v
.get("peer_msg")
.and_then(Value::as_str)
.map(str::to_string);
let peer_bootstrap = v
.get("peer_bootstrap")
.and_then(Value::as_str)
.map(str::to_string);
Ok((peer_msg, peer_bootstrap))
}
pub fn pair_bootstrap(&self, pair_id: &str, role: &str, sealed_b64: &str) -> Result<()> {
let body = serde_json::json!({"role": role, "sealed": sealed_b64});
let resp = self
.client
.post(format!("{}/v1/pair/{pair_id}/bootstrap", self.base_url))
.json(&body)
.send()?;
if !resp.status().is_success() {
let s = resp.status();
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("pair_bootstrap failed: {s}: {detail}"));
}
Ok(())
}
pub fn handle_claim(
&self,
nick: &str,
slot_id: &str,
slot_token: &str,
relay_url: Option<&str>,
card: &Value,
) -> Result<Value> {
self.handle_claim_v2(nick, slot_id, slot_token, relay_url, card, None)
}
pub fn handle_claim_v2(
&self,
nick: &str,
slot_id: &str,
slot_token: &str,
relay_url: Option<&str>,
card: &Value,
discoverable: Option<bool>,
) -> Result<Value> {
let mut body = serde_json::json!({
"nick": nick,
"slot_id": slot_id,
"relay_url": relay_url,
"card": card,
});
if let Some(d) = discoverable {
body["discoverable"] = serde_json::json!(d);
}
let resp = self
.client
.post(format!("{}/v1/handle/claim", self.base_url))
.bearer_auth(slot_token)
.json(&body)
.send()
.with_context(|| format!("POST {}/v1/handle/claim", self.base_url))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("handle_claim failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn handle_intro(&self, nick: &str, event: &Value) -> Result<Value> {
let body = serde_json::json!({"event": event});
let resp = self
.client
.post(format!("{}/v1/handle/intro/{nick}", self.base_url))
.json(&body)
.send()
.with_context(|| format!("POST {}/v1/handle/intro/{nick}", self.base_url))?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("handle_intro failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
pub fn well_known_agent_card_a2a(&self, handle: &str) -> Result<Value> {
let resp = self
.client
.get(format!("{}/.well-known/agent-card.json", self.base_url))
.query(&[("handle", handle)])
.send()
.with_context(|| {
format!(
"GET {}/.well-known/agent-card.json?handle={handle}",
self.base_url
)
})?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!(
"well_known_agent_card_a2a failed: {status}: {detail}"
));
}
Ok(resp.json()?)
}
pub fn well_known_agent(&self, handle: &str) -> Result<Value> {
let resp = self
.client
.get(format!("{}/.well-known/wire/agent", self.base_url))
.query(&[("handle", handle)])
.send()
.with_context(|| {
format!(
"GET {}/.well-known/wire/agent?handle={handle}",
self.base_url
)
})?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().unwrap_or_default();
return Err(anyhow!("well_known_agent failed: {status}: {detail}"));
}
Ok(resp.json()?)
}
}
#[cfg(all(test, unix))]
mod uds_tests {
use super::*;
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
use std::thread;
fn spawn_canned_uds_server(socket_path: std::path::PathBuf, status: u16, body: &'static str) {
let listener = UnixListener::bind(&socket_path).expect("bind canned UDS");
thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept canned UDS");
let mut req: Vec<u8> = Vec::new();
let mut chunk = [0u8; 4096];
loop {
let n = match stream.read(&mut chunk) {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
req.extend_from_slice(&chunk[..n]);
if let Some(split) = req.windows(4).position(|w| w == b"\r\n\r\n") {
let head = String::from_utf8_lossy(&req[..split]);
let content_length: usize = head
.lines()
.find_map(|l| {
l.to_ascii_lowercase()
.strip_prefix("content-length:")
.map(|v| v.trim().parse().unwrap_or(0))
})
.unwrap_or(0);
if req.len() >= split + 4 + content_length {
break;
}
}
}
let body_bytes = body.as_bytes();
let status_text = match status {
200 => "OK",
201 => "Created",
400 => "Bad Request",
_ => "Status",
};
let resp = format!(
"HTTP/1.1 {status} {status_text}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body_bytes.len()
);
let _ = stream.write_all(resp.as_bytes());
});
}
#[test]
fn uds_request_round_trips_200_with_body() {
let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
std::fs::create_dir_all(&tmpdir).unwrap();
let sock = tmpdir.join("rt.sock");
let _ = std::fs::remove_file(&sock);
spawn_canned_uds_server(sock.clone(), 200, r#"{"ok":true}"#);
std::thread::sleep(std::time::Duration::from_millis(50));
let (status, body) = uds_request(
&sock,
"POST",
"/v1/test",
&[("Content-Type", "application/json")],
b"{}",
)
.expect("uds_request succeeds");
assert_eq!(status, 200);
assert_eq!(body, br#"{"ok":true}"#);
}
#[test]
fn uds_request_surfaces_non_2xx_status() {
let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
std::fs::create_dir_all(&tmpdir).unwrap();
let sock = tmpdir.join("err.sock");
let _ = std::fs::remove_file(&sock);
spawn_canned_uds_server(sock.clone(), 400, r#"{"error":"bad"}"#);
std::thread::sleep(std::time::Duration::from_millis(50));
let (status, body) = uds_request(&sock, "GET", "/v1/test", &[], b"")
.expect("uds_request succeeds even on 4xx");
assert_eq!(status, 400);
assert_eq!(body, br#"{"error":"bad"}"#);
}
#[test]
fn uds_request_fails_on_nonexistent_socket() {
let nope = std::path::Path::new("/tmp/wire-uds-nonexistent-socket-aaa.sock");
let _ = std::fs::remove_file(nope);
let err = uds_request(nope, "GET", "/", &[], b"").unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("connect UDS"),
"expected connect error, got: {msg}"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
#[test]
fn url_normalization_trims_trailing_slash() {
let c = RelayClient::new("http://example.com/");
assert_eq!(c.base_url, "http://example.com");
let c = RelayClient::new("http://example.com");
assert_eq!(c.base_url, "http://example.com");
}
#[test]
fn format_transport_error_classifies_tls() {
let inner = anyhow!("invalid peer certificate: UnknownIssuer");
let middle: anyhow::Error = inner.context("hyper send");
let top = middle.context("POST https://relay.example/v1/events/abc");
let formatted = format_transport_error(&top);
assert!(
formatted.starts_with("TLS error:"),
"expected TLS class prefix, got: {formatted}"
);
assert!(
formatted.contains("UnknownIssuer"),
"lost root cause: {formatted}"
);
assert!(
formatted.contains("POST https://relay.example"),
"lost context URL: {formatted}"
);
}
#[test]
fn format_transport_error_classifies_timeout() {
let inner = anyhow!("operation timed out");
let top = inner.context("POST https://relay.example/v1/events/abc");
let formatted = format_transport_error(&top);
assert!(formatted.starts_with("timeout:"), "got: {formatted}");
}
#[test]
fn format_transport_error_classifies_dns() {
let inner = anyhow!("dns error: failed to lookup address");
let top = inner.context("POST https://relay.example/v1/events/abc");
let formatted = format_transport_error(&top);
assert!(formatted.starts_with("DNS error:"), "got: {formatted}");
}
#[test]
fn format_transport_error_falls_back_to_chain_join() {
let inner = anyhow!("Refused to connect for non-standard reason xyz");
let top = inner.context("POST https://relay.example/v1/events/abc");
let formatted = format_transport_error(&top);
assert!(formatted.contains("Refused to connect"));
assert!(formatted.contains("POST https://relay.example"));
}
#[test]
fn insecure_env_recognizes_truthy_values_and_default_off() {
use std::sync::{Mutex, OnceLock};
static GUARD: OnceLock<Mutex<()>> = OnceLock::new();
let _lock = GUARD.get_or_init(|| Mutex::new(())).lock().unwrap();
unsafe {
std::env::remove_var(INSECURE_SKIP_TLS_ENV);
}
assert!(!insecure_skip_tls_verify(), "default must be secure");
for v in ["1", "true", "yes", "on", "TRUE", "Yes"] {
unsafe {
std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
}
assert!(insecure_skip_tls_verify(), "value {v:?} should be truthy");
}
for v in ["0", "false", "no", "off", ""] {
unsafe {
std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
}
assert!(
!insecure_skip_tls_verify(),
"value {v:?} must not enable insecure mode"
);
}
unsafe {
std::env::remove_var(INSECURE_SKIP_TLS_ENV);
}
}
fn org_did() -> &'static str {
"did:wire:org:example-0123456789abcdef0123456789abcdef"
}
fn op_did() -> &'static str {
"did:wire:op:operator-abcdef0123456789abcdef0123456789"
}
#[test]
fn parse_wire_org_txt_record_dispatches_org_and_op_dids() {
let org = parse_wire_org_txt_record(&format!(
"did={}; relay=https://relay.example; sso_iss=https://issuer.example; sso_tenant=tenant; v=1",
org_did()
))
.unwrap();
assert_eq!(org.did, WireOrgTxtDid::Org(org_did().to_string()));
assert_eq!(org.relay.as_deref(), Some("https://relay.example"));
assert_eq!(org.sso_iss.as_deref(), Some("https://issuer.example"));
assert_eq!(org.sso_tenant.as_deref(), Some("tenant"));
let op = parse_wire_org_txt_record(&format!("did={}; v=1", op_did())).unwrap();
assert_eq!(op.did, WireOrgTxtDid::Op(op_did().to_string()));
assert_eq!(op.relay, None);
}
#[test]
fn parse_wire_org_txt_record_rejects_unknown_version_and_session_did() {
let unknown_v = parse_wire_org_txt_record(&format!("did={}; v=2", org_did())).unwrap_err();
assert_eq!(
unknown_v,
WireOrgTxtParseError::UnsupportedVersion("2".into())
);
let session_did =
parse_wire_org_txt_record("did=did:wire:session-01234567; v=1").unwrap_err();
assert!(matches!(session_did, WireOrgTxtParseError::InvalidDid(_)));
}
#[test]
fn parse_wire_org_txt_record_rejects_duplicate_known_fields() {
let err = parse_wire_org_txt_record(&format!("did={}; v=1; v=1", org_did())).unwrap_err();
assert_eq!(err, WireOrgTxtParseError::DuplicateField("v"));
}
proptest! {
#[test]
fn parse_wire_org_txt_record_ignores_unknown_fields_at_v1(
unknown_fields in prop::collection::vec(
(
"[a-z_][a-z0-9_]{0,16}",
"[A-Za-z0-9._:/-]{0,64}"
),
0..32
)
) {
let mut record = format!("did={}; v=1", org_did());
for (key, value) in unknown_fields {
prop_assume!(!matches!(
key.as_str(),
"did" | "v" | "relay" | "sso_iss" | "sso_tenant"
));
record.push_str("; ");
record.push_str(&key);
record.push('=');
record.push_str(&value);
}
let parsed = parse_wire_org_txt_record(&record).unwrap();
prop_assert_eq!(parsed.did, WireOrgTxtDid::Org(org_did().to_string()));
}
#[test]
fn parse_wire_org_txt_record_rejects_every_unknown_version(
version in "[A-Za-z0-9._-]{1,16}"
) {
prop_assume!(version != "1");
let record = format!("did={}; v={version}; future=opaque", org_did());
let err = parse_wire_org_txt_record(&record).unwrap_err();
prop_assert_eq!(err, WireOrgTxtParseError::UnsupportedVersion(version));
}
}
}
#[cfg(test)]
mod failover_tests {
use super::*;
use crate::endpoints::{Endpoint, EndpointScope};
use std::sync::Mutex;
fn fed_ep(url: &str, slot: &str, token: &str) -> Endpoint {
Endpoint::federation(url.to_string(), slot.to_string(), token.to_string())
}
fn local_ep(url: &str, slot: &str, token: &str) -> Endpoint {
Endpoint {
relay_url: url.to_string(),
slot_id: slot.to_string(),
slot_token: token.to_string(),
scope: EndpointScope::Local,
}
}
fn ok_resp() -> PostEventResponse {
PostEventResponse {
event_id: Some("evt-1".to_string()),
status: "queued".to_string(),
}
}
#[test]
fn first_endpoint_succeeds_no_further_attempts() {
let endpoints = vec![
fed_ep("https://good.example", "slot1", "tok1"),
fed_ep("https://other.example", "slot2", "tok2"),
];
let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
let result = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
attempts.lock().unwrap().push(ep.relay_url.clone());
Ok(ok_resp())
})
.unwrap();
assert_eq!(result.0.relay_url, "https://good.example");
assert_eq!(
*attempts.lock().unwrap(),
vec!["https://good.example".to_string()],
"must NOT try the second endpoint after the first succeeds"
);
}
#[test]
fn skips_dead_endpoint_and_succeeds_on_next() {
let endpoints = vec![
fed_ep("https://copilot-agent@wireup.net", "slot-bad", "tok-bad"),
fed_ep("https://wireup.net", "slot-good", "tok-good"),
];
let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
let (delivered_ep, _resp) =
try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
attempts.lock().unwrap().push(ep.relay_url.clone());
if ep.relay_url.contains('@') {
Err(anyhow!("400 Bad Request (userinfo embedded)"))
} else {
Ok(ok_resp())
}
})
.unwrap();
assert_eq!(
delivered_ep.relay_url, "https://wireup.net",
"the successful endpoint must be the one returned to the caller"
);
assert_eq!(
*attempts.lock().unwrap(),
vec![
"https://copilot-agent@wireup.net".to_string(),
"https://wireup.net".to_string()
],
"must try `bad` first, then fall over to `good`"
);
}
#[test]
fn respects_priority_order_caller_supplies() {
let endpoints = vec![
local_ep("http://127.0.0.1:8771", "loc1", "loctok"),
fed_ep("https://wireup.net", "fed1", "fedtok"),
];
let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
let _ = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
attempts.lock().unwrap().push(ep.relay_url.clone());
Ok(ok_resp())
})
.unwrap();
assert_eq!(
attempts.lock().unwrap()[0],
"http://127.0.0.1:8771",
"Local-scope endpoint must be tried first (per the caller's priority order)"
);
}
#[test]
fn all_failures_returns_combined_error() {
let endpoints = vec![
fed_ep("https://a.example", "s", "t"),
fed_ep("https://b.example", "s", "t"),
fed_ep("https://c.example", "s", "t"),
];
let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
Err(anyhow!("simulated 500 from {}", ep.relay_url))
})
.unwrap_err()
.to_string();
assert!(
err.contains("all 3 endpoint(s) failed"),
"error must surface the total count: {err}"
);
for u in [
"https://a.example",
"https://b.example",
"https://c.example",
] {
assert!(
err.contains(u),
"combined error must include each failing endpoint URL ({u}): {err}"
);
}
}
#[test]
fn empty_endpoints_returns_actionable_error() {
let endpoints: Vec<Endpoint> = Vec::new();
let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |_, _| {
unreachable!("poster must not be called when endpoint list is empty")
})
.unwrap_err()
.to_string();
assert!(
err.contains("no endpoints to deliver to"),
"empty-list error must be explicit: {err}"
);
assert!(
err.contains("re-pin") || err.contains("dial") || err.contains("pair"),
"empty-list error must point at the remediation path: {err}"
);
}
}