wire/send.rs
1//! Synchronous event delivery — collapses the legacy
2//! `wire send → outbox → daemon push → relay` 3-step into a single
3//! direct relay POST.
4//!
5//! ## Why this exists
6//!
7//! Paul (2026-06-01): *"Why are we dealing with this whole outbox
8//! queued delivered thing it's a headache and always breaks can we
9//! streamline and collapse steps."*
10//!
11//! Pre-fix, every `wire send` (CLI and MCP) wrote to
12//! `<outbox_dir>/<peer>.jsonl` and returned `status: "queued"`. The
13//! daemon's 5s push loop later POSTed the event to the relay. Three
14//! distinct silent-drop classes hide in those steps:
15//!
16//! 1. **outbox write succeeds, daemon never pushes** — daemon dead,
17//! daemon on wrong WIRE_HOME, TLS broken (the #176 → #183 saga),
18//! operator never ran `wire push`. `queued` looked like success
19//! but no byte ever left the box.
20//! 2. **daemon pushed, peer's relay slot stale** — earlier
21//! half-paired state, peer rotated slot, slot_token expired (the
22//! brisk-iris case). Push got 4xx, marked as skipped in the daemon
23//! log, operator never sees it from the `wire send` side.
24//! 3. **content-hash dedup blocks retries** — `event_id` is
25//! `sha256(canonical(body))`. Sending the same body twice produces
26//! the same event_id; relay drops the second as `duplicate`. Retry
27//! feels like success but never reaches the peer.
28//!
29//! ## The new contract
30//!
31//! - **Default** (`wire send`, `tool_send`): synchronous POST to the
32//! peer's pinned relay slot. Returns `Delivered` / `Duplicate` /
33//! `Failed` inline. No outbox write on the happy path. Operator
34//! sees the actual verdict, not a fake `queued`.
35//!
36//! - **`--queue` opt-in** (CLI flag; MCP `queue: true` arg):
37//! preserves the legacy outbox-write path for explicit batching /
38//! offline-buffer / pre-pair queue use cases. The daemon's
39//! `run_sync_push` loop continues to drain the outbox so anything
40//! written via this path still delivers.
41//!
42//! - **Peer not pinned**: the relay coords are unknown — sync POST
43//! is impossible. We error explicitly with a hint to run
44//! `wire dial <peer>` (or pass `--queue` if the operator wants
45//! pre-pair queueing). Pre-fix this case silently wrote to outbox
46//! and the daemon would never push it; now it's loud.
47//!
48//! - **Stale slot (4xx from relay)**: return `Failed` with the slot
49//! error string. The existing `cli::error_smells_like_slot_4xx`
50//! classifier already detects this shape; the caller surfaces the
51//! re-resolve hint. We do NOT auto-re-pair without the operator's
52//! consent (that's `wire dial`'s job).
53
54use anyhow::{Context, Result};
55use serde::Serialize;
56use serde_json::{Value, json};
57
58/// Result of attempting a synchronous delivery to a peer.
59#[derive(Debug, Clone, Serialize)]
60#[serde(tag = "status", rename_all = "snake_case")]
61pub enum SyncDelivery {
62 /// Relay accepted the event. First-time landing on the peer's slot.
63 Delivered {
64 event_id: String,
65 relay_url: String,
66 slot_id: String,
67 },
68 /// Relay said `duplicate` — same `event_id` already on the slot.
69 /// Not a failure: the relay HAS the event, the peer can pull it.
70 /// Surfaced distinctly so the caller can decide whether to nudge
71 /// content uniqueness on the next attempt.
72 Duplicate {
73 event_id: String,
74 relay_url: String,
75 slot_id: String,
76 },
77 /// Peer isn't in `relay_state.peers` — no slot coords to POST to.
78 /// This is the explicit "you haven't paired yet" case. The
79 /// caller should either suggest `wire dial <peer>` or write
80 /// to outbox via the `--queue` opt-in.
81 PeerUnknown { event_id: String },
82 /// Relay returned a 4xx/410 — slot has rotated, token expired,
83 /// peer half-paired and never completed bilateral. The caller
84 /// surfaces a hint to `wire dial <peer>`.
85 SlotStale {
86 event_id: String,
87 relay_url: String,
88 slot_id: String,
89 detail: String,
90 },
91 /// Transport failure (TLS, DNS, connect timeout, 5xx). The
92 /// caller decides whether to fall back to `--queue` or surface
93 /// the error.
94 TransportError {
95 event_id: String,
96 relay_url: String,
97 slot_id: String,
98 detail: String,
99 },
100}
101
102impl SyncDelivery {
103 /// Compact status string for callers that just want the verdict.
104 /// Same shape as the JSON `status` field.
105 pub fn status_str(&self) -> &'static str {
106 match self {
107 SyncDelivery::Delivered { .. } => "delivered",
108 SyncDelivery::Duplicate { .. } => "duplicate",
109 SyncDelivery::PeerUnknown { .. } => "peer_unknown",
110 SyncDelivery::SlotStale { .. } => "slot_stale",
111 SyncDelivery::TransportError { .. } => "transport_error",
112 }
113 }
114
115 /// True when the event reached the relay (Delivered or
116 /// Duplicate). Both states mean the peer CAN pull it.
117 pub fn reached_relay(&self) -> bool {
118 matches!(
119 self,
120 SyncDelivery::Delivered { .. } | SyncDelivery::Duplicate { .. }
121 )
122 }
123
124 pub fn event_id(&self) -> &str {
125 match self {
126 SyncDelivery::Delivered { event_id, .. }
127 | SyncDelivery::Duplicate { event_id, .. }
128 | SyncDelivery::PeerUnknown { event_id }
129 | SyncDelivery::SlotStale { event_id, .. }
130 | SyncDelivery::TransportError { event_id, .. } => event_id,
131 }
132 }
133}
134
135/// Attempt synchronous delivery of `signed_event` to `peer_handle`.
136///
137/// Reads the peer's slot coords from `relay_state.peers`, builds a
138/// `RelayClient`, POSTs the event. Maps every observable outcome onto
139/// a [`SyncDelivery`] variant.
140///
141/// On success (`Delivered` or `Duplicate`), appends a row to the
142/// per-peer pushed log (`<outbox_dir>/<peer>.pushed.jsonl`) so the
143/// `pending_push_count` counter in `wire status` stays accurate
144/// across both code paths (sync send + legacy daemon push).
145pub fn attempt_deliver(peer_handle: &str, signed_event: &Value) -> Result<SyncDelivery> {
146 let event_id = signed_event
147 .get("event_id")
148 .and_then(Value::as_str)
149 .unwrap_or("")
150 .to_string();
151
152 // RFC-006 Part B: resolve the peer's reachable endpoints from `endpoints[]`
153 // — the single peer-routing source — highest-priority first (UDS → local →
154 // LAN → federation). No pinned endpoints → PeerUnknown so the caller can
155 // act. We try each in order and return on the first that reaches the relay
156 // (priority failover — e.g. a sister's local relay first, federation as
157 // backup); if all fail, the last failure verdict is returned.
158 let state = crate::config::read_relay_state().context("reading relay state")?;
159 let endpoints = crate::endpoints::peer_endpoints_in_priority_order(&state, peer_handle);
160 if endpoints.is_empty() {
161 return Ok(SyncDelivery::PeerUnknown { event_id });
162 }
163
164 let mut last_failure: Option<SyncDelivery> = None;
165 for ep in endpoints {
166 if ep.relay_url.is_empty() || ep.slot_id.is_empty() || ep.slot_token.is_empty() {
167 continue;
168 }
169 let client = crate::relay_client::RelayClient::new(&ep.relay_url);
170 match client.post_event(&ep.slot_id, &ep.slot_token, signed_event) {
171 Ok(resp) => {
172 // Append a row to the per-peer pushed log so
173 // `pending_push_count` decrements regardless of whether the
174 // event reached the relay via sync send (this path) or via
175 // daemon push. Non-fatal on append failure.
176 let now = time::OffsetDateTime::now_utc()
177 .format(&time::format_description::well_known::Rfc3339)
178 .unwrap_or_default();
179 if let Err(e) = crate::config::append_pushed_log(peer_handle, &event_id, &now) {
180 eprintln!(
181 "wire send: pushed-log append for {peer_handle}/{event_id} failed (non-fatal): {e:#}"
182 );
183 }
184 return Ok(if resp.status == "duplicate" {
185 SyncDelivery::Duplicate {
186 event_id,
187 relay_url: ep.relay_url,
188 slot_id: ep.slot_id,
189 }
190 } else {
191 SyncDelivery::Delivered {
192 event_id,
193 relay_url: ep.relay_url,
194 slot_id: ep.slot_id,
195 }
196 });
197 }
198 Err(e) => {
199 let detail = crate::relay_client::format_transport_error(&e);
200 // Classify 4xx/410 (stale slot) distinctly from transport
201 // errors; reuse the relay's error-text classifier so both
202 // paths agree. Keep as last_failure and try the next endpoint.
203 last_failure = Some(if crate::cli::error_smells_like_slot_4xx(&detail) {
204 SyncDelivery::SlotStale {
205 event_id: event_id.clone(),
206 relay_url: ep.relay_url,
207 slot_id: ep.slot_id,
208 detail,
209 }
210 } else {
211 SyncDelivery::TransportError {
212 event_id: event_id.clone(),
213 relay_url: ep.relay_url,
214 slot_id: ep.slot_id,
215 detail,
216 }
217 });
218 }
219 }
220 }
221
222 // Every endpoint failed (or all carried empty coords).
223 Ok(last_failure.unwrap_or(SyncDelivery::PeerUnknown { event_id }))
224}
225
226/// Render a `SyncDelivery` as the JSON value `wire send --json` /
227/// `tool_send` return. Fields are flat (no nested struct) so JSON
228/// consumers can read `.status` + `.event_id` directly without
229/// pattern-matching the variant tag.
230pub fn delivery_json(d: &SyncDelivery, peer: &str) -> Value {
231 let base = json!({
232 "status": d.status_str(),
233 "peer": peer,
234 "event_id": d.event_id(),
235 });
236 let mut obj = base.as_object().cloned().unwrap_or_default();
237 match d {
238 SyncDelivery::Delivered {
239 relay_url, slot_id, ..
240 }
241 | SyncDelivery::Duplicate {
242 relay_url, slot_id, ..
243 } => {
244 obj.insert("relay_url".into(), json!(relay_url));
245 obj.insert("slot_id".into(), json!(slot_id));
246 }
247 SyncDelivery::SlotStale {
248 relay_url,
249 slot_id,
250 detail,
251 ..
252 }
253 | SyncDelivery::TransportError {
254 relay_url,
255 slot_id,
256 detail,
257 ..
258 } => {
259 obj.insert("relay_url".into(), json!(relay_url));
260 obj.insert("slot_id".into(), json!(slot_id));
261 obj.insert("reason".into(), json!(detail));
262 }
263 SyncDelivery::PeerUnknown { .. } => {
264 obj.insert(
265 "reason".into(),
266 json!(format!(
267 "peer '{peer}' not pinned — run `wire dial {peer}` to pair, or pass --queue (CLI) / queue:true (MCP) to write to outbox for the daemon to attempt later"
268 )),
269 );
270 }
271 }
272 Value::Object(obj)
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn status_str_matches_variant() {
281 let d = SyncDelivery::Delivered {
282 event_id: "x".into(),
283 relay_url: "https://r".into(),
284 slot_id: "s".into(),
285 };
286 assert_eq!(d.status_str(), "delivered");
287 assert!(d.reached_relay());
288
289 let d = SyncDelivery::Duplicate {
290 event_id: "x".into(),
291 relay_url: "https://r".into(),
292 slot_id: "s".into(),
293 };
294 assert_eq!(d.status_str(), "duplicate");
295 assert!(
296 d.reached_relay(),
297 "duplicate counts as relay-reached: peer can pull it"
298 );
299
300 let d = SyncDelivery::PeerUnknown {
301 event_id: "x".into(),
302 };
303 assert_eq!(d.status_str(), "peer_unknown");
304 assert!(!d.reached_relay());
305
306 let d = SyncDelivery::SlotStale {
307 event_id: "x".into(),
308 relay_url: "https://r".into(),
309 slot_id: "s".into(),
310 detail: "410".into(),
311 };
312 assert_eq!(d.status_str(), "slot_stale");
313 assert!(!d.reached_relay());
314
315 let d = SyncDelivery::TransportError {
316 event_id: "x".into(),
317 relay_url: "https://r".into(),
318 slot_id: "s".into(),
319 detail: "tls".into(),
320 };
321 assert_eq!(d.status_str(), "transport_error");
322 assert!(!d.reached_relay());
323 }
324
325 #[test]
326 fn delivery_json_includes_reason_only_for_failures() {
327 let ok = SyncDelivery::Delivered {
328 event_id: "abc".into(),
329 relay_url: "https://r".into(),
330 slot_id: "s".into(),
331 };
332 let v = delivery_json(&ok, "alice");
333 assert_eq!(v["status"], "delivered");
334 assert_eq!(v["event_id"], "abc");
335 assert_eq!(v["peer"], "alice");
336 assert_eq!(v["relay_url"], "https://r");
337 assert!(v.get("reason").is_none(), "happy path has no reason field");
338
339 let bad = SyncDelivery::TransportError {
340 event_id: "abc".into(),
341 relay_url: "https://r".into(),
342 slot_id: "s".into(),
343 detail: "TLS error: UnknownIssuer".into(),
344 };
345 let v = delivery_json(&bad, "alice");
346 assert_eq!(v["status"], "transport_error");
347 assert_eq!(v["reason"], "TLS error: UnknownIssuer");
348
349 let unknown = SyncDelivery::PeerUnknown {
350 event_id: "abc".into(),
351 };
352 let v = delivery_json(&unknown, "alice");
353 assert_eq!(v["status"], "peer_unknown");
354 assert!(
355 v["reason"]
356 .as_str()
357 .unwrap_or("")
358 .contains("wire dial alice")
359 );
360 }
361}