Skip to main content

wire/
daemon_stream.rs

1//! Daemon-side SSE stream subscriber (R1 phase 2, v0.5.6).
2//!
3//! Opens a long-lived `GET /v1/events/:slot_id/stream` connection to the
4//! relay using the operator's own slot_token, parses SSE `data:` lines as
5//! they arrive, and pings a wake-channel for each event. The daemon's main
6//! loop replaces `std::thread::sleep(interval)` with `recv_timeout(interval)`
7//! against this channel, so a posted event traverses sender → relay →
8//! subscriber → local inbox in ~10-50ms instead of waiting for the next
9//! ~5s poll tick.
10//!
11//! Failure model: if the stream errors or disconnects, the subscriber
12//! reconnects with exponential backoff (1s → 2s → 4s → 8s → 30s cap). The
13//! daemon's regular polling loop is unaffected and continues as a safety
14//! net — stream-down does NOT mean events-down. Operator running
15//! `wire daemon` with no relay reachability sees both signals (stream
16//! reconnect retries + poll errors) and can diagnose.
17//!
18//! Design note: this is a one-way wake signal, not the data path. The
19//! actual `run_sync_pull` re-fetches via `list_events` so we get
20//! signature verification, dedup, and inbox write through the exact same
21//! code path as polling. The stream only changes WHEN pull runs, not HOW.
22
23use anyhow::Result;
24use std::io::{BufRead, BufReader};
25use std::sync::mpsc::Sender;
26use std::time::Duration;
27
28/// Stream-state file written by `run_subscriber` on every state
29/// transition. Surfaced via `tool_status` so an operator can tell
30/// "stream alive" (live monitor will fire on inbound) from
31/// "polling-only" (daemon up, monitor will wait until next poll). The
32/// file is best-effort; missing/unreadable counts as "unknown" and the
33/// reader degrades gracefully.
34fn stream_state_path() -> Option<std::path::PathBuf> {
35    crate::config::state_dir()
36        .ok()
37        .map(|d| d.join("stream_state.json"))
38}
39
40/// Write the current stream-state snapshot. Best-effort; an unwritable
41/// state-dir does not block the subscriber loop. Schema-versioned so
42/// future fields (per-event count, reconnect attempt index) can land
43/// additively without breaking older readers.
44fn write_stream_state(state: &str, last_event_at: Option<&str>, reconnects: u64) {
45    if let Some(path) = stream_state_path() {
46        if let Some(parent) = path.parent() {
47            let _ = std::fs::create_dir_all(parent);
48        }
49        let ts = time::OffsetDateTime::now_utc()
50            .format(&time::format_description::well_known::Rfc3339)
51            .unwrap_or_default();
52        let body = serde_json::json!({
53            "schema": "wire-daemon-stream-state-v1",
54            "ts": ts,
55            "state": state,
56            "last_event_at": last_event_at,
57            "reconnect_count": reconnects,
58        });
59        let _ = std::fs::write(&path, serde_json::to_vec_pretty(&body).unwrap_or_default());
60    }
61}
62
63/// Spawn the stream-subscriber thread. Returns immediately; the thread
64/// runs until process exit. `wake_tx` is signaled on every received SSE
65/// `data:` line (any event, no parsing of body). Errors during connect or
66/// stream-read trigger reconnect-with-backoff, never panic.
67pub fn spawn_stream_subscriber(wake_tx: Sender<()>) {
68    std::thread::Builder::new()
69        .name("wire-stream-sub".into())
70        .spawn(move || run_subscriber(wake_tx))
71        .expect("spawn wire-stream-sub thread");
72}
73
74fn run_subscriber(wake_tx: Sender<()>) {
75    let mut backoff_secs = 1u64;
76    let mut reconnects: u64 = 0;
77    let mut last_event_at: Option<String> = None;
78    write_stream_state("connecting", last_event_at.as_deref(), reconnects);
79    loop {
80        // We wrap a closure so the connect-and-read inner can stamp
81        // last_event_at into our outer scope on every wake without
82        // restructuring the existing signature. The Vec<String> carries
83        // at most one timestamp (latest); polled by reference below.
84        let mut latest_event_ts: Vec<String> = Vec::new();
85        // v0.14.3 (coral dogfood 2026-06-01): pass accumulated
86        // `last_event_at` + `reconnects` so the "connected" write
87        // inside connect_and_read preserves them. Pre-fix, every
88        // successful reconnect overwrote stream_state.json with
89        // `last_event_at:null, reconnect_count:0` even after
90        // events had arrived + previous reconnects had occurred.
91        // Operator surface always read "last event never" on
92        // long-running daemons.
93        let outcome = connect_and_read(
94            &wake_tx,
95            &mut latest_event_ts,
96            last_event_at.as_deref(),
97            reconnects,
98        );
99        if let Some(ts) = latest_event_ts.into_iter().last() {
100            last_event_at = Some(ts);
101        }
102        match outcome {
103            Ok(()) => {
104                // Stream closed cleanly (e.g., server reload). Quick reconnect.
105                backoff_secs = 1;
106                reconnects += 1;
107                eprintln!("daemon-stream: connection closed cleanly, reconnecting");
108                write_stream_state("reconnecting", last_event_at.as_deref(), reconnects);
109            }
110            Err(e) => {
111                reconnects += 1;
112                eprintln!("daemon-stream: error {e:#}; reconnecting in {backoff_secs}s");
113                write_stream_state("error", last_event_at.as_deref(), reconnects);
114                std::thread::sleep(Duration::from_secs(backoff_secs));
115                backoff_secs = (backoff_secs * 2).min(30);
116            }
117        }
118    }
119}
120
121fn connect_and_read(
122    wake_tx: &Sender<()>,
123    last_event_ts: &mut Vec<String>,
124    accumulated_last_event_at: Option<&str>,
125    accumulated_reconnects: u64,
126) -> Result<()> {
127    // Re-read relay-state on each reconnect so a fresh slot allocation /
128    // rotation picks up automatically without daemon restart.
129    let state = crate::config::read_relay_state()?;
130    let self_state = state
131        .get("self")
132        .cloned()
133        .unwrap_or(serde_json::Value::Null);
134    let url = self_state
135        .get("relay_url")
136        .and_then(|v| v.as_str())
137        .unwrap_or("");
138    let slot_id = self_state
139        .get("slot_id")
140        .and_then(|v| v.as_str())
141        .unwrap_or("");
142    let slot_token = self_state
143        .get("slot_token")
144        .and_then(|v| v.as_str())
145        .unwrap_or("");
146    if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
147        return Err(anyhow::anyhow!(
148            "stream-sub: relay-state missing self.{{relay_url,slot_id,slot_token}} — sleep until next reconnect"
149        ));
150    }
151
152    let stream_url = format!("{url}/v1/events/{slot_id}/stream");
153    // v0.5.13: honor WIRE_INSECURE_SKIP_TLS_VERIFY on the stream sub too,
154    // matching the rest of the wire HTTPS surface (issue #6).
155    let client = {
156        let cfg = crate::tls::shared_client_config();
157        let mut b = reqwest::blocking::Client::builder()
158            // v0.14.2 #177: same dual-roots config the rest of wire's
159            // HTTPS surface uses. SSE used to build its own bare
160            // client which inherited reqwest's default root source
161            // (webpki only under #176's feature flag); now both
162            // surfaces share `tls::shared_client_config`.
163            .use_preconfigured_tls((*cfg).clone())
164            // No total timeout: stream is expected to stay open indefinitely.
165            // TCP keepalive catches a hung connection (server crashed, network
166            // black hole) — the BufReader::lines loop returns Err and the
167            // outer reconnect-with-backoff kicks in.
168            // v0.14.2 (#162 fix #7): tightened TCP keepalive from 60s to
169            // 30s so the kernel-level dead-connection check kicks in sooner
170            // when the SSE upstream goes silent. reqwest's blocking client
171            // doesn't expose a per-read body timeout (the obvious shape for
172            // this) — `Client::timeout` is a total-request timeout, the
173            // wrong primitive for a long-lived stream. A more surgical
174            // per-read timeout via the underlying socket needs a custom
175            // reader and is deferred to v0.15; tightening keepalive is the
176            // observable improvement we can ship today. Honey-pine field
177            // guide failure-mode #2 ("daemon alive but stream wedged") is
178            // also surfaced via the new `stream_state.json` so callers can
179            // detect the polling-only degradation without waiting for the
180            // wedge to clear.
181            .tcp_keepalive(Some(Duration::from_secs(30)));
182        if std::env::var(crate::relay_client::INSECURE_SKIP_TLS_ENV)
183            .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
184            .unwrap_or(false)
185        {
186            b = b.danger_accept_invalid_certs(true);
187        }
188        b.build()?
189    };
190
191    let resp = client
192        .get(&stream_url)
193        .header("Accept", "text/event-stream")
194        .bearer_auth(slot_token)
195        .send()?;
196
197    if !resp.status().is_success() {
198        return Err(anyhow::anyhow!(
199            "stream-sub: server returned {} on connect",
200            resp.status()
201        ));
202    }
203
204    // v0.14.2 (#162 fix #7): mark the stream "connected" once the
205    // server has accepted the slot_token + the body read starts. The
206    // outer loop transitions to "reconnecting" on clean close or
207    // "error" on failure; this is the only place we can confidently
208    // claim "stream is live and pulling events for monitor".
209    //
210    // v0.14.3 (coral dogfood 2026-06-01): preserve accumulated
211    // last_event_at + reconnect counter instead of writing null/0
212    // and clobbering history every reconnect.
213    write_stream_state(
214        "connected",
215        accumulated_last_event_at,
216        accumulated_reconnects,
217    );
218    let reader = BufReader::new(resp);
219    for line in reader.lines() {
220        let line = line?;
221        // SSE protocol: each event is one or more `field: value` lines
222        // followed by a blank line. We only care about `data:` lines —
223        // every event the relay sends is a `data: <json>` line. Any other
224        // field (comments via `:keepalive`, etc.) is ignored. Empty line
225        // is the event separator; benign to ignore.
226        if line.starts_with("data:") {
227            // Fire wake signal. If the main loop is busy, the channel
228            // backs up to a small buffer; we don't block — drop on full
229            // since multiple wakes coalesce into a single pull anyway.
230            let _ = wake_tx.send(());
231            // v0.14.2 (#162 fix #7): stamp the most-recent event-arrival
232            // timestamp for `stream_state.json`. Push, don't replace;
233            // outer loop reads .last() so we only keep the latest. Best-
234            // effort format; failure here = no stamp this cycle.
235            let now = time::OffsetDateTime::now_utc()
236                .format(&time::format_description::well_known::Rfc3339)
237                .unwrap_or_default();
238            if !now.is_empty() {
239                last_event_ts.push(now);
240            }
241        }
242    }
243    Ok(())
244}