Skip to main content

wire/
send.rs

1//! Synchronous event delivery — collapses the legacy
2//! `wire send → outbox → daemon push → relay` 3-step into a single
3//! direct relay POST.
4//!
5//! ## Why this exists
6//!
7//! Paul (2026-06-01): *"Why are we dealing with this whole outbox
8//! queued delivered thing it's a headache and always breaks can we
9//! streamline and collapse steps."*
10//!
11//! Pre-fix, every `wire send` (CLI and MCP) wrote to
12//! `<outbox_dir>/<peer>.jsonl` and returned `status: "queued"`. The
13//! daemon's 5s push loop later POSTed the event to the relay. Three
14//! distinct silent-drop classes hide in those steps:
15//!
16//! 1. **outbox write succeeds, daemon never pushes** — daemon dead,
17//!    daemon on wrong WIRE_HOME, TLS broken (the #176 → #183 saga),
18//!    operator never ran `wire push`. `queued` looked like success
19//!    but no byte ever left the box.
20//! 2. **daemon pushed, peer's relay slot stale** — earlier
21//!    half-paired state, peer rotated slot, slot_token expired (the
22//!    brisk-iris case). Push got 4xx, marked as skipped in the daemon
23//!    log, operator never sees it from the `wire send` side.
24//! 3. **content-hash dedup blocks retries** — `event_id` is
25//!    `sha256(canonical(body))`. Sending the same body twice produces
26//!    the same event_id; relay drops the second as `duplicate`. Retry
27//!    feels like success but never reaches the peer.
28//!
29//! ## The new contract
30//!
31//! - **Default** (`wire send`, `tool_send`): synchronous POST to the
32//!   peer's pinned relay slot. Returns `Delivered` / `Duplicate` /
33//!   `Failed` inline. No outbox write on the happy path. Operator
34//!   sees the actual verdict, not a fake `queued`.
35//!
36//! - **`--queue` opt-in** (CLI flag; MCP `queue: true` arg):
37//!   preserves the legacy outbox-write path for explicit batching /
38//!   offline-buffer / pre-pair queue use cases. The daemon's
39//!   `run_sync_push` loop continues to drain the outbox so anything
40//!   written via this path still delivers.
41//!
42//! - **Peer not pinned**: the relay coords are unknown — sync POST
43//!   is impossible. We error explicitly with a hint to run
44//!   `wire dial <peer>` (or pass `--queue` if the operator wants
45//!   pre-pair queueing). Pre-fix this case silently wrote to outbox
46//!   and the daemon would never push it; now it's loud.
47//!
48//! - **Stale slot (4xx from relay)**: return `Failed` with the slot
49//!   error string. The existing `cli::error_smells_like_slot_4xx`
50//!   classifier already detects this shape; the caller surfaces the
51//!   re-resolve hint. We do NOT auto-re-pair without the operator's
52//!   consent (that's `wire dial`'s job).
53
54use anyhow::{Context, Result};
55use serde::Serialize;
56use serde_json::{Value, json};
57
58/// Result of attempting a synchronous delivery to a peer.
59#[derive(Debug, Clone, Serialize)]
60#[serde(tag = "status", rename_all = "snake_case")]
61pub enum SyncDelivery {
62    /// Relay accepted the event. First-time landing on the peer's slot.
63    Delivered {
64        event_id: String,
65        relay_url: String,
66        slot_id: String,
67    },
68    /// Relay said `duplicate` — same `event_id` already on the slot.
69    /// Not a failure: the relay HAS the event, the peer can pull it.
70    /// Surfaced distinctly so the caller can decide whether to nudge
71    /// content uniqueness on the next attempt.
72    Duplicate {
73        event_id: String,
74        relay_url: String,
75        slot_id: String,
76    },
77    /// Peer isn't in `relay_state.peers` — no slot coords to POST to.
78    /// This is the explicit "you haven't paired yet" case. The
79    /// caller should either suggest `wire dial <peer>` or write
80    /// to outbox via the `--queue` opt-in.
81    PeerUnknown { event_id: String },
82    /// Relay returned a 4xx/410 — slot has rotated, token expired,
83    /// peer half-paired and never completed bilateral. The caller
84    /// surfaces a hint to `wire dial <peer>`.
85    SlotStale {
86        event_id: String,
87        relay_url: String,
88        slot_id: String,
89        detail: String,
90    },
91    /// Transport failure (TLS, DNS, connect timeout, 5xx). The
92    /// caller decides whether to fall back to `--queue` or surface
93    /// the error.
94    TransportError {
95        event_id: String,
96        relay_url: String,
97        slot_id: String,
98        detail: String,
99    },
100}
101
102impl SyncDelivery {
103    /// Compact status string for callers that just want the verdict.
104    /// Same shape as the JSON `status` field.
105    pub fn status_str(&self) -> &'static str {
106        match self {
107            SyncDelivery::Delivered { .. } => "delivered",
108            SyncDelivery::Duplicate { .. } => "duplicate",
109            SyncDelivery::PeerUnknown { .. } => "peer_unknown",
110            SyncDelivery::SlotStale { .. } => "slot_stale",
111            SyncDelivery::TransportError { .. } => "transport_error",
112        }
113    }
114
115    /// True when the event reached the relay (Delivered or
116    /// Duplicate). Both states mean the peer CAN pull it.
117    pub fn reached_relay(&self) -> bool {
118        matches!(
119            self,
120            SyncDelivery::Delivered { .. } | SyncDelivery::Duplicate { .. }
121        )
122    }
123
124    pub fn event_id(&self) -> &str {
125        match self {
126            SyncDelivery::Delivered { event_id, .. }
127            | SyncDelivery::Duplicate { event_id, .. }
128            | SyncDelivery::PeerUnknown { event_id }
129            | SyncDelivery::SlotStale { event_id, .. }
130            | SyncDelivery::TransportError { event_id, .. } => event_id,
131        }
132    }
133}
134
135/// Attempt synchronous delivery of `signed_event` to `peer_handle`.
136///
137/// Reads the peer's slot coords from `relay_state.peers`, builds a
138/// `RelayClient`, POSTs the event. Maps every observable outcome onto
139/// a [`SyncDelivery`] variant.
140///
141/// On success (`Delivered` or `Duplicate`), appends a row to the
142/// per-peer pushed log (`<outbox_dir>/<peer>.pushed.jsonl`) so the
143/// `pending_push_count` counter in `wire status` stays accurate
144/// across both code paths (sync send + legacy daemon push).
145pub fn attempt_deliver(peer_handle: &str, signed_event: &Value) -> Result<SyncDelivery> {
146    let event_id = signed_event
147        .get("event_id")
148        .and_then(Value::as_str)
149        .unwrap_or("")
150        .to_string();
151
152    // Resolve the peer's slot coords. Missing peer / missing fields →
153    // PeerUnknown so the caller can act.
154    let state = crate::config::read_relay_state().context("reading relay state")?;
155    let peer_obj = match state
156        .get("peers")
157        .and_then(Value::as_object)
158        .and_then(|m| m.get(peer_handle))
159    {
160        Some(v) => v,
161        None => return Ok(SyncDelivery::PeerUnknown { event_id }),
162    };
163    let relay_url = peer_obj
164        .get("relay_url")
165        .and_then(Value::as_str)
166        .unwrap_or("")
167        .to_string();
168    let slot_id = peer_obj
169        .get("slot_id")
170        .and_then(Value::as_str)
171        .unwrap_or("")
172        .to_string();
173    let slot_token = peer_obj
174        .get("slot_token")
175        .and_then(Value::as_str)
176        .unwrap_or("")
177        .to_string();
178    if relay_url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
179        return Ok(SyncDelivery::PeerUnknown { event_id });
180    }
181
182    // POST.
183    let client = crate::relay_client::RelayClient::new(&relay_url);
184    match client.post_event(&slot_id, &slot_token, signed_event) {
185        Ok(resp) => {
186            // Append a row to the per-peer pushed log so
187            // `pending_push_count` decrements regardless of whether
188            // the event reached the relay via sync send (this path)
189            // or via daemon push. Non-fatal on append failure.
190            let now = time::OffsetDateTime::now_utc()
191                .format(&time::format_description::well_known::Rfc3339)
192                .unwrap_or_default();
193            if let Err(e) = crate::config::append_pushed_log(peer_handle, &event_id, &now) {
194                eprintln!(
195                    "wire send: pushed-log append for {peer_handle}/{event_id} failed (non-fatal): {e:#}"
196                );
197            }
198            if resp.status == "duplicate" {
199                Ok(SyncDelivery::Duplicate {
200                    event_id,
201                    relay_url,
202                    slot_id,
203                })
204            } else {
205                Ok(SyncDelivery::Delivered {
206                    event_id,
207                    relay_url,
208                    slot_id,
209                })
210            }
211        }
212        Err(e) => {
213            let detail = crate::relay_client::format_transport_error(&e);
214            // Classify 4xx/410 (stale slot) distinctly from transport
215            // errors. The existing `cli::error_smells_like_slot_4xx`
216            // helper matches the relay's error text shape; reuse it
217            // so both code paths share the same classifier.
218            if crate::cli::error_smells_like_slot_4xx(&detail) {
219                Ok(SyncDelivery::SlotStale {
220                    event_id,
221                    relay_url,
222                    slot_id,
223                    detail,
224                })
225            } else {
226                Ok(SyncDelivery::TransportError {
227                    event_id,
228                    relay_url,
229                    slot_id,
230                    detail,
231                })
232            }
233        }
234    }
235}
236
237/// Render a `SyncDelivery` as the JSON value `wire send --json` /
238/// `tool_send` return. Fields are flat (no nested struct) so JSON
239/// consumers can read `.status` + `.event_id` directly without
240/// pattern-matching the variant tag.
241pub fn delivery_json(d: &SyncDelivery, peer: &str) -> Value {
242    let base = json!({
243        "status": d.status_str(),
244        "peer": peer,
245        "event_id": d.event_id(),
246    });
247    let mut obj = base.as_object().cloned().unwrap_or_default();
248    match d {
249        SyncDelivery::Delivered {
250            relay_url, slot_id, ..
251        }
252        | SyncDelivery::Duplicate {
253            relay_url, slot_id, ..
254        } => {
255            obj.insert("relay_url".into(), json!(relay_url));
256            obj.insert("slot_id".into(), json!(slot_id));
257        }
258        SyncDelivery::SlotStale {
259            relay_url,
260            slot_id,
261            detail,
262            ..
263        }
264        | SyncDelivery::TransportError {
265            relay_url,
266            slot_id,
267            detail,
268            ..
269        } => {
270            obj.insert("relay_url".into(), json!(relay_url));
271            obj.insert("slot_id".into(), json!(slot_id));
272            obj.insert("reason".into(), json!(detail));
273        }
274        SyncDelivery::PeerUnknown { .. } => {
275            obj.insert(
276                "reason".into(),
277                json!(format!(
278                    "peer '{peer}' not pinned — run `wire dial {peer}` to pair, or pass --queue (CLI) / queue:true (MCP) to write to outbox for the daemon to attempt later"
279                )),
280            );
281        }
282    }
283    Value::Object(obj)
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn status_str_matches_variant() {
292        let d = SyncDelivery::Delivered {
293            event_id: "x".into(),
294            relay_url: "https://r".into(),
295            slot_id: "s".into(),
296        };
297        assert_eq!(d.status_str(), "delivered");
298        assert!(d.reached_relay());
299
300        let d = SyncDelivery::Duplicate {
301            event_id: "x".into(),
302            relay_url: "https://r".into(),
303            slot_id: "s".into(),
304        };
305        assert_eq!(d.status_str(), "duplicate");
306        assert!(
307            d.reached_relay(),
308            "duplicate counts as relay-reached: peer can pull it"
309        );
310
311        let d = SyncDelivery::PeerUnknown {
312            event_id: "x".into(),
313        };
314        assert_eq!(d.status_str(), "peer_unknown");
315        assert!(!d.reached_relay());
316
317        let d = SyncDelivery::SlotStale {
318            event_id: "x".into(),
319            relay_url: "https://r".into(),
320            slot_id: "s".into(),
321            detail: "410".into(),
322        };
323        assert_eq!(d.status_str(), "slot_stale");
324        assert!(!d.reached_relay());
325
326        let d = SyncDelivery::TransportError {
327            event_id: "x".into(),
328            relay_url: "https://r".into(),
329            slot_id: "s".into(),
330            detail: "tls".into(),
331        };
332        assert_eq!(d.status_str(), "transport_error");
333        assert!(!d.reached_relay());
334    }
335
336    #[test]
337    fn delivery_json_includes_reason_only_for_failures() {
338        let ok = SyncDelivery::Delivered {
339            event_id: "abc".into(),
340            relay_url: "https://r".into(),
341            slot_id: "s".into(),
342        };
343        let v = delivery_json(&ok, "alice");
344        assert_eq!(v["status"], "delivered");
345        assert_eq!(v["event_id"], "abc");
346        assert_eq!(v["peer"], "alice");
347        assert_eq!(v["relay_url"], "https://r");
348        assert!(v.get("reason").is_none(), "happy path has no reason field");
349
350        let bad = SyncDelivery::TransportError {
351            event_id: "abc".into(),
352            relay_url: "https://r".into(),
353            slot_id: "s".into(),
354            detail: "TLS error: UnknownIssuer".into(),
355        };
356        let v = delivery_json(&bad, "alice");
357        assert_eq!(v["status"], "transport_error");
358        assert_eq!(v["reason"], "TLS error: UnknownIssuer");
359
360        let unknown = SyncDelivery::PeerUnknown {
361            event_id: "abc".into(),
362        };
363        let v = delivery_json(&unknown, "alice");
364        assert_eq!(v["status"], "peer_unknown");
365        assert!(
366            v["reason"]
367                .as_str()
368                .unwrap_or("")
369                .contains("wire dial alice")
370        );
371    }
372}