Skip to main content

wire/
relay_client.rs

1//! HTTP client for `wire-relay-server`.
2//!
3//! Sync wrapper around `reqwest::blocking` so CLI commands stay synchronous —
4//! the only async surface in the crate is `relay_server::serve`. Async clients
5//! land in v0.2 if a long-running daemon needs them.
6
7use anyhow::{Context, Result, anyhow, bail};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11#[derive(Clone)]
12pub struct RelayClient {
13    base_url: String,
14    client: reqwest::blocking::Client,
15}
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct AllocateResponse {
19    pub slot_id: String,
20    pub slot_token: String,
21}
22
23#[derive(Debug, Deserialize)]
24pub struct PostEventResponse {
25    pub event_id: Option<String>,
26    pub status: String,
27}
28
29/// Env var: when set to a truthy value (`1`, `true`, `yes`), every TLS
30/// verification check on every wire HTTPS client is disabled. Intended
31/// as an emergency-only operator override for environments behind a
32/// TLS-intercepting middlebox (corporate proxy, AV product like Avast
33/// re-signing certs with its own root, captive portal). Prints a loud
34/// stderr banner on every send when active. **Do not set this in
35/// production.** Documented in THREAT_MODEL.md + README.
36pub const INSECURE_SKIP_TLS_ENV: &str = "WIRE_INSECURE_SKIP_TLS_VERIFY";
37
38fn insecure_skip_tls_verify() -> bool {
39    matches!(
40        std::env::var(INSECURE_SKIP_TLS_ENV)
41            .unwrap_or_default()
42            .to_ascii_lowercase()
43            .as_str(),
44        "1" | "true" | "yes" | "on"
45    )
46}
47
48/// v0.5.13: emit the loud-fail banner exactly once per process so we
49/// don't spam a hundred lines per `wire push`. Per-process `OnceLock`
50/// guards the emission. The banner goes to stderr; never stdout (we
51/// must not corrupt the `--json` machine-readable contract).
52fn maybe_emit_insecure_banner() {
53    static ONCE: std::sync::OnceLock<()> = std::sync::OnceLock::new();
54    if insecure_skip_tls_verify() {
55        ONCE.get_or_init(|| {
56            eprintln!(
57                "\x1b[1;31mwire: WARNING\x1b[0m {INSECURE_SKIP_TLS_ENV}=1 is set; TLS verification is DISABLED for all relay traffic. \
58                 MITM attacks against the relay path are undetectable in this mode. Unset to restore default trust validation."
59            );
60        });
61    }
62}
63
64/// Centralized builder for blocking HTTPS clients across wire. Loads
65/// the OS native trust store (rustls-tls-native-roots) so corporate
66/// proxies, AV cert-resign products, and on-prem CAs validate. Honors
67/// the [`INSECURE_SKIP_TLS_ENV`] escape hatch for the corporate-proxy
68/// emergency case.
69pub fn build_blocking_client(
70    timeout: Option<std::time::Duration>,
71) -> Result<reqwest::blocking::Client> {
72    let mut b = reqwest::blocking::Client::builder();
73    if let Some(t) = timeout {
74        b = b.timeout(t);
75    }
76    if insecure_skip_tls_verify() {
77        maybe_emit_insecure_banner();
78        b = b.danger_accept_invalid_certs(true);
79    }
80    b.build()
81        .with_context(|| "constructing reqwest blocking client")
82}
83
84/// Flatten an `anyhow::Error` source chain into a single human-readable
85/// transport-error line for the `reason` field in `wire push --json` and
86/// for stderr surfaces. Classifies the topmost cause (`TLS error`,
87/// `DNS error`, `connect timeout`, `read timeout`, `HTTP error`) so a
88/// silent failure no longer leaks past the user as a bare URL.
89///
90/// v0.5.13 rule 1 of the network-resilience doctrine — see issue #6.
91pub fn format_transport_error(err: &anyhow::Error) -> String {
92    let mut parts: Vec<String> = err.chain().map(|c| c.to_string()).collect();
93    // Heuristic classification — search the chain for the lowest-level
94    // descriptor and prefix the message so the reader sees the kind
95    // even when the topmost context is just the URL.
96    let lower = parts
97        .iter()
98        .map(|p| p.to_ascii_lowercase())
99        .collect::<Vec<_>>();
100    let class = if lower.iter().any(|p| {
101        p.contains("invalid peer certificate")
102            || p.contains("certificate verification")
103            || p.contains("unknownissuer")
104            || p.contains("certificate is not valid")
105            || p.contains("tls handshake")
106    }) {
107        Some("TLS error")
108    } else if lower.iter().any(|p| {
109        p.contains("dns error")
110            || p.contains("nodename nor servname")
111            || p.contains("failed to lookup address")
112    }) {
113        Some("DNS error")
114    } else if lower
115        .iter()
116        .any(|p| p.contains("operation timed out") || p.contains("deadline has elapsed"))
117    {
118        Some("timeout")
119    } else if lower
120        .iter()
121        .any(|p| p.contains("connection refused") || p.contains("connection reset"))
122    {
123        Some("connect error")
124    } else {
125        None
126    };
127    if let Some(c) = class {
128        parts.insert(0, c.to_string());
129    }
130    parts.join(": ")
131}
132
133/// v0.7.0-alpha.17: minimal blocking HTTP/1.1 client over Unix Domain
134/// Socket. Used by callers that detect a `unix://` scheme on a relay
135/// endpoint URL and route around reqwest (which has no UDS support).
136///
137/// Connects to `socket_path`, writes a single HTTP/1.1 request, parses
138/// status + Content-Length + body. Closes the connection (no keep-
139/// alive). Sufficient for wire's request shape: single POST or GET per
140/// call, JSON in + JSON out, small payloads.
141///
142/// Returns `(status_code, body_bytes)`. Caller decodes body per the
143/// endpoint's content type.
144#[cfg(unix)]
145pub fn uds_request(
146    socket_path: &std::path::Path,
147    method: &str,
148    request_target: &str,
149    headers: &[(&str, &str)],
150    body: &[u8],
151) -> Result<(u16, Vec<u8>)> {
152    use std::io::{Read, Write};
153    use std::os::unix::net::UnixStream;
154    let mut stream =
155        UnixStream::connect(socket_path).with_context(|| format!("connect UDS {socket_path:?}"))?;
156    stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
157    stream.set_write_timeout(Some(std::time::Duration::from_secs(30)))?;
158    let mut req = String::with_capacity(256 + headers.len() * 32 + body.len());
159    req.push_str(method);
160    req.push(' ');
161    req.push_str(request_target);
162    req.push_str(" HTTP/1.1\r\n");
163    req.push_str("Host: localhost\r\n");
164    req.push_str("Connection: close\r\n");
165    req.push_str(&format!("Content-Length: {}\r\n", body.len()));
166    for (k, v) in headers {
167        req.push_str(k);
168        req.push_str(": ");
169        req.push_str(v);
170        req.push_str("\r\n");
171    }
172    req.push_str("\r\n");
173    stream.write_all(req.as_bytes())?;
174    if !body.is_empty() {
175        stream.write_all(body)?;
176    }
177    stream.flush()?;
178    let mut raw = Vec::new();
179    stream.read_to_end(&mut raw)?;
180    // Parse HTTP/1.1 response: status line + headers + \r\n\r\n + body.
181    let split = raw
182        .windows(4)
183        .position(|w| w == b"\r\n\r\n")
184        .ok_or_else(|| anyhow!("UDS response missing header/body delimiter"))?;
185    let head = std::str::from_utf8(&raw[..split])
186        .map_err(|e| anyhow!("UDS response head not UTF-8: {e}"))?;
187    let body = raw[split + 4..].to_vec();
188    let status_line = head.lines().next().unwrap_or("");
189    // "HTTP/1.1 200 OK"
190    let status: u16 = status_line
191        .split_whitespace()
192        .nth(1)
193        .and_then(|s| s.parse().ok())
194        .ok_or_else(|| anyhow!("UDS response missing status code: {status_line:?}"))?;
195    Ok((status, body))
196}
197
198/// v0.7.0-alpha.19: scheme-aware POST helper that dispatches to either
199/// reqwest (for `http(s)://...`) OR the hand-rolled `uds_request` (for
200/// `unix:///path/to/sock`). Lets the daemon + cmd_send walk a peer's
201/// pinned endpoints uniformly without each call site having to detect
202/// scheme + branch.
203///
204/// Used by the routing layer to send signed events to a peer's slot
205/// regardless of which transport scope the peer is reachable on. UDS
206/// path uses the alpha.17 client; TCP path uses the existing
207/// RelayClient::post_event flow.
208pub fn post_event_to_endpoint(
209    endpoint: &crate::endpoints::Endpoint,
210    event: &Value,
211) -> Result<PostEventResponse> {
212    #[cfg(unix)]
213    if let Some(socket_path) = endpoint.relay_url.strip_prefix("unix://") {
214        let body = serde_json::json!({"event": event}).to_string();
215        let auth_header = format!("Bearer {}", endpoint.slot_token);
216        let (status, body) = uds_request(
217            std::path::Path::new(socket_path),
218            "POST",
219            &format!("/v1/events/{}", endpoint.slot_id),
220            &[
221                ("Content-Type", "application/json"),
222                ("Authorization", &auth_header),
223            ],
224            body.as_bytes(),
225        )?;
226        if !(200..300).contains(&status) {
227            // Format constraint: `cli::error_smells_like_slot_4xx` parses
228            // this error string to gate slot-rotation re-resolves. It
229            // matches `<status>` as a whole token bordered by space/colon
230            // (see issue #69). The current shape `: {status}: {body}`
231            // satisfies that — if you wrap the status differently
232            // (commas/brackets, `status=410`, JSON-encode it), update
233            // `error_smells_like_slot_4xx` in lockstep and add the new
234            // shape to its `slot_reresolve_tests` cases or peer slot
235            // rotations will silently stop auto-recovering.
236            return Err(anyhow!(
237                "post_event (uds {socket_path}) failed: {status}: {}",
238                String::from_utf8_lossy(&body)
239            ));
240        }
241        return Ok(serde_json::from_slice(&body)?);
242    }
243    let client = RelayClient::new(&endpoint.relay_url);
244    client.post_event(&endpoint.slot_id, &endpoint.slot_token, event)
245}
246
247/// Try posting `event` to each endpoint in priority order; return the first
248/// success. Generic over the poster so tests can inject a deterministic mock
249/// without spinning up an HTTP server. In production callers pass
250/// `post_event_to_endpoint`.
251///
252/// Bug 2 (P1, federation reachability) this implements: before this helper,
253/// the bilateral-pair ack path (`send_pair_drop_ack`) only ever POSTed to the
254/// FIRST endpoint in the peer's card. A peer whose first endpoint 4xx'd (e.g.
255/// the userinfo-malformed URL surfaced in Bug 1) was unreachable even when
256/// they advertised a perfectly good second endpoint. Surfaced when
257/// `coral-weasel`'s `wire accept swift-harbor` 400'd on the malformed first
258/// endpoint while a clean `https://wireup.net` endpoint sat behind it
259/// untouched.
260///
261/// Failover ordering is the priority order supplied by the caller (typically
262/// `peer_endpoints_in_priority_order` / `self_endpoints` — UDS / Local / LAN
263/// / Federation, lowest-friction first), so this respects the existing
264/// transport-preference contract.
265///
266/// Returns `Ok((endpoint, response))` on the first success — the caller can
267/// log which endpoint actually accepted the event. Returns `Err` if and only
268/// if every endpoint failed; the error string includes the per-endpoint
269/// reasons so the operator can diagnose without re-tracing.
270pub fn try_post_event_with_failover<F>(
271    endpoints: &[crate::endpoints::Endpoint],
272    event: &Value,
273    mut poster: F,
274) -> Result<(crate::endpoints::Endpoint, PostEventResponse)>
275where
276    F: FnMut(&crate::endpoints::Endpoint, &Value) -> Result<PostEventResponse>,
277{
278    if endpoints.is_empty() {
279        bail!(
280            "no endpoints to deliver to — peer has no pinned endpoints in relay_state. \
281             Re-run the pair flow (or `wire dial <peer>@<relay>`) to re-pin the peer's \
282             advertised endpoints."
283        );
284    }
285    let mut errs: Vec<String> = Vec::with_capacity(endpoints.len());
286    for ep in endpoints {
287        match poster(ep, event) {
288            Ok(resp) => return Ok((ep.clone(), resp)),
289            Err(e) => errs.push(format!("{} ({:?}): {e}", ep.relay_url, ep.scope)),
290        }
291    }
292    bail!(
293        "all {n} endpoint(s) failed:\n  • {reasons}",
294        n = endpoints.len(),
295        reasons = errs.join("\n  • ")
296    )
297}
298
299impl RelayClient {
300    pub fn new(base_url: &str) -> Self {
301        let client = build_blocking_client(Some(std::time::Duration::from_secs(30)))
302            .expect("reqwest client construction is infallible with rustls + native roots");
303        Self {
304            base_url: base_url.trim_end_matches('/').to_string(),
305            client,
306        }
307    }
308
309    /// Allocate a fresh slot. Returns `(slot_id, slot_token)` — caller MUST
310    /// persist `slot_token` somewhere safe (mode 0600 file); it grants both
311    /// read and write access to the slot.
312    pub fn allocate_slot(&self, handle_hint: Option<&str>) -> Result<AllocateResponse> {
313        let body = serde_json::json!({"handle": handle_hint});
314        let resp = self
315            .client
316            .post(format!("{}/v1/slot/allocate", self.base_url))
317            .json(&body)
318            .send()
319            .with_context(|| format!("POST {}/v1/slot/allocate", self.base_url))?;
320        let status = resp.status();
321        if !status.is_success() {
322            let detail = resp.text().unwrap_or_default();
323            return Err(anyhow!("allocate failed: {status}: {detail}"));
324        }
325        Ok(resp.json()?)
326    }
327
328    /// POST a signed event to a slot. Caller passes the slot's bearer token
329    /// (the relay model in v0.1 is "shared slot token between paired peers" —
330    /// see iter 9 SPAKE2 for how this token gets exchanged).
331    pub fn post_event(
332        &self,
333        slot_id: &str,
334        slot_token: &str,
335        event: &Value,
336    ) -> Result<PostEventResponse> {
337        let body = serde_json::json!({"event": event});
338        let resp = self
339            .client
340            .post(format!("{}/v1/events/{slot_id}", self.base_url))
341            .bearer_auth(slot_token)
342            .json(&body)
343            .send()
344            .with_context(|| format!("POST {}/v1/events/{slot_id}", self.base_url))?;
345        let status = resp.status();
346        if !status.is_success() {
347            let detail = resp.text().unwrap_or_default();
348            // Format constraint: `cli::error_smells_like_slot_4xx` parses
349            // this error string to gate slot-rotation re-resolves. It
350            // matches `<status>` as a whole token bordered by space/colon
351            // (see issue #69) — `reqwest::StatusCode` Display gives
352            // `"410 Gone"` which satisfies that. If you change the
353            // wrapping (commas/brackets, `status=410`, JSON-encode it),
354            // update `error_smells_like_slot_4xx` in lockstep and add the
355            // new shape to its `slot_reresolve_tests` cases or peer slot
356            // rotations will silently stop auto-recovering.
357            return Err(anyhow!("post_event failed: {status}: {detail}"));
358        }
359        Ok(resp.json()?)
360    }
361
362    /// GET events from a slot. `since` is an event_id cursor (exclusive); pass
363    /// `None` for the full slot snapshot. `limit` defaults to 100, max 1000.
364    pub fn list_events(
365        &self,
366        slot_id: &str,
367        slot_token: &str,
368        since: Option<&str>,
369        limit: Option<usize>,
370    ) -> Result<Vec<Value>> {
371        let mut url = format!("{}/v1/events/{slot_id}", self.base_url);
372        let mut sep = '?';
373        if let Some(s) = since {
374            url.push(sep);
375            url.push_str(&format!("since={s}"));
376            sep = '&';
377        }
378        if let Some(n) = limit {
379            url.push(sep);
380            url.push_str(&format!("limit={n}"));
381        }
382        let resp = self
383            .client
384            .get(&url)
385            .bearer_auth(slot_token)
386            .send()
387            .with_context(|| format!("GET {url}"))?;
388        let status = resp.status();
389        if !status.is_success() {
390            let detail = resp.text().unwrap_or_default();
391            return Err(anyhow!("list_events failed: {status}: {detail}"));
392        }
393        Ok(resp.json()?)
394    }
395
396    /// R4 — probe slot attentiveness. Returns `(event_count, last_pull_at_unix)`
397    /// — the relay's view of the slot's owner's most recent poll. `None` for
398    /// `last_pull_at_unix` means the slot has not been pulled since relay
399    /// restart. Best-effort: any HTTP failure returns `Ok((0, None))` so the
400    /// caller's pre-flight check degrades to "no signal" rather than abort.
401    pub fn slot_state(&self, slot_id: &str, slot_token: &str) -> Result<(usize, Option<u64>)> {
402        let url = format!("{}/v1/slot/{slot_id}/state", self.base_url);
403        let resp = match self.client.get(&url).bearer_auth(slot_token).send() {
404            Ok(r) => r,
405            Err(_) => return Ok((0, None)),
406        };
407        if !resp.status().is_success() {
408            return Ok((0, None));
409        }
410        let v: Value = resp.json().unwrap_or(Value::Null);
411        let count = v.get("event_count").and_then(Value::as_u64).unwrap_or(0) as usize;
412        let last = v.get("last_pull_at_unix").and_then(Value::as_u64);
413        Ok((count, last))
414    }
415
416    pub fn responder_health_set(
417        &self,
418        slot_id: &str,
419        slot_token: &str,
420        record: &Value,
421    ) -> Result<Value> {
422        let resp = self
423            .client
424            .post(format!(
425                "{}/v1/slot/{slot_id}/responder-health",
426                self.base_url
427            ))
428            .bearer_auth(slot_token)
429            .json(record)
430            .send()
431            .with_context(|| {
432                format!("POST {}/v1/slot/{slot_id}/responder-health", self.base_url)
433            })?;
434        let status = resp.status();
435        if !status.is_success() {
436            let detail = resp.text().unwrap_or_default();
437            return Err(anyhow!("responder_health_set failed: {status}: {detail}"));
438        }
439        Ok(resp.json()?)
440    }
441
442    pub fn responder_health_get(&self, slot_id: &str, slot_token: &str) -> Result<Value> {
443        let resp = self
444            .client
445            .get(format!("{}/v1/slot/{slot_id}/state", self.base_url))
446            .bearer_auth(slot_token)
447            .send()
448            .with_context(|| format!("GET {}/v1/slot/{slot_id}/state", self.base_url))?;
449        let status = resp.status();
450        if !status.is_success() {
451            let detail = resp.text().unwrap_or_default();
452            return Err(anyhow!("responder_health_get failed: {status}: {detail}"));
453        }
454        let state: Value = resp.json()?;
455        Ok(state
456            .get("responder_health")
457            .cloned()
458            .unwrap_or(Value::Null))
459    }
460
461    pub fn healthz(&self) -> Result<bool> {
462        let resp = self
463            .client
464            .get(format!("{}/healthz", self.base_url))
465            .send()?;
466        Ok(resp.status().is_success())
467    }
468
469    /// Healthz pre-flight that surfaces the underlying reqwest error in its
470    /// own message. Use at every "is the relay reachable before we mutate
471    /// state" site. The three possible failure modes (network error, 5xx
472    /// from a reachable host, healthy) each get a distinct diagnostic line.
473    pub fn check_healthz(&self) -> anyhow::Result<()> {
474        match self.healthz() {
475            Ok(true) => Ok(()),
476            Ok(false) => anyhow::bail!(
477                "phyllis: silent line — {}/healthz returned non-200.\n\
478                 the host is reachable but the relay isn't returning ok. test:\n  \
479                 curl -v {}/healthz",
480                self.base_url,
481                self.base_url
482            ),
483            Err(e) => anyhow::bail!(
484                "phyllis: silent line — couldn't reach {}/healthz: {e:#}.\n\
485                 test reachability from this machine:\n  curl -v {}/healthz\n\
486                 if curl also fails, a sandbox / proxy / firewall is the usual cause.\n\
487                 (OpenShell sandbox? run `curl -fsSL https://wireup.net/openshell-policy.sh | bash -s <sandbox-name>` on the host first.)",
488                self.base_url,
489                self.base_url
490            ),
491        }
492    }
493
494    /// Open or join a pair-slot. Returns the relay-assigned `pair_id`.
495    /// `role` must be `"host"` or `"guest"`. The host calls first; the guest
496    /// uses the same `code_hash` and finds the existing slot.
497    pub fn pair_open(&self, code_hash: &str, msg_b64: &str, role: &str) -> Result<String> {
498        let body = serde_json::json!({"code_hash": code_hash, "msg": msg_b64, "role": role});
499        let resp = self
500            .client
501            .post(format!("{}/v1/pair", self.base_url))
502            .json(&body)
503            .send()?;
504        let status = resp.status();
505        if !status.is_success() {
506            let detail = resp.text().unwrap_or_default();
507            return Err(anyhow!("pair_open failed: {status}: {detail}"));
508        }
509        let v: Value = resp.json()?;
510        v.get("pair_id")
511            .and_then(Value::as_str)
512            .map(str::to_string)
513            .ok_or_else(|| anyhow!("pair_open response missing pair_id"))
514    }
515
516    /// Forget the pair-slot at this code_hash on the relay. Either side can call;
517    /// knowledge of the code is the only auth. Idempotent — succeeds even if the
518    /// slot doesn't exist. Use after a client crash mid-handshake so the host
519    /// doesn't stay locked out until TTL.
520    pub fn pair_abandon(&self, code_hash: &str) -> Result<()> {
521        let body = serde_json::json!({"code_hash": code_hash});
522        let resp = self
523            .client
524            .post(format!("{}/v1/pair/abandon", self.base_url))
525            .json(&body)
526            .send()?;
527        let status = resp.status();
528        if !status.is_success() {
529            let detail = resp.text().unwrap_or_default();
530            return Err(anyhow!("pair_abandon failed: {status}: {detail}"));
531        }
532        Ok(())
533    }
534
535    /// Read peer's SPAKE2 message + (eventually) sealed bootstrap from a pair-slot.
536    pub fn pair_get(
537        &self,
538        pair_id: &str,
539        as_role: &str,
540    ) -> Result<(Option<String>, Option<String>)> {
541        let resp = self
542            .client
543            .get(format!(
544                "{}/v1/pair/{pair_id}?as_role={as_role}",
545                self.base_url
546            ))
547            .send()?;
548        let status = resp.status();
549        if !status.is_success() {
550            let detail = resp.text().unwrap_or_default();
551            return Err(anyhow!("pair_get failed: {status}: {detail}"));
552        }
553        let v: Value = resp.json()?;
554        let peer_msg = v
555            .get("peer_msg")
556            .and_then(Value::as_str)
557            .map(str::to_string);
558        let peer_bootstrap = v
559            .get("peer_bootstrap")
560            .and_then(Value::as_str)
561            .map(str::to_string);
562        Ok((peer_msg, peer_bootstrap))
563    }
564
565    /// POST a sealed bootstrap payload to the pair-slot.
566    pub fn pair_bootstrap(&self, pair_id: &str, role: &str, sealed_b64: &str) -> Result<()> {
567        let body = serde_json::json!({"role": role, "sealed": sealed_b64});
568        let resp = self
569            .client
570            .post(format!("{}/v1/pair/{pair_id}/bootstrap", self.base_url))
571            .json(&body)
572            .send()?;
573        if !resp.status().is_success() {
574            let s = resp.status();
575            let detail = resp.text().unwrap_or_default();
576            return Err(anyhow!("pair_bootstrap failed: {s}: {detail}"));
577        }
578        Ok(())
579    }
580
581    /// Claim a `nick@<this-relay-domain>` handle (v0.5). Caller must hold
582    /// the bearer token for `slot_id`. FCFS on nick; same-DID re-claims OK.
583    ///
584    /// Back-compat wrapper around `handle_claim_v2` that omits the
585    /// `discoverable` field (relay defaults to discoverable on absence).
586    pub fn handle_claim(
587        &self,
588        nick: &str,
589        slot_id: &str,
590        slot_token: &str,
591        relay_url: Option<&str>,
592        card: &Value,
593    ) -> Result<Value> {
594        self.handle_claim_v2(nick, slot_id, slot_token, relay_url, card, None)
595    }
596
597    /// v0.5.19 (#9.1) variant accepting the optional `discoverable`
598    /// flag. `None` = relay default (= true, back-compat).
599    /// `Some(false)` = opt out of `/v1/handles` bulk listing while
600    /// keeping direct `.well-known/wire/agent` resolution working.
601    /// Relays older than v0.5.19 ignore the field — safe to always send.
602    pub fn handle_claim_v2(
603        &self,
604        nick: &str,
605        slot_id: &str,
606        slot_token: &str,
607        relay_url: Option<&str>,
608        card: &Value,
609        discoverable: Option<bool>,
610    ) -> Result<Value> {
611        let mut body = serde_json::json!({
612            "nick": nick,
613            "slot_id": slot_id,
614            "relay_url": relay_url,
615            "card": card,
616        });
617        if let Some(d) = discoverable {
618            body["discoverable"] = serde_json::json!(d);
619        }
620        let resp = self
621            .client
622            .post(format!("{}/v1/handle/claim", self.base_url))
623            .bearer_auth(slot_token)
624            .json(&body)
625            .send()
626            .with_context(|| format!("POST {}/v1/handle/claim", self.base_url))?;
627        let status = resp.status();
628        if !status.is_success() {
629            let detail = resp.text().unwrap_or_default();
630            return Err(anyhow!("handle_claim failed: {status}: {detail}"));
631        }
632        Ok(resp.json()?)
633    }
634
635    /// POST an intro (zero-paste pair-drop) event to a known nick's slot
636    /// without holding that slot's bearer token. Relay validates the event
637    /// is kind=1100 with an embedded signed agent-card; otherwise refuses.
638    pub fn handle_intro(&self, nick: &str, event: &Value) -> Result<Value> {
639        let body = serde_json::json!({"event": event});
640        let resp = self
641            .client
642            .post(format!("{}/v1/handle/intro/{nick}", self.base_url))
643            .json(&body)
644            .send()
645            .with_context(|| format!("POST {}/v1/handle/intro/{nick}", self.base_url))?;
646        let status = resp.status();
647        if !status.is_success() {
648            let detail = resp.text().unwrap_or_default();
649            return Err(anyhow!("handle_intro failed: {status}: {detail}"));
650        }
651        Ok(resp.json()?)
652    }
653
654    /// Resolve a handle on this relay via A2A v1.0 `.well-known/agent-card.json?handle=<nick>`.
655    /// Returns the parsed AgentCard JSON. Wire-served relays embed wire-native
656    /// fields (DID, slot_id, profile, raw card) under `extensions[0].params`.
657    /// Foreign A2A agents return their A2A card without wire ext — useful for
658    /// `wire whois` even when full mailbox pairing isn't possible.
659    pub fn well_known_agent_card_a2a(&self, handle: &str) -> Result<Value> {
660        let resp = self
661            .client
662            .get(format!("{}/.well-known/agent-card.json", self.base_url))
663            .query(&[("handle", handle)])
664            .send()
665            .with_context(|| {
666                format!(
667                    "GET {}/.well-known/agent-card.json?handle={handle}",
668                    self.base_url
669                )
670            })?;
671        let status = resp.status();
672        if !status.is_success() {
673            let detail = resp.text().unwrap_or_default();
674            return Err(anyhow!(
675                "well_known_agent_card_a2a failed: {status}: {detail}"
676            ));
677        }
678        Ok(resp.json()?)
679    }
680
681    /// Resolve a handle on this relay via `.well-known/wire/agent?handle=<nick>`.
682    /// Caller passes either the full `nick@domain` or just `<nick>` — the
683    /// server only uses the local part.
684    pub fn well_known_agent(&self, handle: &str) -> Result<Value> {
685        let resp = self
686            .client
687            .get(format!("{}/.well-known/wire/agent", self.base_url))
688            .query(&[("handle", handle)])
689            .send()
690            .with_context(|| {
691                format!(
692                    "GET {}/.well-known/wire/agent?handle={handle}",
693                    self.base_url
694                )
695            })?;
696        let status = resp.status();
697        if !status.is_success() {
698            let detail = resp.text().unwrap_or_default();
699            return Err(anyhow!("well_known_agent failed: {status}: {detail}"));
700        }
701        Ok(resp.json()?)
702    }
703}
704
705#[cfg(all(test, unix))]
706mod uds_tests {
707    use super::*;
708    use std::io::{Read, Write};
709    use std::os::unix::net::UnixListener;
710    use std::thread;
711
712    /// Spawn a one-shot UDS HTTP/1.1 server that returns a canned
713    /// response. Returns the socket path; cleanup is via drop of the
714    /// tempdir the caller manages.
715    fn spawn_canned_uds_server(socket_path: std::path::PathBuf, status: u16, body: &'static str) {
716        let listener = UnixListener::bind(&socket_path).expect("bind canned UDS");
717        thread::spawn(move || {
718            let (mut stream, _) = listener.accept().expect("accept canned UDS");
719            let mut req_buf = [0u8; 4096];
720            let _ = stream.read(&mut req_buf);
721            let body_bytes = body.as_bytes();
722            let status_text = match status {
723                200 => "OK",
724                201 => "Created",
725                400 => "Bad Request",
726                _ => "Status",
727            };
728            let resp = format!(
729                "HTTP/1.1 {status} {status_text}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
730                body_bytes.len()
731            );
732            let _ = stream.write_all(resp.as_bytes());
733        });
734    }
735
736    #[test]
737    fn uds_request_round_trips_200_with_body() {
738        let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
739        std::fs::create_dir_all(&tmpdir).unwrap();
740        let sock = tmpdir.join("rt.sock");
741        let _ = std::fs::remove_file(&sock);
742        spawn_canned_uds_server(sock.clone(), 200, r#"{"ok":true}"#);
743        // Give the server a moment to bind.
744        std::thread::sleep(std::time::Duration::from_millis(50));
745        let (status, body) = uds_request(
746            &sock,
747            "POST",
748            "/v1/test",
749            &[("Content-Type", "application/json")],
750            b"{}",
751        )
752        .expect("uds_request succeeds");
753        assert_eq!(status, 200);
754        assert_eq!(body, br#"{"ok":true}"#);
755    }
756
757    #[test]
758    fn uds_request_surfaces_non_2xx_status() {
759        let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
760        std::fs::create_dir_all(&tmpdir).unwrap();
761        let sock = tmpdir.join("err.sock");
762        let _ = std::fs::remove_file(&sock);
763        spawn_canned_uds_server(sock.clone(), 400, r#"{"error":"bad"}"#);
764        std::thread::sleep(std::time::Duration::from_millis(50));
765        let (status, body) = uds_request(&sock, "GET", "/v1/test", &[], b"")
766            .expect("uds_request succeeds even on 4xx");
767        assert_eq!(status, 400);
768        assert_eq!(body, br#"{"error":"bad"}"#);
769    }
770
771    #[test]
772    fn uds_request_fails_on_nonexistent_socket() {
773        let nope = std::path::Path::new("/tmp/wire-uds-nonexistent-socket-aaa.sock");
774        let _ = std::fs::remove_file(nope);
775        let err = uds_request(nope, "GET", "/", &[], b"").unwrap_err();
776        let msg = format!("{err:#}");
777        assert!(
778            msg.contains("connect UDS"),
779            "expected connect error, got: {msg}"
780        );
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787
788    #[test]
789    fn url_normalization_trims_trailing_slash() {
790        let c = RelayClient::new("http://example.com/");
791        assert_eq!(c.base_url, "http://example.com");
792        let c = RelayClient::new("http://example.com");
793        assert_eq!(c.base_url, "http://example.com");
794    }
795
796    #[test]
797    fn format_transport_error_classifies_tls() {
798        // Simulate the Avast/corp-proxy class from issue #6: reqwest wraps
799        // a rustls UnknownIssuer inside a hyper error inside a context URL.
800        let inner = anyhow!("invalid peer certificate: UnknownIssuer");
801        let middle: anyhow::Error = inner.context("hyper send");
802        let top = middle.context("POST https://relay.example/v1/events/abc");
803        let formatted = format_transport_error(&top);
804        assert!(
805            formatted.starts_with("TLS error:"),
806            "expected TLS class prefix, got: {formatted}"
807        );
808        assert!(
809            formatted.contains("UnknownIssuer"),
810            "lost root cause: {formatted}"
811        );
812        assert!(
813            formatted.contains("POST https://relay.example"),
814            "lost context URL: {formatted}"
815        );
816    }
817
818    #[test]
819    fn format_transport_error_classifies_timeout() {
820        let inner = anyhow!("operation timed out");
821        let top = inner.context("POST https://relay.example/v1/events/abc");
822        let formatted = format_transport_error(&top);
823        assert!(formatted.starts_with("timeout:"), "got: {formatted}");
824    }
825
826    #[test]
827    fn format_transport_error_classifies_dns() {
828        let inner = anyhow!("dns error: failed to lookup address");
829        let top = inner.context("POST https://relay.example/v1/events/abc");
830        let formatted = format_transport_error(&top);
831        assert!(formatted.starts_with("DNS error:"), "got: {formatted}");
832    }
833
834    #[test]
835    fn format_transport_error_falls_back_to_chain_join() {
836        // Unknown class → no prefix, just the joined chain. Behavior MUST
837        // still surface every cause (this is the loud-fail invariant).
838        let inner = anyhow!("Refused to connect for non-standard reason xyz");
839        let top = inner.context("POST https://relay.example/v1/events/abc");
840        let formatted = format_transport_error(&top);
841        assert!(formatted.contains("Refused to connect"));
842        assert!(formatted.contains("POST https://relay.example"));
843    }
844
845    #[test]
846    fn insecure_env_recognizes_truthy_values_and_default_off() {
847        // Process-global env var → must be one test, not two (otherwise
848        // parallel cargo-test threads race). Single test owns the var's
849        // lifecycle from "unset" through truthy values back to "unset".
850        use std::sync::{Mutex, OnceLock};
851        static GUARD: OnceLock<Mutex<()>> = OnceLock::new();
852        let _lock = GUARD.get_or_init(|| Mutex::new(())).lock().unwrap();
853
854        // SAFETY: env mutation here is serialized by the GUARD mutex;
855        // other tests in this module do not touch INSECURE_SKIP_TLS_ENV.
856        unsafe {
857            std::env::remove_var(INSECURE_SKIP_TLS_ENV);
858        }
859        assert!(!insecure_skip_tls_verify(), "default must be secure");
860
861        for v in ["1", "true", "yes", "on", "TRUE", "Yes"] {
862            unsafe {
863                std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
864            }
865            assert!(insecure_skip_tls_verify(), "value {v:?} should be truthy");
866        }
867        // Falsy / unset round-trip back to secure.
868        for v in ["0", "false", "no", "off", ""] {
869            unsafe {
870                std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
871            }
872            assert!(
873                !insecure_skip_tls_verify(),
874                "value {v:?} must not enable insecure mode"
875            );
876        }
877        unsafe {
878            std::env::remove_var(INSECURE_SKIP_TLS_ENV);
879        }
880    }
881}
882
883#[cfg(test)]
884mod failover_tests {
885    use super::*;
886    use crate::endpoints::{Endpoint, EndpointScope};
887    use std::sync::Mutex;
888
889    fn fed_ep(url: &str, slot: &str, token: &str) -> Endpoint {
890        Endpoint::federation(url.to_string(), slot.to_string(), token.to_string())
891    }
892
893    fn local_ep(url: &str, slot: &str, token: &str) -> Endpoint {
894        Endpoint {
895            relay_url: url.to_string(),
896            slot_id: slot.to_string(),
897            slot_token: token.to_string(),
898            scope: EndpointScope::Local,
899        }
900    }
901
902    fn ok_resp() -> PostEventResponse {
903        PostEventResponse {
904            event_id: Some("evt-1".to_string()),
905            status: "queued".to_string(),
906        }
907    }
908
909    #[test]
910    fn first_endpoint_succeeds_no_further_attempts() {
911        // Happy path: first endpoint accepts; subsequent endpoints are
912        // never tried. Pins that failover doesn't churn unnecessary RTTs
913        // when the primary works.
914        let endpoints = vec![
915            fed_ep("https://good.example", "slot1", "tok1"),
916            fed_ep("https://other.example", "slot2", "tok2"),
917        ];
918        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
919        let result = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
920            attempts.lock().unwrap().push(ep.relay_url.clone());
921            Ok(ok_resp())
922        })
923        .unwrap();
924        assert_eq!(result.0.relay_url, "https://good.example");
925        assert_eq!(
926            *attempts.lock().unwrap(),
927            vec!["https://good.example".to_string()],
928            "must NOT try the second endpoint after the first succeeds"
929        );
930    }
931
932    #[test]
933    fn skips_dead_endpoint_and_succeeds_on_next() {
934        // The Bug 2 regression case: a peer advertises [bad, good]. Pre-fix,
935        // send_pair_drop_ack would 4xx on `bad` and give up — bilateral pair
936        // unreachable. Now the failover helper tries `bad`, records the
937        // error, tries `good`, succeeds. Mirrors the swift-harbor ↔
938        // coral-weasel incident exactly.
939        let endpoints = vec![
940            // Bad first endpoint (modeling the userinfo-malformed URL from
941            // Bug 1 / the federation 400 coral-weasel hit on accept).
942            fed_ep("https://copilot-agent@wireup.net", "slot-bad", "tok-bad"),
943            // Clean second endpoint that actually works.
944            fed_ep("https://wireup.net", "slot-good", "tok-good"),
945        ];
946        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
947        let (delivered_ep, _resp) =
948            try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
949                attempts.lock().unwrap().push(ep.relay_url.clone());
950                if ep.relay_url.contains('@') {
951                    Err(anyhow!("400 Bad Request (userinfo embedded)"))
952                } else {
953                    Ok(ok_resp())
954                }
955            })
956            .unwrap();
957        assert_eq!(
958            delivered_ep.relay_url, "https://wireup.net",
959            "the successful endpoint must be the one returned to the caller"
960        );
961        assert_eq!(
962            *attempts.lock().unwrap(),
963            vec![
964                "https://copilot-agent@wireup.net".to_string(),
965                "https://wireup.net".to_string()
966            ],
967            "must try `bad` first, then fall over to `good`"
968        );
969    }
970
971    #[test]
972    fn respects_priority_order_caller_supplies() {
973        // We don't re-sort; we honor the caller's order. Typical input is
974        // `peer_endpoints_in_priority_order` (UDS / Local / LAN / Federation),
975        // so the "first tried" semantics encode the existing transport-
976        // preference contract. Test: Local before Federation in input →
977        // Local tried first.
978        let endpoints = vec![
979            local_ep("http://127.0.0.1:8771", "loc1", "loctok"),
980            fed_ep("https://wireup.net", "fed1", "fedtok"),
981        ];
982        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
983        let _ = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
984            attempts.lock().unwrap().push(ep.relay_url.clone());
985            Ok(ok_resp())
986        })
987        .unwrap();
988        assert_eq!(
989            attempts.lock().unwrap()[0],
990            "http://127.0.0.1:8771",
991            "Local-scope endpoint must be tried first (per the caller's priority order)"
992        );
993    }
994
995    #[test]
996    fn all_failures_returns_combined_error() {
997        // All endpoints fail: the helper must combine the per-endpoint
998        // reasons into a single error so the operator can diagnose without
999        // re-tracing — same shape as cmd_push's failure logging.
1000        let endpoints = vec![
1001            fed_ep("https://a.example", "s", "t"),
1002            fed_ep("https://b.example", "s", "t"),
1003            fed_ep("https://c.example", "s", "t"),
1004        ];
1005        let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1006            Err(anyhow!("simulated 500 from {}", ep.relay_url))
1007        })
1008        .unwrap_err()
1009        .to_string();
1010        assert!(
1011            err.contains("all 3 endpoint(s) failed"),
1012            "error must surface the total count: {err}"
1013        );
1014        // Every endpoint URL appears in the combined error so each
1015        // failure is attributable.
1016        for u in [
1017            "https://a.example",
1018            "https://b.example",
1019            "https://c.example",
1020        ] {
1021            assert!(
1022                err.contains(u),
1023                "combined error must include each failing endpoint URL ({u}): {err}"
1024            );
1025        }
1026    }
1027
1028    #[test]
1029    fn empty_endpoints_returns_actionable_error() {
1030        // A peer with no pinned endpoints is unreachable by definition. The
1031        // helper must say so explicitly (not silently return Ok) and point
1032        // at the re-pair remediation.
1033        let endpoints: Vec<Endpoint> = Vec::new();
1034        let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |_, _| {
1035            unreachable!("poster must not be called when endpoint list is empty")
1036        })
1037        .unwrap_err()
1038        .to_string();
1039        assert!(
1040            err.contains("no endpoints to deliver to"),
1041            "empty-list error must be explicit: {err}"
1042        );
1043        assert!(
1044            err.contains("re-pin") || err.contains("dial") || err.contains("pair"),
1045            "empty-list error must point at the remediation path: {err}"
1046        );
1047    }
1048}