1use anyhow::{Context, Result, anyhow, bail};
2use serde_json::{Value, json};
3
4use super::setup;
5use crate::{config, signing::sign_message_v31};
6
7pub(super) fn cmd_mcp() -> Result<()> {
10 crate::mcp::run()
11}
12
13pub(super) fn cmd_relay_server(
14 bind: &str,
15 local_only: bool,
16 uds: Option<&std::path::Path>,
17) -> Result<()> {
18 if let Some(socket_path) = uds {
23 let base = if let Ok(home) = std::env::var("WIRE_HOME") {
24 std::path::PathBuf::from(home)
25 .join("state")
26 .join("wire-relay")
27 .join("uds")
28 } else {
29 dirs::state_dir()
30 .or_else(dirs::data_local_dir)
31 .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
32 .join("wire-relay")
33 .join("uds")
34 };
35 let runtime = tokio::runtime::Builder::new_multi_thread()
36 .enable_all()
37 .build()?;
38 return runtime.block_on(crate::relay_server::serve_uds(
39 socket_path.to_path_buf(),
40 base,
41 ));
42 }
43 if local_only {
47 validate_loopback_bind(bind)?;
48 }
49 let base = if let Ok(home) = std::env::var("WIRE_HOME") {
55 std::path::PathBuf::from(home)
56 .join("state")
57 .join("wire-relay")
58 } else {
59 dirs::state_dir()
60 .or_else(dirs::data_local_dir)
61 .ok_or_else(|| anyhow::anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))?
62 .join("wire-relay")
63 };
64 let state_dir = if local_only { base.join("local") } else { base };
65 let runtime = tokio::runtime::Builder::new_multi_thread()
66 .enable_all()
67 .build()?;
68 runtime.block_on(crate::relay_server::serve_with_mode(
69 bind,
70 state_dir,
71 crate::relay_server::ServerMode { local_only },
72 ))
73}
74
75fn validate_loopback_bind(bind: &str) -> Result<()> {
93 let host = if let Some(stripped) = bind.strip_prefix('[') {
95 let close = stripped
96 .find(']')
97 .ok_or_else(|| anyhow::anyhow!("malformed IPv6 bind {bind:?}"))?;
98 stripped[..close].to_string()
99 } else {
100 bind.rsplit_once(':')
101 .map(|(h, _)| h.to_string())
102 .unwrap_or_else(|| bind.to_string())
103 };
104 use std::net::{IpAddr, ToSocketAddrs};
105 let probe = format!("{host}:0");
106 let resolved: Vec<_> = probe
107 .to_socket_addrs()
108 .with_context(|| format!("resolving bind host {host:?}"))?
109 .collect();
110 if resolved.is_empty() {
111 bail!("--local-only: bind host {host:?} resolved to no addresses");
112 }
113 for addr in &resolved {
114 let ip = addr.ip();
115 let is_acceptable = match ip {
116 IpAddr::V4(v4) => {
117 v4.is_loopback() || v4.is_private() || {
118 let octets = v4.octets();
120 octets[0] == 100 && (64..=127).contains(&octets[1])
121 }
122 }
123 IpAddr::V6(v6) => v6.is_loopback(), };
125 if !is_acceptable {
126 bail!(
127 "--local-only refuses non-private bind: {host:?} resolves to {ip} \
128 which is not loopback (127/8, ::1), RFC 1918 private \
129 (10/8, 172.16/12, 192.168/16), or RFC 6598 CGNAT/Tailscale \
130 (100.64.0.0/10). Remove --local-only to bind publicly."
131 );
132 }
133 }
134 Ok(())
135}
136
137fn parse_scope(s: &str) -> Result<crate::endpoints::EndpointScope> {
140 use crate::endpoints::EndpointScope;
141 match s.to_lowercase().as_str() {
142 "federation" | "fed" => Ok(EndpointScope::Federation),
143 "local" => Ok(EndpointScope::Local),
144 "lan" => Ok(EndpointScope::Lan),
145 "uds" => Ok(EndpointScope::Uds),
146 other => bail!("unknown --scope `{other}` (expected federation|local|lan|uds)"),
147 }
148}
149
150pub(crate) fn cmd_bind_relay(
156 url: &str,
157 scope: Option<&str>,
158 replace: bool,
159 migrate_pinned: bool,
160 as_json: bool,
161) -> Result<()> {
162 use crate::endpoints::{Endpoint, self_endpoints};
163
164 if !config::is_initialized()? {
165 bail!("not initialized — run `wire up` first");
166 }
167 let card = config::read_agent_card()?;
168 let did = card.get("did").and_then(Value::as_str).unwrap_or("");
169 let handle = crate::agent_card::display_handle_from_did(did).to_string();
170
171 let normalized_raw = url.trim_end_matches('/');
172 let normalized_owned = setup::strip_relay_url_userinfo(normalized_raw);
176 let normalized = normalized_owned.as_str();
177 setup::assert_relay_url_clean_for_publish(normalized)?;
181 let new_scope = match scope {
182 Some(s) => parse_scope(s)?,
183 None => crate::endpoints::infer_scope_from_url(normalized),
184 };
185
186 let existing = config::read_relay_state().unwrap_or_else(|_| json!({}));
187 let pinned: Vec<String> = existing
188 .get("peers")
189 .and_then(|p| p.as_object())
190 .map(|o| o.keys().cloned().collect())
191 .unwrap_or_default();
192
193 let existing_eps = self_endpoints(&existing);
194 let is_rebind_same = existing_eps.iter().any(|e| e.relay_url == normalized);
195
196 let destructive = replace || is_rebind_same;
203 if destructive && !pinned.is_empty() && !migrate_pinned {
204 let list = pinned.join(", ");
205 let why = if replace {
206 "`--replace` drops your other slot(s)"
207 } else {
208 "re-binding the same relay rotates its slot"
209 };
210 bail!(
211 "bind-relay would black-hole {n} pinned peer(s): {list}. {why}; they are \
212 pinned to your CURRENT slot and would keep pushing to a slot you no longer \
213 read.\n\n\
214 SAFE PATHS:\n\
215 • Default (omit `--replace`) ADDITIVELY binds a NEW relay, keeping existing \
216 slots — no black-hole.\n\
217 • `wire rotate-slot` — same-relay rotation that emits wire_close to peers.\n\
218 • `wire bind-relay {url} --migrate-pinned` — proceed anyway; re-pair each \
219 peer out-of-band.\n\n\
220 Issue #7 (silent black-hole on relay change) caught this.",
221 n = pinned.len(),
222 );
223 }
224
225 let client = crate::relay_client::RelayClient::new(normalized);
226 client.check_healthz()?;
227 let alloc = client.allocate_slot(Some(&handle))?;
228
229 if destructive && !pinned.is_empty() {
230 eprintln!(
231 "wire bind-relay: {mode} with {n} pinned peer(s) — they will black-hole \
232 until they re-pin: {peers}",
233 mode = if replace { "replacing" } else { "rotating" },
234 n = pinned.len(),
235 peers = pinned.join(", "),
236 );
237 }
238
239 let mut state = existing;
243 if replace {
244 state["self"] = Value::Null;
245 }
246 crate::endpoints::upsert_self_endpoint(
247 &mut state,
248 Endpoint {
249 relay_url: normalized.to_string(),
250 slot_id: alloc.slot_id.clone(),
251 slot_token: alloc.slot_token.clone(),
252 scope: new_scope,
253 },
254 );
255 config::write_relay_state(&state)?;
256 let eps = self_endpoints(&state);
257
258 let scope_str = format!("{new_scope:?}").to_lowercase();
259 if as_json {
260 println!(
261 "{}",
262 serde_json::to_string(&json!({
263 "relay_url": normalized,
264 "slot_id": alloc.slot_id,
265 "scope": scope_str,
266 "endpoints": eps.len(),
267 "additive": !replace,
268 "slot_token_present": true,
269 }))?
270 );
271 } else {
272 println!(
273 "bound {scope_str} slot on {normalized} (slot {})",
274 alloc.slot_id
275 );
276 println!(
277 "self now has {n} endpoint(s): {list}",
278 n = eps.len(),
279 list = eps
280 .iter()
281 .map(|e| format!("{}({:?})", e.relay_url, e.scope))
282 .collect::<Vec<_>>()
283 .join(", "),
284 );
285 }
286 Ok(())
287}
288
289pub(super) fn cmd_add_peer_slot(
292 handle: &str,
293 url: &str,
294 slot_id: &str,
295 slot_token: &str,
296 as_json: bool,
297) -> Result<()> {
298 use crate::endpoints::{Endpoint, infer_scope_from_url, pin_peer_endpoints};
299 let mut state = config::read_relay_state()?;
300
301 let new_ep = Endpoint {
308 relay_url: url.to_string(),
309 slot_id: slot_id.to_string(),
310 slot_token: slot_token.to_string(),
311 scope: infer_scope_from_url(url),
312 };
313 let mut endpoints: Vec<Endpoint> = state
316 .get("peers")
317 .and_then(|p| p.get(handle))
318 .and_then(|e| e.get("endpoints"))
319 .and_then(|a| serde_json::from_value::<Vec<Endpoint>>(a.clone()).ok())
320 .unwrap_or_default();
321 if let Some(existing) = endpoints
323 .iter_mut()
324 .find(|e| e.relay_url == new_ep.relay_url)
325 {
326 *existing = new_ep;
327 } else {
328 endpoints.push(new_ep);
329 }
330 let n = endpoints.len();
331 pin_peer_endpoints(&mut state, handle, &endpoints)?;
332 config::write_relay_state(&state)?;
333 if as_json {
334 println!(
335 "{}",
336 serde_json::to_string(&json!({
337 "handle": handle,
338 "relay_url": url,
339 "slot_id": slot_id,
340 "added": true,
341 "endpoint_count": n,
342 }))?
343 );
344 } else {
345 println!(
346 "pinned peer slot for {handle} at {url} ({slot_id}) — peer now has {n} endpoint(s)"
347 );
348 }
349 Ok(())
350}
351
352pub(super) fn cmd_push(peer_filter: Option<&str>, as_json: bool) -> Result<()> {
355 let mut state = config::read_relay_state()?;
356 let peers = state["peers"].as_object().cloned().unwrap_or_default();
357 if peers.is_empty() {
358 bail!(
359 "no peer slots pinned — run `wire add-peer-slot <handle> <url> <slot_id> <token>` first"
360 );
361 }
362 let outbox_dir = config::outbox_dir()?;
363 if outbox_dir.exists() {
368 let pinned: std::collections::HashSet<String> = peers.keys().cloned().collect();
369 for entry in std::fs::read_dir(&outbox_dir)?.flatten() {
370 let path = entry.path();
371 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
372 continue;
373 }
374 let stem = match path.file_stem().and_then(|s| s.to_str()) {
375 Some(s) => s.to_string(),
376 None => continue,
377 };
378 if pinned.contains(&stem) {
379 continue;
380 }
381 let bare = crate::agent_card::bare_handle(&stem);
384 if pinned.contains(bare) {
385 eprintln!(
386 "wire push: WARN stale outbox file `{}.jsonl` not enumerated (pinned peer is `{bare}`). \
387 Merge with: `cat {} >> {}` then delete the FQDN file.",
388 stem,
389 path.display(),
390 outbox_dir.join(format!("{bare}.jsonl")).display(),
391 );
392 }
393 }
394 }
395 if !outbox_dir.exists() {
396 if as_json {
397 println!(
398 "{}",
399 serde_json::to_string(&json!({"pushed": [], "skipped": []}))?
400 );
401 } else {
402 println!("phyllis: nothing to dial out — write a message first with `wire send`");
403 }
404 return Ok(());
405 }
406
407 let mut pushed = Vec::new();
408 let mut skipped = Vec::new();
409
410 let mut rotated_this_push: std::collections::HashSet<String> = std::collections::HashSet::new();
415 let mut state_dirty = false;
418
419 for (peer_handle, _) in peers.iter() {
425 if let Some(want) = peer_filter
426 && peer_handle != want
427 {
428 continue;
429 }
430 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
431 if !outbox.exists() {
432 continue;
433 }
434 let mut ordered_endpoints =
435 crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
436 if ordered_endpoints.is_empty() {
437 for line in std::fs::read_to_string(&outbox).unwrap_or_default().lines() {
441 let event: Value = match serde_json::from_str(line) {
442 Ok(v) => v,
443 Err(_) => continue,
444 };
445 let event_id = event
446 .get("event_id")
447 .and_then(Value::as_str)
448 .unwrap_or("")
449 .to_string();
450 skipped.push(json!({
451 "peer": peer_handle,
452 "event_id": event_id,
453 "reason": "no reachable endpoint pinned for peer",
454 }));
455 }
456 continue;
457 }
458 let body = std::fs::read_to_string(&outbox)?;
459 for line in body.lines() {
460 let event: Value = match serde_json::from_str(line) {
461 Ok(v) => v,
462 Err(_) => continue,
463 };
464 let event_id = event
465 .get("event_id")
466 .and_then(Value::as_str)
467 .unwrap_or("")
468 .to_string();
469
470 let last_err: std::cell::RefCell<Option<String>> = std::cell::RefCell::new(None);
479 match crate::relay_client::try_post_event_with_failover(
480 &ordered_endpoints,
481 &event,
482 |endpoint, ev| {
483 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
484 match client.post_event(&endpoint.slot_id, &endpoint.slot_token, ev) {
485 Ok(resp) => Ok(resp),
486 Err(e) => {
487 *last_err.borrow_mut() =
488 Some(crate::relay_client::format_transport_error(&e));
489 Err(e)
490 }
491 }
492 },
493 ) {
494 Ok((endpoint, resp)) => {
495 if resp.status == "duplicate" {
496 skipped.push(json!({
497 "peer": peer_handle,
498 "event_id": event_id,
499 "reason": "duplicate",
500 "endpoint": endpoint.relay_url,
501 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
502 }));
503 } else {
504 pushed.push(json!({
505 "peer": peer_handle,
506 "event_id": event_id,
507 "endpoint": endpoint.relay_url,
508 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
509 }));
510 }
511 }
512 Err(_) => {
513 let last_err_text = last_err.borrow().clone().unwrap_or_default();
523 let mut delivered_via_retry: Option<(crate::endpoints::Endpoint, _)> = None;
524 match try_reresolve_peer_on_slot_4xx(
525 &mut state,
526 peer_handle,
527 &last_err_text,
528 &rotated_this_push,
529 ) {
530 Ok(true) => {
531 rotated_this_push.insert(peer_handle.clone());
533 state_dirty = true;
534 ordered_endpoints = crate::endpoints::peer_endpoints_in_priority_order(
539 &state,
540 peer_handle,
541 );
542 *last_err.borrow_mut() = None;
543 if let Ok((endpoint, resp)) =
544 crate::relay_client::try_post_event_with_failover(
545 &ordered_endpoints,
546 &event,
547 |endpoint, ev| {
548 let client = crate::relay_client::RelayClient::new(
549 &endpoint.relay_url,
550 );
551 match client.post_event(
552 &endpoint.slot_id,
553 &endpoint.slot_token,
554 ev,
555 ) {
556 Ok(resp) => Ok(resp),
557 Err(e) => {
558 *last_err.borrow_mut() = Some(
559 crate::relay_client::format_transport_error(&e),
560 );
561 Err(e)
562 }
563 }
564 },
565 )
566 {
567 delivered_via_retry = Some((endpoint, resp));
568 }
569 }
570 Ok(false) => {
571 }
575 Err(e) => {
576 *last_err.borrow_mut() = Some(format!(
581 "{}; re-resolve also failed: {e:#}",
582 last_err.borrow().clone().unwrap_or_default()
583 ));
584 rotated_this_push.insert(peer_handle.clone());
586 }
587 }
588 if let Some((endpoint, resp)) = delivered_via_retry {
589 if resp.status == "duplicate" {
590 skipped.push(json!({
591 "peer": peer_handle,
592 "event_id": event_id,
593 "reason": "duplicate",
594 "endpoint": endpoint.relay_url,
595 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
596 "via": "slot_reresolve_retry",
597 }));
598 } else {
599 pushed.push(json!({
600 "peer": peer_handle,
601 "event_id": event_id,
602 "endpoint": endpoint.relay_url,
603 "scope": serde_json::to_value(endpoint.scope).unwrap_or(json!("?")),
604 "via": "slot_reresolve_retry",
605 }));
606 }
607 } else {
608 skipped.push(json!({
613 "peer": peer_handle,
614 "event_id": event_id,
615 "reason": last_err
616 .borrow()
617 .clone()
618 .unwrap_or_else(|| "all endpoints failed".to_string()),
619 }));
620 }
621 }
622 }
623 }
624 }
625
626 if state_dirty && let Err(e) = config::write_relay_state(&state) {
631 eprintln!(
632 "wire push: WARN failed to persist rotated peer slots: {e:#}. \
633 Slot rotation will be re-attempted on next push."
634 );
635 }
636
637 if as_json {
638 println!(
639 "{}",
640 serde_json::to_string(&json!({"pushed": pushed, "skipped": skipped}))?
641 );
642 } else {
643 println!(
644 "pushed {} event(s); skipped {} ({})",
645 pushed.len(),
646 skipped.len(),
647 if skipped.is_empty() {
648 "none"
649 } else {
650 "see --json for detail"
651 }
652 );
653 }
654 Ok(())
655}
656
657pub(super) fn cmd_pull(as_json: bool) -> Result<()> {
660 let state = config::read_relay_state()?;
661 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
662 if self_state.is_null() {
663 bail!("self slot not bound — run `wire bind-relay <url>` first");
664 }
665
666 let endpoints = crate::endpoints::self_endpoints(&state);
675 if endpoints.is_empty() {
676 bail!("self.relay_url / slot_id / slot_token missing in relay_state.json");
677 }
678
679 let inbox_dir = config::inbox_dir()?;
680 config::ensure_dirs()?;
681
682 let mut total_seen = 0usize;
683 let mut all_written: Vec<Value> = Vec::new();
684 let mut all_rejected: Vec<Value> = Vec::new();
685 let mut all_blocked = false;
686 let mut all_advance_cursor_to: Option<String> = None;
687
688 for endpoint in &endpoints {
689 let cursor_key = endpoint_cursor_key(endpoint.scope);
690 let last_event_id = self_state
691 .get(&cursor_key)
692 .and_then(Value::as_str)
693 .map(str::to_string);
694 let client = crate::relay_client::RelayClient::new(&endpoint.relay_url);
695 let events = match client.list_events(
696 &endpoint.slot_id,
697 &endpoint.slot_token,
698 last_event_id.as_deref(),
699 Some(1000),
700 ) {
701 Ok(ev) => ev,
702 Err(e) => {
703 eprintln!(
707 "wire pull: endpoint {} ({:?}) errored: {}; continuing",
708 endpoint.relay_url,
709 endpoint.scope,
710 crate::relay_client::format_transport_error(&e),
711 );
712 continue;
713 }
714 };
715 total_seen += events.len();
716 let result = crate::pull::process_events(&events, last_event_id.clone(), &inbox_dir)?;
717 all_written.extend(result.written.iter().cloned());
718 all_rejected.extend(result.rejected.iter().cloned());
719 if result.blocked {
720 all_blocked = true;
721 }
722 if let Some(eid) = result.advance_cursor_to.clone() {
725 if endpoint.scope == crate::endpoints::EndpointScope::Federation {
726 all_advance_cursor_to = Some(eid.clone());
727 }
728 let key = cursor_key.clone();
729 config::update_relay_state(|state| {
730 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
731 self_obj.insert(key, Value::String(eid));
732 }
733 Ok(())
734 })?;
735 }
736 }
737
738 let result = crate::pull::PullResult {
743 written: all_written,
744 rejected: all_rejected,
745 blocked: all_blocked,
746 advance_cursor_to: all_advance_cursor_to,
747 };
748 let events_len = total_seen;
749
750 if as_json {
754 println!(
755 "{}",
756 serde_json::to_string(&json!({
757 "written": result.written,
758 "rejected": result.rejected,
759 "total_seen": events_len,
760 "cursor_blocked": result.blocked,
761 "cursor_advanced_to": result.advance_cursor_to,
762 }))?
763 );
764 } else {
765 let blocking = result
766 .rejected
767 .iter()
768 .filter(|r| r.get("blocks_cursor").and_then(Value::as_bool) == Some(true))
769 .count();
770 if blocking > 0 {
771 println!(
772 "pulled {} event(s); wrote {}; rejected {} ({} BLOCKING cursor — see `wire pull --json`)",
773 events_len,
774 result.written.len(),
775 result.rejected.len(),
776 blocking,
777 );
778 } else {
779 println!(
780 "pulled {} event(s); wrote {}; rejected {}",
781 events_len,
782 result.written.len(),
783 result.rejected.len(),
784 );
785 }
786 }
787 Ok(())
788}
789
790fn endpoint_cursor_key(scope: crate::endpoints::EndpointScope) -> String {
795 match scope {
796 crate::endpoints::EndpointScope::Federation => "last_pulled_event_id".to_string(),
797 crate::endpoints::EndpointScope::Local => "last_pulled_event_id_local".to_string(),
798 crate::endpoints::EndpointScope::Lan => "last_pulled_event_id_lan".to_string(),
799 crate::endpoints::EndpointScope::Uds => "last_pulled_event_id_uds".to_string(),
800 }
801}
802
803pub(super) fn cmd_rotate_slot(no_announce: bool, as_json: bool) -> Result<()> {
806 if !config::is_initialized()? {
807 bail!("not initialized — run `wire up` first");
808 }
809 let mut state = config::read_relay_state()?;
810 let self_state = state.get("self").cloned().unwrap_or(Value::Null);
811 if self_state.is_null() {
812 bail!("self slot not bound — run `wire bind-relay <url>` first (nothing to rotate)");
813 }
814 let primary = crate::endpoints::self_primary_endpoint(&state)
818 .ok_or_else(|| anyhow!("self has no resolvable inbound endpoint to rotate"))?;
819 let url = primary.relay_url.clone();
820 let old_slot_id = primary.slot_id.clone();
821 let old_slot_token = primary.slot_token.clone();
822
823 let card = config::read_agent_card()?;
825 let did = card
826 .get("did")
827 .and_then(Value::as_str)
828 .unwrap_or("")
829 .to_string();
830 let handle = crate::agent_card::display_handle_from_did(&did).to_string();
831 let pk_b64 = card
832 .get("verify_keys")
833 .and_then(Value::as_object)
834 .and_then(|m| m.values().next())
835 .and_then(|v| v.get("key"))
836 .and_then(Value::as_str)
837 .ok_or_else(|| anyhow!("agent-card missing verify_keys[*].key"))?
838 .to_string();
839 let pk_bytes = crate::signing::b64decode(&pk_b64)?;
840 let sk_seed = config::read_private_key()?;
841
842 let normalized = url.trim_end_matches('/').to_string();
844 let client = crate::relay_client::RelayClient::new(&normalized);
845 client
846 .check_healthz()
847 .context("aborting rotation; old slot still valid")?;
848 let alloc = client.allocate_slot(Some(&handle))?;
849 let new_slot_id = alloc.slot_id.clone();
850 let new_slot_token = alloc.slot_token.clone();
851
852 let mut announced: Vec<String> = Vec::new();
859 if !no_announce {
860 let now = time::OffsetDateTime::now_utc()
861 .format(&time::format_description::well_known::Rfc3339)
862 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
863 let body = json!({
864 "reason": "operator-initiated slot rotation",
865 "new_relay_url": url,
866 "new_slot_id": new_slot_id,
867 });
871 let peers = state["peers"].as_object().cloned().unwrap_or_default();
872 for (peer_handle, _peer_info) in peers.iter() {
873 let event = json!({
874 "schema_version": crate::signing::EVENT_SCHEMA_VERSION,
875 "timestamp": now.clone(),
876 "from": did,
877 "to": format!("did:wire:{peer_handle}"),
878 "type": "wire_close",
879 "kind": 1201,
880 "body": body.clone(),
881 });
882 let signed = match sign_message_v31(&event, &sk_seed, &pk_bytes, &handle) {
883 Ok(s) => s,
884 Err(e) => {
885 eprintln!("warn: could not sign wire_close for {peer_handle}: {e}");
886 continue;
887 }
888 };
889 let peer_info = match state["peers"].get(peer_handle) {
894 Some(p) => p.clone(),
895 None => continue,
896 };
897 let peer_url = peer_info["relay_url"].as_str().unwrap_or(&url);
898 let peer_slot_id = peer_info["slot_id"].as_str().unwrap_or("");
899 let peer_slot_token = peer_info["slot_token"].as_str().unwrap_or("");
900 if peer_slot_id.is_empty() || peer_slot_token.is_empty() {
901 continue;
902 }
903 let peer_client = if peer_url == url {
904 client.clone()
905 } else {
906 crate::relay_client::RelayClient::new(peer_url)
907 };
908 match peer_client.post_event(peer_slot_id, peer_slot_token, &signed) {
909 Ok(_) => announced.push(peer_handle.clone()),
910 Err(e) => eprintln!("warn: announce to {peer_handle} failed: {e}"),
911 }
912 }
913 }
914
915 state["self"] = json!({
917 "relay_url": url,
918 "slot_id": new_slot_id,
919 "slot_token": new_slot_token,
920 });
921 config::write_relay_state(&state)?;
922
923 if as_json {
924 println!(
925 "{}",
926 serde_json::to_string(&json!({
927 "rotated": true,
928 "old_slot_id": old_slot_id,
929 "new_slot_id": new_slot_id,
930 "relay_url": url,
931 "announced_to": announced,
932 }))?
933 );
934 } else {
935 println!("rotated slot on {url}");
936 println!(
937 " old slot_id: {old_slot_id} (orphaned — abusive bearer-holders lose their leverage)"
938 );
939 println!(" new slot_id: {new_slot_id}");
940 if !announced.is_empty() {
941 println!(
942 " announced wire_close (kind=1201) to: {}",
943 announced.join(", ")
944 );
945 }
946 println!();
947 println!("next steps:");
948 println!(" - peers see the wire_close event in their next `wire pull`");
949 println!(
950 " - paired peers must re-issue: tell them to run `wire add-peer-slot {handle} {url} {new_slot_id} <new-token>`"
951 );
952 println!(" (or full re-pair via `wire dial <handle>@<relay>`)");
953 println!(" - until they do, you'll receive but they won't be able to reach you");
954 let _ = old_slot_token;
956 }
957 Ok(())
958}
959
960pub(super) fn cmd_forget_peer(handle: &str, purge: bool, as_json: bool) -> Result<()> {
963 let mut trust = config::read_trust()?;
964 let mut removed_from_trust = false;
965 if let Some(agents) = trust.get_mut("agents").and_then(Value::as_object_mut)
966 && agents.remove(handle).is_some()
967 {
968 removed_from_trust = true;
969 }
970 config::write_trust(&trust)?;
971
972 let mut state = config::read_relay_state()?;
973 let mut removed_from_relay = false;
974 if let Some(peers) = state.get_mut("peers").and_then(Value::as_object_mut)
975 && peers.remove(handle).is_some()
976 {
977 removed_from_relay = true;
978 }
979 config::write_relay_state(&state)?;
980
981 let mut purged: Vec<String> = Vec::new();
982 if purge {
983 for dir in [config::inbox_dir()?, config::outbox_dir()?] {
984 let path = dir.join(format!("{handle}.jsonl"));
985 if path.exists() {
986 std::fs::remove_file(&path).with_context(|| format!("removing {path:?}"))?;
987 purged.push(path.to_string_lossy().into());
988 }
989 }
990 }
991
992 if !removed_from_trust && !removed_from_relay {
993 if as_json {
994 println!(
995 "{}",
996 serde_json::to_string(&json!({
997 "removed": false,
998 "reason": format!("peer {handle:?} not pinned"),
999 }))?
1000 );
1001 } else {
1002 eprintln!("peer {handle:?} not found in trust or relay state — nothing to forget");
1003 }
1004 return Ok(());
1005 }
1006
1007 if as_json {
1008 println!(
1009 "{}",
1010 serde_json::to_string(&json!({
1011 "handle": handle,
1012 "removed_from_trust": removed_from_trust,
1013 "removed_from_relay_state": removed_from_relay,
1014 "purged_files": purged,
1015 }))?
1016 );
1017 } else {
1018 println!("forgot peer {handle:?}");
1019 if removed_from_trust {
1020 println!(" - removed from trust.json");
1021 }
1022 if removed_from_relay {
1023 println!(" - removed from relay.json");
1024 }
1025 if !purged.is_empty() {
1026 for p in &purged {
1027 println!(" - deleted {p}");
1028 }
1029 } else if !purge {
1030 println!(" (inbox/outbox files preserved; pass --purge to delete them)");
1031 }
1032 }
1033 Ok(())
1034}
1035
1036pub(super) fn cmd_daemon(
1039 interval_secs: u64,
1040 once: bool,
1041 all_sessions: bool,
1042 session: Option<String>,
1043 as_json: bool,
1044) -> Result<()> {
1045 if all_sessions {
1050 if once {
1051 bail!("--all-sessions and --once are mutually exclusive (supervisor runs forever)");
1052 }
1053 if session.is_some() {
1054 bail!(
1055 "--all-sessions and --session are mutually exclusive (supervisor manages every session, not a single named one)"
1056 );
1057 }
1058 return crate::daemon_supervisor::run_supervisor(interval_secs, as_json);
1059 }
1060 if let Some(ref name) = session {
1065 let home = crate::session::find_session_home_by_name(name)
1074 .with_context(|| format!("resolving session home for --session {name}"))?
1075 .ok_or_else(|| {
1076 anyhow!(
1077 "session '{name}' not found — run `wire session list` to see initialized sessions"
1078 )
1079 })?;
1080 unsafe {
1083 std::env::set_var("WIRE_HOME", &home);
1084 }
1085 if !as_json {
1086 eprintln!(
1087 "wire daemon: pinned to session '{name}' (WIRE_HOME={})",
1088 home.display()
1089 );
1090 }
1091 }
1092 if !config::is_initialized()? {
1093 bail!("not initialized — run `wire up` first");
1094 }
1095 let _pid_guard = if !once && std::env::var("WIRE_DAEMON_NO_SINGLETON").is_err() {
1105 if let Some(holder_pid) = crate::ensure_up::daemon_singleton_holder() {
1106 if as_json {
1107 println!(
1108 "{}",
1109 serde_json::to_string(&json!({
1110 "status": "skipped",
1111 "reason": "daemon already running",
1112 "holder_pid": holder_pid,
1113 }))?
1114 );
1115 } else {
1116 eprintln!(
1117 "wire daemon: another daemon is already running (pid {holder_pid}); not starting a second polling loop. Set WIRE_DAEMON_NO_SINGLETON=1 to override."
1118 );
1119 }
1120 return Ok(());
1121 }
1122 Some(crate::ensure_up::claim_daemon_singleton()?)
1123 } else {
1124 None
1125 };
1126 if !once {
1131 crate::session::warn_on_identity_collision(std::process::id(), "daemon");
1132 }
1133 let interval = std::time::Duration::from_secs(interval_secs.max(1));
1134
1135 if !as_json {
1136 if once {
1137 eprintln!("wire daemon: single sync cycle, then exit");
1138 } else {
1139 eprintln!("wire daemon: syncing every {interval_secs}s. SIGINT to stop.");
1140 }
1141 }
1142
1143 if let Err(e) = crate::ensure_up::write_self_daemon_pid() {
1147 eprintln!("daemon: pidfile write error: {e:#}");
1148 }
1149
1150 let (wake_tx, wake_rx) = std::sync::mpsc::channel::<()>();
1156 if !once {
1157 crate::daemon_stream::spawn_stream_subscriber(wake_tx);
1158 }
1159
1160 let mut notify_state: Option<(crate::inbox_watch::InboxWatcher, std::path::PathBuf)> = if once {
1166 None
1167 } else {
1168 let cursor_path = config::state_dir()?.join("notify.cursor");
1169 match crate::inbox_watch::InboxWatcher::from_cursor_file(&cursor_path) {
1170 Ok(w) => Some((w, cursor_path)),
1171 Err(e) => {
1172 eprintln!("daemon: notify watcher init failed, toasts disabled: {e:#}");
1175 None
1176 }
1177 }
1178 };
1179
1180 loop {
1181 let pushed = run_sync_push().unwrap_or_else(|e| {
1182 eprintln!("daemon: push error: {e:#}");
1183 json!({"pushed": [], "skipped": [{"error": e.to_string()}]})
1184 });
1185 let pulled = run_sync_pull().unwrap_or_else(|e| {
1186 eprintln!("daemon: pull error: {e:#}");
1187 json!({"written": [], "rejected": [], "total_seen": 0, "error": e.to_string()})
1188 });
1189
1190 if let Some((ref mut watcher, ref cursor_path)) = notify_state {
1192 match super::comms::notify_sweep_new_events(watcher, cursor_path) {
1193 Ok(events) => super::comms::toast_inbox_events(&events),
1194 Err(e) => eprintln!("daemon: notify sweep error: {e:#}"),
1195 }
1196 }
1197
1198 let cycle_push_n = pushed["pushed"].as_array().map(|a| a.len()).unwrap_or(0);
1205 let cycle_pull_n = pulled["written"].as_array().map(|a| a.len()).unwrap_or(0);
1206 let cycle_rejected_n = pulled["rejected"].as_array().map(|a| a.len()).unwrap_or(0);
1207 crate::ensure_up::write_last_sync_record(cycle_push_n, cycle_pull_n, cycle_rejected_n);
1208
1209 if as_json {
1210 println!(
1211 "{}",
1212 serde_json::to_string(&json!({
1213 "ts": time::OffsetDateTime::now_utc()
1214 .format(&time::format_description::well_known::Rfc3339)
1215 .unwrap_or_default(),
1216 "push": pushed,
1217 "pull": pulled,
1218 }))?
1219 );
1220 } else if cycle_push_n > 0 || cycle_pull_n > 0 || cycle_rejected_n > 0 {
1221 eprintln!(
1222 "daemon: pushed={cycle_push_n} pulled={cycle_pull_n} rejected={cycle_rejected_n}"
1223 );
1224 }
1225
1226 if once {
1227 return Ok(());
1228 }
1229 match wake_rx.recv_timeout(interval) {
1242 Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
1243 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1244 std::thread::sleep(interval);
1245 }
1246 }
1247 while wake_rx.try_recv().is_ok() {}
1248 }
1249}
1250
1251pub fn run_sync_push() -> Result<Value> {
1254 let state = config::read_relay_state()?;
1255 let peers = state["peers"].as_object().cloned().unwrap_or_default();
1256 if peers.is_empty() {
1257 return Ok(json!({"pushed": [], "skipped": []}));
1258 }
1259 let outbox_dir = config::outbox_dir()?;
1260 if !outbox_dir.exists() {
1261 return Ok(json!({"pushed": [], "skipped": []}));
1262 }
1263 let mut pushed = Vec::new();
1264 let mut skipped = Vec::new();
1265 for (peer_handle, slot_info) in peers.iter() {
1266 let outbox = outbox_dir.join(format!("{peer_handle}.jsonl"));
1267 if !outbox.exists() {
1268 continue;
1269 }
1270 let url = slot_info["relay_url"].as_str().unwrap_or("");
1271 let slot_id = slot_info["slot_id"].as_str().unwrap_or("");
1272 let slot_token = slot_info["slot_token"].as_str().unwrap_or("");
1273 if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
1274 continue;
1275 }
1276 let client = crate::relay_client::RelayClient::new(url);
1277 let body = std::fs::read_to_string(&outbox)?;
1278 for line in body.lines() {
1279 let event: Value = match serde_json::from_str(line) {
1280 Ok(v) => v,
1281 Err(_) => continue,
1282 };
1283 let event_id = event
1284 .get("event_id")
1285 .and_then(Value::as_str)
1286 .unwrap_or("")
1287 .to_string();
1288 match client.post_event(slot_id, slot_token, &event) {
1289 Ok(resp) => {
1290 let now = time::OffsetDateTime::now_utc()
1299 .format(&time::format_description::well_known::Rfc3339)
1300 .unwrap_or_default();
1301 if let Err(e) = config::append_pushed_log(peer_handle, &event_id, &now) {
1302 eprintln!(
1303 "daemon: pushed-log append for {peer_handle}/{event_id} failed (non-fatal): {e:#}"
1304 );
1305 }
1306 if resp.status == "duplicate" {
1307 skipped.push(json!({"peer": peer_handle, "event_id": event_id, "reason": "duplicate"}));
1308 } else {
1309 pushed.push(json!({"peer": peer_handle, "event_id": event_id}));
1310 }
1311 }
1312 Err(e) => {
1313 let reason = crate::relay_client::format_transport_error(&e);
1317 skipped
1318 .push(json!({"peer": peer_handle, "event_id": event_id, "reason": reason}));
1319 }
1320 }
1321 }
1322 }
1323 Ok(json!({"pushed": pushed, "skipped": skipped}))
1324}
1325
1326pub fn run_sync_pull() -> Result<Value> {
1334 let state = config::read_relay_state()?;
1335 if state.get("self").map(Value::is_null).unwrap_or(true) {
1336 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
1337 }
1338 let endpoints = crate::endpoints::self_endpoints(&state);
1345 if endpoints.is_empty() {
1346 return Ok(json!({"written": [], "rejected": [], "total_seen": 0}));
1347 }
1348 let inbox_dir = config::inbox_dir()?;
1349 config::ensure_dirs()?;
1350
1351 let self_obj = state.get("self").cloned().unwrap_or(Value::Null);
1356 let legacy_cursor = self_obj
1357 .get("last_pulled_event_id")
1358 .and_then(Value::as_str)
1359 .map(str::to_string);
1360 let primary_slot = crate::endpoints::self_primary_endpoint(&state).map(|e| e.slot_id);
1361 let mut cursors: serde_json::Map<String, Value> = self_obj
1362 .get("cursors")
1363 .and_then(Value::as_object)
1364 .cloned()
1365 .unwrap_or_default();
1366
1367 let mut all_written: Vec<Value> = Vec::new();
1368 let mut all_rejected: Vec<Value> = Vec::new();
1369 let mut total_seen = 0usize;
1370 let mut blocked_any = false;
1371
1372 for ep in &endpoints {
1373 if ep.relay_url.is_empty() {
1374 continue;
1375 }
1376 let cursor = cursors
1377 .get(&ep.slot_id)
1378 .and_then(Value::as_str)
1379 .map(str::to_string)
1380 .or_else(|| {
1381 if Some(&ep.slot_id) == primary_slot.as_ref() {
1382 legacy_cursor.clone()
1383 } else {
1384 None
1385 }
1386 });
1387 let client = crate::relay_client::RelayClient::new(&ep.relay_url);
1388 let events =
1391 match client.list_events(&ep.slot_id, &ep.slot_token, cursor.as_deref(), Some(1000)) {
1392 Ok(e) => e,
1393 Err(e) => {
1394 eprintln!(
1395 "daemon: pull error on {} slot {} (continuing): {e:#}",
1396 ep.relay_url, ep.slot_id
1397 );
1398 continue;
1399 }
1400 };
1401 total_seen += events.len();
1402 let result = crate::pull::process_events(&events, cursor, &inbox_dir)?;
1405 if let Some(eid) = &result.advance_cursor_to {
1406 cursors.insert(ep.slot_id.clone(), Value::String(eid.clone()));
1407 }
1408 blocked_any |= result.blocked;
1409 all_written.extend(result.written);
1410 all_rejected.extend(result.rejected);
1411 }
1412
1413 let primary_cursor = primary_slot
1417 .as_ref()
1418 .and_then(|s| cursors.get(s))
1419 .and_then(Value::as_str)
1420 .map(str::to_string);
1421 let mut latest_inbound: std::collections::HashMap<String, String> =
1429 std::collections::HashMap::new();
1430 for w in &all_written {
1431 let from = match w.get("from").and_then(Value::as_str) {
1432 Some(s) => s.to_string(),
1433 None => continue,
1434 };
1435 let ts = match w.get("timestamp").and_then(Value::as_str) {
1436 Some(s) if !s.is_empty() => s.to_string(),
1437 _ => continue,
1438 };
1439 latest_inbound
1440 .entry(from)
1441 .and_modify(|existing| {
1442 if ts > *existing {
1443 *existing = ts.clone();
1444 }
1445 })
1446 .or_insert(ts);
1447 }
1448 config::update_relay_state(|state| {
1449 if let Some(self_obj) = state.get_mut("self").and_then(Value::as_object_mut) {
1450 self_obj.insert("cursors".into(), Value::Object(cursors.clone()));
1451 if let Some(pc) = &primary_cursor {
1452 self_obj.insert("last_pulled_event_id".into(), Value::String(pc.clone()));
1453 }
1454 }
1455 if !latest_inbound.is_empty()
1456 && let Some(peers_obj) = state.get_mut("peers").and_then(Value::as_object_mut)
1457 {
1458 for (handle, ts) in &latest_inbound {
1459 let entry = peers_obj.entry(handle.clone()).or_insert_with(|| json!({}));
1460 if let Some(obj) = entry.as_object_mut() {
1461 obj.insert("last_inbound_event_at".into(), Value::String(ts.clone()));
1462 }
1463 }
1464 }
1465 Ok(())
1466 })?;
1467
1468 Ok(json!({
1469 "written": all_written,
1470 "rejected": all_rejected,
1471 "total_seen": total_seen,
1472 "cursor_blocked": blocked_any,
1473 "endpoints_pulled": endpoints.len(),
1474 }))
1475}
1476
1477pub fn error_smells_like_slot_4xx(last_err: &str) -> bool {
1506 fn is_token_boundary(b: u8) -> bool {
1507 matches!(b, b' ' | b':' | b'\t' | b'\n' | b'\r')
1508 }
1509 let bytes = last_err.as_bytes();
1510 for code in ["410", "404"] {
1511 let code_bytes = code.as_bytes();
1512 let mut search_from = 0usize;
1513 while let Some(rel) = last_err[search_from..].find(code) {
1514 let abs = search_from + rel;
1515 let end = abs + code_bytes.len();
1516 let before_ok = abs == 0 || is_token_boundary(bytes[abs - 1]);
1517 let after_ok = end == bytes.len() || is_token_boundary(bytes[end]);
1518 if before_ok && after_ok {
1519 return true;
1520 }
1521 search_from = abs + 1;
1525 }
1526 }
1527 false
1528}
1529
1530fn try_reresolve_peer_on_slot_4xx(
1565 state: &mut Value,
1566 peer_handle: &str,
1567 last_err: &str,
1568 already_tried: &std::collections::HashSet<String>,
1569) -> Result<bool> {
1570 if !error_smells_like_slot_4xx(last_err) {
1571 return Ok(false);
1573 }
1574 if already_tried.contains(peer_handle) {
1575 return Ok(false);
1577 }
1578 let peer_entry = state
1580 .get("peers")
1581 .and_then(|p| p.get(peer_handle))
1582 .ok_or_else(|| anyhow!("peer `{peer_handle}` not in relay_state"))?;
1583 let peer_relay = peer_entry
1584 .get("endpoints")
1585 .and_then(Value::as_array)
1586 .and_then(|arr| {
1587 arr.iter().find(|e| {
1588 e.get("scope").and_then(Value::as_str) == Some("federation")
1589 || e.get("scope").and_then(Value::as_str) == Some("Federation")
1590 })
1591 })
1592 .and_then(|e| e.get("relay_url").and_then(Value::as_str))
1593 .or_else(|| peer_entry.get("relay_url").and_then(Value::as_str))
1594 .ok_or_else(|| {
1595 anyhow!("peer `{peer_handle}` has no federation endpoint to re-resolve against")
1596 })?
1597 .to_string();
1598 let domain = peer_relay
1601 .trim_start_matches("https://")
1602 .trim_start_matches("http://")
1603 .split('/')
1604 .next()
1605 .unwrap_or(&peer_relay)
1606 .to_string();
1607 let handle = crate::pair_profile::Handle {
1608 nick: peer_handle.to_string(),
1609 domain,
1610 };
1611 let resolved = crate::pair_profile::resolve_handle(&handle, Some(&peer_relay))?;
1612 let new_slot_id = resolved
1613 .get("slot_id")
1614 .and_then(Value::as_str)
1615 .ok_or_else(|| anyhow!("re-resolved payload missing slot_id"))?
1616 .to_string();
1617 let peers = state
1619 .get_mut("peers")
1620 .and_then(Value::as_object_mut)
1621 .ok_or_else(|| anyhow!("relay_state.peers missing or wrong shape"))?;
1622 let peer_entry = peers
1623 .get_mut(peer_handle)
1624 .ok_or_else(|| anyhow!("peer `{peer_handle}` disappeared from state mid-resolve"))?;
1625 let current_slot_id = peer_entry
1626 .get("endpoints")
1627 .and_then(Value::as_array)
1628 .and_then(|arr| {
1629 arr.iter().find(|e| {
1630 let scope = e.get("scope").and_then(Value::as_str);
1631 scope == Some("federation") || scope == Some("Federation")
1632 })
1633 })
1634 .and_then(|e| e.get("slot_id").and_then(Value::as_str))
1635 .unwrap_or("")
1636 .to_string();
1637 if current_slot_id == new_slot_id {
1638 return Ok(false);
1640 }
1641 if let Some(endpoints) = peer_entry
1650 .get_mut("endpoints")
1651 .and_then(Value::as_array_mut)
1652 {
1653 for ep in endpoints.iter_mut() {
1654 let scope = ep.get("scope").and_then(Value::as_str);
1655 if scope == Some("federation") || scope == Some("Federation") {
1656 ep["slot_id"] = Value::String(new_slot_id.clone());
1657 ep["slot_token"] = Value::String(String::new());
1658 }
1659 }
1660 }
1661 peer_entry["slot_id"] = Value::String(new_slot_id.clone());
1664 peer_entry["slot_token"] = Value::String(String::new());
1665 eprintln!(
1666 "wire push: peer `{peer_handle}` rotated their relay slot (was `{current_slot_id}`, \
1667 now `{new_slot_id}`); pin updated in place. Re-pair via `wire add \
1668 {peer_handle}@<relay>` to refresh the slot_token."
1669 );
1670 Ok(true)
1671}
1672
1673#[cfg(test)]
1674mod slot_reresolve_tests {
1675 use super::*;
1676
1677 #[test]
1698 fn try_reresolve_skips_when_error_is_not_4xx_shape() {
1699 let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
1700 let already = std::collections::HashSet::new();
1701 let res =
1704 try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "post failed: 502", &already)
1705 .unwrap();
1706 assert!(!res, "502 must NOT trigger a re-resolve");
1707
1708 let res =
1709 try_reresolve_peer_on_slot_4xx(&mut state, "some-peer", "connection refused", &already)
1710 .unwrap();
1711 assert!(!res, "transport errors must NOT trigger a re-resolve");
1712
1713 let res = try_reresolve_peer_on_slot_4xx(
1714 &mut state,
1715 "some-peer",
1716 "post failed: 401 Unauthorized",
1717 &already,
1718 )
1719 .unwrap();
1720 assert!(
1721 !res,
1722 "401 (auth) is a token problem, not a slot rotation — must NOT trigger a re-resolve"
1723 );
1724 }
1725
1726 #[test]
1727 fn try_reresolve_rate_limits_one_attempt_per_peer_per_push() {
1728 let mut state = json!({"peers": {"some-peer": {"endpoints": []}}});
1733 let mut already = std::collections::HashSet::new();
1734 already.insert("some-peer".to_string());
1735 let res = try_reresolve_peer_on_slot_4xx(
1736 &mut state,
1737 "some-peer",
1738 "post failed: 410 Gone",
1739 &already,
1740 )
1741 .unwrap();
1742 assert!(
1743 !res,
1744 "peer already in `already_tried` must NOT trigger another re-resolve in the same push"
1745 );
1746 }
1747
1748 #[test]
1749 fn try_reresolve_errors_when_peer_missing_from_state() {
1750 let mut state = json!({"peers": {}});
1754 let already = std::collections::HashSet::new();
1755 let err = try_reresolve_peer_on_slot_4xx(
1756 &mut state,
1757 "missing-peer",
1758 "post failed: 410 Gone",
1759 &already,
1760 )
1761 .unwrap_err()
1762 .to_string();
1763 assert!(
1764 err.contains("missing-peer") && err.contains("not in relay_state"),
1765 "missing-peer error must name the peer + the failure: {err}"
1766 );
1767 }
1768
1769 #[test]
1770 fn try_reresolve_errors_when_peer_has_no_federation_endpoint() {
1771 let mut state = json!({
1778 "peers": {
1779 "local-only": {
1780 "endpoints": [
1781 {
1782 "scope": "Local",
1783 "relay_url": "http://127.0.0.1:8771",
1784 "slot_id": "loc",
1785 "slot_token": "tok"
1786 }
1787 ]
1788 }
1789 }
1790 });
1791 let already = std::collections::HashSet::new();
1792 let err = try_reresolve_peer_on_slot_4xx(
1793 &mut state,
1794 "local-only",
1795 "post failed: 410 Gone",
1796 &already,
1797 )
1798 .unwrap_err()
1799 .to_string();
1800 assert!(
1801 err.contains("federation endpoint"),
1802 "no-federation error must name the problem: {err}"
1803 );
1804 }
1805
1806 #[test]
1822 fn error_smells_like_slot_4xx_matches_reqwest_status_display_shape() {
1823 assert!(error_smells_like_slot_4xx(
1826 "post_event failed: 410 Gone: slot rotated by peer"
1827 ));
1828 assert!(error_smells_like_slot_4xx(
1829 "post_event failed: 404 Not Found: handle no longer claimed"
1830 ));
1831 }
1832
1833 #[test]
1834 fn error_smells_like_slot_4xx_matches_uds_bare_u16_shape() {
1835 assert!(error_smells_like_slot_4xx(
1839 "post_event (uds /tmp/wire-relay.sock) failed: 410: gone"
1840 ));
1841 assert!(error_smells_like_slot_4xx(
1842 "post_event (uds /tmp/wire-relay.sock) failed: 404: not found"
1843 ));
1844 }
1845
1846 #[test]
1847 fn error_smells_like_slot_4xx_rejects_substring_lookalikes() {
1848 let false_positives = [
1852 "push aborted: slot 4101 expired",
1853 "post_event failed: 502 Bad Gateway: request_id=410abc-deadbeef",
1854 "post_event failed: 500: received 4040 bytes, expected envelope",
1855 "post_event failed: 500: event 0x4104 malformed",
1856 "post_event failed: 503: backlog=4102 entries pending",
1857 "post_event failed: 500: tx_id=4044beef",
1859 "post_event failed: 500: hash=abc410def",
1861 ];
1862 for case in false_positives {
1863 assert!(
1864 !error_smells_like_slot_4xx(case),
1865 "must NOT trigger re-resolve on substring lookalike: {case:?}"
1866 );
1867 }
1868 }
1869
1870 #[test]
1871 fn error_smells_like_slot_4xx_handles_edge_positions() {
1872 assert!(error_smells_like_slot_4xx("410 Gone"));
1874 assert!(error_smells_like_slot_4xx("404 Not Found"));
1875 assert!(error_smells_like_slot_4xx("got 410"));
1877 assert!(error_smells_like_slot_4xx("got 404"));
1878 assert!(error_smells_like_slot_4xx("post_event failed:\t410\tGone"));
1880 assert!(error_smells_like_slot_4xx("post_event failed:\n410\nGone"));
1881 assert!(error_smells_like_slot_4xx("410"));
1883 assert!(error_smells_like_slot_4xx("404"));
1884 assert!(!error_smells_like_slot_4xx(""));
1886 assert!(!error_smells_like_slot_4xx("no relevant status"));
1887 assert!(!error_smells_like_slot_4xx(
1890 "post_event failed: 401 Unauthorized"
1891 ));
1892 assert!(!error_smells_like_slot_4xx(
1893 "post_event failed: 403 Forbidden"
1894 ));
1895 assert!(!error_smells_like_slot_4xx(
1896 "post_event failed: 411 Length Required"
1897 ));
1898 }
1899}