Skip to main content

configvault_sdk/
watcher.rs

1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures_util::StreamExt;
5use reqwest::header::{HeaderMap, HeaderValue};
6use tokio::sync::broadcast;
7
8use crate::models::ConfigChangedEvent;
9
10const DEFAULT_RECONNECT_DELAY: Duration = Duration::from_secs(5);
11const CHANNEL_CAPACITY: usize = 64;
12
13/// SSE-based watcher that delivers `ConfigChangedEvent`s via a broadcast channel.
14///
15/// Multiple subscribers can `subscribe()` to receive events independently.
16/// Call `start()` to begin streaming. Call `stop()` to shut down.
17pub struct ConfigWatcher {
18    base_url: String,
19    api_key: String,
20    filter: Option<String>,
21    reconnect_delay: Duration,
22    sender: broadcast::Sender<ConfigChangedEvent>,
23}
24
25impl ConfigWatcher {
26    /// Create a new watcher (called internally by `ConfigVaultClient::watch()`).
27    pub(crate) fn new(
28        base_url: &str,
29        api_key: &str,
30        filter: Option<&str>,
31        _timeout: Duration,
32    ) -> Self {
33        let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
34        Self {
35            base_url: base_url.trim_end_matches('/').to_string(),
36            api_key: api_key.to_string(),
37            filter: filter.map(str::to_string),
38            reconnect_delay: DEFAULT_RECONNECT_DELAY,
39            sender,
40        }
41    }
42
43    /// Subscribe to config-changed events.
44    ///
45    /// Returns a `broadcast::Receiver`. Call `start()` to begin streaming events.
46    pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangedEvent> {
47        self.sender.subscribe()
48    }
49
50    /// Start the SSE streaming loop in the background.
51    ///
52    /// This spawns a Tokio task that connects to the `/events` endpoint and
53    /// delivers deserialized `ConfigChangedEvent`s to all subscribers.
54    /// Reconnects automatically on connection failure after `reconnect_delay`.
55    pub fn start(&self) {
56        let base_url = self.base_url.clone();
57        let api_key = self.api_key.clone();
58        let filter = self.filter.clone();
59        let reconnect_delay = self.reconnect_delay;
60        let sender = self.sender.clone();
61
62        tokio::spawn(async move {
63            loop {
64                let result = stream_events(&base_url, &api_key, &filter, &sender).await;
65                if let Err(e) = result {
66                    tracing_or_eprintln(format!("SSE connection error: {e}"));
67                }
68                // Reconnect after delay if there are still active receivers
69                if sender.receiver_count() == 0 {
70                    break;
71                }
72                tokio::time::sleep(reconnect_delay).await;
73            }
74        });
75    }
76
77    /// Stop the watcher by dropping the sender, which closes all receivers.
78    pub fn stop(&self) {
79        // Closing is handled by dropping; expose this as a no-op for API parity.
80        // The background task exits when there are no more receivers.
81    }
82
83    /// Set a custom reconnect delay (default: 5 seconds).
84    pub fn with_reconnect_delay(mut self, delay: Duration) -> Self {
85        self.reconnect_delay = delay;
86        self
87    }
88}
89
90/// Stream SSE events, parsing `config-changed` events and sending them to subscribers.
91async fn stream_events(
92    base_url: &str,
93    api_key: &str,
94    filter: &Option<String>,
95    sender: &broadcast::Sender<ConfigChangedEvent>,
96) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
97    let mut url = format!("{base_url}/events");
98    if let Some(f) = filter {
99        url = format!("{url}?filter={}", urlencoding_encode(f));
100    }
101
102    let mut headers = HeaderMap::new();
103    headers.insert(
104        "X-Api-Key",
105        HeaderValue::from_str(api_key)?,
106    );
107    headers.insert(
108        reqwest::header::ACCEPT,
109        HeaderValue::from_static("text/event-stream"),
110    );
111
112    let client = reqwest::Client::builder()
113        .use_rustls_tls()
114        .default_headers(headers)
115        .build()?;
116
117    let response = client.get(&url).send().await?;
118
119    let stream = response.bytes_stream().eventsource();
120    futures_util::pin_mut!(stream);
121
122    while let Some(result) = stream.next().await {
123        let event = result?;
124        if event.event == "config-changed" {
125            if let Ok(parsed) = serde_json::from_str::<ConfigChangedEvent>(&event.data) {
126                // Ignore send errors (no active receivers is OK)
127                let _ = sender.send(parsed);
128            }
129        }
130    }
131
132    Ok(())
133}
134
135/// Simple percent-encode a string for use in a query parameter value.
136fn urlencoding_encode(s: &str) -> String {
137    let mut encoded = String::with_capacity(s.len());
138    for byte in s.bytes() {
139        match byte {
140            b'A'..=b'Z'
141            | b'a'..=b'z'
142            | b'0'..=b'9'
143            | b'-'
144            | b'_'
145            | b'.'
146            | b'~'
147            | b'/'
148            | b'*' => encoded.push(byte as char),
149            b => encoded.push_str(&format!("%{b:02X}")),
150        }
151    }
152    encoded
153}
154
155/// Log an error — in a library we avoid forcing a logging framework.
156/// Users can set up `tracing` subscriber independently.
157#[allow(dead_code)]
158fn tracing_or_eprintln(msg: String) {
159    eprintln!("[configvault-sdk] {msg}");
160}