Skip to main content

wire/
relay_client.rs

1//! HTTP client for `wire-relay-server`.
2//!
3//! Sync wrapper around `reqwest::blocking` so CLI commands stay synchronous —
4//! the only async surface in the crate is `relay_server::serve`. Async clients
5//! land in v0.2 if a long-running daemon needs them.
6
7use anyhow::{Context, Result, anyhow, bail};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use thiserror::Error;
11
12#[derive(Clone)]
13pub struct RelayClient {
14    base_url: String,
15    client: reqwest::blocking::Client,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct AllocateResponse {
20    pub slot_id: String,
21    pub slot_token: String,
22}
23
24#[derive(Debug, Deserialize)]
25pub struct PostEventResponse {
26    pub event_id: Option<String>,
27    pub status: String,
28}
29
30/// RFC-003 §2 DNS-TXT binding anchor. `_wire-org.<domain>` records use the
31/// same field grammar for org-tier (`did:wire:org:*`) and personal-tier
32/// (`did:wire:op:*`) deployments; receivers dispatch on the DID prefix.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum WireOrgTxtDid {
35    Org(String),
36    Op(String),
37}
38
39impl WireOrgTxtDid {
40    pub fn as_str(&self) -> &str {
41        match self {
42            WireOrgTxtDid::Org(did) | WireOrgTxtDid::Op(did) => did,
43        }
44    }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct WireOrgTxtRecord {
49    pub did: WireOrgTxtDid,
50    pub relay: Option<String>,
51    pub sso_iss: Option<String>,
52    pub sso_tenant: Option<String>,
53}
54
55#[derive(Debug, Error, PartialEq, Eq)]
56pub enum WireOrgTxtParseError {
57    #[error("DNS-TXT record missing required `did=` field")]
58    MissingDid,
59    #[error("DNS-TXT record missing required `v=` field")]
60    MissingVersion,
61    #[error("unsupported DNS-TXT record version `{0}`")]
62    UnsupportedVersion(String),
63    #[error("`did=` must be did:wire:org:* or did:wire:op:* with a long fingerprint suffix")]
64    InvalidDid(String),
65    #[error("duplicate DNS-TXT field `{0}`")]
66    DuplicateField(&'static str),
67    #[error("malformed DNS-TXT field `{0}`")]
68    MalformedField(String),
69}
70
71/// Parse the field grammar used by RFC-003 §2:
72///
73/// `_wire-org.<domain> TXT "did=<wire-DID>; relay=<url>; sso_iss=<iss>; sso_tenant=<tenant>; v=1"`
74///
75/// Field-additive evolution rule: at known `v=1`, unknown fields are ignored
76/// so future records remain forward-compatible. Unknown `v` values are rejected
77/// at parse time because they may change existing-field semantics.
78pub fn parse_wire_org_txt_record(record: &str) -> Result<WireOrgTxtRecord, WireOrgTxtParseError> {
79    let trimmed = record.trim();
80    let body = trimmed
81        .strip_prefix('"')
82        .and_then(|s| s.strip_suffix('"'))
83        .unwrap_or(trimmed);
84
85    let mut did: Option<String> = None;
86    let mut version: Option<String> = None;
87    let mut relay: Option<String> = None;
88    let mut sso_iss: Option<String> = None;
89    let mut sso_tenant: Option<String> = None;
90
91    fn set_once(
92        slot: &mut Option<String>,
93        field: &'static str,
94        value: &str,
95    ) -> Result<(), WireOrgTxtParseError> {
96        if slot.is_some() {
97            return Err(WireOrgTxtParseError::DuplicateField(field));
98        }
99        *slot = Some(value.trim().to_string());
100        Ok(())
101    }
102
103    for raw in body.split(';') {
104        let raw = raw.trim();
105        if raw.is_empty() {
106            continue;
107        }
108        let Some((key, value)) = raw.split_once('=') else {
109            return Err(WireOrgTxtParseError::MalformedField(raw.to_string()));
110        };
111        match key.trim() {
112            "did" => set_once(&mut did, "did", value)?,
113            "v" => set_once(&mut version, "v", value)?,
114            "relay" => set_once(&mut relay, "relay", value)?,
115            "sso_iss" => set_once(&mut sso_iss, "sso_iss", value)?,
116            "sso_tenant" => set_once(&mut sso_tenant, "sso_tenant", value)?,
117            _ => {
118                // RFC-003 §2 / RFC-001 §A: field-additive evolution at a
119                // known version. Unknown fields are opaque, not fatal.
120            }
121        }
122    }
123
124    let version = version.ok_or(WireOrgTxtParseError::MissingVersion)?;
125    if version != "1" {
126        return Err(WireOrgTxtParseError::UnsupportedVersion(version));
127    }
128
129    let did = did.ok_or(WireOrgTxtParseError::MissingDid)?;
130    let did = if crate::agent_card::is_org_did(&did) {
131        WireOrgTxtDid::Org(did)
132    } else if crate::agent_card::is_op_did(&did) {
133        WireOrgTxtDid::Op(did)
134    } else {
135        return Err(WireOrgTxtParseError::InvalidDid(did));
136    };
137
138    Ok(WireOrgTxtRecord {
139        did,
140        relay,
141        sso_iss,
142        sso_tenant,
143    })
144}
145
146/// Env var: when set to a truthy value (`1`, `true`, `yes`), every TLS
147/// verification check on every wire HTTPS client is disabled. Intended
148/// as an emergency-only operator override for environments behind a
149/// TLS-intercepting middlebox (corporate proxy, AV product like Avast
150/// re-signing certs with its own root, captive portal). Prints a loud
151/// stderr banner on every send when active. **Do not set this in
152/// production.** Documented in THREAT_MODEL.md + README.
153pub const INSECURE_SKIP_TLS_ENV: &str = "WIRE_INSECURE_SKIP_TLS_VERIFY";
154
155fn insecure_skip_tls_verify() -> bool {
156    matches!(
157        std::env::var(INSECURE_SKIP_TLS_ENV)
158            .unwrap_or_default()
159            .to_ascii_lowercase()
160            .as_str(),
161        "1" | "true" | "yes" | "on"
162    )
163}
164
165/// v0.5.13: emit the loud-fail banner exactly once per process so we
166/// don't spam a hundred lines per `wire push`. Per-process `OnceLock`
167/// guards the emission. The banner goes to stderr; never stdout (we
168/// must not corrupt the `--json` machine-readable contract).
169fn maybe_emit_insecure_banner() {
170    static ONCE: std::sync::OnceLock<()> = std::sync::OnceLock::new();
171    if insecure_skip_tls_verify() {
172        ONCE.get_or_init(|| {
173            eprintln!(
174                "\x1b[1;31mwire: WARNING\x1b[0m {INSECURE_SKIP_TLS_ENV}=1 is set; TLS verification is DISABLED for all relay traffic. \
175                 MITM attacks against the relay path are undetectable in this mode. Unset to restore default trust validation."
176            );
177        });
178    }
179}
180
181/// Centralized builder for blocking HTTPS clients across wire. Uses
182/// rustls + Mozilla webpki-roots bundled CA set
183/// (`rustls-tls-webpki-roots` reqwest feature). Honors the
184/// [`INSECURE_SKIP_TLS_ENV`] escape hatch for the corporate-proxy
185/// emergency case.
186///
187/// v0.14.2: previously this used `rustls-tls-native-roots` (native OS
188/// trust store via `rustls-native-certs`) so corp CAs / AV-resign
189/// products validated transparently. That broke catastrophically when
190/// #170's `--all-sessions` supervisor moved wire daemons into launchd:
191/// launchd-spawned processes don't inherit the user's Aqua session
192/// keychain context on macOS, so `rustls-native-certs` returned zero
193/// roots → every wireup.net request failed with "UnknownIssuer" and
194/// the daemon silently no-op'd push/pull (84 events queued, 0 pushed,
195/// SSE stream errored on every reconnect). Same binary worked fine
196/// from a shell because the operator's Aqua session had keychain
197/// access.
198///
199/// Switching to bundled webpki-roots removes the OS dependency at the
200/// cost of corp CA support; operators behind a corporate proxy that
201/// resigns certs should set `WIRE_INSECURE_SKIP_TLS_VERIFY=1`. A
202/// proper dual-roots verifier (native + webpki via
203/// `rustls-platform-verifier`) is filed for follow-up.
204pub fn build_blocking_client(
205    timeout: Option<std::time::Duration>,
206) -> Result<reqwest::blocking::Client> {
207    let mut b = reqwest::blocking::Client::builder();
208    if let Some(t) = timeout {
209        b = b.timeout(t);
210    }
211    if insecure_skip_tls_verify() {
212        maybe_emit_insecure_banner();
213        b = b.danger_accept_invalid_certs(true);
214    } else {
215        // v0.14.2 #177: dual-roots TLS — webpki bundled + OS native
216        // when accessible (corp CAs / AV-resign / on-prem). Replaces
217        // #176's webpki-only emergency fallback. See `tls.rs` for
218        // the why.
219        let cfg = crate::tls::shared_client_config();
220        b = b.use_preconfigured_tls((*cfg).clone());
221    }
222    b.build()
223        .with_context(|| "constructing reqwest blocking client")
224}
225
226/// Flatten an `anyhow::Error` source chain into a single human-readable
227/// transport-error line for the `reason` field in `wire push --json` and
228/// for stderr surfaces. Classifies the topmost cause (`TLS error`,
229/// `DNS error`, `connect timeout`, `read timeout`, `HTTP error`) so a
230/// silent failure no longer leaks past the user as a bare URL.
231///
232/// v0.5.13 rule 1 of the network-resilience doctrine — see issue #6.
233pub fn format_transport_error(err: &anyhow::Error) -> String {
234    let mut parts: Vec<String> = err.chain().map(|c| c.to_string()).collect();
235    // Heuristic classification — search the chain for the lowest-level
236    // descriptor and prefix the message so the reader sees the kind
237    // even when the topmost context is just the URL.
238    let lower = parts
239        .iter()
240        .map(|p| p.to_ascii_lowercase())
241        .collect::<Vec<_>>();
242    let class = if lower.iter().any(|p| {
243        p.contains("invalid peer certificate")
244            || p.contains("certificate verification")
245            || p.contains("unknownissuer")
246            || p.contains("certificate is not valid")
247            || p.contains("tls handshake")
248    }) {
249        Some("TLS error")
250    } else if lower.iter().any(|p| {
251        p.contains("dns error")
252            || p.contains("nodename nor servname")
253            || p.contains("failed to lookup address")
254    }) {
255        Some("DNS error")
256    } else if lower
257        .iter()
258        .any(|p| p.contains("operation timed out") || p.contains("deadline has elapsed"))
259    {
260        Some("timeout")
261    } else if lower
262        .iter()
263        .any(|p| p.contains("connection refused") || p.contains("connection reset"))
264    {
265        Some("connect error")
266    } else {
267        None
268    };
269    if let Some(c) = class {
270        parts.insert(0, c.to_string());
271    }
272    parts.join(": ")
273}
274
275/// v0.7.0-alpha.17: minimal blocking HTTP/1.1 client over Unix Domain
276/// Socket. Used by callers that detect a `unix://` scheme on a relay
277/// endpoint URL and route around reqwest (which has no UDS support).
278///
279/// Connects to `socket_path`, writes a single HTTP/1.1 request, parses
280/// status + Content-Length + body. Closes the connection (no keep-
281/// alive). Sufficient for wire's request shape: single POST or GET per
282/// call, JSON in + JSON out, small payloads.
283///
284/// Returns `(status_code, body_bytes)`. Caller decodes body per the
285/// endpoint's content type.
286#[cfg(unix)]
287pub fn uds_request(
288    socket_path: &std::path::Path,
289    method: &str,
290    request_target: &str,
291    headers: &[(&str, &str)],
292    body: &[u8],
293) -> Result<(u16, Vec<u8>)> {
294    use std::io::{Read, Write};
295    use std::os::unix::net::UnixStream;
296    let mut stream =
297        UnixStream::connect(socket_path).with_context(|| format!("connect UDS {socket_path:?}"))?;
298    stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
299    stream.set_write_timeout(Some(std::time::Duration::from_secs(30)))?;
300    let mut req = String::with_capacity(256 + headers.len() * 32 + body.len());
301    req.push_str(method);
302    req.push(' ');
303    req.push_str(request_target);
304    req.push_str(" HTTP/1.1\r\n");
305    req.push_str("Host: localhost\r\n");
306    req.push_str("Connection: close\r\n");
307    req.push_str(&format!("Content-Length: {}\r\n", body.len()));
308    for (k, v) in headers {
309        req.push_str(k);
310        req.push_str(": ");
311        req.push_str(v);
312        req.push_str("\r\n");
313    }
314    req.push_str("\r\n");
315    stream.write_all(req.as_bytes())?;
316    if !body.is_empty() {
317        stream.write_all(body)?;
318    }
319    stream.flush()?;
320    let mut raw = Vec::new();
321    stream.read_to_end(&mut raw)?;
322    // Parse HTTP/1.1 response: status line + headers + \r\n\r\n + body.
323    let split = raw
324        .windows(4)
325        .position(|w| w == b"\r\n\r\n")
326        .ok_or_else(|| anyhow!("UDS response missing header/body delimiter"))?;
327    let head = std::str::from_utf8(&raw[..split])
328        .map_err(|e| anyhow!("UDS response head not UTF-8: {e}"))?;
329    let body = raw[split + 4..].to_vec();
330    let status_line = head.lines().next().unwrap_or("");
331    // "HTTP/1.1 200 OK"
332    let status: u16 = status_line
333        .split_whitespace()
334        .nth(1)
335        .and_then(|s| s.parse().ok())
336        .ok_or_else(|| anyhow!("UDS response missing status code: {status_line:?}"))?;
337    Ok((status, body))
338}
339
340/// v0.7.0-alpha.19: scheme-aware POST helper that dispatches to either
341/// reqwest (for `http(s)://...`) OR the hand-rolled `uds_request` (for
342/// `unix:///path/to/sock`). Lets the daemon + cmd_send walk a peer's
343/// pinned endpoints uniformly without each call site having to detect
344/// scheme + branch.
345///
346/// Used by the routing layer to send signed events to a peer's slot
347/// regardless of which transport scope the peer is reachable on. UDS
348/// path uses the alpha.17 client; TCP path uses the existing
349/// RelayClient::post_event flow.
350pub fn post_event_to_endpoint(
351    endpoint: &crate::endpoints::Endpoint,
352    event: &Value,
353) -> Result<PostEventResponse> {
354    #[cfg(unix)]
355    if let Some(socket_path) = endpoint.relay_url.strip_prefix("unix://") {
356        let body = serde_json::json!({"event": event}).to_string();
357        let auth_header = format!("Bearer {}", endpoint.slot_token);
358        let (status, body) = uds_request(
359            std::path::Path::new(socket_path),
360            "POST",
361            &format!("/v1/events/{}", endpoint.slot_id),
362            &[
363                ("Content-Type", "application/json"),
364                ("Authorization", &auth_header),
365            ],
366            body.as_bytes(),
367        )?;
368        if !(200..300).contains(&status) {
369            // Format constraint: `cli::error_smells_like_slot_4xx` parses
370            // this error string to gate slot-rotation re-resolves. It
371            // matches `<status>` as a whole token bordered by space/colon
372            // (see issue #69). The current shape `: {status}: {body}`
373            // satisfies that — if you wrap the status differently
374            // (commas/brackets, `status=410`, JSON-encode it), update
375            // `error_smells_like_slot_4xx` in lockstep and add the new
376            // shape to its `slot_reresolve_tests` cases or peer slot
377            // rotations will silently stop auto-recovering.
378            return Err(anyhow!(
379                "post_event (uds {socket_path}) failed: {status}: {}",
380                String::from_utf8_lossy(&body)
381            ));
382        }
383        return Ok(serde_json::from_slice(&body)?);
384    }
385    let client = RelayClient::new(&endpoint.relay_url);
386    client.post_event(&endpoint.slot_id, &endpoint.slot_token, event)
387}
388
389/// Try posting `event` to each endpoint in priority order; return the first
390/// success. Generic over the poster so tests can inject a deterministic mock
391/// without spinning up an HTTP server. In production callers pass
392/// `post_event_to_endpoint`.
393///
394/// Bug 2 (P1, federation reachability) this implements: before this helper,
395/// the bilateral-pair ack path (`send_pair_drop_ack`) only ever POSTed to the
396/// FIRST endpoint in the peer's card. A peer whose first endpoint 4xx'd (e.g.
397/// the userinfo-malformed URL surfaced in Bug 1) was unreachable even when
398/// they advertised a perfectly good second endpoint. Surfaced when
399/// `coral-weasel`'s `wire accept swift-harbor` 400'd on the malformed first
400/// endpoint while a clean `https://wireup.net` endpoint sat behind it
401/// untouched.
402///
403/// Failover ordering is the priority order supplied by the caller (typically
404/// `peer_endpoints_in_priority_order` / `self_endpoints` — UDS / Local / LAN
405/// / Federation, lowest-friction first), so this respects the existing
406/// transport-preference contract.
407///
408/// Returns `Ok((endpoint, response))` on the first success — the caller can
409/// log which endpoint actually accepted the event. Returns `Err` if and only
410/// if every endpoint failed; the error string includes the per-endpoint
411/// reasons so the operator can diagnose without re-tracing.
412pub fn try_post_event_with_failover<F>(
413    endpoints: &[crate::endpoints::Endpoint],
414    event: &Value,
415    mut poster: F,
416) -> Result<(crate::endpoints::Endpoint, PostEventResponse)>
417where
418    F: FnMut(&crate::endpoints::Endpoint, &Value) -> Result<PostEventResponse>,
419{
420    if endpoints.is_empty() {
421        bail!(
422            "no endpoints to deliver to — peer has no pinned endpoints in relay_state. \
423             Re-run the pair flow (or `wire dial <peer>@<relay>`) to re-pin the peer's \
424             advertised endpoints."
425        );
426    }
427    let mut errs: Vec<String> = Vec::with_capacity(endpoints.len());
428    for ep in endpoints {
429        match poster(ep, event) {
430            Ok(resp) => return Ok((ep.clone(), resp)),
431            Err(e) => errs.push(format!("{} ({:?}): {e}", ep.relay_url, ep.scope)),
432        }
433    }
434    bail!(
435        "all {n} endpoint(s) failed:\n  • {reasons}",
436        n = endpoints.len(),
437        reasons = errs.join("\n  • ")
438    )
439}
440
441impl RelayClient {
442    pub fn new(base_url: &str) -> Self {
443        let client = build_blocking_client(Some(std::time::Duration::from_secs(30)))
444            .expect("reqwest client construction is infallible with rustls + native roots");
445        Self {
446            base_url: base_url.trim_end_matches('/').to_string(),
447            client,
448        }
449    }
450
451    /// Allocate a fresh slot. Returns `(slot_id, slot_token)` — caller MUST
452    /// persist `slot_token` somewhere safe (mode 0600 file); it grants both
453    /// read and write access to the slot.
454    pub fn allocate_slot(&self, handle_hint: Option<&str>) -> Result<AllocateResponse> {
455        let body = serde_json::json!({"handle": handle_hint});
456        let resp = self
457            .client
458            .post(format!("{}/v1/slot/allocate", self.base_url))
459            .json(&body)
460            .send()
461            .with_context(|| format!("POST {}/v1/slot/allocate", self.base_url))?;
462        let status = resp.status();
463        if !status.is_success() {
464            let detail = resp.text().unwrap_or_default();
465            return Err(anyhow!("allocate failed: {status}: {detail}"));
466        }
467        Ok(resp.json()?)
468    }
469
470    /// POST a signed event to a slot. Caller passes the slot's bearer token
471    /// (the relay model in v0.1 is "shared slot token between paired peers" —
472    /// see iter 9 SPAKE2 for how this token gets exchanged).
473    pub fn post_event(
474        &self,
475        slot_id: &str,
476        slot_token: &str,
477        event: &Value,
478    ) -> Result<PostEventResponse> {
479        let body = serde_json::json!({"event": event});
480        let resp = self
481            .client
482            .post(format!("{}/v1/events/{slot_id}", self.base_url))
483            .bearer_auth(slot_token)
484            .json(&body)
485            .send()
486            .with_context(|| format!("POST {}/v1/events/{slot_id}", self.base_url))?;
487        let status = resp.status();
488        if !status.is_success() {
489            let detail = resp.text().unwrap_or_default();
490            // Format constraint: `cli::error_smells_like_slot_4xx` parses
491            // this error string to gate slot-rotation re-resolves. It
492            // matches `<status>` as a whole token bordered by space/colon
493            // (see issue #69) — `reqwest::StatusCode` Display gives
494            // `"410 Gone"` which satisfies that. If you change the
495            // wrapping (commas/brackets, `status=410`, JSON-encode it),
496            // update `error_smells_like_slot_4xx` in lockstep and add the
497            // new shape to its `slot_reresolve_tests` cases or peer slot
498            // rotations will silently stop auto-recovering.
499            return Err(anyhow!("post_event failed: {status}: {detail}"));
500        }
501        Ok(resp.json()?)
502    }
503
504    /// GET events from a slot. `since` is an event_id cursor (exclusive); pass
505    /// `None` for the full slot snapshot. `limit` defaults to 100, max 1000.
506    pub fn list_events(
507        &self,
508        slot_id: &str,
509        slot_token: &str,
510        since: Option<&str>,
511        limit: Option<usize>,
512    ) -> Result<Vec<Value>> {
513        let mut url = format!("{}/v1/events/{slot_id}", self.base_url);
514        let mut sep = '?';
515        if let Some(s) = since {
516            url.push(sep);
517            url.push_str(&format!("since={s}"));
518            sep = '&';
519        }
520        if let Some(n) = limit {
521            url.push(sep);
522            url.push_str(&format!("limit={n}"));
523        }
524        let resp = self
525            .client
526            .get(&url)
527            .bearer_auth(slot_token)
528            .send()
529            .with_context(|| format!("GET {url}"))?;
530        let status = resp.status();
531        if !status.is_success() {
532            let detail = resp.text().unwrap_or_default();
533            return Err(anyhow!("list_events failed: {status}: {detail}"));
534        }
535        Ok(resp.json()?)
536    }
537
538    /// R4 — probe slot attentiveness. Returns `(event_count, last_pull_at_unix)`
539    /// — the relay's view of the slot's owner's most recent poll. `None` for
540    /// `last_pull_at_unix` means the slot has not been pulled since relay
541    /// restart. Best-effort: any HTTP failure returns `Ok((0, None))` so the
542    /// caller's pre-flight check degrades to "no signal" rather than abort.
543    pub fn slot_state(&self, slot_id: &str, slot_token: &str) -> Result<(usize, Option<u64>)> {
544        let url = format!("{}/v1/slot/{slot_id}/state", self.base_url);
545        let resp = match self.client.get(&url).bearer_auth(slot_token).send() {
546            Ok(r) => r,
547            Err(_) => return Ok((0, None)),
548        };
549        if !resp.status().is_success() {
550            return Ok((0, None));
551        }
552        let v: Value = resp.json().unwrap_or(Value::Null);
553        let count = v.get("event_count").and_then(Value::as_u64).unwrap_or(0) as usize;
554        let last = v.get("last_pull_at_unix").and_then(Value::as_u64);
555        Ok((count, last))
556    }
557
558    pub fn responder_health_set(
559        &self,
560        slot_id: &str,
561        slot_token: &str,
562        record: &Value,
563    ) -> Result<Value> {
564        let resp = self
565            .client
566            .post(format!(
567                "{}/v1/slot/{slot_id}/responder-health",
568                self.base_url
569            ))
570            .bearer_auth(slot_token)
571            .json(record)
572            .send()
573            .with_context(|| {
574                format!("POST {}/v1/slot/{slot_id}/responder-health", self.base_url)
575            })?;
576        let status = resp.status();
577        if !status.is_success() {
578            let detail = resp.text().unwrap_or_default();
579            return Err(anyhow!("responder_health_set failed: {status}: {detail}"));
580        }
581        Ok(resp.json()?)
582    }
583
584    pub fn responder_health_get(&self, slot_id: &str, slot_token: &str) -> Result<Value> {
585        let resp = self
586            .client
587            .get(format!("{}/v1/slot/{slot_id}/state", self.base_url))
588            .bearer_auth(slot_token)
589            .send()
590            .with_context(|| format!("GET {}/v1/slot/{slot_id}/state", self.base_url))?;
591        let status = resp.status();
592        if !status.is_success() {
593            let detail = resp.text().unwrap_or_default();
594            return Err(anyhow!("responder_health_get failed: {status}: {detail}"));
595        }
596        let state: Value = resp.json()?;
597        Ok(state
598            .get("responder_health")
599            .cloned()
600            .unwrap_or(Value::Null))
601    }
602
603    pub fn healthz(&self) -> Result<bool> {
604        let resp = self
605            .client
606            .get(format!("{}/healthz", self.base_url))
607            .send()?;
608        Ok(resp.status().is_success())
609    }
610
611    /// Healthz pre-flight that surfaces the underlying reqwest error in its
612    /// own message. Use at every "is the relay reachable before we mutate
613    /// state" site. The three possible failure modes (network error, 5xx
614    /// from a reachable host, healthy) each get a distinct diagnostic line.
615    pub fn check_healthz(&self) -> anyhow::Result<()> {
616        match self.healthz() {
617            Ok(true) => Ok(()),
618            Ok(false) => anyhow::bail!(
619                "phyllis: silent line — {}/healthz returned non-200.\n\
620                 the host is reachable but the relay isn't returning ok. test:\n  \
621                 curl -v {}/healthz",
622                self.base_url,
623                self.base_url
624            ),
625            Err(e) => anyhow::bail!(
626                "phyllis: silent line — couldn't reach {}/healthz: {e:#}.\n\
627                 test reachability from this machine:\n  curl -v {}/healthz\n\
628                 if curl also fails, a sandbox / proxy / firewall is the usual cause.\n\
629                 (OpenShell sandbox? run `curl -fsSL https://wireup.net/openshell-policy.sh | bash -s <sandbox-name>` on the host first.)",
630                self.base_url,
631                self.base_url
632            ),
633        }
634    }
635
636    /// Open or join a pair-slot. Returns the relay-assigned `pair_id`.
637    /// `role` must be `"host"` or `"guest"`. The host calls first; the guest
638    /// uses the same `code_hash` and finds the existing slot.
639    pub fn pair_open(&self, code_hash: &str, msg_b64: &str, role: &str) -> Result<String> {
640        let body = serde_json::json!({"code_hash": code_hash, "msg": msg_b64, "role": role});
641        let resp = self
642            .client
643            .post(format!("{}/v1/pair", self.base_url))
644            .json(&body)
645            .send()?;
646        let status = resp.status();
647        if !status.is_success() {
648            let detail = resp.text().unwrap_or_default();
649            return Err(anyhow!("pair_open failed: {status}: {detail}"));
650        }
651        let v: Value = resp.json()?;
652        v.get("pair_id")
653            .and_then(Value::as_str)
654            .map(str::to_string)
655            .ok_or_else(|| anyhow!("pair_open response missing pair_id"))
656    }
657
658    /// Forget the pair-slot at this code_hash on the relay. Either side can call;
659    /// knowledge of the code is the only auth. Idempotent — succeeds even if the
660    /// slot doesn't exist. Use after a client crash mid-handshake so the host
661    /// doesn't stay locked out until TTL.
662    pub fn pair_abandon(&self, code_hash: &str) -> Result<()> {
663        let body = serde_json::json!({"code_hash": code_hash});
664        let resp = self
665            .client
666            .post(format!("{}/v1/pair/abandon", self.base_url))
667            .json(&body)
668            .send()?;
669        let status = resp.status();
670        if !status.is_success() {
671            let detail = resp.text().unwrap_or_default();
672            return Err(anyhow!("pair_abandon failed: {status}: {detail}"));
673        }
674        Ok(())
675    }
676
677    /// Read peer's SPAKE2 message + (eventually) sealed bootstrap from a pair-slot.
678    pub fn pair_get(
679        &self,
680        pair_id: &str,
681        as_role: &str,
682    ) -> Result<(Option<String>, Option<String>)> {
683        let resp = self
684            .client
685            .get(format!(
686                "{}/v1/pair/{pair_id}?as_role={as_role}",
687                self.base_url
688            ))
689            .send()?;
690        let status = resp.status();
691        if !status.is_success() {
692            let detail = resp.text().unwrap_or_default();
693            return Err(anyhow!("pair_get failed: {status}: {detail}"));
694        }
695        let v: Value = resp.json()?;
696        let peer_msg = v
697            .get("peer_msg")
698            .and_then(Value::as_str)
699            .map(str::to_string);
700        let peer_bootstrap = v
701            .get("peer_bootstrap")
702            .and_then(Value::as_str)
703            .map(str::to_string);
704        Ok((peer_msg, peer_bootstrap))
705    }
706
707    /// POST a sealed bootstrap payload to the pair-slot.
708    pub fn pair_bootstrap(&self, pair_id: &str, role: &str, sealed_b64: &str) -> Result<()> {
709        let body = serde_json::json!({"role": role, "sealed": sealed_b64});
710        let resp = self
711            .client
712            .post(format!("{}/v1/pair/{pair_id}/bootstrap", self.base_url))
713            .json(&body)
714            .send()?;
715        if !resp.status().is_success() {
716            let s = resp.status();
717            let detail = resp.text().unwrap_or_default();
718            return Err(anyhow!("pair_bootstrap failed: {s}: {detail}"));
719        }
720        Ok(())
721    }
722
723    /// Claim a `nick@<this-relay-domain>` handle (v0.5). Caller must hold
724    /// the bearer token for `slot_id`. FCFS on nick; same-DID re-claims OK.
725    ///
726    /// Back-compat wrapper around `handle_claim_v2` that omits the
727    /// `discoverable` field (relay defaults to discoverable on absence).
728    pub fn handle_claim(
729        &self,
730        nick: &str,
731        slot_id: &str,
732        slot_token: &str,
733        relay_url: Option<&str>,
734        card: &Value,
735    ) -> Result<Value> {
736        self.handle_claim_v2(nick, slot_id, slot_token, relay_url, card, None)
737    }
738
739    /// v0.5.19 (#9.1) variant accepting the optional `discoverable`
740    /// flag. `None` = relay default (= true, back-compat).
741    /// `Some(false)` = opt out of `/v1/handles` bulk listing while
742    /// keeping direct `.well-known/wire/agent` resolution working.
743    /// Relays older than v0.5.19 ignore the field — safe to always send.
744    pub fn handle_claim_v2(
745        &self,
746        nick: &str,
747        slot_id: &str,
748        slot_token: &str,
749        relay_url: Option<&str>,
750        card: &Value,
751        discoverable: Option<bool>,
752    ) -> Result<Value> {
753        let mut body = serde_json::json!({
754            "nick": nick,
755            "slot_id": slot_id,
756            "relay_url": relay_url,
757            "card": card,
758        });
759        if let Some(d) = discoverable {
760            body["discoverable"] = serde_json::json!(d);
761        }
762        let resp = self
763            .client
764            .post(format!("{}/v1/handle/claim", self.base_url))
765            .bearer_auth(slot_token)
766            .json(&body)
767            .send()
768            .with_context(|| format!("POST {}/v1/handle/claim", self.base_url))?;
769        let status = resp.status();
770        if !status.is_success() {
771            let detail = resp.text().unwrap_or_default();
772            return Err(anyhow!("handle_claim failed: {status}: {detail}"));
773        }
774        Ok(resp.json()?)
775    }
776
777    /// POST an intro (zero-paste pair-drop) event to a known nick's slot
778    /// without holding that slot's bearer token. Relay validates the event
779    /// is kind=1100 with an embedded signed agent-card; otherwise refuses.
780    pub fn handle_intro(&self, nick: &str, event: &Value) -> Result<Value> {
781        let body = serde_json::json!({"event": event});
782        let resp = self
783            .client
784            .post(format!("{}/v1/handle/intro/{nick}", self.base_url))
785            .json(&body)
786            .send()
787            .with_context(|| format!("POST {}/v1/handle/intro/{nick}", self.base_url))?;
788        let status = resp.status();
789        if !status.is_success() {
790            let detail = resp.text().unwrap_or_default();
791            return Err(anyhow!("handle_intro failed: {status}: {detail}"));
792        }
793        Ok(resp.json()?)
794    }
795
796    /// Resolve a handle on this relay via A2A v1.0 `.well-known/agent-card.json?handle=<nick>`.
797    /// Returns the parsed AgentCard JSON. Wire-served relays embed wire-native
798    /// fields (DID, slot_id, profile, raw card) under `extensions[0].params`.
799    /// Foreign A2A agents return their A2A card without wire ext — useful for
800    /// `wire whois` even when full mailbox pairing isn't possible.
801    pub fn well_known_agent_card_a2a(&self, handle: &str) -> Result<Value> {
802        let resp = self
803            .client
804            .get(format!("{}/.well-known/agent-card.json", self.base_url))
805            .query(&[("handle", handle)])
806            .send()
807            .with_context(|| {
808                format!(
809                    "GET {}/.well-known/agent-card.json?handle={handle}",
810                    self.base_url
811                )
812            })?;
813        let status = resp.status();
814        if !status.is_success() {
815            let detail = resp.text().unwrap_or_default();
816            return Err(anyhow!(
817                "well_known_agent_card_a2a failed: {status}: {detail}"
818            ));
819        }
820        Ok(resp.json()?)
821    }
822
823    /// Resolve a handle on this relay via `.well-known/wire/agent?handle=<nick>`.
824    /// Caller passes either the full `nick@domain` or just `<nick>` — the
825    /// server only uses the local part.
826    pub fn well_known_agent(&self, handle: &str) -> Result<Value> {
827        let resp = self
828            .client
829            .get(format!("{}/.well-known/wire/agent", self.base_url))
830            .query(&[("handle", handle)])
831            .send()
832            .with_context(|| {
833                format!(
834                    "GET {}/.well-known/wire/agent?handle={handle}",
835                    self.base_url
836                )
837            })?;
838        let status = resp.status();
839        if !status.is_success() {
840            let detail = resp.text().unwrap_or_default();
841            return Err(anyhow!("well_known_agent failed: {status}: {detail}"));
842        }
843        Ok(resp.json()?)
844    }
845}
846
847#[cfg(all(test, unix))]
848mod uds_tests {
849    use super::*;
850    use std::io::{Read, Write};
851    use std::os::unix::net::UnixListener;
852    use std::thread;
853
854    /// Spawn a one-shot UDS HTTP/1.1 server that returns a canned
855    /// response. Returns the socket path; cleanup is via drop of the
856    /// tempdir the caller manages.
857    fn spawn_canned_uds_server(socket_path: std::path::PathBuf, status: u16, body: &'static str) {
858        let listener = UnixListener::bind(&socket_path).expect("bind canned UDS");
859        thread::spawn(move || {
860            let (mut stream, _) = listener.accept().expect("accept canned UDS");
861            let mut req_buf = [0u8; 4096];
862            let _ = stream.read(&mut req_buf);
863            let body_bytes = body.as_bytes();
864            let status_text = match status {
865                200 => "OK",
866                201 => "Created",
867                400 => "Bad Request",
868                _ => "Status",
869            };
870            let resp = format!(
871                "HTTP/1.1 {status} {status_text}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
872                body_bytes.len()
873            );
874            let _ = stream.write_all(resp.as_bytes());
875        });
876    }
877
878    #[test]
879    fn uds_request_round_trips_200_with_body() {
880        let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
881        std::fs::create_dir_all(&tmpdir).unwrap();
882        let sock = tmpdir.join("rt.sock");
883        let _ = std::fs::remove_file(&sock);
884        spawn_canned_uds_server(sock.clone(), 200, r#"{"ok":true}"#);
885        // Give the server a moment to bind.
886        std::thread::sleep(std::time::Duration::from_millis(50));
887        let (status, body) = uds_request(
888            &sock,
889            "POST",
890            "/v1/test",
891            &[("Content-Type", "application/json")],
892            b"{}",
893        )
894        .expect("uds_request succeeds");
895        assert_eq!(status, 200);
896        assert_eq!(body, br#"{"ok":true}"#);
897    }
898
899    #[test]
900    fn uds_request_surfaces_non_2xx_status() {
901        let tmpdir = std::env::temp_dir().join(format!("wire-uds-test-{}", rand::random::<u32>()));
902        std::fs::create_dir_all(&tmpdir).unwrap();
903        let sock = tmpdir.join("err.sock");
904        let _ = std::fs::remove_file(&sock);
905        spawn_canned_uds_server(sock.clone(), 400, r#"{"error":"bad"}"#);
906        std::thread::sleep(std::time::Duration::from_millis(50));
907        let (status, body) = uds_request(&sock, "GET", "/v1/test", &[], b"")
908            .expect("uds_request succeeds even on 4xx");
909        assert_eq!(status, 400);
910        assert_eq!(body, br#"{"error":"bad"}"#);
911    }
912
913    #[test]
914    fn uds_request_fails_on_nonexistent_socket() {
915        let nope = std::path::Path::new("/tmp/wire-uds-nonexistent-socket-aaa.sock");
916        let _ = std::fs::remove_file(nope);
917        let err = uds_request(nope, "GET", "/", &[], b"").unwrap_err();
918        let msg = format!("{err:#}");
919        assert!(
920            msg.contains("connect UDS"),
921            "expected connect error, got: {msg}"
922        );
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929    use proptest::prelude::*;
930
931    #[test]
932    fn url_normalization_trims_trailing_slash() {
933        let c = RelayClient::new("http://example.com/");
934        assert_eq!(c.base_url, "http://example.com");
935        let c = RelayClient::new("http://example.com");
936        assert_eq!(c.base_url, "http://example.com");
937    }
938
939    #[test]
940    fn format_transport_error_classifies_tls() {
941        // Simulate the Avast/corp-proxy class from issue #6: reqwest wraps
942        // a rustls UnknownIssuer inside a hyper error inside a context URL.
943        let inner = anyhow!("invalid peer certificate: UnknownIssuer");
944        let middle: anyhow::Error = inner.context("hyper send");
945        let top = middle.context("POST https://relay.example/v1/events/abc");
946        let formatted = format_transport_error(&top);
947        assert!(
948            formatted.starts_with("TLS error:"),
949            "expected TLS class prefix, got: {formatted}"
950        );
951        assert!(
952            formatted.contains("UnknownIssuer"),
953            "lost root cause: {formatted}"
954        );
955        assert!(
956            formatted.contains("POST https://relay.example"),
957            "lost context URL: {formatted}"
958        );
959    }
960
961    #[test]
962    fn format_transport_error_classifies_timeout() {
963        let inner = anyhow!("operation timed out");
964        let top = inner.context("POST https://relay.example/v1/events/abc");
965        let formatted = format_transport_error(&top);
966        assert!(formatted.starts_with("timeout:"), "got: {formatted}");
967    }
968
969    #[test]
970    fn format_transport_error_classifies_dns() {
971        let inner = anyhow!("dns error: failed to lookup address");
972        let top = inner.context("POST https://relay.example/v1/events/abc");
973        let formatted = format_transport_error(&top);
974        assert!(formatted.starts_with("DNS error:"), "got: {formatted}");
975    }
976
977    #[test]
978    fn format_transport_error_falls_back_to_chain_join() {
979        // Unknown class → no prefix, just the joined chain. Behavior MUST
980        // still surface every cause (this is the loud-fail invariant).
981        let inner = anyhow!("Refused to connect for non-standard reason xyz");
982        let top = inner.context("POST https://relay.example/v1/events/abc");
983        let formatted = format_transport_error(&top);
984        assert!(formatted.contains("Refused to connect"));
985        assert!(formatted.contains("POST https://relay.example"));
986    }
987
988    #[test]
989    fn insecure_env_recognizes_truthy_values_and_default_off() {
990        // Process-global env var → must be one test, not two (otherwise
991        // parallel cargo-test threads race). Single test owns the var's
992        // lifecycle from "unset" through truthy values back to "unset".
993        use std::sync::{Mutex, OnceLock};
994        static GUARD: OnceLock<Mutex<()>> = OnceLock::new();
995        let _lock = GUARD.get_or_init(|| Mutex::new(())).lock().unwrap();
996
997        // SAFETY: env mutation here is serialized by the GUARD mutex;
998        // other tests in this module do not touch INSECURE_SKIP_TLS_ENV.
999        unsafe {
1000            std::env::remove_var(INSECURE_SKIP_TLS_ENV);
1001        }
1002        assert!(!insecure_skip_tls_verify(), "default must be secure");
1003
1004        for v in ["1", "true", "yes", "on", "TRUE", "Yes"] {
1005            unsafe {
1006                std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
1007            }
1008            assert!(insecure_skip_tls_verify(), "value {v:?} should be truthy");
1009        }
1010        // Falsy / unset round-trip back to secure.
1011        for v in ["0", "false", "no", "off", ""] {
1012            unsafe {
1013                std::env::set_var(INSECURE_SKIP_TLS_ENV, v);
1014            }
1015            assert!(
1016                !insecure_skip_tls_verify(),
1017                "value {v:?} must not enable insecure mode"
1018            );
1019        }
1020        unsafe {
1021            std::env::remove_var(INSECURE_SKIP_TLS_ENV);
1022        }
1023    }
1024
1025    fn org_did() -> &'static str {
1026        "did:wire:org:example-0123456789abcdef0123456789abcdef"
1027    }
1028
1029    fn op_did() -> &'static str {
1030        "did:wire:op:operator-abcdef0123456789abcdef0123456789"
1031    }
1032
1033    #[test]
1034    fn parse_wire_org_txt_record_dispatches_org_and_op_dids() {
1035        let org = parse_wire_org_txt_record(&format!(
1036            "did={}; relay=https://relay.example; sso_iss=https://issuer.example; sso_tenant=tenant; v=1",
1037            org_did()
1038        ))
1039        .unwrap();
1040        assert_eq!(org.did, WireOrgTxtDid::Org(org_did().to_string()));
1041        assert_eq!(org.relay.as_deref(), Some("https://relay.example"));
1042        assert_eq!(org.sso_iss.as_deref(), Some("https://issuer.example"));
1043        assert_eq!(org.sso_tenant.as_deref(), Some("tenant"));
1044
1045        let op = parse_wire_org_txt_record(&format!("did={}; v=1", op_did())).unwrap();
1046        assert_eq!(op.did, WireOrgTxtDid::Op(op_did().to_string()));
1047        assert_eq!(op.relay, None);
1048    }
1049
1050    #[test]
1051    fn parse_wire_org_txt_record_rejects_unknown_version_and_session_did() {
1052        let unknown_v = parse_wire_org_txt_record(&format!("did={}; v=2", org_did())).unwrap_err();
1053        assert_eq!(
1054            unknown_v,
1055            WireOrgTxtParseError::UnsupportedVersion("2".into())
1056        );
1057
1058        let session_did =
1059            parse_wire_org_txt_record("did=did:wire:session-01234567; v=1").unwrap_err();
1060        assert!(matches!(session_did, WireOrgTxtParseError::InvalidDid(_)));
1061    }
1062
1063    #[test]
1064    fn parse_wire_org_txt_record_rejects_duplicate_known_fields() {
1065        let err = parse_wire_org_txt_record(&format!("did={}; v=1; v=1", org_did())).unwrap_err();
1066        assert_eq!(err, WireOrgTxtParseError::DuplicateField("v"));
1067    }
1068
1069    proptest! {
1070        #[test]
1071        fn parse_wire_org_txt_record_ignores_unknown_fields_at_v1(
1072            unknown_fields in prop::collection::vec(
1073                (
1074                    "[a-z_][a-z0-9_]{0,16}",
1075                    "[A-Za-z0-9._:/-]{0,64}"
1076                ),
1077                0..32
1078            )
1079        ) {
1080            let mut record = format!("did={}; v=1", org_did());
1081            for (key, value) in unknown_fields {
1082                prop_assume!(!matches!(
1083                    key.as_str(),
1084                    "did" | "v" | "relay" | "sso_iss" | "sso_tenant"
1085                ));
1086                record.push_str("; ");
1087                record.push_str(&key);
1088                record.push('=');
1089                record.push_str(&value);
1090            }
1091
1092            let parsed = parse_wire_org_txt_record(&record).unwrap();
1093            prop_assert_eq!(parsed.did, WireOrgTxtDid::Org(org_did().to_string()));
1094        }
1095
1096        #[test]
1097        fn parse_wire_org_txt_record_rejects_every_unknown_version(
1098            version in "[A-Za-z0-9._-]{1,16}"
1099        ) {
1100            prop_assume!(version != "1");
1101            let record = format!("did={}; v={version}; future=opaque", org_did());
1102            let err = parse_wire_org_txt_record(&record).unwrap_err();
1103            prop_assert_eq!(err, WireOrgTxtParseError::UnsupportedVersion(version));
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109mod failover_tests {
1110    use super::*;
1111    use crate::endpoints::{Endpoint, EndpointScope};
1112    use std::sync::Mutex;
1113
1114    fn fed_ep(url: &str, slot: &str, token: &str) -> Endpoint {
1115        Endpoint::federation(url.to_string(), slot.to_string(), token.to_string())
1116    }
1117
1118    fn local_ep(url: &str, slot: &str, token: &str) -> Endpoint {
1119        Endpoint {
1120            relay_url: url.to_string(),
1121            slot_id: slot.to_string(),
1122            slot_token: token.to_string(),
1123            scope: EndpointScope::Local,
1124        }
1125    }
1126
1127    fn ok_resp() -> PostEventResponse {
1128        PostEventResponse {
1129            event_id: Some("evt-1".to_string()),
1130            status: "queued".to_string(),
1131        }
1132    }
1133
1134    #[test]
1135    fn first_endpoint_succeeds_no_further_attempts() {
1136        // Happy path: first endpoint accepts; subsequent endpoints are
1137        // never tried. Pins that failover doesn't churn unnecessary RTTs
1138        // when the primary works.
1139        let endpoints = vec![
1140            fed_ep("https://good.example", "slot1", "tok1"),
1141            fed_ep("https://other.example", "slot2", "tok2"),
1142        ];
1143        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1144        let result = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1145            attempts.lock().unwrap().push(ep.relay_url.clone());
1146            Ok(ok_resp())
1147        })
1148        .unwrap();
1149        assert_eq!(result.0.relay_url, "https://good.example");
1150        assert_eq!(
1151            *attempts.lock().unwrap(),
1152            vec!["https://good.example".to_string()],
1153            "must NOT try the second endpoint after the first succeeds"
1154        );
1155    }
1156
1157    #[test]
1158    fn skips_dead_endpoint_and_succeeds_on_next() {
1159        // The Bug 2 regression case: a peer advertises [bad, good]. Pre-fix,
1160        // send_pair_drop_ack would 4xx on `bad` and give up — bilateral pair
1161        // unreachable. Now the failover helper tries `bad`, records the
1162        // error, tries `good`, succeeds. Mirrors the swift-harbor ↔
1163        // coral-weasel incident exactly.
1164        let endpoints = vec![
1165            // Bad first endpoint (modeling the userinfo-malformed URL from
1166            // Bug 1 / the federation 400 coral-weasel hit on accept).
1167            fed_ep("https://copilot-agent@wireup.net", "slot-bad", "tok-bad"),
1168            // Clean second endpoint that actually works.
1169            fed_ep("https://wireup.net", "slot-good", "tok-good"),
1170        ];
1171        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1172        let (delivered_ep, _resp) =
1173            try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1174                attempts.lock().unwrap().push(ep.relay_url.clone());
1175                if ep.relay_url.contains('@') {
1176                    Err(anyhow!("400 Bad Request (userinfo embedded)"))
1177                } else {
1178                    Ok(ok_resp())
1179                }
1180            })
1181            .unwrap();
1182        assert_eq!(
1183            delivered_ep.relay_url, "https://wireup.net",
1184            "the successful endpoint must be the one returned to the caller"
1185        );
1186        assert_eq!(
1187            *attempts.lock().unwrap(),
1188            vec![
1189                "https://copilot-agent@wireup.net".to_string(),
1190                "https://wireup.net".to_string()
1191            ],
1192            "must try `bad` first, then fall over to `good`"
1193        );
1194    }
1195
1196    #[test]
1197    fn respects_priority_order_caller_supplies() {
1198        // We don't re-sort; we honor the caller's order. Typical input is
1199        // `peer_endpoints_in_priority_order` (UDS / Local / LAN / Federation),
1200        // so the "first tried" semantics encode the existing transport-
1201        // preference contract. Test: Local before Federation in input →
1202        // Local tried first.
1203        let endpoints = vec![
1204            local_ep("http://127.0.0.1:8771", "loc1", "loctok"),
1205            fed_ep("https://wireup.net", "fed1", "fedtok"),
1206        ];
1207        let attempts: Mutex<Vec<String>> = Mutex::new(Vec::new());
1208        let _ = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1209            attempts.lock().unwrap().push(ep.relay_url.clone());
1210            Ok(ok_resp())
1211        })
1212        .unwrap();
1213        assert_eq!(
1214            attempts.lock().unwrap()[0],
1215            "http://127.0.0.1:8771",
1216            "Local-scope endpoint must be tried first (per the caller's priority order)"
1217        );
1218    }
1219
1220    #[test]
1221    fn all_failures_returns_combined_error() {
1222        // All endpoints fail: the helper must combine the per-endpoint
1223        // reasons into a single error so the operator can diagnose without
1224        // re-tracing — same shape as cmd_push's failure logging.
1225        let endpoints = vec![
1226            fed_ep("https://a.example", "s", "t"),
1227            fed_ep("https://b.example", "s", "t"),
1228            fed_ep("https://c.example", "s", "t"),
1229        ];
1230        let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |ep, _| {
1231            Err(anyhow!("simulated 500 from {}", ep.relay_url))
1232        })
1233        .unwrap_err()
1234        .to_string();
1235        assert!(
1236            err.contains("all 3 endpoint(s) failed"),
1237            "error must surface the total count: {err}"
1238        );
1239        // Every endpoint URL appears in the combined error so each
1240        // failure is attributable.
1241        for u in [
1242            "https://a.example",
1243            "https://b.example",
1244            "https://c.example",
1245        ] {
1246            assert!(
1247                err.contains(u),
1248                "combined error must include each failing endpoint URL ({u}): {err}"
1249            );
1250        }
1251    }
1252
1253    #[test]
1254    fn empty_endpoints_returns_actionable_error() {
1255        // A peer with no pinned endpoints is unreachable by definition. The
1256        // helper must say so explicitly (not silently return Ok) and point
1257        // at the re-pair remediation.
1258        let endpoints: Vec<Endpoint> = Vec::new();
1259        let err = try_post_event_with_failover(&endpoints, &serde_json::json!({}), |_, _| {
1260            unreachable!("poster must not be called when endpoint list is empty")
1261        })
1262        .unwrap_err()
1263        .to_string();
1264        assert!(
1265            err.contains("no endpoints to deliver to"),
1266            "empty-list error must be explicit: {err}"
1267        );
1268        assert!(
1269            err.contains("re-pin") || err.contains("dial") || err.contains("pair"),
1270            "empty-list error must point at the remediation path: {err}"
1271        );
1272    }
1273}