use anyhow::Result;
use std::io::{BufRead, BufReader};
use std::sync::mpsc::Sender;
use std::time::Duration;
pub fn spawn_stream_subscriber(wake_tx: Sender<()>) {
std::thread::Builder::new()
.name("wire-stream-sub".into())
.spawn(move || run_subscriber(wake_tx))
.expect("spawn wire-stream-sub thread");
}
fn run_subscriber(wake_tx: Sender<()>) {
let mut backoff_secs = 1u64;
loop {
match connect_and_read(&wake_tx) {
Ok(()) => {
backoff_secs = 1;
eprintln!("daemon-stream: connection closed cleanly, reconnecting");
}
Err(e) => {
eprintln!("daemon-stream: error {e:#}; reconnecting in {backoff_secs}s");
std::thread::sleep(Duration::from_secs(backoff_secs));
backoff_secs = (backoff_secs * 2).min(30);
}
}
}
}
fn connect_and_read(wake_tx: &Sender<()>) -> Result<()> {
let state = crate::config::read_relay_state()?;
let self_state = state
.get("self")
.cloned()
.unwrap_or(serde_json::Value::Null);
let url = self_state
.get("relay_url")
.and_then(|v| v.as_str())
.unwrap_or("");
let slot_id = self_state
.get("slot_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let slot_token = self_state
.get("slot_token")
.and_then(|v| v.as_str())
.unwrap_or("");
if url.is_empty() || slot_id.is_empty() || slot_token.is_empty() {
return Err(anyhow::anyhow!(
"stream-sub: relay-state missing self.{{relay_url,slot_id,slot_token}} — sleep until next reconnect"
));
}
let stream_url = format!("{url}/v1/events/{slot_id}/stream");
let client = {
let mut b = reqwest::blocking::Client::builder()
.tcp_keepalive(Some(Duration::from_secs(60)));
if std::env::var(crate::relay_client::INSECURE_SKIP_TLS_ENV)
.map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
.unwrap_or(false)
{
b = b.danger_accept_invalid_certs(true);
}
b.build()?
};
let resp = client
.get(&stream_url)
.header("Accept", "text/event-stream")
.bearer_auth(slot_token)
.send()?;
if !resp.status().is_success() {
return Err(anyhow::anyhow!(
"stream-sub: server returned {} on connect",
resp.status()
));
}
let reader = BufReader::new(resp);
for line in reader.lines() {
let line = line?;
if line.starts_with("data:") {
let _ = wake_tx.send(());
}
}
Ok(())
}