Skip to main content

wire/cli/
relay.rs

1use anyhow::{Context, Result, anyhow, bail};
2use serde_json::{Value, json};
3
4use super::setup;
5use crate::{config, signing::sign_message_v31};
6
7// ---------- mcp / relay-server stubs ----------
8
9pub(super) fn cmd_mcp() -> Result<()> {
10    crate::mcp::run()
11}
12
13pub(super) fn cmd_relay_server(
14    bind: &str,
15    local_only: bool,
16    uds: Option<&std::path::Path>,
17) -> Result<()> {
18    // v0.7.0-alpha.16: --uds <path> takes the UDS transport path,
19    // overriding --bind. Implies --local-only semantics. Routed to a
20    // separate serve_uds entry point with a manual hyper accept loop
21    // (axum 0.7's `serve` is TcpListener-only).
22    if let Some(socket_path) = uds {
23        let base = if let Ok(home) = std::env::var("WIRE_HOME") {
24            std::path::PathBuf::from(home)
25                .join("state")
26                .join("wire-relay")
27                .join("uds")
28        } else {
29            dirs::state_dir()
30                .or_else(dirs::data_local_dir)
31                .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
32                .join("wire-relay")
33                .join("uds")
34        };
35        let runtime = tokio::runtime::Builder::new_multi_thread()
36            .enable_all()
37            .build()?;
38        return runtime.block_on(crate::relay_server::serve_uds(
39            socket_path.to_path_buf(),
40            base,
41        ));
42    }
43    // v0.5.17: --local-only refuses non-loopback binds. Catches the
44    // "wait did I just bind a publicly-reachable local-only relay" mistake
45    // at startup rather than discovering it via an empty phonebook later.
46    if local_only {
47        validate_loopback_bind(bind)?;
48    }
49    // Default state dir for the relay process: $WIRE_HOME/state/wire-relay
50    // (or `dirs::state_dir()/wire-relay`). Distinct from the CLI's state dir
51    // so a single user can run both client and server on one machine.
52    // For --local-only, suffix with /local so a single operator can run
53    // both a federation relay and a local-only relay without state collision.
54    let base = if let Ok(home) = std::env::var("WIRE_HOME") {
55        std::path::PathBuf::from(home)
56            .join("state")
57            .join("wire-relay")
58    } else {
59        dirs::state_dir()
60            .or_else(dirs::data_local_dir)
61            .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
62            .join("wire-relay")
63    };
64    let state_dir = if local_only { base.join("local") } else { base };
65    let runtime = tokio::runtime::Builder::new_multi_thread()
66        .enable_all()
67        .build()?;
68    runtime.block_on(crate::relay_server::serve_with_mode(
69        bind,
70        state_dir,
71        crate::relay_server::ServerMode { local_only },
72    ))
73}
74
75/// v0.5.17 loopback-bind guard. Refuses any address whose host portion
76/// resolves to something outside `127.0.0.0/8` or `::1`.
77///
78/// v0.7.0-alpha.11: relaxed to also accept RFC 1918 private IPv4
79/// (10/8, 172.16/12, 192.168/16) so `wire relay-server --bind
80/// <LAN-IP>:8772 --local-only` works for the alpha.9 LAN feature.
81///
82/// v0.7.0-alpha.15: also accept RFC 6598 CGNAT (100.64.0.0/10), which
83/// is the IP range Tailscale uses for tailnet addresses. Lets operators
84/// pair wire across machines using their tailnet IPs (e.g. Mac at
85/// 100.96.234.16, Spark at 100.91.57.17) — Tailscale handles
86/// auth + encryption + NAT traversal, wire handles protocol + identity.
87/// Sidesteps host firewall config entirely (utun interface bypass).
88///
89/// Still refuses: public IPv4/IPv6, wildcards (0.0.0.0/::), link-local,
90/// multicast, broadcast. Those would publish a "local-only" relay to
91/// the global internet — the v0.5.17 security gate's whole point.
92fn validate_loopback_bind(bind: &str) -> Result<()> {
93    // Split host:port. IPv6 literals use `[::]:port` form.
94    let host = if let Some(stripped) = bind.strip_prefix('[') {
95        let close = stripped
96            .find(']')
97            .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
98        stripped[..close].to_string()
99    } else {
100        bind.rsplit_once(':')
101            .map(|(h, _)| h.to_string())
102            .unwrap_or_else(|| bind.to_string())
103    };
104    use std::net::{IpAddr, ToSocketAddrs};
105    let probe = format!("{host}:0");
106    let resolved: Vec<_> = probe
107        .to_socket_addrs()
108        .with_context(|| format!("resolving bind host {host:?}"))?
109        .collect();
110    if resolved.is_empty() {
111        bail!("--local-only: bind host {host:?} resolved to no addresses");
112    }
113    for addr in &resolved {
114        let ip = addr.ip();
115        let is_acceptable = match ip {
116            IpAddr::V4(v4) => {
117                v4.is_loopback() || v4.is_private() || {
118                    // RFC 6598 CGNAT / Tailscale range: 100.64.0.0/10
119                    let octets = v4.octets();
120                    octets[0] == 100 && (64..=127).contains(&octets[1])
121                }
122            }
123            IpAddr::V6(v6) => v6.is_loopback(), // ULA + Tailscale-v6 deferred
124        };
125        if !is_acceptable {
126            bail!(
127                "--local-only refuses non-private bind: {host:?} resolves to {ip} \
128                 which is not loopback (127/8, ::1), RFC 1918 private \
129                 (10/8, 172.16/12, 192.168/16), or RFC 6598 CGNAT/Tailscale \
130                 (100.64.0.0/10). Remove --local-only to bind publicly."
131            );
132        }
133    }
134    Ok(())
135}
136
137// ---------- bind-relay ----------
138
139fn parse_scope(s: &str) -> Result<crate::endpoints::EndpointScope> {
140    use crate::endpoints::EndpointScope;
141    match s.to_lowercase().as_str() {
142        "federation" | "fed" => Ok(EndpointScope::Federation),
143        "local" => Ok(EndpointScope::Local),
144        "lan" => Ok(EndpointScope::Lan),
145        "uds" => Ok(EndpointScope::Uds),
146        other => bail!("unknown --scope `{other}` (expected federation|local|lan|uds)"),
147    }
148}
149
150/// v0.12: bind a relay slot. ADDITIVE by default — the new slot is
151/// appended to `self.endpoints[]`, keeping any existing slots so an agent
152/// can hold a local relay AND a federation relay simultaneously without
153/// black-holing pinned peers. `--replace` restores the pre-v0.12
154/// destructive single-slot behavior (guarded by issue #7).
155pub(crate) fn cmd_bind_relay(
156    url: &str,
157    scope: Option<&str>,
158    replace: bool,
159    migrate_pinned: bool,
160    as_json: bool,
161) -> Result<()> {
162    use crate::endpoints::{Endpoint, self_endpoints};
163
164    if !config::is_initialized()? {
165        bail!("not initialized — run `wire up` first");
166    }
167    let card = config::read_agent_card()?;
168    let did = card.get("did").and_then(Value::as_str).unwrap_or("");
169    let handle = crate::agent_card::display_handle_from_did(did).to_string();
170
171    let normalized_raw = url.trim_end_matches('/');
172    // Refuse to record/publish a relay endpoint that embeds userinfo —
173    // `https://<handle>@<host>` 4xxes every inbound event POST. Strip and
174    // warn so operators learn the right shape without losing the call.
175    let normalized_owned = setup::strip_relay_url_userinfo(normalized_raw);
176    let normalized = normalized_owned.as_str();
177    // Belt-and-suspenders: confirm the post-strip URL is clean before any
178    // persist / publish. A future code path that bypasses the strip filter
179    // MUST NOT be able to leak userinfo into the signed agent-card.
180    setup::assert_relay_url_clean_for_publish(normalized)?;
181    let new_scope = match scope {
182        Some(s) => parse_scope(s)?,
183        None => crate::endpoints::infer_scope_from_url(normalized),
184    };
185
186    let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
187    let pinned: Vec<String> = existing
188        .get("peers")
189        .and_then(|p| p.as_object())
190        .map(|o| o.keys().cloned().collect())
191        .unwrap_or_default();
192
193    let existing_eps = self_endpoints(&existing);
194    let is_rebind_same = existing_eps.iter().any(|e| e.relay_url == normalized);
195
196    // Destructive paths that black-hole pinned peers (issue #7):
197    //   • `--replace` drops every other slot.
198    //   • re-binding the SAME relay rotates that slot in place.
199    // An additive bind of a NEW relay keeps existing slots, so peers stay
200    // reachable — no acknowledgement required. This is the v0.12 default
201    // that unblocks simultaneous local + remote.
202    let destructive = replace || is_rebind_same;
203    if destructive && !pinned.is_empty() && !migrate_pinned {
204        let list = pinned.join(", ");
205        let why = if replace {
206            "`--replace` drops your other slot(s)"
207        } else {
208            "re-binding the same relay rotates its slot"
209        };
210        bail!(
211            "bind-relay would black-hole {n} pinned peer(s): {list}. {why}; they are \
212             pinned to your CURRENT slot and would keep pushing to a slot you no longer \
213             read.\n\n\
214             SAFE PATHS:\n\
215             • Default (omit `--replace`) ADDITIVELY binds a NEW relay, keeping existing \
216             slots — no black-hole.\n\
217             • `wire rotate-slot` — same-relay rotation that emits wire_close to peers.\n\
218             • `wire bind-relay {url} --migrate-pinned` — proceed anyway; re-pair each \
219             peer out-of-band.\n\n\
220             Issue #7 (silent black-hole on relay change) caught this.",
221            n = pinned.len(),
222        );
223    }
224
225    let client = crate::relay_client::RelayClient::new(normalized);
226    client.check_healthz()?;
227    let alloc = client.allocate_slot(Some(&handle))?;
228
229    if destructive && !pinned.is_empty() {
230        eprintln!(
231            "wire bind-relay: {mode} with {n} pinned peer(s) — they will black-hole \
232             until they re-pin: {peers}",
233            mode = if replace { "replacing" } else { "rotating" },
234            n = pinned.len(),
235            peers = pinned.join(", "),
236        );
237    }
238
239    // Write the new slot via the single source of truth for the self-slot
240    // shape. Additive by default; --replace starts from an empty self so
241    // only this slot remains.
242    let mut state = existing;
243    if replace {
244        state["self"] = Value::Null;
245    }
246    crate::endpoints::upsert_self_endpoint(
247        &mut state,
248        Endpoint {
249            relay_url: normalized.to_string(),
250            slot_id: alloc.slot_id.clone(),
251            slot_token: alloc.slot_token.clone(),
252            scope: new_scope,
253        },
254    );
255    config::write_relay_state(&state)?;
256    let eps = self_endpoints(&state);
257
258    let scope_str = format!("{new_scope:?}").to_lowercase();
259    if as_json {
260        println!(
261            "{}",
262            serde_json::to_string(&json!({
263                "relay_url": normalized,
264                "slot_id": alloc.slot_id,
265                "scope": scope_str,
266                "endpoints": eps.len(),
267                "additive": !replace,
268                "slot_token_present": true,
269            }))?
270        );
271    } else {
272        println!(
273            "bound {scope_str} slot on {normalized} (slot {})",
274            alloc.slot_id
275        );
276        println!(
277            "self now has {n} endpoint(s): {list}",
278            n = eps.len(),
279            list = eps
280                .iter()
281                .map(|e| format!("{}({:?})", e.relay_url, e.scope))
282                .collect::<Vec<_>>()
283                .join(", "),
284        );
285    }
286    Ok(())
287}
288
289// ---------- add-peer-slot ----------
290
291pub(super) fn cmd_add_peer_slot(
292    handle: &str,
293    url: &str,
294    slot_id: &str,
295    slot_token: &str,
296    as_json: bool,
297) -> Result<()> {
298    use crate::endpoints::{Endpoint, infer_scope_from_url, pin_peer_endpoints};
299    let mut state = config::read_relay_state()?;
300
301    // E3 (v0.13.2): ADD this slot to the peer's endpoint set — don't REPLACE
302    // the whole entry. The old flat `peers.insert` clobbered an existing
303    // peer's federation endpoint when pinning a local slot, silently dropping
304    // the federation route (glossy-magnolia + wisp-blossom repro: pinning a
305    // loopback slot made the peer flat loopback-only). Mirror bind-relay's
306    // additive semantics: upsert by relay_url into the peer's endpoints[].
307    let new_ep = Endpoint {
308        relay_url: url.to_string(),
309        slot_id: slot_id.to_string(),
310        slot_token: slot_token.to_string(),
311        scope: infer_scope_from_url(url),
312    };
313    // RFC-006 Part B: `endpoints[]` is the single peer-routing source — no flat
314    // fallback (every pin carries `endpoints[]`).
315    let mut endpoints: Vec<Endpoint> = state
316        .get("peers")
317        .and_then(|p| p.get(handle))
318        .and_then(|e| e.get("endpoints"))
319        .and_then(|a| serde_json::from_value::<Vec<Endpoint>>(a.clone()).ok())
320        .unwrap_or_default();
321    // Upsert by relay_url: refresh in place if already pinned, else append.
322    if let Some(existing) = endpoints
323        .iter_mut()
324        .find(|e| e.relay_url == new_ep.relay_url)
325    {
326        *existing = new_ep;
327    } else {
328        endpoints.push(new_ep);
329    }
330    let n = endpoints.len();
331    pin_peer_endpoints(&mut state, handle, &endpoints)?;
332    config::write_relay_state(&state)?;
333    if as_json {
334        println!(
335            "{}",
336            serde_json::to_string(&json!({
337                "handle": handle,
338                "relay_url": url,
339                "slot_id": slot_id,
340                "added": true,
341                "endpoint_count": n,
342            }))?
343        );
344    } else {
345        println!(
346            "pinned peer slot for {handle} at {url} ({slot_id}) — peer now has {n} endpoint(s)"
347        );
348    }
349    Ok(())
350}
351
352// ---------- push ----------
353
354pub(super) fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
355    let mut state = config::read_relay_state()?;
356    let peers = state["peers"].as_object().cloned().unwrap_or_default();
357    if peers.is_empty() {
358        bail!(
359            "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
360        );
361    }
362    let outbox_dir = config::outbox_dir()?;
363    // v0.5.13 loud-fail: warn on outbox files that don't match a pinned peer.
364    // Pre-v0.5.13 `wire send peer@relay` wrote to `peer@relay.jsonl` while
365    // push only enumerated bare-handle files. After upgrade, stale FQDN-named
366    // files sit on disk forever; warn so operator can `cat fqdn.jsonl >> handle.jsonl`.
367    if outbox_dir.exists() {
368        let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
369        for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
370            let path = entry.path();
371            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
372                continue;
373            }
374            let stem = match path.file_stem().and_then(|s| s.to_str()) {
375                Some(s) => s.to_string(),
376                None => continue,
377            };
378            if pinned.contains(&stem) {
379                continue;
380            }
381            // Try the bare-handle of the orphaned stem — if THAT matches a
382            // pinned peer, the stem is a stale FQDN-suffixed file.
383            let bare = crate::agent_card::bare_handle(&stem);
384            if pinned.contains(bare) {
385                eprintln!(
386                    "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
387                     Merge with: `cat {} >> {}` then delete the FQDN file.",
388                    stem,
389                    path.display(),
390                    outbox_dir.join(format!("{bare}.jsonl")).display(),
391                );
392            }
393        }
394    }
395    if !outbox_dir.exists() {
396        if as_json {
397            println!(
398                "{}",
399                serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
400            );
401        } else {
402            println!("phyllis: nothing to dial out — write a message first with `wire send`");
403        }
404        return Ok(());
405    }
406
407    let mut pushed = Vec::new();
408    let mut skipped = Vec::new();
409
410    // Issue #15: track which peers we've already re-resolved this push call
411    // so we don't whois more than once per peer per push (the rate limit the
412    // issue specifies). Lifetime is the whole `cmd_push` invocation; clears
413    // every time the operator (or daemon) runs `wire push` again.
414    let mut rotated_this_push: std::collections::HashSet<String> = std::collections::HashSet::new();
415    // Track whether we mutated `state` so we can write it back exactly
416    // once at the end (avoids a write per peer).
417    let mut state_dirty = false;
418
419    // v0.5.17: walk each peer's pinned endpoints in priority order (local
420    // first if we share a local relay, federation second). Try POST on the
421    // first endpoint; on transport failure, fall through to the next.
422    // Falls back to the v0.5.16 legacy single-endpoint code path when the
423    // peer record carries no `endpoints[]` array (back-compat).
424    for (peer_handle, _) in peers.iter() {
425        if let Some(want) = peer_filter
426            && peer_handle != want
427        {
428            continue;
429        }
430        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
431        if !outbox.exists() {
432            continue;
433        }
434        let mut ordered_endpoints =
435            crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
436        if ordered_endpoints.is_empty() {
437            // Unreachable peer (no federation endpoint AND our local
438            // relay doesn't match the peer's). Skip with a loud reason
439            // rather than silently dropping events.
440            for line in std::fs::read_to_string(&outbox).unwrap_or_default().lines() {
441                let event: Value = match serde_json::from_str(line) {
442                    Ok(v) => v,
443                    Err(_) => continue,
444                };
445                let event_id = event
446                    .get("event_id")
447                    .and_then(Value::as_str)
448                    .unwrap_or("")
449                    .to_string();
450                skipped.push(json!({
451                    "peer": peer_handle,
452                    "event_id": event_id,
453                    "reason": "no reachable endpoint pinned for peer",
454                }));
455            }
456            continue;
457        }
458        let body = std::fs::read_to_string(&outbox)?;
459        for line in body.lines() {
460            let event: Value = match serde_json::from_str(line) {
461                Ok(v) => v,
462                Err(_) => continue,
463            };
464            let event_id = event
465                .get("event_id")
466                .and_then(Value::as_str)
467                .unwrap_or("")
468                .to_string();
469
470            // Capture the most recent per-endpoint error reason via a RefCell
471            // so we can preserve cmd_push's pre-existing "last-error wins"
472            // semantics for the skipped-with-reason path. The shared
473            // try_post_event_with_failover helper (from #62) handles iteration,
474            // priority order, and early-return on first success; the closure
475            // applies the existing `format_transport_error` formatting on
476            // each individual error so the operator sees the same diagnostic
477            // text as before the dedup.
478            let last_err: std::cell::RefCell<Option<String>> = std::cell::RefCell::new(None);
479            match crate::relay_client::try_post_event_with_failover(
480                &ordered_endpoints,
481                &event,
482                |endpoint, ev| {
483                    let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
484                    match client.post_event(&endpoint.slot_id, &endpoint.slot_token, ev) {
485                        Ok(resp) => Ok(resp),
486                        Err(e) => {
487                            *last_err.borrow_mut() =
488                                Some(crate::relay_client::format_transport_error(&e));
489                            Err(e)
490                        }
491                    }
492                },
493            ) {
494                Ok((endpoint, resp)) => {
495                    if resp.status == "duplicate" {
496                        skipped.push(json!({
497                            "peer": peer_handle,
498                            "event_id": event_id,
499                            "reason": "duplicate",
500                            "endpoint": endpoint.relay_url,
501                            "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
502                        }));
503                    } else {
504                        pushed.push(json!({
505                            "peer": peer_handle,
506                            "event_id": event_id,
507                            "endpoint": endpoint.relay_url,
508                            "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
509                        }));
510                    }
511                }
512                Err(_) => {
513                    // Issue #15: before reporting the event as skipped, see
514                    // if the failure smelled like a slot-rotation (4xx 404 /
515                    // 410). If yes AND we haven't already re-resolved this
516                    // peer in this push call, attempt one whois lookup. On
517                    // a real rotation, the helper updates `state.peers[peer]`
518                    // in place; we refresh `ordered_endpoints` from the
519                    // mutated state and retry the same event once. Composes
520                    // with the doctor #14 staleness check from PR #68: #14
521                    // surfaces the symptom, #15 closes the loop.
522                    let last_err_text = last_err.borrow().clone().unwrap_or_default();
523                    let mut delivered_via_retry: Option<(crate::endpoints::Endpoint, _)> = None;
524                    match try_reresolve_peer_on_slot_4xx(
525                        &mut state,
526                        peer_handle,
527                        &last_err_text,
528                        &rotated_this_push,
529                    ) {
530                        Ok(true) => {
531                            // Mark this peer as already re-resolved this push.
532                            rotated_this_push.insert(peer_handle.clone());
533                            state_dirty = true;
534                            // Refresh endpoints from the updated state and
535                            // retry exactly once. last_err is also reset so
536                            // the retry's error (if any) replaces the prior
537                            // one in the eventual skipped reason.
538                            ordered_endpoints = crate::endpoints::peer_endpoints_in_priority_order(
539                                &state,
540                                peer_handle,
541                            );
542                            *last_err.borrow_mut() = None;
543                            if let Ok((endpoint, resp)) =
544                                crate::relay_client::try_post_event_with_failover(
545                                    &ordered_endpoints,
546                                    &event,
547                                    |endpoint, ev| {
548                                        let client = crate::relay_client::RelayClient::new(
549                                            &endpoint.relay_url,
550                                        );
551                                        match client.post_event(
552                                            &endpoint.slot_id,
553                                            &endpoint.slot_token,
554                                            ev,
555                                        ) {
556                                            Ok(resp) => Ok(resp),
557                                            Err(e) => {
558                                                *last_err.borrow_mut() = Some(
559                                                    crate::relay_client::format_transport_error(&e),
560                                                );
561                                                Err(e)
562                                            }
563                                        }
564                                    },
565                                )
566                            {
567                                delivered_via_retry = Some((endpoint, resp));
568                            }
569                        }
570                        Ok(false) => {
571                            // Either not a slot-rotation shape, or already
572                            // re-resolved this push, or slot id unchanged —
573                            // fall through to the original skipped path.
574                        }
575                        Err(e) => {
576                            // Re-resolve itself failed (DNS down, relay 5xx,
577                            // handle unclaimed, etc.). Don't fail the push —
578                            // fall through to skipped with the resolve error
579                            // appended for diagnostic context.
580                            *last_err.borrow_mut() = Some(format!(
581                                "{}; re-resolve also failed: {e:#}",
582                                last_err.borrow().clone().unwrap_or_default()
583                            ));
584                            // Mark as tried so we don't loop on the next event.
585                            rotated_this_push.insert(peer_handle.clone());
586                        }
587                    }
588                    if let Some((endpoint, resp)) = delivered_via_retry {
589                        if resp.status == "duplicate" {
590                            skipped.push(json!({
591                                "peer": peer_handle,
592                                "event_id": event_id,
593                                "reason": "duplicate",
594                                "endpoint": endpoint.relay_url,
595                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
596                                "via": "slot_reresolve_retry",
597                            }));
598                        } else {
599                            pushed.push(json!({
600                                "peer": peer_handle,
601                                "event_id": event_id,
602                                "endpoint": endpoint.relay_url,
603                                "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
604                                "via": "slot_reresolve_retry",
605                            }));
606                        }
607                    } else {
608                        // Every endpoint failed even after (any) retry.
609                        // Preserve the prior "last reason is what gets
610                        // reported" UX (the closure captured the last per-
611                        // endpoint error via `last_err`).
612                        skipped.push(json!({
613                            "peer": peer_handle,
614                            "event_id": event_id,
615                            "reason": last_err
616                                .borrow()
617                                .clone()
618                                .unwrap_or_else(|| "all endpoints failed".to_string()),
619                        }));
620                    }
621                }
622            }
623        }
624    }
625
626    // Issue #15: persist any in-place slot rotations from the per-peer loop
627    // exactly once at the end. Best-effort: if the write fails the operator
628    // still gets a valid push report, and the next push will re-attempt the
629    // resolve (cheap) before retrying delivery.
630    if state_dirty && let Err(e) = config::write_relay_state(&state) {
631        eprintln!(
632            "wire push: WARN failed to persist rotated peer slots: {e:#}. \
633             Slot rotation will be re-attempted on next push."
634        );
635    }
636
637    if as_json {
638        println!(
639            "{}",
640            serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
641        );
642    } else {
643        println!(
644            "pushed {} event(s); skipped {} ({})",
645            pushed.len(),
646            skipped.len(),
647            if skipped.is_empty() {
648                "none"
649            } else {
650                "see --json for detail"
651            }
652        );
653    }
654    Ok(())
655}
656
657// ---------- pull ----------
658
659pub(super) fn cmd_pull(as_json: bool) -> Result<()> {
660    let state = config::read_relay_state()?;
661    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
662    if self_state.is_null() {
663        bail!("self slot not bound — run `wire bind-relay <url>` first");
664    }
665
666    // v0.5.17: pull from every endpoint in self.endpoints (federation +
667    // optional local). Each endpoint has its own per-scope cursor so we
668    // don't re-pull events we've already seen on that path. Events from
669    // all endpoints feed into the same inbox JSONL via process_events;
670    // dedup by event_id is the last line of defense.
671    // Falls back to a single federation endpoint synthesized from the
672    // top-level legacy fields when self.endpoints is absent (v0.5.16
673    // back-compat).
674    let endpoints = crate::endpoints::self_endpoints(&state);
675    if endpoints.is_empty() {
676        bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
677    }
678
679    let inbox_dir = config::inbox_dir()?;
680    config::ensure_dirs()?;
681
682    let mut total_seen = 0usize;
683    let mut all_written: Vec<Value> = Vec::new();
684    let mut all_rejected: Vec<Value> = Vec::new();
685    let mut all_blocked = false;
686    let mut all_advance_cursor_to: Option<String> = None;
687
688    for endpoint in &endpoints {
689        let cursor_key = endpoint_cursor_key(endpoint.scope);
690        let last_event_id = self_state
691            .get(&cursor_key)
692            .and_then(Value::as_str)
693            .map(str::to_string);
694        let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
695        let events = match client.list_events(
696            &endpoint.slot_id,
697            &endpoint.slot_token,
698            last_event_id.as_deref(),
699            Some(1000),
700        ) {
701            Ok(ev) => ev,
702            Err(e) => {
703                // One endpoint's failure shouldn't kill the whole pull.
704                // The local-relay-down case in particular needs to
705                // gracefully continue against federation.
706                eprintln!(
707                    "wire pull: endpoint {} ({:?}) errored: {}; continuing",
708                    endpoint.relay_url,
709                    endpoint.scope,
710                    crate::relay_client::format_transport_error(&e),
711                );
712                continue;
713            }
714        };
715        total_seen += events.len();
716        let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
717        all_written.extend(result.written.iter().cloned());
718        all_rejected.extend(result.rejected.iter().cloned());
719        if result.blocked {
720            all_blocked = true;
721        }
722        // Advance per-endpoint cursor. The cursor key is scope-specific
723        // so federation and local don't trample each other.
724        if let Some(eid) = result.advance_cursor_to.clone() {
725            if endpoint.scope == crate::endpoints::EndpointScope::Federation {
726                all_advance_cursor_to = Some(eid.clone());
727            }
728            let key = cursor_key.clone();
729            config::update_relay_state(|state| {
730                if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
731                    self_obj.insert(key, Value::String(eid));
732                }
733                Ok(())
734            })?;
735        }
736    }
737
738    // Compatibility shim for the legacy single-cursor code paths below:
739    // `result` used to come from one process_events call; we now have
740    // per-endpoint results aggregated into the all_* accumulators.
741    // Reconstruct a synthetic result for the remaining display logic.
742    let result = crate::pull::PullResult {
743        written: all_written,
744        rejected: all_rejected,
745        blocked: all_blocked,
746        advance_cursor_to: all_advance_cursor_to,
747    };
748    let events_len = total_seen;
749
750    // Cursor advance happened per-endpoint above; no aggregate cursor
751    // write needed here.
752
753    if as_json {
754        println!(
755            "{}",
756            serde_json::to_string(&json!({
757                "written": result.written,
758                "rejected": result.rejected,
759                "total_seen": events_len,
760                "cursor_blocked": result.blocked,
761                "cursor_advanced_to": result.advance_cursor_to,
762            }))?
763        );
764    } else {
765        let blocking = result
766            .rejected
767            .iter()
768            .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
769            .count();
770        if blocking > 0 {
771            println!(
772                "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
773                events_len,
774                result.written.len(),
775                result.rejected.len(),
776                blocking,
777            );
778        } else {
779            println!(
780                "pulled {} event(s); wrote {}; rejected {}",
781                events_len,
782                result.written.len(),
783                result.rejected.len(),
784            );
785        }
786    }
787    Ok(())
788}
789
790/// v0.5.17: cursor key for an endpoint's per-scope read position.
791/// Federation keeps the v0.5.16 legacy key `last_pulled_event_id` for
792/// back-compat with on-disk relay_state files; local uses a
793/// `_local` suffix.
794fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
795    match scope {
796        crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
797        crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
798        crate::endpoints::EndpointScope::Lan => "last_pulled_event_id_lan".to_string(),
799        crate::endpoints::EndpointScope::Uds => "last_pulled_event_id_uds".to_string(),
800    }
801}
802
803// ---------- rotate-slot ----------
804
805pub(super) fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
806    if !config::is_initialized()? {
807        bail!("not initialized — run `wire up` first");
808    }
809    let mut state = config::read_relay_state()?;
810    let self_state = state.get("self").cloned().unwrap_or(Value::Null);
811    if self_state.is_null() {
812        bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
813    }
814    // v0.9: route through self_primary_endpoint so v0.5.17+ sessions
815    // (which write only self.endpoints[]) can rotate. Pre-v0.9 read
816    // top-level legacy fields directly and bailed for those sessions.
817    let primary = crate::endpoints::self_primary_endpoint(&state)
818        .ok_or_else(|| anyhow!("self has no resolvable inbound endpoint to rotate"))?;
819    let url = primary.relay_url.clone();
820    let old_slot_id = primary.slot_id.clone();
821    let old_slot_token = primary.slot_token.clone();
822
823    // Read identity to sign the announcement.
824    let card = config::read_agent_card()?;
825    let did = card
826        .get("did")
827        .and_then(Value::as_str)
828        .unwrap_or("")
829        .to_string();
830    let handle = crate::agent_card::display_handle_from_did(&did).to_string();
831    let pk_b64 = card
832        .get("verify_keys")
833        .and_then(Value::as_object)
834        .and_then(|m| m.values().next())
835        .and_then(|v| v.get("key"))
836        .and_then(Value::as_str)
837        .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
838        .to_string();
839    let pk_bytes = crate::signing::b64decode(&pk_b64)?;
840    let sk_seed = config::read_private_key()?;
841
842    // Allocate new slot on the same relay.
843    let normalized = url.trim_end_matches('/').to_string();
844    let client = crate::relay_client::RelayClient::new(&normalized);
845    client
846        .check_healthz()
847        .context("aborting rotation; old slot still valid")?;
848    let alloc = client.allocate_slot(Some(&handle))?;
849    let new_slot_id = alloc.slot_id.clone();
850    let new_slot_token = alloc.slot_token.clone();
851
852    // Optionally announce the rotation to every paired peer via the OLD slot.
853    // Each peer's recipient-side `wire pull` will pick up this event before
854    // their daemon next polls the new slot — but auto-update of peer's
855    // relay.json from a wire_close event is a v0.2 daemon feature; for now
856    // peers see the event and an operator must manually `add-peer-slot` the
857    // new coords, OR re-pair via SAS.
858    let mut announced: Vec<String> = Vec::new();
859    if !no_announce {
860        let now = time::OffsetDateTime::now_utc()
861            .format(&time::format_description::well_known::Rfc3339)
862            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
863        let body = json!({
864            "reason": "operator-initiated slot rotation",
865            "new_relay_url": url,
866            "new_slot_id": new_slot_id,
867            // NOTE: new_slot_token deliberately NOT shared in the broadcast.
868            // In v0.1 slot tokens are bilateral-shared, so peer can post via
869            // existing add-peer-slot flow if operator chooses to re-issue.
870        });
871        let peers = state["peers"].as_object().cloned().unwrap_or_default();
872        for (peer_handle, _peer_info) in peers.iter() {
873            let event = json!({
874                "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
875                "timestamp": now.clone(),
876                "from": did,
877                "to": format!("did:wire:{peer_handle}"),
878                "type": "wire_close",
879                "kind": 1201,
880                "body": body.clone(),
881            });
882            let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
883                Ok(s) => s,
884                Err(e) => {
885                    eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
886                    continue;
887                }
888            };
889            // Post to OUR old slot (we're announcing on our own slot, NOT
890            // peer's slot — peer reads from us). Wait, this is wrong: peers
891            // read from THEIR OWN slot via wire pull. To reach peer A, we
892            // post to peer A's slot. Use the existing per-peer slot mapping.
893            let peer_info = match state["peers"].get(peer_handle) {
894                Some(p) => p.clone(),
895                None => continue,
896            };
897            let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
898            let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
899            let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
900            if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
901                continue;
902            }
903            let peer_client = if peer_url == url {
904                client.clone()
905            } else {
906                crate::relay_client::RelayClient::new(peer_url)
907            };
908            match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
909                Ok(_) => announced.push(peer_handle.clone()),
910                Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
911            }
912        }
913    }
914
915    // Swap the self-slot to the new one.
916    state["self"] = json!({
917        "relay_url": url,
918        "slot_id": new_slot_id,
919        "slot_token": new_slot_token,
920    });
921    config::write_relay_state(&state)?;
922
923    if as_json {
924        println!(
925            "{}",
926            serde_json::to_string(&json!({
927                "rotated": true,
928                "old_slot_id": old_slot_id,
929                "new_slot_id": new_slot_id,
930                "relay_url": url,
931                "announced_to": announced,
932            }))?
933        );
934    } else {
935        println!("rotated slot on {url}");
936        println!(
937            "  old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
938        );
939        println!("  new slot_id: {new_slot_id}");
940        if !announced.is_empty() {
941            println!(
942                "  announced wire_close (kind=1201) to: {}",
943                announced.join(", ")
944            );
945        }
946        println!();
947        println!("next steps:");
948        println!("  - peers see the wire_close event in their next `wire pull`");
949        println!(
950            "  - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
951        );
952        println!("    (or full re-pair via `wire dial <handle>@<relay>`)");
953        println!("  - until they do, you'll receive but they won't be able to reach you");
954        // Suppress unused warning
955        let _ = old_slot_token;
956    }
957    Ok(())
958}
959
960// ---------- forget-peer ----------
961
962pub(super) fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
963    let mut trust = config::read_trust()?;
964    let mut removed_from_trust = false;
965    if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
966        && agents.remove(handle).is_some()
967    {
968        removed_from_trust = true;
969    }
970    config::write_trust(&trust)?;
971
972    let mut state = config::read_relay_state()?;
973    let mut removed_from_relay = false;
974    if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
975        && peers.remove(handle).is_some()
976    {
977        removed_from_relay = true;
978    }
979    config::write_relay_state(&state)?;
980
981    let mut purged: Vec<String> = Vec::new();
982    if purge {
983        for dir in [config::inbox_dir()?, config::outbox_dir()?] {
984            let path = dir.join(format!("{handle}.jsonl"));
985            if path.exists() {
986                std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
987                purged.push(path.to_string_lossy().into());
988            }
989        }
990    }
991
992    if !removed_from_trust && !removed_from_relay {
993        if as_json {
994            println!(
995                "{}",
996                serde_json::to_string(&json!({
997                    "removed": false,
998                    "reason": format!("peer {handle:?} not pinned"),
999                }))?
1000            );
1001        } else {
1002            eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
1003        }
1004        return Ok(());
1005    }
1006
1007    if as_json {
1008        println!(
1009            "{}",
1010            serde_json::to_string(&json!({
1011                "handle": handle,
1012                "removed_from_trust": removed_from_trust,
1013                "removed_from_relay_state": removed_from_relay,
1014                "purged_files": purged,
1015            }))?
1016        );
1017    } else {
1018        println!("forgot peer {handle:?}");
1019        if removed_from_trust {
1020            println!("  - removed from trust.json");
1021        }
1022        if removed_from_relay {
1023            println!("  - removed from relay.json");
1024        }
1025        if !purged.is_empty() {
1026            for p in &purged {
1027                println!("  - deleted {p}");
1028            }
1029        } else if !purge {
1030            println!("  (inbox/outbox files preserved; pass --purge to delete them)");
1031        }
1032    }
1033    Ok(())
1034}
1035
1036// ---------- daemon (long-lived push+pull sync) ----------
1037
1038pub(super) fn cmd_daemon(
1039    interval_secs: u64,
1040    once: bool,
1041    all_sessions: bool,
1042    session: Option<String>,
1043    as_json: bool,
1044) -> Result<()> {
1045    // v0.14.2 (#162): supervisor mode is mutually exclusive with --once and
1046    // --session — the supervisor IS the multi-session orchestrator, and
1047    // --once is a single-cycle exit (no supervision). Surface loudly
1048    // rather than silently picking one branch.
1049    if all_sessions {
1050        if once {
1051            bail!("--all-sessions and --once are mutually exclusive (supervisor runs forever)");
1052        }
1053        if session.is_some() {
1054            bail!(
1055                "--all-sessions and --session are mutually exclusive (supervisor manages every session, not a single named one)"
1056            );
1057        }
1058        return crate::daemon_supervisor::run_supervisor(interval_secs, as_json);
1059    }
1060    // v0.14.2 (#162): pin this process's WIRE_HOME to the named session's
1061    // home dir BEFORE any config read. Used by the supervisor when it
1062    // fork-execs children, and operator-facing when running a one-session
1063    // foreground daemon outside launchd.
1064    if let Some(ref name) = session {
1065        // v0.14.2 #44: resolve via the layout-aware helper so v0.13
1066        // by-key sessions (where the on-disk dir is a hash and the
1067        // operator-typed name is the persona handle, e.g.
1068        // "coral-weasel") work as well as legacy v0.6 top-level
1069        // sessions. Pre-fix: `session_dir(name)` only resolved the
1070        // legacy form → operator running `wire daemon --session
1071        // coral-weasel` in a tmux pane saw "session not found" even
1072        // though `wire session list` clearly enumerated it.
1073        let home = crate::session::find_session_home_by_name(name)
1074            .with_context(|| format!("resolving session home for --session {name}"))?
1075            .ok_or_else(|| {
1076                anyhow!(
1077                    "session '{name}' not found — run `wire session list` to see initialized sessions"
1078                )
1079            })?;
1080        // SAFETY: cmd_daemon is the one process-lifetime entrypoint that
1081        // chooses a session. No other thread reads WIRE_HOME yet.
1082        unsafe {
1083            std::env::set_var("WIRE_HOME", &home);
1084        }
1085        if !as_json {
1086            eprintln!(
1087                "wire daemon: pinned to session '{name}' (WIRE_HOME={})",
1088                home.display()
1089            );
1090        }
1091    }
1092    if !config::is_initialized()? {
1093        bail!("not initialized — run `wire up` first");
1094    }
1095    // v0.14.2 (#162): pidfile singleton on the persistent daemon. If
1096    // another live `wire daemon` already owns the pidfile, exit 0 with a
1097    // human/JSON message instead of starting a second polling loop —
1098    // honey-pine's report observed 3 concurrent daemons polling the same
1099    // slot, wasteful and a possible source of duplicate-pull races.
1100    // `--once` is a single sync cycle and doesn't own the cursor; the
1101    // singleton check is skipped for it (matches the existing collision
1102    // warning's `--once` carve-out). Test escape hatch:
1103    // `WIRE_DAEMON_NO_SINGLETON=1`.
1104    let _pid_guard = if !once && std::env::var("WIRE_DAEMON_NO_SINGLETON").is_err() {
1105        if let Some(holder_pid) = crate::ensure_up::daemon_singleton_holder() {
1106            if as_json {
1107                println!(
1108                    "{}",
1109                    serde_json::to_string(&json!({
1110                        "status": "skipped",
1111                        "reason": "daemon already running",
1112                        "holder_pid": holder_pid,
1113                    }))?
1114                );
1115            } else {
1116                eprintln!(
1117                    "wire daemon: another daemon is already running (pid {holder_pid}); not starting a second polling loop. Set WIRE_DAEMON_NO_SINGLETON=1 to override."
1118                );
1119            }
1120            return Ok(());
1121        }
1122        Some(crate::ensure_up::claim_daemon_singleton()?)
1123    } else {
1124        None
1125    };
1126    // v0.13.x identity work: a long-running daemon racing another wire
1127    // process for the same inbox cursor silently loses messages. Surface
1128    // the collision the same way `wire mcp` does. Skipped under `--once`:
1129    // a single sync cycle is atomic and doesn't own the cursor.
1130    if !once {
1131        crate::session::warn_on_identity_collision(std::process::id(), "daemon");
1132    }
1133    let interval = std::time::Duration::from_secs(interval_secs.max(1));
1134
1135    if !as_json {
1136        if once {
1137            eprintln!("wire daemon: single sync cycle, then exit");
1138        } else {
1139            eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
1140        }
1141    }
1142
1143    // Claim the daemon pidfile for this process so `wire status` / doctor /
1144    // the singleton guard can see us when started directly (not via
1145    // ensure_background). Best-effort.
1146    if let Err(e) = crate::ensure_up::write_self_daemon_pid() {
1147        eprintln!("daemon: pidfile write error: {e:#}");
1148    }
1149
1150    // R1 phase 2: spawn the SSE stream subscriber. On every event pushed
1151    // to our slot, the subscriber signals `wake_rx`; we use it as the
1152    // sleep-or-wake gate of the polling loop. Polling stays as the
1153    // safety net — stream errors fall back transparently to the existing
1154    // interval-based cadence.
1155    let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
1156    if !once {
1157        crate::daemon_stream::spawn_stream_subscriber(wake_tx);
1158    }
1159
1160    // Arm inbound-message OS toasts inside the always-on daemon: fold the
1161    // `wire notify` sweep into the sync loop so the default `wire up` path
1162    // delivers toasts for incoming messages (previously nothing ever started
1163    // a notify sweep — inbound messages arrived silently). `--once` is a
1164    // single atomic cycle that doesn't own the cursor, so it stays opt-out.
1165    let mut notify_state: Option<(crate::inbox_watch::InboxWatcher, std::path::PathBuf)> = if once {
1166        None
1167    } else {
1168        let cursor_path = config::state_dir()?.join("notify.cursor");
1169        match crate::inbox_watch::InboxWatcher::from_cursor_file(&cursor_path) {
1170            Ok(w) => Some((w, cursor_path)),
1171            Err(e) => {
1172                // Non-fatal: the sync loop is the daemon's core job; toasts
1173                // are a side channel. Degrade to no toasts, keep syncing.
1174                eprintln!("daemon: notify watcher init failed, toasts disabled: {e:#}");
1175                None
1176            }
1177        }
1178    };
1179
1180    loop {
1181        let pushed = run_sync_push().unwrap_or_else(|e| {
1182            eprintln!("daemon: push error: {e:#}");
1183            json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
1184        });
1185        let pulled = run_sync_pull().unwrap_or_else(|e| {
1186            eprintln!("daemon: pull error: {e:#}");
1187            json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
1188        });
1189
1190        // Toast any newly-arrived inbox events (folded-in `wire notify`).
1191        if let Some((ref mut watcher, ref cursor_path)) = notify_state {
1192            match super::comms::notify_sweep_new_events(watcher, cursor_path) {
1193                Ok(events) => super::comms::toast_inbox_events(&events),
1194                Err(e) => eprintln!("daemon: notify sweep error: {e:#}"),
1195            }
1196        }
1197
1198        // v0.14.2 (#162): persist a `last_sync.json` record after every
1199        // cycle (including --once + cycles that pushed/pulled zero events
1200        // — the "idle daemon is alive" signal is exactly what the
1201        // detection layers need). Readers: `wire status`,
1202        // `mcp__wire__wire_status`, `mcp__wire__wire_send` annotations.
1203        // Best-effort: errors log + don't abort the loop.
1204        let cycle_push_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
1205        let cycle_pull_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
1206        let cycle_rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
1207        crate::ensure_up::write_last_sync_record(cycle_push_n, cycle_pull_n, cycle_rejected_n);
1208
1209        if as_json {
1210            println!(
1211                "{}",
1212                serde_json::to_string(&json!({
1213                    "ts": time::OffsetDateTime::now_utc()
1214                        .format(&time::format_description::well_known::Rfc3339)
1215                        .unwrap_or_default(),
1216                    "push": pushed,
1217                    "pull": pulled,
1218                }))?
1219            );
1220        } else if cycle_push_n > 0 || cycle_pull_n > 0 || cycle_rejected_n > 0 {
1221            eprintln!(
1222                "daemon: pushed={cycle_push_n} pulled={cycle_pull_n} rejected={cycle_rejected_n}"
1223            );
1224        }
1225
1226        if once {
1227            return Ok(());
1228        }
1229        // Wait either for the next poll-interval tick OR for a stream
1230        // wake signal — whichever comes first. Drain any additional
1231        // wake-ups that accumulated during the previous cycle since one
1232        // pull catches up everything.
1233        //
1234        // v0.13.2 (wisp-blossom): if the stream subscriber thread has gone
1235        // away, `wake_rx` is Disconnected and `recv_timeout` returns
1236        // INSTANTLY — which would busy-spin the sync loop (hammering push/pull
1237        // + the relay with zero delay). Fall back to a plain sleep so a dead
1238        // stream degrades to normal polling and never kills or pegs the
1239        // daemon. (Realizes the "decouple stream from sync" hardening — a
1240        // stream failure must never affect the push/pull loop.)
1241        match wake_rx.recv_timeout(interval) {
1242            Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
1243            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1244                std::thread::sleep(interval);
1245            }
1246        }
1247        while wake_rx.try_recv().is_ok() {}
1248    }
1249}
1250
1251/// Programmatic push (no stdout, no exit on errors). Returns the same JSON
1252/// shape `wire push --json` emits.
1253pub fn run_sync_push() -> Result<Value> {
1254    let state = config::read_relay_state()?;
1255    let peers = state["peers"].as_object().cloned().unwrap_or_default();
1256    if peers.is_empty() {
1257        return Ok(json!({"pushed": [], "skipped": []}));
1258    }
1259    let outbox_dir = config::outbox_dir()?;
1260    if !outbox_dir.exists() {
1261        return Ok(json!({"pushed": [], "skipped": []}));
1262    }
1263    let mut pushed = Vec::new();
1264    let mut skipped = Vec::new();
1265    for (peer_handle, slot_info) in peers.iter() {
1266        let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
1267        if !outbox.exists() {
1268            continue;
1269        }
1270        let url = slot_info["relay_url"].as_str().unwrap_or("");
1271        let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
1272        let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
1273        if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
1274            continue;
1275        }
1276        let client = crate::relay_client::RelayClient::new(url);
1277        let body = std::fs::read_to_string(&outbox)?;
1278        for line in body.lines() {
1279            let event: Value = match serde_json::from_str(line) {
1280                Ok(v) => v,
1281                Err(_) => continue,
1282            };
1283            let event_id = event
1284                .get("event_id")
1285                .and_then(Value::as_str)
1286                .unwrap_or("")
1287                .to_string();
1288            match client.post_event(slot_id, slot_token, &event) {
1289                Ok(resp) => {
1290                    // v0.14.2 (#162 fix #2): record the queued → pushed
1291                    // transition in the per-peer lifecycle log. Both
1292                    // `ok` and `duplicate` count as pushed — the relay
1293                    // has the event either way, and an operator who
1294                    // hits the dedup path didn't lose the event. Failure
1295                    // here is non-fatal: the sync loop must keep
1296                    // running even if the lifecycle log can't be
1297                    // appended.
1298                    let now = time::OffsetDateTime::now_utc()
1299                        .format(&time::format_description::well_known::Rfc3339)
1300                        .unwrap_or_default();
1301                    if let Err(e) = config::append_pushed_log(peer_handle, &event_id, &now) {
1302                        eprintln!(
1303                            "daemon: pushed-log append for {peer_handle}/{event_id} failed (non-fatal): {e:#}"
1304                        );
1305                    }
1306                    if resp.status == "duplicate" {
1307                        skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
1308                    } else {
1309                        pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
1310                    }
1311                }
1312                Err(e) => {
1313                    // v0.5.13: flatten the anyhow chain so TLS / DNS / timeout
1314                    // errors aren't hidden behind the topmost-context URL string.
1315                    // Issue #6 highest-impact silent-fail fix.
1316                    let reason = crate::relay_client::format_transport_error(&e);
1317                    skipped
1318                        .push(json!({"peer": peer_handle, "event_id": event_id, "reason": reason}));
1319                }
1320            }
1321        }
1322    }
1323    Ok(json!({"pushed": pushed, "skipped": skipped}))
1324}
1325
1326/// Programmatic pull. Same shape as `wire pull --json`.
1327///
1328/// v0.9: routes through `endpoints::self_primary_endpoint` so sessions
1329/// created via `wire session new --with-local` (which only writes
1330/// `self.endpoints[]`, not the legacy top-level fields) actually pull.
1331/// Pre-v0.9 this function read only the top-level fields and silently
1332/// returned `{}` for any v0.5.17+ session.
1333pub fn run_sync_pull() -> Result<Value> {
1334    let state = config::read_relay_state()?;
1335    if state.get("self").map(Value::is_null).unwrap_or(true) {
1336        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
1337    }
1338    // E2 (v0.13.2): pull EVERY self endpoint, not just the primary. A session
1339    // that bound a local slot (additive) alongside its federation slot used to
1340    // have the daemon pull ONLY the primary (federation) endpoint — the local
1341    // slot was never serviced, so same-box loopback delivery silently never
1342    // happened until a manual restart re-seeded the (startup-only) stream
1343    // subscriber. Now each endpoint is pulled with its OWN cursor.
1344    let endpoints = crate::endpoints::self_endpoints(&state);
1345    if endpoints.is_empty() {
1346        return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
1347    }
1348    let inbox_dir = config::inbox_dir()?;
1349    config::ensure_dirs()?;
1350
1351    // Per-slot cursors live at `self.cursors.<slot_id>`. The legacy global
1352    // `self.last_pulled_event_id` is migrated as the cursor for the PRIMARY
1353    // slot only (a federation event id won't match a local slot's log); other
1354    // slots start from None and `process_events` dedups against the inbox.
1355    let self_obj = state.get("self").cloned().unwrap_or(Value::Null);
1356    let legacy_cursor = self_obj
1357        .get("last_pulled_event_id")
1358        .and_then(Value::as_str)
1359        .map(str::to_string);
1360    let primary_slot = crate::endpoints::self_primary_endpoint(&state).map(|e| e.slot_id);
1361    let mut cursors: serde_json::Map<String, Value> = self_obj
1362        .get("cursors")
1363        .and_then(Value::as_object)
1364        .cloned()
1365        .unwrap_or_default();
1366
1367    let mut all_written: Vec<Value> = Vec::new();
1368    let mut all_rejected: Vec<Value> = Vec::new();
1369    let mut total_seen = 0usize;
1370    let mut blocked_any = false;
1371
1372    for ep in &endpoints {
1373        if ep.relay_url.is_empty() {
1374            continue;
1375        }
1376        let cursor = cursors
1377            .get(&ep.slot_id)
1378            .and_then(Value::as_str)
1379            .map(str::to_string)
1380            .or_else(|| {
1381                if Some(&ep.slot_id) == primary_slot.as_ref() {
1382                    legacy_cursor.clone()
1383                } else {
1384                    None
1385                }
1386            });
1387        let client = crate::relay_client::RelayClient::new(&ep.relay_url);
1388        // One endpoint erroring (relay down, slot gone) must NOT stop the
1389        // others — a dead local relay shouldn't black-hole federation pulls.
1390        let events =
1391            match client.list_events(&ep.slot_id, &ep.slot_token, cursor.as_deref(), Some(1000)) {
1392                Ok(e) => e,
1393                Err(e) => {
1394                    eprintln!(
1395                        "daemon: pull error on {} slot {} (continuing): {e:#}",
1396                        ep.relay_url, ep.slot_id
1397                    );
1398                    continue;
1399                }
1400            };
1401        total_seen += events.len();
1402        // P0.1 shared cursor-blocking logic (matches `wire pull`). A block on
1403        // one slot only stalls THAT slot's cursor; other slots keep flowing.
1404        let result = crate::pull::process_events(&events, cursor, &inbox_dir)?;
1405        if let Some(eid) = &result.advance_cursor_to {
1406            cursors.insert(ep.slot_id.clone(), Value::String(eid.clone()));
1407        }
1408        blocked_any |= result.blocked;
1409        all_written.extend(result.written);
1410        all_rejected.extend(result.rejected);
1411    }
1412
1413    // P0.3 flock-protected RMW: persist per-slot cursors + keep the legacy
1414    // global cursor in sync with the primary slot for back-compat with older
1415    // binaries that only read `last_pulled_event_id`.
1416    let primary_cursor = primary_slot
1417        .as_ref()
1418        .and_then(|s| cursors.get(s))
1419        .and_then(Value::as_str)
1420        .map(str::to_string);
1421    // v0.14.3 (#14): group `written` by sender handle, take max
1422    // timestamp, write to `peers[<handle>].last_inbound_event_at`.
1423    // RFC3339-comparable as lex sort (same offset, ISO 8601). This
1424    // is the daemon-written signal `check_peer_staleness` needs —
1425    // robust against backup/restore/`touch` that breaks inbox-mtime
1426    // detection. Additive field: pre-v0.14.3 readers ignore it,
1427    // older daemons just don't write it.
1428    let mut latest_inbound: std::collections::HashMap<String, String> =
1429        std::collections::HashMap::new();
1430    for w in &all_written {
1431        let from = match w.get("from").and_then(Value::as_str) {
1432            Some(s) => s.to_string(),
1433            None => continue,
1434        };
1435        let ts = match w.get("timestamp").and_then(Value::as_str) {
1436            Some(s) if !s.is_empty() => s.to_string(),
1437            _ => continue,
1438        };
1439        latest_inbound
1440            .entry(from)
1441            .and_modify(|existing| {
1442                if ts > *existing {
1443                    *existing = ts.clone();
1444                }
1445            })
1446            .or_insert(ts);
1447    }
1448    config::update_relay_state(|state| {
1449        if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
1450            self_obj.insert("cursors".into(), Value::Object(cursors.clone()));
1451            if let Some(pc) = &primary_cursor {
1452                self_obj.insert("last_pulled_event_id".into(), Value::String(pc.clone()));
1453            }
1454        }
1455        if !latest_inbound.is_empty()
1456            && let Some(peers_obj) = state.get_mut("peers").and_then(Value::as_object_mut)
1457        {
1458            for (handle, ts) in &latest_inbound {
1459                let entry = peers_obj.entry(handle.clone()).or_insert_with(|| json!({}));
1460                if let Some(obj) = entry.as_object_mut() {
1461                    obj.insert("last_inbound_event_at".into(), Value::String(ts.clone()));
1462                }
1463            }
1464        }
1465        Ok(())
1466    })?;
1467
1468    Ok(json!({
1469        "written": all_written,
1470        "rejected": all_rejected,
1471        "total_seen": total_seen,
1472        "cursor_blocked": blocked_any,
1473        "endpoints_pulled": endpoints.len(),
1474    }))
1475}
1476
1477/// Issue #69 follow-up to #15: predicate "does this error smell like a
1478/// 4xx slot rotation?" — used by `try_reresolve_peer_on_slot_4xx` to
1479/// decide whether to spend a whois RTT on a re-resolve.
1480///
1481/// Original #15 implementation used `last_err.contains("410") ||
1482/// last_err.contains("404")`, which false-triggers on any unrelated
1483/// substring with `"410"`/`"404"` in it — e.g. `"slot 4101 expired"`,
1484/// `"request_id=410abc..."`, `"received 4040 bytes"`. False-trigger cost
1485/// is a single wasted whois per push call per peer (rate-limited by
1486/// `already_tried`), but it muddies the doctor diagnostic by inserting
1487/// spurious "peer slot rotated" log lines.
1488///
1489/// This predicate gates on the status code appearing as a *whole token*
1490/// — preceded by start-of-string / space / colon / tab / newline AND
1491/// followed by end-of-string / space / colon / tab / newline. That
1492/// matches both real-world shapes:
1493///
1494/// - `reqwest::StatusCode` Display, via `relay_client.rs` line ~339
1495///   `format!("post_event failed: {status}: {detail}")` →
1496///   `"post_event failed: 410 Gone: <body>"` (token `"410"` is followed
1497///   by space).
1498/// - UDS bare-`u16` Display, via `relay_client.rs` line ~227
1499///   `format!("post_event (uds {socket_path}) failed: {status}: ...")` →
1500///   `"post_event (uds /tmp/...sock) failed: 410: <body>"` (token
1501///   `"410"` is followed by colon).
1502///
1503/// And rejects the false-positive shapes documented in
1504/// `error_smells_like_slot_4xx_tests` below.
1505pub fn error_smells_like_slot_4xx(last_err: &str) -> bool {
1506    fn is_token_boundary(b: u8) -> bool {
1507        matches!(b, b' ' | b':' | b'\t' | b'\n' | b'\r')
1508    }
1509    let bytes = last_err.as_bytes();
1510    for code in ["410", "404"] {
1511        let code_bytes = code.as_bytes();
1512        let mut search_from = 0usize;
1513        while let Some(rel) = last_err[search_from..].find(code) {
1514            let abs = search_from + rel;
1515            let end = abs + code_bytes.len();
1516            let before_ok = abs == 0 || is_token_boundary(bytes[abs - 1]);
1517            let after_ok = end == bytes.len() || is_token_boundary(bytes[end]);
1518            if before_ok && after_ok {
1519                return true;
1520            }
1521            // Step past this candidate to find the next occurrence; using
1522            // `+ 1` (rather than `+ code_bytes.len()`) keeps the scan
1523            // cheap and guarantees forward progress even on overlap.
1524            search_from = abs + 1;
1525        }
1526    }
1527    false
1528}
1529
1530/// Issue #15: detect a 4xx-shaped push failure that smells like "slot
1531/// rotated by peer" and update the peer's pin in place with the freshly
1532/// resolved slot from the relay's handle directory.
1533///
1534/// Returns:
1535/// - `Ok(true)` — peer's pin was rotated; caller should refresh
1536///   `peer_endpoints_in_priority_order(&state, ...)` and retry.
1537/// - `Ok(false)` — re-resolve completed but the slot id was unchanged
1538///   (false-alarm 4xx, e.g. throttling); caller should NOT retry.
1539/// - `Err(e)` — re-resolve itself failed (network down, relay 5xx,
1540///   handle no longer claimed, etc.); caller should fall through to the
1541///   existing "skipped" path.
1542///
1543/// Only triggers when:
1544///   - The error string carries a 4xx slot-rotation status token (`410`/`404`)
1545///     as a *whole token* — preceded by start/space/colon/tab/newline and
1546///     followed by end/space/colon/tab/newline. This matches both the
1547///     `reqwest::StatusCode` Display shape (`": 410 Gone"`) and the UDS
1548///     bare-`u16` shape (`": 410:"`) emitted by `post_event` in
1549///     `src/relay_client.rs`, while rejecting substring false-positives
1550///     like `"slot 4101 expired"` or `"request_id=410abc..."`. See
1551///     `error_smells_like_slot_4xx` below.
1552///   - The peer has a pinned `relay_url` we can parse a handle@domain from.
1553///   - The caller hasn't already re-resolved this peer in the current push
1554///     call (caller's responsibility — pass `already_tried` from a set kept
1555///     in the outer per-peer loop). One whois per peer per push call,
1556///     exactly the rate limit the issue specifies.
1557///
1558/// Updates `state.peers[peer_handle]` in place (rotates the federation
1559/// endpoint's slot_id + slot_token to the fresh resolve), and emits a
1560/// stderr WARN so the operator can see the rotation event in their
1561/// terminal alongside the unrelated `wire push` output. Caller is
1562/// responsible for persisting `state` back to disk via
1563/// `config::write_relay_state` after all per-peer re-resolves settle.
1564fn try_reresolve_peer_on_slot_4xx(
1565    state: &mut Value,
1566    peer_handle: &str,
1567    last_err: &str,
1568    already_tried: &std::collections::HashSet<String>,
1569) -> Result<bool> {
1570    if !error_smells_like_slot_4xx(last_err) {
1571        // Not the slot-rotation shape. Don't waste a whois on this.
1572        return Ok(false);
1573    }
1574    if already_tried.contains(peer_handle) {
1575        // Rate limit: at most one whois per peer per push call.
1576        return Ok(false);
1577    }
1578    // Find the peer's pinned federation endpoint to re-resolve against.
1579    let peer_entry = state
1580        .get("peers")
1581        .and_then(|p| p.get(peer_handle))
1582        .ok_or_else(|| anyhow!("peer `{peer_handle}` not in relay_state"))?;
1583    let peer_relay = peer_entry
1584        .get("endpoints")
1585        .and_then(Value::as_array)
1586        .and_then(|arr| {
1587            arr.iter().find(|e| {
1588                e.get("scope").and_then(Value::as_str) == Some("federation")
1589                    || e.get("scope").and_then(Value::as_str) == Some("Federation")
1590            })
1591        })
1592        .and_then(|e| e.get("relay_url").and_then(Value::as_str))
1593        .or_else(|| peer_entry.get("relay_url").and_then(Value::as_str))
1594        .ok_or_else(|| {
1595            anyhow!("peer `{peer_handle}` has no federation endpoint to re-resolve against")
1596        })?
1597        .to_string();
1598    // Strip scheme + path to get the relay domain. Same shape parse used by
1599    // pair_profile::resolve_handle's input contract.
1600    let domain = peer_relay
1601        .trim_start_matches("https://")
1602        .trim_start_matches("http://")
1603        .split('/')
1604        .next()
1605        .unwrap_or(&peer_relay)
1606        .to_string();
1607    let handle = crate::pair_profile::Handle {
1608        nick: peer_handle.to_string(),
1609        domain,
1610    };
1611    let resolved = crate::pair_profile::resolve_handle(&handle, Some(&peer_relay))?;
1612    let new_slot_id = resolved
1613        .get("slot_id")
1614        .and_then(Value::as_str)
1615        .ok_or_else(|| anyhow!("re-resolved payload missing slot_id"))?
1616        .to_string();
1617    // Compare against the currently-pinned federation slot.
1618    let peers = state
1619        .get_mut("peers")
1620        .and_then(Value::as_object_mut)
1621        .ok_or_else(|| anyhow!("relay_state.peers missing or wrong shape"))?;
1622    let peer_entry = peers
1623        .get_mut(peer_handle)
1624        .ok_or_else(|| anyhow!("peer `{peer_handle}` disappeared from state mid-resolve"))?;
1625    let current_slot_id = peer_entry
1626        .get("endpoints")
1627        .and_then(Value::as_array)
1628        .and_then(|arr| {
1629            arr.iter().find(|e| {
1630                let scope = e.get("scope").and_then(Value::as_str);
1631                scope == Some("federation") || scope == Some("Federation")
1632            })
1633        })
1634        .and_then(|e| e.get("slot_id").and_then(Value::as_str))
1635        .unwrap_or("")
1636        .to_string();
1637    if current_slot_id == new_slot_id {
1638        // Same slot — the 4xx was something else (rate limit, server burp).
1639        return Ok(false);
1640    }
1641    // Rotate in place. We update slot_id but DROP the slot_token: only the
1642    // peer's freshly-issued slot_token (which arrives via a new pair_drop_ack)
1643    // is valid. Sending against the new slot without a fresh token gets 401,
1644    // so the operator will see one more "skipped: 401" and the next pair
1645    // cycle (or a manual `wire add <peer>@<relay>` per the doctor #14 fix)
1646    // refreshes the token. This is the same trade-off the issue spells out:
1647    // auto-rotation closes the slot mismatch; token refresh still needs the
1648    // bilateral pair gate.
1649    if let Some(endpoints) = peer_entry
1650        .get_mut("endpoints")
1651        .and_then(Value::as_array_mut)
1652    {
1653        for ep in endpoints.iter_mut() {
1654            let scope = ep.get("scope").and_then(Value::as_str);
1655            if scope == Some("federation") || scope == Some("Federation") {
1656                ep["slot_id"] = Value::String(new_slot_id.clone());
1657                ep["slot_token"] = Value::String(String::new());
1658            }
1659        }
1660    }
1661    // Also update the legacy top-level fields for v0.5.16-era readers (the
1662    // same back-compat surface pair_drop_ack uses).
1663    peer_entry["slot_id"] = Value::String(new_slot_id.clone());
1664    peer_entry["slot_token"] = Value::String(String::new());
1665    eprintln!(
1666        "wire push: peer `{peer_handle}` rotated their relay slot (was `{current_slot_id}`, \
1667         now `{new_slot_id}`); pin updated in place. Re-pair via `wire add \
1668         {peer_handle}@<relay>` to refresh the slot_token."
1669    );
1670    Ok(true)
1671}
1672
1673#[cfg(test)]
1674mod slot_reresolve_tests {
1675    use super::*;
1676
1677    /// Issue #15: the gating logic of try_reresolve_peer_on_slot_4xx
1678    /// must short-circuit BEFORE any network call when the error shape
1679    /// doesn't smell like slot rotation, when the peer was already
1680    /// re-resolved this push, or when there's no peer entry to work
1681    /// against. Three of those four short-circuit paths are testable
1682    /// without a mock relay; the fourth (the actual whois + slot
1683    /// comparison) requires either a live test server or a mock
1684    /// transport, so it's covered manually via the failover_tests
1685    /// helper + integration check in a separate PR.
1686    ///
1687    /// What these tests pin:
1688    ///   - 200/500/timeout-shape errors do NOT trigger a re-resolve
1689    ///     (avoids wasted whois RTTs and churn in steady-state).
1690    ///   - Same peer twice in one push call only attempts re-resolve
1691    ///     once (rate limit the issue specifies).
1692    ///   - Missing peer entry surfaces as an explicit error, NOT a
1693    ///     silent skip (operator can see the malformed state).
1694    ///   - Peer with no federation endpoint surfaces as an explicit
1695    ///     error (you can't re-resolve a slot you can't address).
1696
1697    #[test]
1698    fn try_reresolve_skips_when_error_is_not_4xx_shape() {
1699        let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
1700        let already = std::collections::HashSet::new();
1701        // 200 OK shouldn't ever land in this path, but sanity check the
1702        // negative filter: any error string without "404"/"410" is a no-op.
1703        let res =
1704            try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "post failed: 502", &already)
1705                .unwrap();
1706        assert!(!res, "502 must NOT trigger a re-resolve");
1707
1708        let res =
1709            try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "connection refused", &already)
1710                .unwrap();
1711        assert!(!res, "transport errors must NOT trigger a re-resolve");
1712
1713        let res = try_reresolve_peer_on_slot_4xx(
1714            &mut state,
1715            "some-peer",
1716            "post failed: 401 Unauthorized",
1717            &already,
1718        )
1719        .unwrap();
1720        assert!(
1721            !res,
1722            "401 (auth) is a token problem, not a slot rotation — must NOT trigger a re-resolve"
1723        );
1724    }
1725
1726    #[test]
1727    fn try_reresolve_rate_limits_one_attempt_per_peer_per_push() {
1728        // The issue's rate limit: "at most one whois per peer per push call."
1729        // Caller tracks via `already_tried`; helper must honor it BEFORE
1730        // attempting any I/O (otherwise a bad-state peer would burn a
1731        // network call per event in the outbox).
1732        let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
1733        let mut already = std::collections::HashSet::new();
1734        already.insert("some-peer".to_string());
1735        let res = try_reresolve_peer_on_slot_4xx(
1736            &mut state,
1737            "some-peer",
1738            "post failed: 410 Gone",
1739            &already,
1740        )
1741        .unwrap();
1742        assert!(
1743            !res,
1744            "peer already in `already_tried` must NOT trigger another re-resolve in the same push"
1745        );
1746    }
1747
1748    #[test]
1749    fn try_reresolve_errors_when_peer_missing_from_state() {
1750        // Surface state corruption explicitly rather than silently
1751        // returning Ok(false). If a peer disappeared from relay_state
1752        // mid-loop the operator needs to see it.
1753        let mut state = json!({"peers": {}});
1754        let already = std::collections::HashSet::new();
1755        let err = try_reresolve_peer_on_slot_4xx(
1756            &mut state,
1757            "missing-peer",
1758            "post failed: 410 Gone",
1759            &already,
1760        )
1761        .unwrap_err()
1762        .to_string();
1763        assert!(
1764            err.contains("missing-peer") && err.contains("not in relay_state"),
1765            "missing-peer error must name the peer + the failure: {err}"
1766        );
1767    }
1768
1769    #[test]
1770    fn try_reresolve_errors_when_peer_has_no_federation_endpoint() {
1771        // A peer with only local-scope endpoints (UDS / 127.0.0.1) has
1772        // no relay domain to whois against. Helper must surface this as
1773        // an actionable error, not a silent skip — the operator's
1774        // remediation is "pair via federation" or "you're on the same
1775        // box, the slot can't be 410'd by a peer who controls the
1776        // socket."
1777        let mut state = json!({
1778            "peers": {
1779                "local-only": {
1780                    "endpoints": [
1781                        {
1782                            "scope": "Local",
1783                            "relay_url": "http://127.0.0.1:8771",
1784                            "slot_id": "loc",
1785                            "slot_token": "tok"
1786                        }
1787                    ]
1788                }
1789            }
1790        });
1791        let already = std::collections::HashSet::new();
1792        let err = try_reresolve_peer_on_slot_4xx(
1793            &mut state,
1794            "local-only",
1795            "post failed: 410 Gone",
1796            &already,
1797        )
1798        .unwrap_err()
1799        .to_string();
1800        assert!(
1801            err.contains("federation endpoint"),
1802            "no-federation error must name the problem: {err}"
1803        );
1804    }
1805
1806    /// Issue #69: pin the word-boundary behavior of
1807    /// `error_smells_like_slot_4xx`. Prior implementation used a bare
1808    /// `contains("410") || contains("404")` substring match, which
1809    /// false-triggered on any unrelated error string containing those
1810    /// digits — e.g. slot ids that happen to start with `410`, request
1811    /// IDs, byte counts, etc.  Each false-positive cost a wasted whois
1812    /// per peer per push and a misleading "peer slot rotated" log line.
1813    ///
1814    /// These tests pin three classes:
1815    ///   - Real reqwest StatusCode Display shapes (`": 410 Gone"`,
1816    ///     `": 404 Not Found"`) trigger.
1817    ///   - Real UDS bare-`u16` shapes (`": 410:"`, `": 404:"`) trigger.
1818    ///   - Substring lookalikes (`"slot 4101 expired"`,
1819    ///     `"request_id=410abc"`, `"received 4040 bytes"`,
1820    ///     `"event 0x4104"`) do NOT trigger.
1821    #[test]
1822    fn error_smells_like_slot_4xx_matches_reqwest_status_display_shape() {
1823        // reqwest::StatusCode Display is "<u16> <reason>", embedded in
1824        // the post_event failure format string as "...failed: <status>: <detail>".
1825        assert!(error_smells_like_slot_4xx(
1826            "post_event failed: 410 Gone: slot rotated by peer"
1827        ));
1828        assert!(error_smells_like_slot_4xx(
1829            "post_event failed: 404 Not Found: handle no longer claimed"
1830        ));
1831    }
1832
1833    #[test]
1834    fn error_smells_like_slot_4xx_matches_uds_bare_u16_shape() {
1835        // UDS path formats status as a bare u16, so the shape is
1836        // "...failed: 410: <detail>" with the status flanked by spaces
1837        // and colons (no reason phrase).
1838        assert!(error_smells_like_slot_4xx(
1839            "post_event (uds /tmp/wire-relay.sock) failed: 410: gone"
1840        ));
1841        assert!(error_smells_like_slot_4xx(
1842            "post_event (uds /tmp/wire-relay.sock) failed: 404: not found"
1843        ));
1844    }
1845
1846    #[test]
1847    fn error_smells_like_slot_4xx_rejects_substring_lookalikes() {
1848        // The bug being fixed: the prior `contains("410")` predicate
1849        // matched ALL of these, burning a whois RTT and emitting a
1850        // spurious "peer slot rotated" log line each time.
1851        let false_positives = [
1852            "push aborted: slot 4101 expired",
1853            "post_event failed: 502 Bad Gateway: request_id=410abc-deadbeef",
1854            "post_event failed: 500: received 4040 bytes, expected envelope",
1855            "post_event failed: 500: event 0x4104 malformed",
1856            "post_event failed: 503: backlog=4102 entries pending",
1857            // 4044 is "received bytes" or anything containing 404 mid-token.
1858            "post_event failed: 500: tx_id=4044beef",
1859            // pure digit substrings inside identifiers / hashes:
1860            "post_event failed: 500: hash=abc410def",
1861        ];
1862        for case in false_positives {
1863            assert!(
1864                !error_smells_like_slot_4xx(case),
1865                "must NOT trigger re-resolve on substring lookalike: {case:?}"
1866            );
1867        }
1868    }
1869
1870    #[test]
1871    fn error_smells_like_slot_4xx_handles_edge_positions() {
1872        // Token at start of string (no preceding char).
1873        assert!(error_smells_like_slot_4xx("410 Gone"));
1874        assert!(error_smells_like_slot_4xx("404 Not Found"));
1875        // Token at end of string (no trailing char).
1876        assert!(error_smells_like_slot_4xx("got 410"));
1877        assert!(error_smells_like_slot_4xx("got 404"));
1878        // Tab and newline as separators (logs sometimes carry these).
1879        assert!(error_smells_like_slot_4xx("post_event failed:\t410\tGone"));
1880        assert!(error_smells_like_slot_4xx("post_event failed:\n410\nGone"));
1881        // Pure digit-only input that IS the code — token at start AND end.
1882        assert!(error_smells_like_slot_4xx("410"));
1883        assert!(error_smells_like_slot_4xx("404"));
1884        // Empty / no-match.
1885        assert!(!error_smells_like_slot_4xx(""));
1886        assert!(!error_smells_like_slot_4xx("no relevant status"));
1887        // 411-414, 401-403, 405-409 must NOT trigger (only 410/404 are
1888        // the slot-rotation shape per issue #15).
1889        assert!(!error_smells_like_slot_4xx(
1890            "post_event failed: 401 Unauthorized"
1891        ));
1892        assert!(!error_smells_like_slot_4xx(
1893            "post_event failed: 403 Forbidden"
1894        ));
1895        assert!(!error_smells_like_slot_4xx(
1896            "post_event failed: 411 Length Required"
1897        ));
1898    }
1899}