wire/daemon_stream.rs
1//! Daemon-side SSE stream subscriber (R1 phase 2, v0.5.6).
2//!
3//! Opens a long-lived `GET /v1/events/:slot_id/stream` connection to the
4//! relay using the operator's own slot_token, parses SSE `data:` lines as
5//! they arrive, and pings a wake-channel for each event. The daemon's main
6//! loop replaces `std::thread::sleep(interval)` with `recv_timeout(interval)`
7//! against this channel, so a posted event traverses sender → relay →
8//! subscriber → local inbox in ~10-50ms instead of waiting for the next
9//! ~5s poll tick.
10//!
11//! Failure model: if the stream errors or disconnects, the subscriber
12//! reconnects with exponential backoff (1s → 2s → 4s → 8s → 30s cap). The
13//! daemon's regular polling loop is unaffected and continues as a safety
14//! net — stream-down does NOT mean events-down. Operator running
15//! `wire daemon` with no relay reachability sees both signals (stream
16//! reconnect retries + poll errors) and can diagnose.
17//!
18//! Design note: this is a one-way wake signal, not the data path. The
19//! actual `run_sync_pull` re-fetches via `list_events` so we get
20//! signature verification, dedup, and inbox write through the exact same
21//! code path as polling. The stream only changes WHEN pull runs, not HOW.
22
23use anyhow::Result;
24use std::io::{BufRead, BufReader};
25use std::sync::mpsc::Sender;
26use std::time::Duration;
27
28/// Spawn the stream-subscriber thread. Returns immediately; the thread
29/// runs until process exit. `wake_tx` is signaled on every received SSE
30/// `data:` line (any event, no parsing of body). Errors during connect or
31/// stream-read trigger reconnect-with-backoff, never panic.
32pub fn spawn_stream_subscriber(wake_tx: Sender<()>) {
33 std::thread::Builder::new()
34 .name("wire-stream-sub".into())
35 .spawn(move || run_subscriber(wake_tx))
36 .expect("spawn wire-stream-sub thread");
37}
38
39fn run_subscriber(wake_tx: Sender<()>) {
40 let mut backoff_secs = 1u64;
41 loop {
42 match connect_and_read(&wake_tx) {
43 Ok(()) => {
44 // Stream closed cleanly (e.g., server reload). Quick reconnect.
45 backoff_secs = 1;
46 eprintln!("daemon-stream: connection closed cleanly, reconnecting");
47 }
48 Err(e) => {
49 eprintln!("daemon-stream: error {e:#}; reconnecting in {backoff_secs}s");
50 std::thread::sleep(Duration::from_secs(backoff_secs));
51 backoff_secs = (backoff_secs * 2).min(30);
52 }
53 }
54 }
55}
56
57fn connect_and_read(wake_tx: &Sender<()>) -> Result<()> {
58 // Re-read relay-state on each reconnect so a fresh slot allocation /
59 // rotation picks up automatically without daemon restart.
60 let state = crate::config::read_relay_state()?;
61 let self_state = state
62 .get("self")
63 .cloned()
64 .unwrap_or(serde_json::Value::Null);
65 let url = self_state
66 .get("relay_url")
67 .and_then(|v| v.as_str())
68 .unwrap_or("");
69 let slot_id = self_state
70 .get("slot_id")
71 .and_then(|v| v.as_str())
72 .unwrap_or("");
73 let slot_token = self_state
74 .get("slot_token")
75 .and_then(|v| v.as_str())
76 .unwrap_or("");
77 if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
78 return Err(anyhow::anyhow!(
79 "stream-sub: relay-state missing self.{{relay_url,slot_id,slot_token}} — sleep until next reconnect"
80 ));
81 }
82
83 let stream_url = format!("{url}/v1/events/{slot_id}/stream");
84 // v0.5.13: honor WIRE_INSECURE_SKIP_TLS_VERIFY on the stream sub too,
85 // matching the rest of the wire HTTPS surface (issue #6).
86 let client = {
87 let mut b = reqwest::blocking::Client::builder()
88 // No total timeout: stream is expected to stay open indefinitely.
89 // TCP keepalive catches a hung connection (server crashed, network
90 // black hole) — the BufReader::lines loop returns Err and the
91 // outer reconnect-with-backoff kicks in.
92 .tcp_keepalive(Some(Duration::from_secs(60)));
93 if std::env::var(crate::relay_client::INSECURE_SKIP_TLS_ENV)
94 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
95 .unwrap_or(false)
96 {
97 b = b.danger_accept_invalid_certs(true);
98 }
99 b.build()?
100 };
101
102 let resp = client
103 .get(&stream_url)
104 .header("Accept", "text/event-stream")
105 .bearer_auth(slot_token)
106 .send()?;
107
108 if !resp.status().is_success() {
109 return Err(anyhow::anyhow!(
110 "stream-sub: server returned {} on connect",
111 resp.status()
112 ));
113 }
114
115 let reader = BufReader::new(resp);
116 for line in reader.lines() {
117 let line = line?;
118 // SSE protocol: each event is one or more `field: value` lines
119 // followed by a blank line. We only care about `data:` lines —
120 // every event the relay sends is a `data: <json>` line. Any other
121 // field (comments via `:keepalive`, etc.) is ignored. Empty line
122 // is the event separator; benign to ignore.
123 if line.starts_with("data:") {
124 // Fire wake signal. If the main loop is busy, the channel
125 // backs up to a small buffer; we don't block — drop on full
126 // since multiple wakes coalesce into a single pull anyway.
127 let _ = wake_tx.send(());
128 }
129 }
130 Ok(())
131}